Reactor 환경에서는 Spring WebFlux와 Reactor-Kafka를 활용해 Kafka 메시지를 Flux 스트림으로 자연스럽게 처리하는 것이 익숙하다. 하지만 Proactor 환경에서는 Kafka를 어떻게 처리할까?
이번 글에서는 C++ 게임 서버에서 Kafka Consumer를 IOCP 이벤트 루프에 통합했던 경험을 공유한다.
1. OS가 어디까지 처리하는지에 따라서 구분한다.
데이터가 소켓 버퍼에 수신되면 OS가 수신 이벤트를 전달(reactor)하거나, 나아가서 OS가 직접 IO 작업까지 하고 완료 결과를 통보(proactor)하여서 비동기적으로 IO 멀티플렉싱을 진행한다.
쉽게 설명해서 IO 멀티플렉싱은 읽을 데이터가 왔을때 작업하는 Reactor와 실제 작업이 완료되면 통보받는 Proactor로 구분할 수 있다.
Reactor 방식이란?
- Reactor 패턴은 이벤트가 발생하면 핸들러에서 즉시 처리하도록 이벤트 루프가 등록되는 방식
- Spring WebFlux + Reactor-Kafka 환경에서는 Kafka 메시지가 Flux 스트림으로 들어오고, 비동기 체인에서 자연스럽게 처리
Proactor 방식이란?
- Proactor는 OS가 I/O 완료를 감지하고, 완료 이벤트를 애플리케이션에 알리는 구조
- Completion Port를 이용해 이벤트를 Worker Thread Pool로 전달하고, 애플리케이션은 이벤트 처리에만 집중
2. Proactor 기반으로 Kafka 메시지 처리하기에 앞서
- Reactor (Spring WebFlux, Reactor-Kafka): 메시지가 Reactor stream에 들어오고 Flux 체인에서 처리.
- Proactor (IO Completion Port): OS가 Socket/File Event를 IO 작업 완료를 통지.
PostQueuedCompletionStatus()로 이벤트를 주입하여 동일한 Event source처럼 취급.
C++ rdkafka에서의 consumer->poll() 동작 과정
// Fetch messages
inline std::vector<consumer::ConsumerRecord>
KafkaConsumer::poll(std::chrono::milliseconds timeout)
{
// Commit the offsets for these messages which had been polled last time (for "enable.auto.commit=true" case)
commitStoredOffsetsIfNecessary(CommitType::Async);
// Poll messages with librdkafka's API
std::vector<rd_kafka_message_t*> msgPtrArray(_maxPollRecords);
auto msgReceived = rd_kafka_consume_batch_queue(_rk_queue.get(), convertMsDurationToInt(timeout), msgPtrArray.data(), _maxPollRecords);
if (msgReceived < 0)
{
KAFKA_THROW_ERROR(Error(rd_kafka_last_error()));
}
// Wrap messages with ConsumerRecord
std::vector<consumer::ConsumerRecord> records(msgPtrArray.begin(), msgPtrArray.begin() + msgReceived);
// Store the offsets for all these polled messages (for "enable.auto.commit=true" case)
storeOffsetsIfNecessary(records);
return records;
}
poll()메서드는 내부적으로 Broker Socket I/O + 메시지 wrapping + offset 관리를 처리- Kafka 메시지는 이미 Consumer 라이브러리 안에서 처리되므로, OS 수준의 Completion Port 이벤트와 다름
내부 코드의 동작 원리를 살펴보면, Kafka Consumer로 받은 메시지를 굳이 IOCP 루프에 넣을 필요는 없다는걸 알 수 있다.
하지만 여기는 게임서버다.
물론 Kafka Consumer를 통해 이미 가져온 데이터를 다시 큐에 집어넣는 과정이 불필요하지만, 서로 다른 소스에서 들어오는 이벤트를 각기 다른 쓰레드 모델에서 처리하면 동기화/Lock 비용이 커지게 되는 문제점 역시 간과할 수 없다.
예시를 들어보겠다.
- Kafka 이벤트 → "매칭 서버가 뱉은 매칭 결과"를 소비해서 matchResults 목록에 추가
- 소켓 이벤트 → 클라이언트에서 룸 생성 요청 들어오면 matchResults에서 해당 매칭 결과를 꺼내와 룸을 생성
서로 다른 이벤트 루프에서 동작하는 경우, matchResults라는 공유 리소스에 접근하기 위해서 Lock 처리 및 경합이 일어난다.
하지만 같은 IOCP Queue에서 순차적으로 처리하면 락 없이도 안전하게 접근할 수 있어서 race condition 문제가 해소된다.
여러 이벤트의 일관성을 유지하면서도 실시간으로 처리하기 위해 IO Completion Port에서 통합하도록 설계 철학을 세운 것이다.
불필요한 큐 오버헤드는 OverlappedEx 객체 풀링(Pool)을 염두하고, 스마트 포인터(std::unique_ptr)로 메모리를 자동 관리하는 식으로 문제점들을 보완하고자 한다.
IOCP Worker 수준의 병렬 처리
그리고 Kafka의 Partition 단위 병렬 처리와 별개로 IOCP Worker 수준에서 또 병렬 처리를 가능하게 해준다.
공유 쓰레드에 대한 race condition와 메시지 순서가 보장되지 않는다는 점만 조심하자.
예시로 특별히 대단한건 아니고, Consumer 1개가 혼자 Partition 0,1,2를 모두 구독하는 상태라고 가정하자.

