본문으로 건너뛰기

CQRS + Event Sourcing 아키텍처

🏗️ CQRS 패턴 개요


⏱️ Command 처리 시퀀스


📖 Query 처리 시퀀스


📢 Event Sourcing 상세

Event Store 구조

Event Flow


🏛️ Aggregate 패턴


ONESHIM Event Sourcing 현황

도메인별 Aggregate

Handler 분포

유형개수역할
Command Handler223상태 변경 명령 처리
Query Handler201데이터 조회 처리
Event Handler175이벤트 기반 부수 효과
총계599-

구현 패턴

Command Handler 예시

class CreateUserCommandHandler:
def __init__(
self,
repository: IUserRepositoryPort,
event_bus: IEventBusPort
):
self._repository = repository
self._event_bus = event_bus

async def handle(self, command: CreateUserCommand) -> UserResponse:
# 1. Aggregate 로드 또는 생성
user = UserAggregate.create(
email=command.email,
role=command.role
)

# 2. 비즈니스 로직 실행 (이벤트 생성)
user.activate()

# 3. 이벤트 저장
await self._repository.save(user)

# 4. 이벤트 발행
for event in user.uncommitted_events:
await self._event_bus.publish(event)

return UserResponse.from_aggregate(user)

Query Handler 예시

class GetUserQueryHandler:
def __init__(
self,
read_model: IUserReadModelPort,
cache: ICachePort
):
self._read_model = read_model
self._cache = cache

async def handle(self, query: GetUserQuery) -> UserResponse:
# 1. 캐시 확인
cached = await self._cache.get(f"user:{query.user_id}")
if cached:
return cached

# 2. Read Model 조회
user = await self._read_model.get_by_id(query.user_id)

# 3. 캐시 저장
await self._cache.set(f"user:{query.user_id}", user)

return user

관련 문서: