project/wargame

게임종료시 전적 갱신에 대한 트랜잭션 롤백 처리 (Saga 패턴, Axon Framework)

downfa11 2025. 3. 16. 17:13

적어놓고 블로그 올릴까 말까 굉장히 고민했다.

 

어려운 내용이기도 하고.. 남들이 물어봐도 잘 설명할 수 있을까?

 

내가 아직 공부가 부족하다고 생각하는 파트라 조심스럽다.

그래도 멀쩡히 구현해냈으니 숨길 필요는 없다고 생각해서 올려버렸다. 

 

* 프로젝트의 Saga 도입과 트랜잭션 장애 상황에 대한 롤백 구현을 위한 포스트라, Axon Framework나 Saga 패턴에 대한 이론적인 내용은 생략하겠다. 

 

잦은 쓰기 작업에 대한 이벤트 소싱(Event Sourcing)

사용자의 실력 점수(Elo)와 현재 진행중인 게임 방의 Id(code)는 매 게임마다 변경된다.

비즈니스상 데이터의 정합성이 매우 중요하며 장애 발생시 롤백이 구현되어야한다.

게임 완료를 이벤트로 처리하여 이벤트 소싱 처리함

 

 

AxonFramework의 Command ↔ Event 차이점

발행한 사실(Event)와 행동을 요구(Command). 엄격히 두 개념을 구분해야한다.

 

이벤트는 여러 개의 Handler에서 한꺼번에 처리할 수 있으며 비동기로 실행해 결과를 반환하지 않는다.

Command는 단 하나의 대상만 처리하며, 결과를 즉시 반환하고 Aggregate가 받아서 Command를 처리할 유일한 책임을 진다.

 

 

 

 

보상 트랜잭션 구현에서 겪은 기존 서비스의 구조적 문제

DB update에 실패했을 경우, 이미 진행한 Elo 변동은 취소할 수 없다는 문제

즉, 실패시 실력점수를 관리하는 회원 서비스에 보상 트랜잭션을 요청해야한다.

 

(Axon Server)

  • Event Queue → Consumer → Event Store → command handler(axon framework)
  • Axon Framework Saga → Axon Server Saga

 

보상 트랜잭션을 통해 실력점수를 업데이트하고, 게임이 종료되면 gameCode를 다시 비워주는 작업이 필요하다.

여기서 gameCode는 현재 게임중인 방의 Id로, 사용자가 게임중인지 판단하는데 주로 사용하고 있다.

 

그러던중 트랜잭션을 구현하는데 있어 기존 아키텍처의 구조적 결함을 발견했다.

 

 

왜 회원 서비스에서 게임 결과에 따라 바뀌는 실력점수(Elo)나 gamecode를 관리했는가?

 

이제부터라도 Aggregate로 만들고 result 서비스에서 관리하겠다.

  • MemberAggregate : 회원 서비스에서는 서비스의 회원(member)으로서 Aggregate를 관리
  • PlayerAggregate : 게임 결과 서비스에서는 게임의 플레이어(player)로서 Aggregate를 관리

 

 

 

Saga 패턴을 이용한 분산 트랜잭션 설계 (feat. AxonFramework)

게임 완료 이벤트에 대한 비즈니스 Saga 설계

직접 만들었따ㅋ

 

  1. 게임서버로부터 게임 완료 이벤트를 수신받는다. (Kafka)
  2. 정합성을 판단한 뒤, state가 Dodge, Success인지에 따라 행동한다.
  • Success 경우
    1. 우선적으로 전적 DB에 기록한다. (ElasticSearch)
    2. 해당 전적에 속하는 사용자들에게 GameFinishedCommand 전달 (membership service)
    3. 각 사용자들의 Elo 점수를 업데이트하고, gamecode를 공백으로 만든다.
  • Dodge 경우
    1. 해당 전적에 속하는 사용자들에게 GameFinishedCommand 전달 (membership service)
    2. 각 사용자들의 gamecode를 공백으로 만든다.
  • 장애 발생시
    1. 보상 트랜잭션을 전달해서 Elo 점수를 롤백한다.
    2. gamecode는 공백으로 만들어야한다.

 

그리고 기존 방식에서는 통계 서비스에서 CQRS 패턴을 사용함에 있어 ‘이벤트 소싱도 안하는데 Kafka 써도 되는거 아니야?’ 라는 생각이 이제 었다.

 

지금은 이벤트 소싱도 도입했고 Saga도 적용해야하니 기존 CQRS 패턴 사용에도 Axon Server를 이용해서 소싱하도록 적용했다.

 

 

 

매칭 서버에서 다른 마이크로 서비스로부터 요청하는 데이터 :

MatchResponse(회원 서비스), 현재 게임중인지 여부(회원 서비스)

  • 사용자의 nickname(회원 서비스), 사용자의 실력점수, 현재 게임중인지 여부(결과 서비스)

매칭 과정에서 회원 서비스와 결과 서비스에게 데이터를 요청하고 받아오는 복잡한 과정이 있지만, 분산 트랜잭션을 처리할 필요는 없다!

 

 

게임 종료 이벤트(GameFinishedEvent) 발행

게임 서버(IO Completion Port, C++)로부터 종료된 게임의 전적 결과를 Kafka를 통해 전달하면, 전적 서비스는 해당 이벤트를 발행합니다.

 

com.ns.result.adapter.in.kafka.KafkaTaskConsumerAdapter.java

    @Override
    public void sendReceivedGameFinishedEvent(SubTask subtask) {
        GameFinishedCommand axonCommand = objectMapper.convertValue(subtask.getData(), GameFinishedCommand.class);

        Mono.fromFuture(() -> commandGateway.send(axonCommand))
                .doOnSuccess(success -> log.info("GameFinishedCommand sent successfully: " + success))
                .doOnError(throwable -> log.error("Failed to send GameFinishedCommand: " + throwable))
                .then().subscribe();
    }

 

이를 통해서 해당 이벤트를 받아서 처리해야한다.

 

 

전적 기록+통계 업데이트+Elo 업데이트 시도 완료

분산 트랜잭션으로 이어지는 것을 확인했다.

 

다만 아직 Elo 업데이트 과정에서 문제를 겪고 있다.

더미로 만든 게임종료 이벤트의 Player들이 존재하지 않아서 Elo를 반영하질 않고 있음

 

이미 생성했던 PlayerAggregate인 membershipId=1, 5를 이용해서 더미 이벤트에 주입했다.

GameFinishedSaga에서 다음과 같이 Elo 변동 이벤트를 발행한다.

 

   private Mono<Void> updateElo(GameFinishedEvent event){
        boolean isWin = event.getWinTeam().equalsIgnoreCase("blue");
        //todo. blue 문자열로 받았었나? 치맨가..

        List<MembershipEloRequest> newEloRequests = eloService.updateElo(getEloRequests(event), isWin);
        log.info("updateElo 연산한 결과로 각 팀별로 업데이트해야할 값: " + newEloRequests);
        return Flux.fromIterable(newEloRequests)
                .flatMap(request -> {
                    String membershipId = String.valueOf(request.getMembershipId());
                    Long newElo = request.getElo();
                    log.info("현재 사용자:"+membershipId+"의 새롭게 반영될 Elo는 "+newElo);
                    return findPlayerPort.findByMembershipId(membershipId)
                            .flatMap(player -> Mono.fromFuture(commandGateway.send(new UpdateEloCommand(player.getAggregateIdentifier(), membershipId, newElo))))
                            .doOnError(throwable -> log.error("Failed to update Elo for membershipId: {}", membershipId, throwable));
                }).then();
    }

 

Saga의 updateElo가 UpdateEloCommand 전달 → PlayerAggregate에서 Command Handler가 받고 apply(UpdateEloEvent 발생)

 

 

여러 문제 상황을 상정한 API 테스트

  1. 정상적인 게임종료
  2. 게임이 닷지된 상황
  3. CQRS 패턴에서 필요한 조회용 DB에 기록시 오류 발생하는 경우 (rollback)

생각이상으로 심오하다.

 

10명이 있다고 햇을때 3명째 업데이트하고 4번째 사용자가 장애가 발생해서 롤백해야하는 상황이라 하자.

 

앞의 3명만 롤백하고 나머지 사용자들은 롤백하면 안된다.

 

비동기라 순서가 정해지지 않는데 어떻게 구분해서 일부 인원만 다시 롤백하지…?

 

 

개꿀딱따구리 드디어 롤백 성공했다!!!!! 🎉🎉🎉

membershipId가 14, 15인 사용자 두명이 각각 blueTeam, redTeam으로 게임한 결과 resultA이다.

 

이벤트 소싱을 통해서 복원한 최신 값을 토대로 승패에 따른 Elo 점수 변동을 시도하는 과정이다.

 

membershipId:14는 처리된 이후로 15을 작업하던 중 오류가 발생한 상황을 만들었다.

 

rollback start: 1/2를 통해서 둘중 한명만 작업이 완료된 상태임을 확인했고, 14만 롤백해주면 resultA 전적 반영 이전의 최신 값으로 돌아가게 된다.

 

테스트에 사용한 더미 데이터가 각각 14, 15이라 코드도 다음과 같다.

 

1번 경우 (이벤트 소싱은 완료되었지만 조회용 모델 업데이트중 오류 발생시)

private Mono<Void> updatePlayerElo(MembershipEloRequest request, String aggregateIdentifier, List<MembershipEloRequest> successfullyUpdatedPlayers) {
        String membershipId = String.valueOf(request.getMembershipId());
        Long newElo = request.getElo();
        UpdateEloCommand command = new UpdateEloCommand(aggregateIdentifier, membershipId, newElo);

        return Mono.fromFuture(() -> commandGateway.send(command))
                .then(Mono.defer(() -> {
                    if (request.getMembershipId().equals(15L)) {
                        log.error("에러 주입 - membershipId={} (intentional failure)", membershipId);
                        return Mono.error(new RuntimeException("에러 주입 - membershipId=" + membershipId));
                    }

                    return playerService.updateElo(membershipId, newElo);
                }))
                .doOnSuccess(aVoid -> {
                    successfullyUpdatedPlayers.add(request);
                    log.info("Elo 업데이트 성공: membershipId={}, newElo={}", membershipId, newElo);
                })
                .then();
    }

 

 

2번 경우 (이벤트 소싱과 조회용 모델 업데이트 모두 진행되지 않아서 오류 발생시)

private Mono<Void> updatePlayerElo(MembershipEloRequest request, String aggregateIdentifier, List<MembershipEloRequest> successfullyUpdatedPlayers) {
        String membershipId = String.valueOf(request.getMembershipId());
        Long newElo = request.getElo();
        UpdateEloCommand command = new UpdateEloCommand(aggregateIdentifier, membershipId, newElo);

        if (request.getMembershipId().equals(15L)) {
            log.error("에러 주입 - membershipId={} (intentional failure)", membershipId);
            return Mono.error(new RuntimeException("에러 주입 - membershipId=" + membershipId));
        }

        return Mono.fromFuture(() -> commandGateway.send(command))
                .then(Mono.defer(() ->  playerService.updateElo(membershipId, newElo)))
                .doOnSuccess(aVoid -> {
                    successfullyUpdatedPlayers.add(request);
                    log.info("Elo 업데이트 성공: membershipId={}, newElo={}", membershipId, newElo);
                }).then();
    }

 

 

Flux.fromIterable은 비동기로 처리하기 때문에 순서를 보장하지 않는다.

 

따라서 15를 먼저 처리하는 경우 먼저 업데이트된 내용이 없어서 아무 롤백도 진행하지 않고도 이전 상태로 만들 수 있다.

하지만 정확한 롤백을 확인하기 위해서 다음과 같이 순서를 보장해줬다.

 

    @SagaEventHandler(associationProperty = "spaceId")
    public void handle(ResultQueryUpdatedEvent event, EloService eloService) {
        log.info("ResultQueryUpdatedEvent received. Updating Elo... ");

        boolean isWin = event.getWinTeam().equalsIgnoreCase("blue");
        List<MembershipEloRequest> newEloRequests = eloService.updateElo(getEloRequests(event), isWin);
        List<MembershipEloRequest> successfullyUpdatedPlayers = new ArrayList<>();

        // 14는 업데이트하고, 15에서 오류가 발생 -> 14만 롤백해서 이전 상태로 복구
        Flux.fromIterable(newEloRequests)
                .flatMap(request -> handleEloUpdateRequest(request, successfullyUpdatedPlayers))
                .then()
                .onErrorResume(throwable -> {
                    log.error("Elo 업데이트 중 오류 발생, 롤백을 진행합니다.", throwable);
                    return handleRollback(event.getSpaceId(), successfullyUpdatedPlayers, newEloRequests);
                })
                .subscribe();
    }

 

테스트 환경에서 '사용자중 일부만 실력점수를 업데이트하고, 일부는 아직 진행되지 않은 상태에서 롤백하는 상황'을 구현하기 위해 flatMap -> concapMap으로 순서를 보장했다.

 

 

최종 롤백 테스트 결과