Kafka는 순차적으로 여러 파티션들로부터 poll()해오고, 데이터를 병렬로 읽는다.
여기서 IOCP Worker들이 읽은 데이터를 통해 실제 로직을 처리하는 과정을 또 병렬로 처리한다고 이해하면 된다.
이해를 돕자면 Spring Java에서 @KafkaListener나 KafkaConsumer를 통해서 데이터를 poll()하면 Consumer 단위로 1개 쓰레드에서 메시지를 처리한다.
KafkaListener의 컨테이너 설정 중에는 병렬 처리에 관한 concurrency 속성이 존재하며, 이 친구는 Consumer Thread 개수를 늘려도 Partition 내의 순서가 깨지지 않는다는 특징이 있다.
별로의 쓰레드풀을 이용해서 애플리케이션 수준에서 메시지를 병렬 처리하는 경우와 IOCP Worker 방식이 동일하다고 보면 된다.
뭐 더 얘기하자면 concurrency 튜닝과 동일하게, 파티션 개수와 쓰레드 개수를 맞춰줄수록 최적의 효율을 낸다.
IO Completion Port는 일반적으로 코어 개수만큼 Worker Thread를 유지하기 때문에, 파티션 개수도 그에 맞추는 방법도 생각해 볼 수 있다.
Kafka를 IOCP와 통합하는 경우 주의해야할 점
명심해야하는 경우는, 비즈니스에서 Kafka의 메시지 순서 보장이나 세부 기능(batch, pause/resume)을 주로 활용하지 않고 있기 때문에 통합할 수 있는 것이다.
메시지의 순서가 중요한 경우, 큐에 2중으로 집어넣게 되면 순서를 엄격하게 보장할 수 없어진다. 위에서 언급한 병렬 처리로 인해서 순서에 맞게 poll해와도 결국 worker가 순서를 섞을 수 있기 때문이다.
이 경우에는 뭐 싱글 쓰레드로 Kafka 전용 Worker를 두면 되는데.. 점점 의미없는 짓이 되어가니 그럴바엔 통합을 포기하는게 낫다고 본다.
3. 실제 consumer.poll()을 IO CompletionPort로 통합하는 C++ 로직
OverlappedEx 구조체의 확장 (rwMode 추가)
#define READ 3
#define WRITE 5
#define KAFKA 7
typedef struct socketf {
SOCKET hClntSock;
SOCKADDR_IN clntAdr;
} PER_HANDLE_DATA, * LPPER_HANDLE_DATA;
typedef struct OverlappedEx {
OVERLAPPED overlapped;
WSABUF wsaBuf;
int rwMode; //READ or WRITE or KAFKA
BYTE* broken_data = new BYTE[1024];
int broken_data_size = 0;
bool header_recv = false;
bool header_broken = false;
bool data_broken = false;
} PER_IO_DATA, * LPPER_IO_DATA;
Kafka Consumer thread 동작
void MatchManager::KafkaConsumerThread(HANDLE hComPor) {
MatchManager::InitProperties();
int received_count = 1;
while (1) {
auto records = MatchManager::consumer->poll(std::chrono::milliseconds(500));
for (const auto& record : records) {
if (!record.error()) {
std::cout << std::endl;
std::string message = record.value().toString();
auto* overlapped = new OverlappedEx();
overlapped->rwMode = KAFKA;
overlapped->wsaBuf.len = (ULONG)message.size();
overlapped->wsaBuf.buf = new char[message.size() + 1];
std::memcpy(overlapped->wsaBuf.buf, message.c_str(), message.size() + 1);
PostQueuedCompletionStatus(hComPor, 0, 0, (LPOVERLAPPED)overlapped);
auto now = std::chrono::system_clock::now();
std::time_t current_time = std::chrono::system_clock::to_time_t(now);
// std::cout << "received_count : " << received_count++ << " ,timestamp: " << std::ctime(¤t_time) << std::endl;
}
else std::cerr << record.toString() << std::endl;
}
}
Finalize();
}
OverlappedEx + char[]동적 할당되는 부분은 내가 메모리 관리에 서툴러서 그렇다. 메모리 관리는 개선할 여지가 충분해서 별로 단점으로 안보인다.PostQueuedCompletionStatus()로 Kafka 메시지를 IOCP Queue에 밀어 넣었는데, Kafka 이벤트를 Socket, File I/O 이벤트와 동일한 경로로 처리하고자 의도
Kakfa로 받은 메시지 IOCP 처리
while (true) {
BOOL result = GetQueuedCompletionStatus(hComPort, &bytesTrans, (PULONG_PTR)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);
if (ioInfo->rwMode == KAFKA) {
std::cout << "KafkaMessageHandler 처리 시도" << std::endl;
MatchManager::KafkaMessageHandler(ioInfo);
delete[] ioInfo->wsaBuf.buf;
delete ioInfo;
continue;
}
sock = handleInfo->hClntSock;
...
}
- 실제 처리가
LPPER_IO_DATA(OverlappedEx)를 인자로 풀어서KafkaMessageHandler에서 이뤄짐 - 다음과 같이 내부적으로 마샬링한다.
std::string message(ioInfo->wsaBuf.buf, ioInfo->wsaBuf.len);
4. 정리
Kafka Consumer는 자체 poll loop를 통해 메시지를 가져오지만, 게임 서버에서는 IOCP 기반 단일 이벤트 루프에 통합하는 방식을 도입하고자 했다.
IOCP 기반으로 통합하면 각 이벤트들을 동일한 경로에서 처리하여 동기화/락 비용이 감소하고 전체 흐름이 단순해지는 것을 기대했지만, 이 방식은 큐 삽입/삭제 오버헤드, 메모리 관리, 메시지 순서 보장 등의 단점을 가진다.
이를 완화하기 위해 OverlappedEx Pooling, 스마트 포인터 기반 메모리 관리 그리고 Kafka 제어는 Consumer에서 먼저 처리하도록 보완했다.
결과적으로 게임 서버 이벤트 처리 일관성을 높이는 trade-off를 고민해보려 했지만, 현업에서는 어떻게 쓰는지도 모르고 확신이 없다.
논리적 근거가 부족하긴 해서 앞서 언급한 단점만 그득한 아키텍처일 가능성 50%. 틀린 내용이거나 별로 안좋은거 알게 되면 바로 정정하겠습니다.
'kafka' 카테고리의 다른 글
| 실시간 처리시 ReactiveKafka의 메시지 소비 지연 현상에 대해 알아보자 (0) | 2025.06.13 |
|---|---|
| 프로젝트에 Apache Kafka 도입이 오버 엔지니어링인지 판단해보자 (feat. Kafka Benchmark) (0) | 2025.03.21 |
| Kafka 원리를 이용한 메시지 브로커를 직접 구현해보자 (3) | 2024.12.21 |
| Kafka의 내부 원리를 공부하기 위한 토이프로젝트 (0) | 2024.12.21 |
| Kafka는 고가용성을 어떻게 유지하는지 알아보자 (0) | 2024.12.17 |