일단 큐잉을 통한 메시지 브로커부터 만들어보고.. 목표는 분산 처리하는 이벤트 브로커 구현
Cpp 뿐만 아니라 Java, Python 클라이언트까지 개발할 생각이다.
Broker를 중앙에서 관리해야하는가? 아니면 Kafka처럼 브로커를 독립 실행형으로 구현해야하는가?
젠장 중앙에서 메타데이터를 관리할 Zookeeper나 KRaft 역할을 할 녀석도 만들어야한다..
→ 일단 중앙 서버에서 관리하도록 하겠음
이벤트 브로커의 Server
서버 실행시 명령어를 통해 원하는 개수의 브로커를 생성할 수 있다.
CREATE_BROKER <IP> <Port>
: 자체적으로 brokerId를 발급하고 Metadata를 인자로 주입한다.
내부 구성은 크게 3가지 영역으로 구분했다.
1. metadata : broker나 cluster의 메타데이터를 관리
2. broker : cluster의 메시지 처리와 브로커의 동작을 수행
3. server : 클라이언트와 통신할 목적의 소켓 서버
현재 구현한 내용
1. Broker 관리(Topic 관리, Broker 추가 등)
2. Topic 및 Partition 관리
2. 메시지 처리(해당 Topic의 특정 Partition에서 수행)
3. 각 파티션의 리더 브로커와 팔로워간 Replication
4. Leader Broker 선출(Round-Robin 방식)
5. Cluster 내의 Broker, Topic 상태 모니터링
위 내용에서 클라이언트로부터 받은 명령을 처리
서버는 cluster 상태를 주기적으로 모니터링해서 클라이언트에게 전달해야함
클라이언트에서 Producer와 Consumer를 만들면 기능은 어느 정도 되는 것 같다.
disk에 저장하거나 압축하는건 안하고 싶다
파티션의 선별 과정
리더 파티션은 기본적으로 현재 broker로 설정했고, 리더를 선별하는 과정은 metadata에서 Round-robin 방식으로 선정하도록 하고 있다.
Consumer-Group을 구현하기 위해서 메시지에 포함된 Key 해시값을 기반으로 파티션에 메시지를 고정하는 기법도 구상해야한다.
Properties.h 구현
Kafka Client인 rdkafka를 Modern C++로 이전하는 오픈소스에 contribute 시도한 적이 있다.
실제 kafka처럼 설정을 범용성 있게 가져올 수 있다.
int numPartitions = properties.getIntProperty("num.partitions", 1);
int replicationFactor = properties.getIntProperty("replication.factor", defaultReplicationFactor);
int minInSyncReplicas = properties.getIntProperty("min.insync.replicas", 1);
metadata.addTopic(topicName);
테스트를 위한 함수를 작성해서 로직 확인
아직은 겉으로 볼때 그럴듯하다.
향후 계획 - 비동기 도입을 통한 이벤트 기반 모델
지금의 멀티쓰레드 방식의 동기식 winSock 모델에서 좀 다른 시도를 해볼까 한다.
이벤트 기반 모델(Event driven)을 채택해서 IOCP 모델과 함께 사용하면 재밋을 거 같아서 준비중이다.
Overedlapped IO 같은 비동기 이벤트 모델을 이용해서 각각의 브로커들은 Job을 등록하고 완료시 APC던 IOCP던 이용해서 작업의 결과를 콜백받는 구조
아마 다음 게시글은 이거 구현하는 과정이 될거다
트러블 슈팅
Cpp 이해 부족..
E0434 "TopicMetadata &" 형식(const 한정 형식 아님)의 참조를 "std::vector<PartitionInfo, std::allocator>" 형식의 값으로 초기화할 수 없습니다.
기존 방식
TopicMetadata Metadata::getTopicMetadata(const std::string& topicName) const {
std::lock_guard<std::mutex> lock(mutex);
if (topics.find(topicName) == topics.end()) {
throw std::runtime_error("Topic not found: " + topicName);
}
TopicMetadata metadata;
metadata.topicName = topicName;
metadata.partitions = topics.at(topicName);
return metadata;
}
일단 const 한정자가 있는데 TopicMetadata를 수정하려고 했다. 시그니처에서 const는 뺐다.
씁. TopicMetadata 객체를 복사해서 반환하는데, 실제 topics에 반영되지 않았다.
객체를 복사해서 반환할지 원본 데이터의 참조를 반환할지 고민이다
runtime_error - topicMetadata {topicName=<NULL> partitions={ size=2 } } TopicMetadata &..
broker의 로직 중에서 produceMessage(topicName, partitionId, message)
에서 생긴 오류
void Broker::produceMessage(std::string& topicName, int partitionId, const std::string& message) {
auto& topicMetadata = metadata.getTopicMetadata(topicName);
for (auto& partition : topicMetadata.partitions) {
if (partition.partitionId == partitionId) {
// std::lock_guard<std::mutex> lock(mutex);
partition.messages.push(message);
return;
}
}
throw std::runtime_error("Partition not found: " + std::to_string(partitionId));
}
분명 getTopicMetadata(topicName)
에서는 topicName이 분명하게 나오지만 반환값에서는 , 혹은 <문자열에 잘못된 문자가 있습니다.>으로 나왔다.
처음 metadata.getTopicMetadata(topicName)
이 반환하는 topicMetadata는 임시 객체로, 반환된 객체가 복사되거나 임시 객체로 생성된 것이다.
로컬 변수를 반환했기에 auto& topicMetadata
로 참조 선언하면 잠깐만 유효하고, 이후에 문제가 생길 수 있다.
TopicMetadata& Metadata::getTopicMetadata(std::string& topicName) {
std::lock_guard<std::mutex> lock(mutex);
auto it = topics.find(topicName);
if (it == topics.end()) {
throw std::runtime_error("Topic not found: " + topicName);
}
return it->second;
}
그래서 메모리에서 사라지지 않도록 참조를 반환했다.
이제 반환된 객체가 복사되지 않고, 실제 데이터에 대한 참조를 사용할 수 있다.
'kafka' 카테고리의 다른 글
Kafka의 내부 원리를 공부하기 위한 토이프로젝트 (0) | 2024.12.21 |
---|---|
Kafka는 고가용성을 어떻게 유지하는지 알아보자 (0) | 2024.12.17 |
Kafka가 대용량 트래픽에 뛰어난 성능을 보이는 이유 (0) | 2024.11.21 |
동물원을 탈출한 Kafka를 잡아왔습니다 (0) | 2024.11.21 |
동물원을 탈출한 Kafka를 찾습니다 (0) | 2024.11.21 |