2025-03-06T02:53:48.120+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : GameResultSavedEvent received. Updating Result-Query...
2025-03-06T02:53:48.121+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : createResultQueryEvent success: com.ns.common.CreateResultQueryEvent@4c24cbbc
2025-03-06T02:53:48.125+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : ResultQueryUpdatedEvent successfully published
2025-03-06T02:53:48.163+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : ResultQueryUpdatedEvent received. Updating Elo... 
2025-03-06T02:53:48.255+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.application.service.EloService     : 업데이트 이전 Elo : [MembershipEloRequest(membershipId=16, team=blue, elo=2022), MembershipEloRequest(membershipId=17, team=red, elo=1976)]
2025-03-06T02:53:48.258+09:00  INFO 1 --- [result] [agaProcessor]-0] c.n.r.application.service.EloService     : 업데이트 이후 Elo 변동량 : [MembershipEloRequest(membershipId=16, team=blue, elo=2028), MembershipEloRequest(membershipId=17, team=red, elo=1969)]
2025-03-06T02:53:48.417+09:00  WARN 1 --- [result] [mandProcessor-0] o.s.c.annotation.AnnotationTypeMapping   : Support for convention-based annotation attribute overrides is deprecated and will be removed in Spring Framework 6.2. Please annotate the following attributes in @org.axonframework.modelling.command.AggregateIdentifier with appropriate @AliasFor declarations: [routingKey]
2025-03-06T02:53:48.448+09:00  INFO 1 --- [result] [mandProcessor-0] c.n.result.adapter.axon.PlayerAggregate  : UpdateEloEvent 16's elo: 2000 -> 2008
...
2025-03-06T02:53:48.533+09:00  INFO 1 --- [result] [ault-executor-0] c.n.r.adapter.axon.GameFinishedSaga      : Elo 이벤트 소싱 성공: membershipId=16, newElo=2028
2025-03-06T02:53:48.590+09:00  INFO 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : Elo 업데이트 성공: membershipId=16, newElo=2028
2025-03-06T02:53:48.607+09:00 ERROR 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : 에러 주입 - membershipId=17 (intentional failure)
2025-03-06T02:53:48.610+09:00 ERROR 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : Elo 업데이트 중 오류 발생, 롤백을 진행합니다.
java.lang.RuntimeException: 에러 주입 - membershipId=17
...
2025-03-06T02:53:48.620+09:00  INFO 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : rollback start: 1/2
2025-03-06T02:53:48.632+09:00  INFO 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : 성공적으로 업데이트된 사용자 16의 Elo를 롤백합니다. 이전 증가값:2022
2025-03-06T02:53:48.665+09:00  WARN 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : RollbackUpdateEloEvent received. Rolling back Elo changes...
2025-03-06T02:53:48.679+09:00  INFO 1 --- [result] [or-tcp-epoll-10] c.n.r.adapter.axon.GameFinishedSaga      : 모든 롤백이 완료되어서 분산 트랜잭션을 종료합니다.
2025-03-06T02:53:48.696+09:00  WARN 1 --- [result] [agaProcessor]-0] c.n.r.adapter.axon.GameFinishedSaga      : RollbackGameResultEvent received. Rolling back changes...
2025-03-06T02:53:48.714+09:00  INFO 1 --- [result] [mandProcessor-1] c.n.result.adapter.axon.PlayerAggregate  : UpdateEloEvent 16's elo: 2028 -> 2022
2025-03-06T02:53:49.270+09:00  INFO 1 --- [result] [/O dispatcher 1] c.n.r.adapter.axon.GameFinishedSaga      : c7a7c6ec-3005-40bb-818f-7d91520536e1에 해당하는 전적을 삭제합니다. true

 

 

Result-query 서비스의 통계 반영

게임이 종료되고 전적이 등록되면 해당 전적을 기반으로 통계 서비스 업데이트를 진행한다.

 

예를 들어,  op.gg에서는 다음과 같이 챔피언별 통계를 제공한다.

 

op.gg

 

CQRS 패턴 적용에 대해서 다룬 이전 게시글에서도 말했지만, 천만 판의 기록이 있다고 매번 조회할때마다 해당 기록들을 모두 분석해서 각 챔프별 승률을 조회하는 쿼리를 짜는건 상상만 해도 비효율적이다.

 

 

 

CQRS 패턴을 이용한 데이터 쿼리 - 게임 전적과 통계 구현

들어가기에 앞서리그오브레전드의 전적 검색이나 통계 시스템을 생각해보자. ​각 사용자의 전적에서는 사용한 챔피언별로 판수나 승률이 함께 조회된다. 심지어 각 챔피언별 판수나 승률, 아

downfa11.tistory.com

 

자세한 구현은 이미 작성했으니, 이번 글에서는 분산 트랜잭션을 통해 전적 업데이트와 함께 진행하도록 구성했다.

 

기존 아키텍처를 확장성 있게 짜놔서 그런가.. Saga에 의한 롤백 이벤트 처리도 별다른 비즈니스 작성 없이 진행할 수 있었다.

 

