CQRS + Event Sourcing 아키텍처
🏗️ CQRS 패턴 개요
⏱️ Command 처리 시퀀스
📖 Query 처리 시퀀스
📢 Event Sourcing 상세
Event Store 구조
Event Flow
🏛️ Aggregate 패턴
ONESHIM Event Sourcing 현황
도메인별 Aggregate
Handler 분포
| 유형 | 개수 | 역할 |
|---|---|---|
| Command Handler | 223 | 상태 변경 명령 처리 |
| Query Handler | 201 | 데이터 조회 처리 |
| Event Handler | 175 | 이벤트 기반 부수 효과 |
| 총계 | 599 | - |
구현 패턴
Command Handler 예시
class CreateUserCommandHandler:
def __init__(
self,
repository: IUserRepositoryPort,
event_bus: IEventBusPort
):
self._repository = repository
self._event_bus = event_bus
async def handle(self, command: CreateUserCommand) -> UserResponse:
# 1. Aggregate 로드 또는 생성
user = UserAggregate.create(
email=command.email,
role=command.role
)
# 2. 비즈니스 로직 실행 (이벤트 생성)
user.activate()
# 3. 이벤트 저장
await self._repository.save(user)
# 4. 이벤트 발행
for event in user.uncommitted_events:
await self._event_bus.publish(event)
return UserResponse.from_aggregate(user)
Query Handler 예시
class GetUserQueryHandler:
def __init__(
self,
read_model: IUserReadModelPort,
cache: ICachePort
):
self._read_model = read_model
self._cache = cache
async def handle(self, query: GetUserQuery) -> UserResponse:
# 1. 캐시 확인
cached = await self._cache.get(f"user:{query.user_id}")
if cached:
return cached
# 2. Read Model 조회
user = await self._read_model.get_by_id(query.user_id)
# 3. 캐시 저장
await self._cache.set(f"user:{query.user_id}", user)
return user
관련 문서: