backend

Axon Framework를 이용해서 이벤트 소싱을 도입하자

downfa11 2024. 12. 31. 17:39

원래는 이벤트 소싱으로 도메인을 먼저 관리하고, CQRS 패턴을 이용해서 Command와 Query를 분리하면서 책임을 명확히 하는 식으로 단점을 보완하는게 맞다.

본 프로젝트는 이와 다르게, 전적 통계를 위해 CQRS 패턴을 먼저 구현해서 Axon Server 없이 Kafka만을 이용해서 게임이 끝날때의 전적 종료 이벤트를 받아 작업했다.

그러다가 member 서비스에서 사용자 데이터를 관리하는데 있어서 전적 결과에 따른 실력 점수(Elo, MMR 등)의 변동은 비즈니스적으로 정합성 관리가 중요하다고 판단했다.

따라서 인과는 좀 다르지만 뒤늦게 이벤트 소싱을 통해 사용자 정보를 관리하게 되었다.

아직 사용자 서비스에 대한 CQRS는 구현되지 않았고, 전적 쿼리 서비스와 전적 서비스간의 이벤트 처리도 새로이 들어선 Axon Server를 이용하지 않고 기존 Kafka 방식을 따르고 있다.

아, Saga 패턴을 이용한 분산 트랜잭션은 진짜 생각 없다.

안필요하면 안할거고 필요해도 가능하면 사가 안하고 싶다..

Axon을 이용한 이벤트 소싱(Event Sourcing)

이벤트 소싱 : 애플리케이션의 모든 상태 변경 이벤트를 순서에 맞게 저장해 Status를 만든다.

모든 데이터의 변경을 의미하는 작업들에 대해서 최종적으로 변경된 정보(rdb)만 바라보는 것이 아니라, 그냥 가장 최근에 수신받은 Event를 기준으로 특정 도메인의 정합성을 유지하는거

데이터의 변경 이력이 모두 기록되어 시스템 상태를 언제든지 재구성할 수 있다는 장점이 있다.

그리고 장애나 문제 발생시, 애플리케이션이 복구되면 메시지 브로커(혹은 이벤트 브로커)에 의존해 일관성(트랜잭션)을 구현할 수 있다.

치명적인 단점은.. 한번 이벤트를 잘못 처리하면, 전체 트랜잭션과 정합성이 깨져서 심각한 문제가 된다.

이렇게 되면 어떤 이벤트에서 문제가 생긴 건지도 파악하기 어려움...

 

사용자 데이터를 이벤트 소싱으로 구현하자

public Mono<Void> createMemberByEvent(UserCreateRequest request) {
  CreateMemberCommand axonCommand = new CreateMemberCommand(request.getAccount(), request.getName(), request.getEmail(), request.getPassword());

  return Mono.fromFuture(() -> commandGateway.send(axonCommand))
     .doOnSuccess(result -> create(request, result.toString()).subscribe())
     .doOnError(throwable -> {
         log.error("createMemberByEvent throwable : ", throwable);
           }).then();
}

CreatedMemberMoneyCommand는 서비스 내에서 다루는 명령(Command)이고, 이를 통해 axon에서 사용할 MemberMoneyCreatedCommand를 생성한다.

 

* CommandGateway.send()로 이벤트 큐에 전달 (이벤트 소싱)

@Aggregate
@NoArgsConstructor
public class MemberAggregate {
    @AggregateIdentifier
    private String id;

    @CommandHandler
    public MemberAggregate(CreateMemberCommand command){
        log.info("CreateMemberCommand Handler: "+id);
        apply(new CreateMemberEvent(command.getAccount() ... ));
}

위에서 발행한 MemberMoneyCreatedCommand 를 받아서 처리하는 사용자 Aggreagate

* 받은 커맨드를 통해서 CommandHandler에서는 데이터가 변경 혹은 생성되었다는 이벤트를 생성한다. (저장)

@EventSourcingHandler
public void on(MemberCreatedEvent event) {
	log.info("MemberCreatedEvent Sourcing Handler.");
	id = UUID.randomUUID().toString();
}

apply를 받는 EventSourcingHandler에서 해당 Aggregate를 새롭게 생성한다.

자 그럼, 사용자 Aggregate의 변동도 이벤트로 구현해서 만들 수 있지 않겠나??

public class CreateMemberCommand {
	@NotNull
	@TargetAggregateIdentifier
	private String aggregateIdenfier;
	private String membershipId;
	…
}

@TargetAggregateIdentifier : Axon에서 해당 command를 통해서 aggregateIdentifier를 변경시키는게 목적이라 어노테이션을 통해 주입시켜준다.

Saga 패턴으로 분산 트랜잭션 구현하기 - rollback

생각 없었는데, 이벤트 소싱과 Saga의 이벤트 핸들링의 차이점에 대해 궁금해서 찾아보다가 살짝 테스트 삼아 작성했다..

 

@Saga
@NoArgsConstructor
public class MemberModifySaga {
	private transient CommandGateway commandGateway;
	
	@StartSaga
	@SagaEventHandler(associationProperty = "modifyRequestId")
	public void handle(ModifyRequestCreatedEvent event){
		log.info("ModifyRequestCreatedEvent Start saga");
		String modifyRequestId = event.getModifyRequestId();
		
		SagaLifecycle.associateWith("modifyRequestId", modifyRequestId);
		commandGateway.send(new CheckRegisterMembershipCommand()
		.. 중략
       );
	}
}

 

transient는 객체를 직렬화할때 제외되며, 역직렬화시 기본값(null, 0, false)로 설정된다.

→ 여기선 Saga가 길게 실행될 수 있기 때문에 직렬화되는데, commandGateway는 외부 시스템과 상호작용을 위해 필요한거라 직렬화할 필요가 없다.

@StartSaga : Event 이벤트가 발행시 Saga를 시작

@SagaEventHandler : Event 이벤트에 반응해서 해당 메소드가 호출

Axon은 이벤트를 Saga에 전달하고, 해당 Saga의 handle 메소드가 실행되는거다.

* associationProperty : 부하가 많아 Saga가 여러 개 있는 경우, 서로를 구분하기 위한 구분자로 동작

SagaLifecycle.associateWith : Saga를 특정 Id와 연결해서, RequestId가 같은 이벤트들이 모두 동일한 Saga에 의해 처리되도록 유지한다.

→ 같은 RequestId의 후속 이벤트들이 같은 Saga에 의해 처리되도록 보장

이벤트 소싱과 이벤트 핸들링(Saga)의 차이점

이벤트 소싱은 Aggregate의 상태를 데이터베이스나 스냅샷이 아닌 이벤트의 연속된 기록으로 관리하는 방식이다.

이벤트 핸들링 : 이벤트가 발행되면 Axon Framework는 해당 이벤트를 이벤트 버스(Event Bus)를 통해 브로드캐스트한다.

* 이벤트에 관심있는 핸들러들이 해당 이벤트를 처리한다.

헷갈리지말자.

이벤트 소싱을 통한 데이터 처리를 잘 작성했는지 검증해보자

본 서비스에서는 크게 사용자 생성, 사용자 정보 변경과 매 게임마다 변동되는 사용자의 실력 점수(Elo) 변동을 구현했다.

  • 이벤트 소싱을 통해 구현한 /users/create-member 호출
CreateMemberCommand Handler
CreateMemberEvent Sourcing Handler
createMemberByEvent result : eb725495-cc4f-4842-801e-9dec6dc337f6
 

으헤헤 Aggregate의 생성자에서 @CommandHandler로 주입한게 잘 처리되었다.

따라서 각 과정에서 로깅을 추가해 과정을 살펴보면 다음과 같이 나온다.

사용자 생성 후 Elo 변동, 사용자 정보 변경을 순서대로 진행했을때의 로그는 다음과 같다.

 

CreateMemberCommand Handler: null
CreateMemberEvent Sourcing Handler: 0a7c4869-54de-48ff-a392-60eeeea2f3d7
createMemberByEvent result : 0a7c4869-54de-48ff-a392-60eeeea2f3d7

3 번 사용자인 string의 memberAggregateIdentifier : 0a7c4869-54de-48ff-a392-60eeeea2f3d7
CreateMemberEvent Sourcing Handler: 935b42c6-2a21-44bf-9267-6c2026c5614b
ModifyMemberEloCommand Handler: 935b42c6-2a21-44bf-9267-6c2026c5614b
ModifyMemberEloEvent Sourcing Handler: 0a7c4869-54de-48ff-a392-60eeeea2f3d7
modifyMemberEloByEvent result : 0a7c4869-54de-48ff-a392-60eeeea2f3d7

3 번 사용자인 string의 memberAggregateIdentifier : 0a7c4869-54de-48ff-a392-60eeeea2f3d7
CreateMemberEvent Sourcing Handler: d4b98c6e-aad8-4715-bcdd-4453aab98c5e
ModifyMemberEloEvent Sourcing Handler: 0a7c4869-54de-48ff-a392-60eeeea2f3d7
ModifyMemberCommand Handler: 0a7c4869-54de-48ff-a392-60eeeea2f3d7
ModifyMemberEvent Sourcing Handler: 0a7c4869-54de-48ff-a392-60eeeea2f3d7
modifyMemberByEvent result : 0a7c4869-54de-48ff-a392-60eeeea2f3d7

처음에는 엥 왜 한번 작업할때마다 이전 이벤트들이 모두 중복되어서 실행되지? 하고 삽질을 좀 했다.

그러나 생성된 이벤트는 중복되지 않았고 원래 이벤트 소싱이 Aggregate의 상태를 해당 Aggregate에 적용되었던 모든 이벤트 기록으로부터 복원하는 거였다.

따라서 순차적으로 모든 이벤트 소싱 핸들러(@EventSourcingHandler)가 순차적으로 실행되면서 해당 Id의 Aggregate의 최신 상태로 복원한다.

이런 방식으로 항상 일관된 상태를 유지하게 해주며 불변성을 보장해준다.

특정 이벤트를 재처리 혹은 롤백해서 변경할 수 있도록 하는 유연성도 함께 제공해준다.

그러나 상태 변화가 많아질 수록 이벤트 저장소의 크기 문제나, 복원하는데 드는 시간이 커지는 등의 문제가 생길 수 있다.

이벤트 소싱과 기존 방식간의 성능 테스트 비교

일반적인 방식으로 구현한 Elo 변동과 이벤트 소싱을 이용한 로직의 성능을 비교해봤다. 이젠 아주 그냥 습관적으로 돌린다

Elo 실력 점수를 1~2000 사이의 랜덤 값으로 변동하는 로직을 두 개 작성했다.

1. 데이터 영속화 방식의 Elo 변동

public Mono<User> modifyMemberElo(String membershipId, Long elo) {
        return userRepository.findById(Long.parseLong(membershipId))
                .flatMap(u -> {
                    u.setElo(elo);
                    u.setUpdatedAt(LocalDateTime.now());
                    return userRepository.save(u);
                });
}

 

2. 이벤트 소싱 방식의 Elo 변동

public Mono<Void> modifyMemberEloByEvent(String membershipId, Long elo) {
        // command를 만들어서 axon-server로 보낼거야

        return userRepository.findById(Long.parseLong(membershipId))
                .flatMap(user -> {
                    String memberAggregateIdentifier = user.getAggregateIdentifier();
                    ModifyMemberEloCommand axonCommand = new ModifyMemberEloCommand(memberAggregateIdentifier, membershipId, elo);

                    return Mono.fromFuture(() -> commandGateway.send(axonCommand))
                                    .doOnSuccess(result -> updateElo(Long.parseLong(membershipId), elo, result.toString()).subscribe())
                                    .doOnError(throwable -> {
                                        log.error("modifyMemberEloByEvent throwable : ", throwable);
                                    })
                                    .then();
                });
    }

 

두 방식의 Elo 변동 로직을 Controller에서 다음과 같이 편하게 사용하면 된다.

Random random = new Random();
Long elo = random.nextLong(2000);
userService.modifyMemberEloByEvent(membershipId, elo);

 

성능 테스트는 Apaceh JMeter로 진행했고 쓰레드 수 100, 30초동안 Loop 10번을 동일하게 2-3회 예열시킨 뒤에 측정했다.

방식
Latency
TPS
데이터 영속화
7 ms
33.6 /sec
이벤트 소싱
354 ms
26.1 /sec

????????????????????????????

어라.. 비동기로 처리해서 적어도 더 성능이 좋을거라 생각했는데 의외로 성능이 떨어진다.

Spring Webflux 환경이라 비동기임에도 동기식 로깅을 덕지덕지 붙였기에, 성능이 떨어지는건 감안해도.. 어..

그만멍청하고싶다 나도 이유알고싶어

잠을 못자다가 고민을 좀 더 해봤다. 매번 해당 aggregateIdentifier를 가진 Aggregate의 이벤트들을 복원해서 최신 상태의 데이터로 만들고 나서 해당 이벤트를 쌓는다.

‘한 명의 사용자에게 호출하는’ 테스트 방식은 이벤트 소싱에 있어서 퍼포먼스가 떨어질 수 밖에 없다는 결론이 나왔다.

해당 사용자의 Elo를 랜덤으로 바꾸는 기존 테스트 방식으로는 1000번째 호출시 999개의 이전 이벤트들을 불러와서 복원하는 과정이 들어 갈수록 지연이 늘어날 수 밖에 없다.

이를 해결하기 위해서 일정 시간이 지나거나 일정 개수를 넘으면 snapshot으로 저장해두고 복원시 가져오면서 비용을 줄일 수 있을거 같다. 추후 CQRS패턴 적용과 함께 개선해보겠음

시행착오의 해결

1. Reactor의 신비함…

    public Mono<Void> insertResultCountIncreaseEventByUserName(Long membershipId, String username, InsertResultCountDto insertResultCountDto) {
        return Mono.fromRunnable(() -> {
            log.info("page1 :" + insertResultCountDto);
        }).subscribeOn(Schedulers.boundedElastic())
                .then();
    }

 

다른 이름의 같은 로직 함수는 잘 동작함.. 왜 얘만 안되지....

fromRunnable() 블럭 안쪽이 동작하지 않는데, REST로 요청 들어올때는 subscribe() 안할텐데

아 어렵다 컴퓨터……

음 일단 REST가 아니라 Axon의 @EventHandler를 통해 주입되는 동작에서 문제가 생긴거였다.

 

@EventHandler
    public void handle(ResultRequestEvent event, ResultQueryService resultQueryService) {
        Flux.fromIterable(allClients)
                .flatMap(clientRequest -> {
                    resultQueryService.insertResultCountIncreaseEventByUserName(membershipResultEventDto);
                    return resultQueryService.insertResultCountIncreaseEventByChampName(eventDto);
                })
                .subscribe();
}

반환하는 insertResultCountIncreaseEventByChampName와 다르게 insertResultCountIncreaseEventByUserName는 결과를 반환하지 않고 있음.

병렬처리할건 아니라서 그냥 이어서 진행했다.

 