새삼 재사용성이 무슨 말인지 체감하게 된다. 

 

 

com.ns.resultquery.adapter.axon.GameResultEventHandler.java

 @EventHandler
    public void handle(GameFinishedEvent event) {
        log.info("Result Event Received: " + event);

        List<ClientRequest> allClients = ResultQueryMapper.getAllTeamClientRequests(event);
        String winningTeam = event.getWinTeam();

        Flux.fromIterable(allClients)
                .flatMap(clientRequest -> {
                    Long winCount = clientRequest.getTeam().equals(winningTeam) ? 1L : 0L;

                    MembershipResultEventDto membershipResultEventDto = ResultQueryMapper.getMembershipResultEventDto(clientRequest, winCount);
                    ResultEventDto resultEventDto = ResultQueryMapper.getResultEventDto(clientRequest, winCount);

                    return insertUserStatisticsUseCase.insertResultCountIncreaseEventByUserName(membershipResultEventDto)
                            .then(insertChampStatisticsUseCase.insertResultCountIncreaseEventByChampName(resultEventDto));

                })
                .doOnError(throwable -> log.error("Error : " + throwable.getMessage()))
                .subscribe();
    }

 

 

GameFinishedEvent를 중복해서 사용하지 않고, 새로 CreateResultQueryEvent를 만들었다.

@Getter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class CreateResultQueryEvent {
    private String winTeam;
    private String loseTeam;

    private List<ClientRequest> blueTeams;
    private List<ClientRequest> redTeams;
}

 

흠 근데 아직 게임 서비스가 제대로 완성되지 않았고, 당연히 통계에 어떤 데이터를 더 넣을지 확장성을 고민해야한다.

일단은 챔피언 승률 통계나 각 사용자의 전적 승률 통계를 제공하고 있어서 해당 정보만 넣었다.

 

 

 

 

모르면 당하는 오류라 기록하는 AxonFramework 트러블슈팅

SagaEventHandler의 associationProperty 오류

@SagaEventHandler(associationProperty = "gameId")에서 gameId 필드가 GameFinishedEvent에 없음 → spaceId임;... 

 

... 20 common frames omitted
Caused by: java.util.concurrent.ExecutionException: org.axonframework.common.AxonConfigurationException: SagaEventHandler public reactor.core.publisher.Mono<java.lang.Void> com.ns.result.adapter.axon.GameResultSaga.handleFailure(com.ns.common.events.GameFinishedEvent) defines a property gameId that is not defined on the Event it declares to handle (com.ns.common.events.GameFinishedEvent)
        at java.base/java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:396) ~[na:na]
        at java.base/java.util.concurrent.CompletableFuture.get(CompletableFuture.java:2096) ~[na:na]
        at org.axonframework.config.DefaultConfigurer.invokeLifecycleHandlers(DefaultConfigurer.java:885) ~[axon-configuration-4.8.0.jar!/:4.8.0]
        ... 24 common frames omitted
Caused by: org.axonframework.common.AxonConfigurationException: SagaEventHandler public reactor.core.publisher.Mono<java.lang.Void> com.ns.result.adapter.axon.GameResultSaga.handleFailure(com.ns.common.events.GameFinishedEvent) defines a property gameId that is not defined on the Event it declares to handle (com.ns.common.events.GameFinishedEvent)

 

 

 

CQRS 적용을 위한 AggregateQuery 조회시 오류

조회(Query)를 목적으로 하는 Aggregate 요청시 jackson의 직렬화 오류를 겪었다.

public Mono<FindPlayerQuery> queryToPlayerByMembershipId(String membershipId){
  return Mono.fromFuture(() -> queryGateway.query(
     new FindPlayerAggregateQuery(membershipId), FindPlayerQuery.class));
}
    
...

public Mono<FindPlayerQuery> queryToPlayerByMembershipId(String membershipId){
    return Mono.fromFuture(() -> queryGateway.query(
        new FindPlayerAggregateQuery(membershipId), FindPlayerQuery.class));
}

 

FindPlayerQuery 클래스의 생성자 및 필드가 직렬화 가능하도록 설정해야한다.

  • @JsonCreator, @JsonProperty 등

 

@Getter
@AllArgsConstructor
@NoArgsConstructor
public class FindPlayerAggregateQuery {
    private String membershipId;
}

 

조회를 목적으로 하는 질의(Query) 클래스에서 @NoArgsConstructor가 없어서 오류가 난거였다.

 

AxonFramework 사용할때 기본 생성자를 안넣어서 오류 생긴 적이 많아서 이젠 습관처럼 쓴다.