Hexagonal Architecture 마이그레이션
Hexagonal Architecture 마이그레이션
프로젝트 내의 모든 서비스의 마이그레이션이 어느정도 마무리되었다. (25-02-22)
가장 아래에서 일단락된 스프린트에 대한 후기를 작성했다.
대공사가 진행중입니다.
와 너무 양이 많다……….
그나마 통계 서비스는 내용이 적고 DynamoDB를 애초에 Adapter 형태로 설계해서 좀 편했다.
뭣도 모르고 사전순으로 피드(Feed) 서비스부터 시작했다가 뭔가 잘못됐다는걸 크게 느껴버렸음.
게시글, 공지사항, 이벤트, 댓글, 좋아요 기능에는 PostgreSQL와 Redis를 섞어 쓰고 있기 때문에 엔티티와 외부시스템이 가장 많다.
현재 피드, 매칭, 통계를 담당하는 각 마이크로 서비스는 마이그레이션을 완료한 상태고, 사용자 정보 관리쪽과 게임 결과 관리 서비스만 남은 상태이다.
나머지 서비스들을 모두 접어도 한 화면 안에 다 안들어온다
- Post UseCase
- registerBoardUseCase.create(membershipId,request)
- modifyPostUseCase.modify(request)
- deletePostUseCase.deleteById(boardId)
- updateLikeUseCase.updateLikes(membershipId, boardId)
- findPostUseCase.findPostById(request.getBoardId())
- findPostUseCase.findAll()
- findPostUseCase.findPostAllPagination(categoryId, sortedPageRequest)
- findPostUseCase.findPostResponseById(boardId)
- findPostUseCase.updatePostResponseById(boardId)
- Comment UseCase
- registerCommentUseCase.create(membershipId, request)
- modifyCommentUseCase.modify(request)
- findCommentUseCase.findByCommentId(id)
- deleteCommentUseCase.deleteByCommentId(commentId)
순환 문제(Cycle Context) 해결
Post에 관한 Port들을 구현한 Post...Adapter는 KafkaPort를 통해서 회원 서비스로부터 사용자 이름과 같은 정보를 받아오는 구조이다. (Outbound)
필연적으로 회원 서비스로부터 요청한 데이터를 받기 위해 KafkaConsume 과정이 필요한데, 이 과정은 Inbound라 Adapter와 Port를 구분해야했다.
PostService와 CommentService는 사용자 정보를 회원 서비스로부터 받기 위해 각각 KafkaTaskAdapter들을 가진다.
UseCase를 구현한 postService에서 게시글을 조회할때 댓글들도 함께 가져오기 때문에 comment..Port를 가지므로 위와 같은 형태가 나온건데, 사실 commentService와 postService 둘다 kafkaProducer, kafkaConsumer를 가지기 때문에 생긴 문제다.
The dependencies of some of the beans in the application context form a cycle:
┌─────┐
| kafkaTaskConsumerAdapter
↑ ↓
| postService
↑ ↓
| commentService
↑ ↓
| kafkaTaskProducerAdapter
└─────┘
Port와 Adapter를 통해 외부 세계의 종속을 피하려고 한건데 졸지에 갇혀버렸다.
Inbound 역할을 담당하는 KafkaConsumerAdapter에서는 Kafka 의존성만 관리하도록 하고, UseCase를 분리해서 순환 문제를 해결하고자 했다.
@PersistanceAdapter
@RequiredArgsConstructor
public class KafkaTaskConsumerAdapter {
private final ReactiveKafkaConsumerTemplate<String, Task> TaskRequestConsumerTemplate;
private final ReactiveKafkaConsumerTemplate<String, Task> TaskResponseConsumerTemplate;
private final FindPostUseCase findPostUseCase;
private final TaskConsumerService taskConsumerService;
...
}
ConsumerAdapter는 Kafka 메시지의 수신만 담당하고, 수신한 메시지는 TaskConsumerService로 전달해서 보관한다.
@Adapter
public class TaskConsumerService implements TaskConsumerPort {
private final ConcurrentHashMap<String, Task> taskResults = new ConcurrentHashMap<>();
private final int MAX_TASK_RESULT_SIZE = 5000;
@Override
public Task getTaskResults(String taskId) {
return taskResults.get(taskId);
}
@Override
public Mono<String> waitForGetUserNameTaskFeed(String taskId) {
return Flux.interval(Duration.ofMillis(500))
.map(tick -> getTaskResults(taskId))
...
.next()
.timeout(Duration.ofSeconds(3));
}
수신한 메시지를 토대로 Task들을 보관, 관리하는 TaskConsumerService로 인터페이스를 분리해서 순환 문제를 해결했다.
Hexagonal Architecture를 도입하는 과정에서 느낀 점
원래는 Adapter는 IO 작업만 담당해서 Port를 UseCase로 넘겨주는 역할이고, UseCase의 구현체에서 비즈니스 로직을 담당해야한다.
하지만 외부 시스템과 상호작용하는 Port의 구현체인 Adapter에서 어느 정도까지 구현하는지에 따라서 두 영역이 모호해짐을 경험했다.
Port의 구현체(Adapter)와 UseCase의 구현체(Service)간의 모호성
@UseCase
@RequiredArgsConstructor
public class FindStatisticsService implements FindStatisticsUseCase {
private final FindStatisticsPort findStatisticsPort;
@Override
public Mono<CountSumByChamp> findStatiscticsByChampion(String champName) {
return findStatisticsPort.queryToResultSumByChampName(champName);
}
@Override
public Mono<CountSumByMembership> findStatiscticsByUserName(String userName) {
return findStatisticsPort.queryToResultByUserName(userName);
}
}
비즈니스 로직이 이뤄져야할 UseCase 구현체(Service)에서 이렇게 단순히 Port를 호출해버리는 방식은 사실 별로 좋지 않다.
얼마나 복잡한 비즈니스인지에 따라서 외부 시스템이 교체될때, Adapter의 코드를 비즈니스 로직을 새로 작성하는 수준으로 적어버리게 될 수 있다.
외부 시스템의 변동에 유연한 대응을 위해서 지금까지 Hexagonal Architecture 해보겠다고 고생한건데 로직을… 새로 작성해????????
다행히 이 findStatisticsPort
는 복잡하지 않고 각 메서드당 한두줄 남짓한 코드라서 재사용에 무리가 없다.
@PersistanceAdapter
@RequiredArgsConstructor
public class FindStatisticsAdapter implements FindStatisticsPort {
private final QueryGateway queryGateway;
@Override
public Mono<CountSumByChamp> queryToResultSumByChampName(String champName) {
// 챔프의 전체 판수와 승률을 쿼리
return Mono.fromFuture(() ->
queryGateway.query(new QueryResultSumByChampName(champName), CountSumByChamp.class));
}
@Override
public Mono<CountSumByMembership> queryToResultByUserName(String champName) {
return Mono.fromFuture(() ->
queryGateway.query(new QueryResultSumByUserName(champName), CountSumByMembership.class));
}
}
그렇다고 해도 좋지 않음은 인지해서 특히 데이터베이스와 상호작용하는 서비스들에 대해서는 Persistence Adaper 구현에 신경쓰고 있다.
외부 시스템과의 소통은 Port의 구현체인 Adapter에서 담당하고, 비즈니스 로직은 UseCase의 구현체인 Service 수준에서 담당하도록 책임 분리를 항상 염두해두자.
Webflux에서 switchIfEmpty()
와 defaultIfEmpty()
의 차이점
둘 다 Mono.empty()
일때 대체값을 반환하는 역할이지만 동작 방식에 차이가 있다.
1. defaultIfEmpty()
Mono가 empty일 때 지정한 값을 반환하며, 데이터 변환 불가능 (Mono<T>
→ Mono<T>
유지)
flatMap 내부에서는 실행되지 않음
defaultIfEmpty()
는 Mono<String>
을 그대로 유지하며 새로운 연산을 수행하지 못해 추가적인 비동기 작업을 할 수 없음
2. switchIfEmpty()
Mono가 empty일 때 대체 Mono를 실행하며, 데이터 변환 가능 (Mono<T>
→ Mono<R>
가능)
flatMap 내부에서도 실행됨
switchIfEmpty()
는 비동기 연산을 추가할 수 있고, 다른 Mono를 반환할 수도 있다.
3. 결론
간단한 기본값을 반환하고 싶다면 defaultIfEmpty()
새로운 Mono를 실행하거나 추가적인 비동기 작업이 필요하다면 switchIfEmpty()
마이그레이션 후기
실제로 Result 서비스는 개발 환경에서 ElasticSearch를 이용해서 전적 검색 기능을 구현했다.
하지만 배포 환경에서 ElasticSearch의 클러스터 환경부터 구축하는데 리소스를 쏟을 생각은 없었고 AWS OpenSearch로 이전할 생각이었다.
그에 따라서 Adapter만 갈아끼우면 비즈니스 로직에 아무 지장없이 외부 의존성을 대체할 수 있는 Hexagonal Architecture가 매력적인 선택지였던 것 같다.
비즈니스 로직을 외부 시스템으로부터 격리해 지속 가능한 개발을 도모하는 클린 아키텍처의 일환으로, 직접 프로젝트에 녹여서 의존성을 쉽게 대체한 경험은 소중하게 다가왔다.
1차적으로 헥사고날 아키텍처의 틀을 구성했고 이제 Task, SubTask 캡슐화에 관련한 Mapper 분리나 생성자 모호성 구분 등으로 2차 리팩토링을 계획하고 있다.
아키텍처 수준에서 보완해야할 점
아직 Port를 구현한 Adapter에서 어느 정도 수준까지 비즈니스를 구현해야할지에 대한 모호함이 남아있다.
특히 membership 서비스의 UserEventSourcingAdapter에서는 이벤트 소싱에 기반한 사용자 데이터에 대한 생성, 변경을 담당하고 있다.
@Override
public Mono<User> createMemberByEvent(UserCreateRequest request) {
CreateMemberCommand axonCommand = new CreateMemberCommand(request.getAccount(), request.getName(), request.getEmail(), request.getPassword());
return Mono.fromFuture(() -> commandGateway.send(axonCommand))
.flatMap(result -> registerUserPort.create(request, (String) result))
.doOnError(throwable -> log.error("createMemberByEvent throwable : ", throwable));
}
CommandGateway와의 의존성을 느슨하게 하려고 만들었지만 수신받은 AggregateIdentifier를 토대로 조회용 데이터 모델에 접근하는 비즈니스까지 함께 담당하고 있다.
반면 result 서비스에서는 이벤트 소싱에 관한 Adapter에서 오직 Command의 전달만 구현하고 비즈니스를 담당하는 UseCase에서 결합해 로직을 이룬다.
@Override
public Mono<QueryPlayer> updateEloByEvent(String membershipId, Long balancedElo) {
return findPlayerPort.findByMembershipId(membershipId)
.flatMap(player -> {
UpdateEloCommand command = new UpdateEloCommand(player.getAggregateIdentifier(), membershipId, balancedElo);
return sendCommandPort.sendUpdatePlayer(command)
.then(Mono.defer(() -> queryToPlayerByMembershipId(membershipId)))
.doOnError(throwable -> log.error("Failed to start Elo Update Saga", throwable));
});
}
후자(Result 서비스)쪽이 더 외부 의존성으로부터 격리되어서 대체하기 쉽다고 판단했다.
아직 서비스 수준에서 두 서비스간의 비교를 좀 더 해보고 완전히 결정하고자 한다.