kafka

우당탕탕 C++로 Apache Kafka 통신하기 (rdkafka, modern-cpp-kafka)

downfa11 2024. 11. 21. 15:28

librdkafka 라이브러리를 통해서 Windows 환경에서 C++로 Apache Kafka를 사용하고자 했다.

백엔드로 구축한 로비 서버와 dedicated 서버간의 통신하기 위한 메세징 큐로 뛰어난 성능의 이벤트 브로커인 Kafka를 채택했었다.

IO Completion Port로 윈도우 환경에서 비동기, Non-Blocking IO 통신을 구축한 게임 서버와 통신해야할 로비 서버도 역시 블로킹 동작을 막기 위해서 Spring Webflux로 Netty 기반의 비동기 Non-Blocking IO를 구현했다.

서버간의 IPC 역시 블로킹 작업이 이뤄지면 오히려 일반적인 구조의 서버보다 느려지기 때문에 비동기로 이뤄져야한다!!

순전히 흥미본위로 시작한 기술 프로젝트에 가깝지만, 너무 어려워도 재밌다.

modern-cpp-kafka

• Need boost headers (for boost::optional) : auto.offset.reset"earliest", "latest" 또는 "none"

설치

https://docs.progress.com/bundle/openedge-kafka-guide/page/Install-the-Apache-Kafka-CC-library-on-Windows.html

참고한 자료 (공식 문서)

https://github.com/confluentinc/librdkafka/blob/master/examples/README.md

rdkafka를 통해서 kafka producer와 consumer를 간단하게 구현해서 테스트해봤음.

 

Docker-compose로 미들웨어 서비스들을 한꺼번에 키고 있다.

그중에서 Kakfa-ui를 통해서 시각화하면서 편리하게 테스트해볼 수 있었음.

 

 

아래의 트러블 슈팅 과정에서 후술하겠지만, rdkafka 라이브러리에 쓰이는 functionalwinsock2 간의 함수 충돌이 있어서 삽질을 좀 오래 했었다. 사실 해결 못할 줄 알고 중간자 미들웨어로 mailslot을 두고 통신하게 구현했었다.

 

처음에는 kafka properties를 설정한 뒤에 librdkafka 라이브러리에서 어떤 이유에서인지 winsock2와 충돌하는 현상임을 알아냈다.

 

그래서 포트 충돌을 의심했음. 얼마전에 방화벽을 잘못 건들여서 어어 하다가 다시 설정하면서 문제 생긴게 많아서.. 인바운드 규칙 설정이 덜 됐나..?

 

 

 

24-04-08 미들웨어 서버를 하나 더 둬서 통신하기로 했음

로비 서버(spring webflux) - Apache Kafka - middleware - 게임 서버(IOCP socket)

 

여기서 middleware는 winsock 소켓 구현과 rdkafka의 충돌로 인해서 따로 kafka의 메시지만을 송수신하는 역할로, 게임 서버와 MailSlot을 통해 프로세스간 통신을 한다.

일단은 임시적으로 구현한걸로 치고 rdkafka 오픈소스를 다시 읽어보겠다. 소스를 수정해서 미들웨어 없이 게임 서버와 직접적으로 Kakfa 통신이 가능하다면 제일 좋지만 그래도 mailslot의 성능적인 요소는 문제 될게 없어서 대충 쓸만하다고 본다.

 

  • kafka core 모듈화해서 프로젝트에 함께 저장
  • 게임 서버에서 mailslot으로 matchResponse를 받고, gameResult를 전송하기 (직렬화, 역직렬화)
  • 클라이언트와 게임서버간의 인가 재확인
  • 인가 과정에서 알수 없는 생성자 오류(해결)
  • 이전 버전 게임 서버의 매치메이킹 로직을 수정해서 픽창을 구현

mailSlot으로 게임 서버 프로세스로 kafka 메세지를 전달한다.

해당 스크린샷은 로비 서버에서 가져온 매칭 정보를 kafka-mailslot을 통해 게임 서버에서 받은 내용이다.

그러고 해결해서 잘 쓰고 있다!!

 

 

 

오류1 - LNK2020 함수에서 참조되는 확인할 수 없는 외부 기호

링커 오류는 진짜 미치겟다. Microsoft 공식 문서나 stackoverflow 가도 지들도 모른다고 함…

 

내용 자체는 미리정의되지 않은 변수나 함수가 사용되어서 컴파일러가 못알아먹는단건데, dll,obj에서 오류가 뜨니 부검도 못한다.

 

대충 해당 내용으로 \*.obj 파일에서 오류가 발생한다고 한다. 출력 내용을 보자.

1>producer.obj : error LNK2019: "__declspec(dllimport) class std::basic_string<char,struct std::char_traits<char>,class std::allocator<char> > __cdecl RdKafka::err2str(enum RdKafka::ErrorCode)" (__imp_?err2str@RdKafka@@YA?AV?$basic_string@DU?$char_traits@D@std@@V?$allocator@D@2@@std@@W4ErrorCode@1@@Z)"void __cdecl produce(class RdKafka::Producer *)" (?produce@@YAXPEAVProducer@RdKafka@@@Z) 함수에서 참조되는 확인할 수 없는 외부 기호
1>producer.obj : error LNK2019: "__declspec(dllimport) public: virtual __cdecl RdKafka::DeliveryReportCb::~DeliveryReportCb(void)" (__imp_??1DeliveryReportCb@RdKafka@@UEAA@XZ)"public: virtual __cdecl DeliveryReportCallback::~DeliveryReportCallback(void)" (??1DeliveryReportCallback@@UEAA@XZ) 함수에서 참조되는 확인할 수 없는 외부 기호
1>producer.obj : error LNK2019: "__declspec(dllimport) public: __cdecl RdKafka::DeliveryReportCb::DeliveryReportCb(void)" (__imp_??0DeliveryReportCb@RdKafka@@QEAA@XZ)"public: __cdecl DeliveryReportCallback::DeliveryReportCallback(void)" (??0DeliveryReportCallback@@QEAA@XZ) 함수에서 참조되는 확인할 수 없는 외부 기호
1>producer.obj : error LNK2019: "__declspec(dllimport) public: static class RdKafka::Conf * __cdecl RdKafka::Conf::create(enum RdKafka::Conf::ConfType)" (__imp_?create@Conf@RdKafka@@SAPEAV12@W4ConfType@12@@Z)main 함수에서 참조되는 확인할 수 없는 외부 기호
1>producer.obj : error LNK2019: "__declspec(dllimport) public: static class RdKafka::Producer * __cdecl RdKafka::Producer::create(class RdKafka::Conf const *,class std::basic_string<char,struct std::char_traits<char>,class std::allocator<char> > &)" (__imp_?create@Producer@RdKafka@@SAPEAV12@PEBVConf@2@AEAV?$basic_string@DU?$char_traits@D@std@@V?$allocator@D@2@@std@@@Z)main 함수에서 참조되는 확인할 수 없는 외부 기호
1>producer.obj : error LNK2019: "__declspec(dllimport) public: static int const RdKafka::Topic::PARTITION_UA" (__imp_?PARTITION_UA@Topic@RdKafka@@2HB)"void __cdecl produce(class RdKafka::Producer *)" (?produce@@YAXPEAVProducer@RdKafka@@@Z) 함수에서 참조되는 확인할 수 없는 외부 기호

 

내용 자체는 미리정의되지 않은 변수나 함수가 사용되어서 컴파일러가 못알아먹는단 소리.

그럼 어떻게 해결해야할까? 대충 몇 가지의 경우가 있다.

  1. main 함수가 없다는 소리임
  2. x64 라이브러리인데 x32로 쓰는 등 버전 호환이 섞여서 컴파일러가 인식못한다는 소리

1번의 경우는 main함수를 정의할 수 없다는 소리인데 보통 편집 단계에서 문제를 인식할 수 있는 수준이다.

2번이 좀 많을 텐데, 이 경우 해결책은 플랫폼과 구성 모두 통일시켜줘야하고, 라이브러리 또한 마찬가지이다.

mysql같은 경우는 32bit를 지원하지 않아서 오랜 시간 삽질한 경험이 있다. 이번 kafka에서는 그냥 디렉토리 설정을 잘못했던거 같다. 뭔가 대단히 잘못된 거같으면 추가 라이브러리나 종속성 디렉토리를 한번 더 확인하자.

보통 오류같은거 잘안올릴려하는데, 이번건 삽질하는데 시간이 너무 녹아서 모두 염두해두자고 올렸당

 

 

오류2 - winsock2 라이브러리와 충돌 문제

시작은 rdkafka 라이브러리를 도입해서 IPC 통신을 구축할려고 했을 때인데, 소켓 접속이 안되는 오류가 떴다.

??????? 잘되던게 kafka 라이브러리 추가하자마자????

 

로그를 띄워보니, 결국 오류가 나는 로직은 winsockbind() 함수였다.

winsock에 관한 오류는 win32에서 제공하는 GetLastError()를 통해서 확인할 수 있다.

오류코드 10022라니 인자 값이 문제란건데....  그 외에는 알 방법이 없었다.

 

편법으로 통신하고자 중간자 역할의 미들웨어를 구축해서 우회하고 넘어갔었지만, 나중에 생각치도 못한 경로를 통해서 원인을 파악해버렸다.

단일 서버의 타이머 쓰레드와 관련된 로직을 구성하면서 설계한 코드가 분명 표준 라이브러리만 사용했는데도

어디선가 많이 만나봤던 bind() : 10022 오류를 띄운 것이다.

어????????그럴리가...???????????/

그전까지 winsock과 외부 라이브러리의 충돌로 단정 짓고, 자연재해는 피해가자라는 마음가짐이었지만..

(사실 머리박아봤는데 방향성이 틀려서 결국 kafka 오픈소스만 쳐다본 셈)

해결의 실마리가 보이자 또 시간을 박아넣으면서 결국 표준 라이브러리의 functional에도 bind()라는 함수가 있음을 확인했다.

 

그러니까, winsock 라이브러리와 C++ 표준 라이브러리인 functional 헤더를 모두 사용해서 충돌한 것이다.

winsockbind()는 전역함수로 전역 네임스페이스에 속해있기 때문에 _using namespace std;_ 를 했더라도 전역 bind 함수가 우선 호출된다.

 

#include <winsock2.h>
#include <functional>

using namespace std;

int main() {
    ...

    ::bind(hServSock, (SOCKADDR*)&servAdr, sizeof(servAdr)); 
    // GetLastError(): 10022 bind error

    if (listen(hServSock, 5) == SOCKET_ERROR)
        ErrorHandling("listen error");

    return 0;
}

 

그것도 모르고 개발상의 편의를 위해 using namespace std;를 하다보니, 이름이 같은 bind() 라는 함수에 대해서 winsock와 표준 라이브러리(functional)간의 충돌이 일어났던 것이다.

winsock bind()를 호출할 때는 ::bind로 namespace의 bind()를 명시적으로 호출해주면 std::bind()와 구분되어 오류를 해결할 수 있다.

 

C++표준 라이브러리의 std::bind함수를 호출할 때는 std::bind로 써줘도 좋다.

 

#include <winsock2.h>
#include <functional>

int main() {
    ::bind(socket, ...); // winsock
    std::bind(...); // functional

    return 0;
}

 

덕분에 Polling 방식으로 통신하던 중간자 미들웨어를 삭제하면서 부하나 딜레이가 훨씬 줄었다.

 

 

 

오류3 - 메시지의 크기가 커서 데이터가 깨지는 현상 (kafka Producer의 생성 위치 오류)

아오 라이브러리 헤더 파일 뜯어보고 겨우 알았네;

rdkafka Properties.h의 내용을 보면, 여타 다른 Kafka Client들처럼 Broker의 속성(property)들을 추가할 수 있다.

 

const string& brokers = "topic.test.common";
Properties props({ {"bootstrap.servers", brokers} });

