복잡한 비즈니스 로직을 수행하고 많은 이벤트를 발행하는 서비스에서 허울좋게도 항상 트랜잭션이 성공한 후에 이벤트가 발행되진 않는다.
Spring에서 제공하는 transaction의 begin()
과 end()
의 범위와 kafka 트랜잭션의 범위를 관리해야한다.
Spring Kafka에서 트랜잭션 처리
중요한건, 트랜잭션의 관리이다. 별표 25개
프로젝트 github
https://github.com/downfa11/kafka-querydsl
GitHub - downfa11/kafka-concurrency: kafka 트랜잭션 관리와 Redisson 분산 락 처리
kafka 트랜잭션 관리와 Redisson 분산 락 처리. Contribute to downfa11/kafka-concurrency development by creating an account on GitHub.
github.com
문제가 발생하는 부분은 kafkaTemplate.send
와 Spring Transaction의 실행 시점의 차이이다.
예시 상황을 두겠다.
1. 사용자 서비스에서 사용자 등록 이후 이벤트 발행
2. 아직 transaction commit
이전이기에 db commit
은 안됐음에도 이벤트가 발행
3. 이벤트를 구독하는 서비스들(주문 등)에서 해당 userId 조회시 없어서 예외 발생
물론 @TransactionalEventListner
어노테이션을 사용해서 트랜잭션이 종료된 이후에 이벤트를 발행하면 똑같이 이벤트 발행 시점을 트랜잭션 이후로 미룰 수 있긴 하다.
하지만 이번 게시글에서는 Kafka 트랜잭션과 DB 트랜잭션을 연관시켜, DB 트랜잭션 종료 후에 이벤트를 발행하도록 관리해보겠다.
TransactionIdPrefix("tx-")
를 통해 트랜잭션 처리
Spring Transaction 이후로 이벤트를 발행하기 위해서 Kafka Transaction에 대해 설정해야한다.
일단 기존 방식대로 kafka와 transaction trace를 로깅하도록 설정하고 register()
함수를 호출해보겠다.
Producer
@Transactional
public void register(Long id, Long count, String kind) throws InterruptedException {
kafkaTemplate.send("transaction-topic", createEvent(id,count,kind));
Thread.sleep(1000);
if (id == null) {
throw new IllegalArgumentException();
}
}
Consumer
@KafkaListener(topics = "transaction-topic",groupId = "group_order")
public void create(OrderEvent event) {
log.info("received : " + event.toString());
}
로그의 출력 결과는 다음과 같이 트랜잭션이 종료되기 전에 Conumser가 이미 이벤트를 받아 처리가 되었다.
Getting transaction for [ns.example.kafka_querydsl.TransactionService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2caf282e]
Sending: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=phone, count=100), timestamp=null)
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2caf282e] send(ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=phone, count=100), timestamp=null))
Sent: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=phone, count=100), timestamp=null)
Sent ok: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=phone, count=100), timestamp=null), metadata: transaction-topic-0@0
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@2caf282e] close(PT5S)
received : OrderEvent(id=1, kind=남석, count=100)
Completing transaction for [ns.example.kafka_querydsl.TransactionService.register]
로그의 순서를 확인해보면 이벤트를 전송한 이후에 트랜잭션이 완료(Completing transaction)되는 것을 확인할 수 있다.
우리가 하고자 하는 것은 이벤트 발행과 DB 트랜잭션을 묶어서 DB 트랜잭션이 완료되고 나면 이벤트를 발행하도록 하는 것이다.
다음과 같이 TransactionIdPrefix("tx-")
를 통해 트랜잭션을 시작할 수 있다.
스프링에서 자동으로 KafkaTransactionManager
빈을 구성해서 리스너에 연결해둔다.
우리는 spring.kafka.producer.transaction-id-prefix
값을 설정하기만 하면 된다.
kafkaConfig
@Bean
public DefaultKafkaProducerFactory<String, String> producerFactory() {
DefaultKafkaProducerFactory<String, String> producerFactory = new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setTransactionIdPrefix("tx-");
return producerFactory;
}
자, 설정을 마치고 다시 테스트 케이스를 실행해보자.
실행 결과
Getting transaction for [ns.example.kafka_querydsl.TransactionService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@654de23e]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@654de23e] beginTransaction()
Sending: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null)
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@654de23e] send(ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null))
Sent: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null)
Sent ok: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null), metadata: transaction-topic-0@10
received : OrderEvent(id=1, kind=남석, count=100)
Completing transaction for [ns.example.kafka_querydsl.TransactionService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@654de23e] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@654de23e] close(PT5S)
Completing transaction 이후에나 Kafka commitTransaction()
이 나오는 걸 확인할 수 있다.
즉, DB 트랜잭션이 끝나고 kafka 트랜잭션을 commit한다.
DefaultKafkaProducerFactory
에 TransactionIdPrefix
를 설정해서 kafka 트랜잭션으로 동작하게 했다.
이벤트 자체는 kafkaTemplate.send()
에서 발행되지만, Spring 트랜잭션 이후로 이벤트를 commit하게 처리한다.
kafka 트랜잭션 DB 트랜잭션과 다르게 우선 이벤트부터 발행하고 해당 이벤트에서 트랜잭션 결과를 마크한다.
이렇게 되면 이벤트 발행은 무조건 이뤄지는 거고, 구독자가 메시지를 소비하는 방식이 중요해진다.
위 결과 로그의 이상한 점이 한 가지 있다.
바로 메시지 소비 과정에서 commit 전의 데이터를 읽는 문제점이 있다.
트랜잭션 commit이 완료되면 Consumer가 받을 것을 기대했지만, 트랜잭션이 commit된 이후의 데이터를 읽는게 아니라 commit 전의 데이터를 읽는 경우가 생긴다.
spring.kafka.consumer.isolation.level=read_commited
으로 설정한다면 Kafka 트랜잭션이 commit된 이벤트만 구독하게 된다.
iolation.level
의 defualt는 read_uncommitted
이다.
아직 commit되지 않은 record까지 읽는데, 트랜잭션이 실패한 record를 포함해 모든 레코드를 읽어온다고 보면 된다.
spring.kakfa.consumer.isolation-level=read_committed
메시지 소비 로직에서 다음 트랜잭션 설정을 한 뒤 다시 테스트 해보자.
실행 결과
Getting transaction for [ns.example.kafka_querydsl.TransactionService.register]
Created new Producer: CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3fcad23e]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3fcad23e] beginTransaction()
Sending: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null)
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3fcad23e] send(ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = false), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null))
Sent: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null)
Sent ok: ProducerRecord(topic=transaction-topic, partition=null, headers=RecordHeaders(headers = [RecordHeader(key = __TypeId__, value = [110, 115, 46, 101, 120, 97, 109, 112, 108, 101, 46, 107, 97, 102, 107, 97, 95, 113, 117, 101, 114, 121, 100, 115, 108, 46, 117, 116, 105, 108, 115, 46, 79, 114, 100, 101, 114, 69, 118, 101, 110, 116])], isReadOnly = true), key=null, value=OrderEvent(id=1, kind=남석, count=100), timestamp=null), metadata: transaction-topic-0@12
received : OrderEvent(id=1, kind=남석, count=100)
Completing transaction for [ns.example.kafka_querydsl.TransactionService.register]
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3fcad23e] commitTransaction()
CloseSafeProducer [delegate=org.apache.kafka.clients.producer.KafkaProducer@3fcad23e] close(PT5S)
드디어 모든 트랜잭션이 종료된 후에 메시지를 소비해 수신받도록 했다!!
출처 및 인용
https://www.confluent.io/blog/hands-free-kafka-replication-a-lesson-in-operational-simplicity/
https://docs.spring.io/spring-kafka/reference/kafka/transactions.html#overview
https://gunju-ko.github.io/kafka/spring-kafka/2018/03/31/Spring-KafkaTransaction.html
'kafka' 카테고리의 다른 글
Kafka가 대용량 트래픽에 뛰어난 성능을 보이는 이유 (0) | 2024.11.21 |
---|---|
동물원을 탈출한 Kafka를 잡아왔습니다 (0) | 2024.11.21 |
동물원을 탈출한 Kafka를 찾습니다 (0) | 2024.11.21 |
토이프로젝트 - Apache Kafka 성능 비교 (0) | 2024.11.21 |
우당탕탕 C++로 Apache Kafka 통신하기 (rdkafka, modern-cpp-kafka) (0) | 2024.11.21 |