당근마켓도 100만 MAU 뜨고 나서야 MSA를 도입했다는거 보면, 사실 그정도 규모 이하는 모놀리식으로 충분하다는 소리다.
도대체 왜 이렇게 MSA에 목메이게 되었는가..

이길 수 없다면 합류해라
나도 MSA하겠다
이거말고 진짜 MSA를 결심한 이유
원래 C++로 이뤄진 게임 서버와 통신하기 위해 Kafka를 사용했었다.
매칭 결과나 전적 결과를 저장하기 위해서라도 전달 과정에서 최소 한번은 디스크에 기록되어야 했다.
1. 매치메이킹의 결과를 받아서 게임 서버에 방을 생성
2. 게임이 끝나면 그 결과를 다시 웹서버로 전달
대충 목적인데 닭 잡는데 소 칼 쓰는 느낌이다.
그래서 kafka를 좀 더 효율적으로 쓰기 위해서 고도화를 고민해봤다.
일단 Redis를 이용하던 매치메이킹 기능이 스케줄러로 동작하고 있어서, 마이크로 서비스로 분산하면 메모리 부하를 줄일 수 있다는 장점이 있다.
그리고 기존의 ElaticSearch를 이용한 전적 시스템이 아주 부실했다.
기초적인 구현만 해둔 상태였고 CQRS 패턴을 이용해서 총 전적 판수, 승률, 챔프별 승률과 판수 등 사용자의 전적을 표시하면 효율적이고 재밌겠다...........ㅎ
설계부터 어느 정도 확장성을 염두해두고 러프하게 잡았는데, 역시 뭐 추가할때 편하다.
원래 기존 서비스가 reactive 기반으로 비동기 처리하고 있는데 굳이 분해 과정에서 바꿀 이유는 없다고 판단했다.
모든 서비스들은 webflux로 이뤄진다.
또한 Redis, Kafka, R2DBC(postgreSQL), Elasticsearch를 Reactive 방식으로 사용하는건 귀중한 경험이 될거여
아키텍처 분해 과정에서의 문제 해결
모든 마이크로 서비스에 공통적으로 들어가는 클래스나 IPC 통신에 관련된 요소들을 모아서 common 모듈을 생성했다.
Implementation project(path: ':common')
dependency에서 다음과 같이 사용하도록 했는데, 게임 전적 서비스에서 오류가 생겼다.
게임 전적 서비스는 C++ 게임 서버에서 게임이 종료되면서 결과 정보를 kafka를 통해 전달하면 이를 ElasticSearch에 기록하면서 사용자들의 Elo 점수를 업데이트하는 역할을 한다.
당연히 전적에 관한 토픽을 구독하는데.. reactive elasticsearch의 접속이 거부된다고 한다.
분해하기 전까지만 해도 잘 썼었는데???????
삽질의 결과... @ComponentScan 어노테이션을 생각없이 써서 생긴 문제였다.
@SpringBootApplication
@ComponentScan(basePackages = {"com.ns.common"})
public class ResultApplication {
public static void main(String[] args) {
SpringApplication.run(ResultApplication.class, args);
}
}
어휴 내가 추가적으로 해당 경로도 스캔~ 정도로 안일하게 생각했었던거지.
위 코드에서는 com.ns.common 경로만 스캔하고 있는데, 사실 전적 서비스의 경로인 "com.ns.result"도 함께 명시해줘야 한다.
사실 디버그 모드로 로그를 싹 다 뒤지고 나름 논리적으로 생각했는데도 안돼서 포기(..)하기 직전까지 갔었다.
오류 로그가 elasticsearch와의 연결 실패로만 떠서 elasticsearch쪽만 만지작거리고 있었다.
그러다가 Swagger에 Presentation Layer들이 보이지 않는 것을 보고 나서야 ComponentScan을 떠올릴 수 있었다.
응 역시 시간 박아서 안되는건 없어
IPC(Inter Process Communication)
각 마이크로 서비스간의 통신 처리를 kafka로 진행했다. 이거 하려고 시작부터 어그로끈거 맞다
솔직히 고성능 미들웨어를 기존 방식대로 게임서버와 전적, 매치를 위해 토픽 3개만 쓰던건 너무 낭비였다. 오버 테크놀로지
그래서 여러 마이크로 서비스를 지나는 분산 로직으로 구성해 kafka 사용의 비중을 최대한 늘렸다.
resistance 서비스를 MSA로 구축할때는 프로젝트가 Spring MVC 기반이라 분산 로직을 구현할때 어려움이 있었다.
예를 들어서 친구 목록을 불러온다고 하면, 사용자 서버에서 요청을 받아서 사용자 id를 친구 서버로 전달해서 정보를 받아와야 한다.
그럼 사용자 서버는 친구 서버가 작업한 내용을 주기전까지 대기해야한다.
그래서 CountDownLatch를 써서 해당 쓰레드를 블로킹 상태로 만들고, 대기하면서 해당 작업에만 집중하도록 만들어서 진행했었다.

무작정 기다리는게 굉장히 위험한게, 친구 서버가 장애 발생시 해당 요청은 무한 루프에 빠지게 된다. 그 지연 시간동안 다른 작업을 처리하지 못하고 쓰레드가 붙잡히게 되니 관리하기 어려웠었다.
하지만 비동기로 작동하니 이 await가 편해졌다.

오예
작업이 완료되길 기다리면서도 해당 쓰레드를 블로킹시키지 않고 다른 작업을 할 수 있어서 노는 스레드가 없어진다.
다른 일하면서 놀다가도 해당 작업이 완료되었음을 시스템이 이벤트 방식으로 감지해서 필요한 처리를 한다.
sendTask("Post",task) //kafka를 통해 Task를 전송
.thenMany(waitForTaskResult(task.getTaskID()))
.flatMap(result -> {
// 받아온 결과 Task에 관한 로직 처리
})
다음과 같이 IPC를 담당하는 Task 클래스를 설계해두면 watiForTaskResult 함수에서 결과를 받아오는 동안 기다릴 수 있다.
작업의 format을 설계해 데이터를 요청하면, 해당 토픽을 구독하던 메신저들은 이를 받아서 해결한다.
참고로 @KafkaListener 어노테이션은 Spring MVC 위에서 동기적으로 수행하기에 reactive 환경에서는 사용할 수 없다.
고도화를 진행하면서 뭔가 잘못 되어가고 있음을 느껴버렸다.
Kafka Topic 설계
다른 마이크로 서비스에서 해당 서비스로 요청이 들어온 경우, 작업을 수행해서 전달하기 위해 topic.membership 토픽을 구독 하고 있다.
하지만 해당 서비스의 로직중에서 다른 서비스로부터 받은 데이터를 써야하는 경우, 다음과 같이 kafka로 메시지를 받고 있는데 여기서 받아야할 메시지가 앞의 run에서 소비되고 있는 상황.
두 가지 방법론이 생각났다.
Topic을 요청(request), 반응(response)로 나누기
- 해당 서비스의 작업에 다른 서비스로부터 요청을 받아와야하는 경우(task.*.request)
- 다른 서비스의 작업에서 해당 서비스의 작업이 필요한 경우(task.*.response)
Topic의 파티션에 대해서 Consumer-group을 나누기
해당 토픽에 여러 consumer-group이 있다면, 하나의 메시지를 서로 독립적으로 소비한다.
- consumer-group을 request, response로 나눠서 받아도 동일한 메시지에 대해서 서로 그룹별로 독립적으로 소비한다.
아무래도 topic을 세분화하는게 더 세련된 방식이라고 생각했고, 다음과 같이 로직을 나눴다.
1. ApplicationRunner를 구현한 후 run 함수를 오버라이딩해서 애플리케이션이 시작될때 실행되도록 해서 receiveAutoAck().subscribe()를 계속하는 방식
2. 로직 중간에 kafka 메시지를 전송하고, 다른 마이크로 서비스에서 작업을 처리한 뒤에 다시 메시지가 도착하는 것을 기다리는 방식
response.* 방식처럼 다른 서비스에서 온 요청 메시지를 소비하는 경우는 1번을 따르고 request.* 방식은 2번의 경우로 이뤄질 거다.
task.membership.request와 task.membership.response 토픽을 구독하는데 하나의 consumer-group을 쓰고 있었다.
→ 해당 토픽들이 파티션 1개씩만 가지고 있다면 각 컨슈머는 하나의 토픽만 구독한다.
하지만 k8s 환경에서 각 마이크로 서비스마다 여러 노드를 구성하기 때문에 복수 개의 컨슈머가 생길 예정이다. 미리 감안해둬야한다
2번 방식으로 진행했다가 생긴 컨슈머 활성화 오류