    @EventHandler
    public void handle(ResultRequestEvent event, ResultQueryService resultQueryService) {
        Flux.fromIterable(allClients)
                .flatMap(clientRequest -> {
                    Mono<Void> userMono = resultQueryService.insertResultCountIncreaseEventByUserName(membershipResultEventDto);
                    Mono<Void> champMono = resultQueryService.insertResultCountIncreaseEventByChampName(eventDto);
                    return userMono.then(champMono);
                })
                .subscribe();
    }

 

헹 된다.

 

2. CommandGateway is null 오류

2024-10-08T20:07:57.465+09:00 ERROR 1 --- [membership] [or-tcp-epoll-10] a.w.r.e.AbstractErrorWebExceptionHandler : [c2bbb2ca-12]  500 Server Error for HTTP POST "/users/increase-eda?membershipId=1&elo=100"
java.lang.NullPointerException: Cannot invoke "org.axonframework.commandhandling.gateway.CommandGateway.send(Object)" because "this.commandGateway" is null
        at com.ns.membership.service.UserService.lambda$modifyMemberEloByEvent$43(UserService.java:569) ~[!/:0.0.1]
        Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: 
Error has been observed at the following site(s):
        *__checkpoint ⇢ Handler com.ns.membership.controller.UserController#ModifyMemberEloByEvent(String, Long) [DispatcherHandler]
        *__checkpoint ⇢ HTTP POST "/users/increase-eda?membershipId=1&elo=100" [ExceptionHandlingWebHandler]

Axon Framework의 버전 호환문제였다. 4.10.1 최신 버전으로 해결했다.

3. 이벤트 저장소가 설정되지 않아서 이벤트 소싱이 안되는 에러

Application run failed
java.lang.IllegalStateException: Cannot apply reactive transaction to non-reactive return type [void] with specified transaction manager: org.springframework.r2dbc.connection.R2dbcTransactionManager@1b08d26f
        at org.springframework.transaction.interceptor.TransactionAspectSupport.lambda$invokeWithinTransaction$0(TransactionAspectSupport.java:362) ~[spring-tx-6.1.4.jar!/:6.1.4]
      
Deregistered handler for command 'com.ns.membership.axon.common.CreateMemberCommand' in context 'default'
Deregistered handler for command 'com.ns.membership.axon.common.ModifyMemberEloCommand' in context 'default'
Deregistered handler for command 'com.ns.membership.axon.common.ModifyMemberCommand' in context 'default'

Axon Server Dockerimage :  hub.docker.com

 

 

 

도커 이미지도 lastest-jdk-17 최신으로 업데이트했다.

Connecting to AxonServer node [axon-server:8124] failed: UNAVAILABLE: io exception


사실 Axon Server의 대시보드에서 StandAlone 설정을 Complete하는거 까먹어서 못알아먹은거다.

아래의 로깅에서 Successfully connection to axon-server:8124를 볼 수 있다!

물론 Axon 서버 대시보드에서도 membership 서비스 노드가 잘 참가한 것도 함께 확인했다.

 

 

Axon Framework를 이용한 이벤트 복원 (SnapShot, Event Replay)

매번 해당 aggregateIdentifier를 가진 Aggregate의 이벤트들을 복원해서 최신 상태의 데이터로 만들고 나서 해당 이벤트를 쌓는다.

‘한 명의 사용자에게 호출하는’ 테스트 방식은 이벤트 소싱에 있어서 퍼포먼스가 떨어질 수 밖에 없다는 결론이 나왔다.

해당 사용자의 Elo를 랜덤으로 바꾸는 기존 테스트 방식으로는 1000번째 호출시 999개의 이전 이벤트들을 불러와서 복원하는 과정이 들어 갈수록 지연이 늘어날 수 밖에 없다.

이를 해결하기 위해서 일정 시간이 지나거나 일정 개수를 넘으면 snapshot으로 저장해두고 복원시 가져오면서 비용을 줄일 수 있다.

Snapshot을 통한 이벤트 복원시 비용 감소

MSA의 Axon Framework에서 이벤트 소싱 패턴을 적용하고 있다.

Axon Framework는 아주 쉽게 Snapshot 기능을 이용할 수 있도록 지원한다.

snapshot 저장소에 n개의 이벤트 생성시마다 기록을 해두고, 직접 스토리지에서 매번 가져오는 부하가 생긴다. 따라서 이 과정에서 Java의 WeakReference를 이용한 캐싱으로 IO 성능을 개선시킬 수 있다.

두 가지 작업이 요구된다

  • Snapshot Configuration
  • Aggregate에 사용할 Snapshot 설정 명시

SnapShot Configuration

AxonConfig class에서 진행된다.

@Configuration
public class AxonConfig {

    @Bean
    public SnapshotTriggerDefinition snapshotTrigger(Snapshotter snapshotter) {
        return new EventCountSnapshotTriggerDefinition(snapshotter, 3);
    }

    @Bean
    public Cache snapshotCache() {
        return new WeakReferenceCache();
    }
}

EventCountSnapshotTriggerDefinition(snapshotter, Event 개수) : 몇 개의 Event 마다 Snapshot 저장소에 기록할 것인지 명시

 

@Bean
public SnapshotTriggerDefinition snapshotTrigger(Snapshotter snapshotter){
   return new EventCountSnapshotTriggerDefinition(snapshotter, 3);
}
  • Aggregate가 Event Replay를 하기 위해서는 각 Event마다 매번 Event Store에 접근하여 데이터를 읽어야 한다.
  • 하지만 이 방식은 매우 비효율적이고 속도도 느리다

아래의 설정은 Event Store의 데이터를 메모리에 로딩하여 Event Replay가 메모리 내에서 이루어 지도록 한다.

@Bean
public Cache snapshotCache(){
	return new WeakReferenceCache();
}

 

Aggregate에 Snapshot 명시

해당 Aggregate에 @Aggregate 어노테이션의 패러미터로 지정하면 끝난다.

 

@Aggregate(snapshotTriggerDefinition = "snapshotTrigger", cache = "snapshotCache")
public class MembershipAggregate{
	@AggregateIdentifier
	private String id;
	...
}