props.put("message.max.bytes", to_string(2048)); // message.max.bytes = 2048KB

 

주의해야할 점은, 이때 인자가 되는 Properties와 pram 값은 둘 다 const string& 형태이다.

기존의 Apache Kafka는 1MB 이하의 작은 데이터 단위 이벤트를 큐잉하는 목적이다.

* message.max.bytes는 default값으로 1024KB로 제공한다.

* 따라서 본인의 송수신 데이터 크기가 1MB를 초과한다면 message.max.bytes 크기를 키우거나, 메시지 압축을 진행해야한다.

특히 json으로 사용자 데이터 같은 내용을 송수신한다면 Kafka 설정을 해줘야 될 것이다.

나같은 경우는 프로젝트에서 다음과 같이 API화해서 사용했다.

*노파심에 말하는데, 크기 늘릴 생각보다 압축이나 먼저 하쇼!

 

class kafkaMessage {


public:
	const static string brokers;
	const static Topic resultTopic;
	const static Topic matchTopic;

	static Properties props;

	kafkaMessage() {
		kafkaMessage::props.put("message.max.bytes", to_string(2048));
	}
	static void KafkaSend(const Topic& topic, const string& message);
};

// kafkaMessage.cpp
Properties kafkaMessage::props({ {"bootstrap.servers", kafkaMessage::brokers} });

 

 

오류4 - record가 깨져서 전송되는 현상 (kafka Producer의 생성 위치 오류)

kafka-ui를 통해 확인한 메시지의 내용은 특수문자와 기호로 점철된 데이터여서 Consume할때 역직렬화를 실패하는 오류이다.

보통 데이터가 깨지는 현상은 유니코드 변환 과정에서 잘못되는 경우가 많아서, 먼저 UTF-8로 호환되는지에 대해서 찾아봤다.

C++의 std::string은 기본적으로 UTF-8과 호환되며, Kafka에서도 UTF-8을 호환하기에 문제가 없어야 했다.

원래는 잘 쓰던 녀석이었다니까?

 

디버깅을 위해서, Kafka 라이브러리 내의 Error.h 를 찾아서 다음과 같이 로그 코드를 작성했다.

cout << "Message (size:"<< message.size()<<")  : " << message.c_str() << endl;
// 장애 상황시 로깅
cerr << "Message failed to be delivered: "<<error.value()<<" " << error.message() << endl;

 

 

Log에서 error.value()가 "Broker: Invalid Message" 형태로 cerr 오류가 나왔다.

내가 로깅 작성한게 아니라 Kafka 라이브러리에서 제공하는 오류이다.

이 error.value()가 kafka messgae의 ErrorCode이다.

공식 위키에서 ErrorCode를 찾아보자.

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ErrorCodes

별 의미는 없어 보인다.

결국 해결은 KafkaProducer 객체의 수명 주기와 관련있는 로직 상의 실수였다.

원래는 아래의 이전 코드처럼 KafkaProducer를 static으로 선언하여 한 번 초기화하고 유지시켰었다.

이게 한번 메시지를 보낸 이후도 찌꺼기가 남아 있어서(?) 이후의 메시지 전송에 영향을 주는 걸로 판단했다.

아래와 같이 함수 내에서 매번 새로운 객체를 생성해서 작업하고, close() 해주면서 이전의 Producer의 상태가 영향을 미치지 않도록 했다.

이전 코드

kafkaMessage::KafkaProducer producer(kafkaMessage::props);

void kafkaMessage::KafkaSend(const Topic& topic, const string& message) {
	ProducerRecord record(topic, NullKey, Value(message.c_str(), message.size()));
    ...
}

// 	producer.close()는 KafkaMessage의 소멸자에서 호출했었다.

 

 

개선한 코드

void kafkaMessage::KafkaSend(const Topic& topic, const string& message) {

	KafkaProducer producer(kafkaMessage::props);

	ProducerRecord record(topic, NullKey, Value(message.c_str(), message.size()));
    ...
	producer.close();
}

 

메모리는 아파하지만 메시지의 안정성은 구출해낼 수 있었다!