membership 서버에서 해당 사용자 Id가 작성한 게시글 목록 정보를 요청한다.
send [topic.membership]: Task(taskID=181f6611-f55f-4b03-b46c-bd4293883646,
taskName=Post Response, membershipId=1,
subTaskList=[SubTask(membershipId=1, subTaskName=PostSummary
, data=PostSummary(id=1, sortStatus=announce, nickname=nickname, title=string, likes=0, comments=0, views=0, createdAt=2024-09-03T04:22:50.458223)
, taskType=post, status=ready)])
feed 서버도 해당 요청을 kafka로 잘 수신한 모습
received : SubTask(membershipId=1, subTaskName=PostSummary
, data={ id=1, sortStatus=announce, nickname=nickname, title=string, likes=0, comments=0, views=0, createdAt=2024-09-03T04:22:50.458223},
taskType=post, status=ready)
근데 feed 서버로부터 작업의 결과를 기다리고 있는 membership에서 메시지를 읽지 못하고 있다.
컨슈머의 구독(subscribe)는 ReceiverOptions.subscription()을 통해 설정되었지만 receiveAutoAck()를 호출해도 컨슈머 활성화가 되지 않고 있는다. 흠
그래서 1번 방식으로 다시 구현했다!
지연 시간이 길어지는 오류 - 속도가 느려지는 원인이 뭘까?
ApplicationRunner를 implements해서 만든 run 함수를 오버라이딩한 1번 녀석
그리고 로직 중간에 다른 마이크로 서비스로 요청을 보내고 작업 메시지 수신을 비동기로 대기하는 2번 녀석
private Flux<Task> waitForTaskResult(String taskId) {
return TaskRequestConsumerTemplate
.receiveAutoAck()
.filter(record -> taskId.equals(record.key()))
.map(record -> record.value());
}
이 두 녀석간의 속도 차이가 너무 괴랄하게 난다.
전자는 60대 ms로 짱빠른데 반면..
2024-09-04T08:10:14.897+09:00 INFO 1 --- [membership] [onsumer-group-2]
com.ns.membership.service.UserService :
TaskRequestConsumerTemplate received : Key = a02705a0-8580-425d-8afe-53deb22beeaf, Value = Task(taskID=a02705a0-8580-425d-8afe-53deb22beeaf, taskName=Post Response,
… 중략
2024-09-04T08:10:19.918+09:00 ERROR 1 --- [membership] [ parallel-5]
com.ns.membership.service.UserService :
Error occurred: Did not observe any item or terminal signal within 5000ms in 'map' (and no fallback has been configured)
데이터 다 받아오고도 5초 지연이 생긴다. 왜???
스트림 방출의 종료 신호를 보내지 않아서 지연이 계속되는 현상인가?
- take(n) : 해당 연산자는 스트림이 n개의 요소를 방출한 후 종료한다.
- first() : 첫번째 요소를 방출하고 종료한다.
- takeUntil(other) : 다른 Publisher가 방출된 경우까지 스트림을 방출. 종료 조건 설정
- filter(predicate) : 조건(predicate)을 만족하는 요소만 방출
불러오면서 filter를 통해 taskID가 동일한지 검출하는 영역이 있는데, 이 녀석을 빼고 모든 메시지를 수신하게 해도 지연은 그대로 이뤄진다. filter()는 무죄다.
timeout을 설정해두는 방안도 생각했지만 애초에 메시지를 수신하고도 지연이 오래 걸리기에 본질적인 문제점이 있다고 생각했다.
그래서 처음에는 Consumer 옵션 중 AUTO_OFFSET_RESET_CONFIG를 earliest한게 초기 지연을 발생시킨 요인이라고 판단했다.
AUTO_OFFSET_RESET_CONFIG
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG : Kafka Consumer가 어떤 offset부터 읽을지 정하는 옵션
earliest : Consumer Group에 저장된 offset이 없다면 가장 오래된 메시지부터 읽기 시작해서 데이터 누락을 방지하면서 재처리한다
latest : 가장 최근의 offset부터 읽어서 새로운 메시지만 읽는다. (default)
작업에 성공한 로직에 대해서도 지연이 길다면, 차라리 메시지를 보존하지 않고 재시도하도록 하는게 더 사용자 경험이 유용할 것이라고 판단해서 earliest 설정을 지웠다.

그래도 3초나 걸린다
근데 애초에 메시지가 몇 만개 되는 것도 아니고 한두 개 더 불러온다고 지연이 생긴다는게 말이 안된다.
내 동물적인 감각은 소비 방식의 차이 때문에 컨슈머 초기 설정으로 지연이 생기는 거라고 말해주고 있다.
함수 호출시 매번 컨슈머 연결 문제로 지연이 발생하는 걸로 추측중이다.
그래서 다음 구조로 taskResult를 모아두고 taskId에 맞는 녀석을 읽는 방식으로 변경했다.
private final ConcurrentHashMap<String, Task> taskResults = new ConcurrentHashMap<>();
private final int MAX_TASK_RESULT_SIZE = 5000;
@Override
public void run(ApplicationArguments args){ //ApplicationRunner
this.TaskRequestConsumerTemplate
.receiveAutoAck()
.doOnNext(r -> {
Task task = r.value();
taskResults.put(task.getTaskID(),task);
if(taskResults.size() > MAX_TASK_RESULT_SIZE){
taskResults.clear();
log.info("taskResults clear.");
}
})
.doOnError(e -> log.error("Error: " + e))
.subscribe();
}
애플리케이션 시작시 run 함수를 통해서 초기화할때 쓰이는 ApplicationRunner에서 subscribe()한다.
* subscribe() : 스트림을 소비하거나 에러 처리, 완료 확인시 사용되어 특정 작업을 수행
펠리컨적 사고로 일단 속도 문제를 해결
온갖 근거없는 추측과 낭설이 오가는 가운데, 이 방식으로 지연 문제를 해결해버렸다는 점이 가장 중요하다. 짱빨라서 기분좋다.
하지만 위 코드처럼 While 루프를 통한 블로킹 동작은 비효율적이다.
sleep하는거도 거슬려 죽겠다
블로킹 함수의 처리를 최대한 피해서 비동기 방식의 성능 저하를 막고자 하는게 reactor 방식의 핵심이자 본질이라 생각한다.
그래서 어케 처리할까 고민하다가 Project Reactor의 스케줄러에 대해 찾아봤다.
Project Reactor - 스케줄러 (Scheduler)
Project Reactor에서 제공되는 Pub/Sub 모델은 실행될 쓰레드를 전략적으로 선택할 수 있는 interface를 제공한다.
- publicOn(Scheduler scheduler) : 어디에서 Publish될 것인가
- subscribeOn(Scheduler scheduler) : 어디에서 Subscribe될 것인가
스케줄러(Scheduler)는 publishOn이나 subscirbeOn에서 사용 가능한 다양한 flavor를 제공
- parallel() : 빠른 Runnable 논블로킹 수행에 최적화
- reactor.schedulers.defaultPoolSize(default=CPU core 수)
- single() : low-latency Runnable 일회성 실행에 최적화
- elastic() : 더 긴 수행에 최적화 (쓰레드 수가 무한정 증가할 수 있는 blocking 작업)
- boundedElastic() : elasitc()와 동일하지만, 쓰레드 수를 제한함
- 기본 쓰레드 수 : reactor.schedulers.defaultBoundedElasticSize(default=CPU core 수 * 10)
- 쓰레드당 기본 Pool 크기 : reactor.schedulers.defaultBoundedElasticSize(default=100000)
Diffrence between boundedElastic() vs Parallel()
Schedulers.boundedElastic()는 ScheduledExecutorService 기반의 Worker에 의해 지원되며, IO 작업의 Blocking 처리에 적합한 Elastic()와 완전 동일하다. (쓰레드 총 개수의 제한 여부)
그에 반해 Schedulers.parallel()은 CPU 바운드 작업을 위한 스케줄러로, 비동기적이고 논블로킹 작업에 적합
공식 문서에서도 boundedElastic()이 블로킹 작업에 효율적이고, parallet()은 논-블로킹 작업 처리에 뛰어난 성능을 보이는 스케줄러(Scheduler)라고 소개한다.
스케줄러 사용에 대한 이해를 돕기 위해 코드를 작성해봤다.
Mono<Void> task = Mono.fromCallable(() -> {
Thread.sleep(2000); // 2000ms = 2sec
System.out.println("야용");
return null;
}).subscribeOn(Schedulers.boundedElastic());
task.subscribe(result -> {
System.out.println("작업 종료");});
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
fromCallable()을 통해 블로킹 작업을 수행시, 작업이 현재 쓰레드를 차단하지 않고 별도의 스레드에서 실행되도록 한다.
subscribe를 통해 작업을 실행하고 결과를 처리한다.
그리고 블로킹 작업이 완료되길 기다리기 위해 3초 더 대기한다.
개선과 성능 테스트
일단 boundedElastic 방식의 스케줄러를 통해 블로킹 과정을 최대한 효율적으로 사용하도록 구성해서 IPC를 구현했다.
return Mono.fromCallable(() -> {
while (true) {
Task resultTask = taskResults.get(taskId);
if (resultTask != null) {
List<PostSummary> postSummaries = convertToPostSummaries(resultTask);
return postSummaries;
}
Thread.sleep(50);
}
})
.subscribeOn(Schedulers.boundedElastic())
.timeout(Duration.ofSeconds(3));
하나의 작업에서 최대 3초를 넘지 않도록 제한해서 나중에 한번에 많은 트래픽이 몰려도 가용성에 지장을 주지 않도록 의도했다.
JMeter로 성능 테스트를 진행해봤다.
Number of Threads : 200, Ramp-up period : 1, Loop Count : 5
평균 Latency 992 ms, TPS 164.2sec가 나온다.
못봐줄 정도의 성능은 아닌데, 그래도 여러 마이크로 서비스를 로컬에서 docker-compose로 돌리니까 컴퓨터 메모리 부하가 심하다.
원하는 성능이 안나온 이유
그리고 개발 단계에서 Slf4j의 블로킹 방식의 로깅을 매 함수 호출마다 진행하면서 성능 저하가 심한 상태임을 감안해야한다.
수정 - 펠리컨적 사고의 후일담.
사실 Reactive Stream에 대한 이해 부족 문제였다.
개선끝에 다음과 같은 로직으로 이루어 진다.
Feed 서비스에서 해당 사용자 이름으로 검색할때, 토큰에 기록된 사용자의 membershipId를 통해 membership 서비스와 통신하는 로직.
public Mono<String> getUserNameByPost(Long membershipId) {
List<SubTask> subTasks = createSubTaskListPostUserNameByMembershipId(membershipId);
Task task = createTaskGetUserName(membershipId, subTasks);
return sendTask("task.membership.response",task)
.then(waitForGetUserNameTaskFeed(task.getTaskID())
.subscribeOn(Schedulers.boundedElastic()));
}
1. membershipId에 해당하는 사용자의 이름을 요청하는 SubTask들을 담아서 전달한다.
2. Kafka를 통해 membership의 응답을 처리하는 토픽으로 전송한다.
3. 그 다음, taksId에 해당하는 Task가 반환되어 돌아올때까지 비동기로 await한다.
단, subscribeOn(Schedulers.boundedElastic())으로 IO 작업 처리에 특화된 boundedElastic 쓰레드풀을 사용해서 비동기적으로 처리한다.
여러 시도를 하면서 가장 많이 겪은 실패 상황은 waitFor... 함수 동작 중에 있는 timeout을 초과해서 아래의 오류를 내는거였다.
Kafka를 통해 membership 서비스로부터 요청한 Task의 결과를 제대로 전송했음에도 말이다!
java.util.concurrent.TimeoutException: Did not observe any item or terminal signal within 3000ms in 'next' (and no fallback has been configured)
at reactor.core.publisher.FluxTimeout$TimeoutMainSubscriber.handleTimeout(FluxTimeout.java:296) ~[reactor-core-3.6.3.jar!/:3.6.3]
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
계획대로라면 비동기적으로 await하면서 Kafka를 통해 들어온 메시지의 taskId를 확인해서 비즈니스를 이어서 작업해야한다.
consume 로직에서부터 로깅해봐도 받지 못하는 현상에 답답했지만..
앞서 이야기했듯이 Reactive Stream에 대한 이해부족 문제였다.
public static Mono<String> waitForGetUserNameTaskFeed(String taskId) {
return Flux.interval(Duration.ofMillis(500))
.map(tick -> taskResults.get(taskId))
.filter(Objects::nonNull)
.take(1)
.map(task -> {
SubTask subTask = task.getSubTaskList().get(0);
return String.valueOf(subTask.getData());
})
.next()
.timeout(Duration.ofSeconds(3))
.switchIfEmpty(Mono.error(new RuntimeException("Timeout waitForGetUserNameTaskFeed for taskId " + taskId)));
}
Reactor는 기본적으로 현재 실행중인 쓰레드에서 작업을 수행한다.
즉, subscribeOn이나 publishOn을 명시하지 않으면 현재 스레드에서 모든 연산이 수행된다는 말이다.
이벤트 루프에서 저러고 있으니까 똑같이 이벤트 루프의 쓰레드를 사용하는 ReactvieKafka Consumer와 충돌해버려서 지연되던 것이다.
Reactor의 이벤트 루프(주 쓰레드)는 동시에 하나의 작업만 처리할 수 있다.
이를 해결하기 위해서 boundedElastic 쓰레드 풀을 이용해서 Kafka Consumer 쓰레드를 점유하지 않도록 하여 메시지를 정상적으로 소비할 수 있었다.
출처 및 인용.
https://stackoverflow.com/questions/61304762/difference-between-boundedelastic-vs-parallel-scheduler
https://projectreactor.io/docs/core/release/api/reactor/core/scheduler/Schedulers.html
'project > wargame' 카테고리의 다른 글
포탑의 공격 우선순위와 병종 시스템 개선 (feat. Kafka 메시지 유실) (0) | 2025.01.17 |
---|---|
매칭 서버의 대규모 트래픽 시험 검증하기 (feat. Kafka 성능 튜닝) (2) | 2025.01.04 |
사용자 경험 개선을 위한 시도 (Banner 구현, Addressable 패치) (0) | 2025.01.04 |
게임서버 개선을 위한 시행착오들 (feat. 게임서버 타이머, 매치메이킹 정책) (1) | 2025.01.04 |
CQRS 패턴을 이용한 데이터 쿼리 - 게임 전적과 통계 구현 (0) | 2025.01.03 |