Cursus: BloomFilter를 이용한 컨슈머 벤치마크의 정확성 검사 확장
현재 컨슈머측 벤치마크 측정은 단순하게 전체 TPS와 발행된 메시지 수와 비교해서 유실된 메시지 수만 표시한다.
broker-consumer | Total messages consumed : 100000
broker-consumer | Actually processed messages : 100000
broker-consumer | Duplicate messages filtered : 0
broker-consumer | Consume elapsed time : 398.445467ms
broker-consumer | Consume Throughput : 250975.37 msg/s
broker-consumer | Processed Throughput : 250975.37 msg/s
broker-consumer | --- Partition Message Counts ---
broker-consumer | Partition [4] : 8333 messages
broker-consumer | Partition [11] : 8333 messages
broker-consumer | Partition [8] : 8333 messages
broker-consumer | Partition [1] : 8334 messages
broker-consumer | Partition [6] : 8333 messages
broker-consumer | Partition [10] : 8333 messages
broker-consumer | Partition [7] : 8333 messages
broker-consumer | Partition [5] : 8333 messages
broker-consumer | Partition [3] : 8334 messages
broker-consumer | Partition [2] : 8334 messages
broker-consumer | Partition [0] : 8334 messages
broker-consumer | Partition [9] : 8333 messages
이를 파티션 모델에 특화된 지표를 수집하는 벤치마크로 개선하는 과정에서, 성능 측정을 위한 벤치마크와 정확도 검사를 명확히 분리할 필요가 있다고 판단했다. 이제 enable_correctness, enable_benchmark 옵션으로 나눠서 메시지 소비가 얼마나 정확하게 이뤄지는지 검사한다.
이미 producer측에서 멱등성(idempotency) 처리나 브로커측에서 중복에 대한 처리를 수행하지만, 그 처리 결과를 검증하는 구간은 종단간(end-to-end) 테스트에서 메시지를 최종적으로 소비할 컨슈머가 맡아야 한다고 생각했다.
메시지의 중복 발행 문제는 동일한 partition-offset 꼴을 갖기 때문에 필터링하기 쉬운 편이다. 하지만 브로커측의 오류로 제때 발행에 대한 ACK를 주지 못한 경우는 같은 메시지를 다른 offset으로 멱등하게 재처리(retry)한다.
이러면 같은 내용의 메시지여도 partition-offset 꼴이 달라지기 때문에 중복을 판단하는게 불가능하다. 그럼 우째요?
블룸 필터(Bloom Filter)를 통한 메시지 중복 검사
블룸 필터란?
적은 메모리를 사용해서 방대한 데이터에 대해서 요소 포함 여부를 확률적으로 확인하는 근사 자료구조이다.
요소가 존재하지 않는 경우는 확실하게 No라고 판별할 수 있지만, 존재하는 경우에는 오판(False Positive)의 가능성이 있다.
즉, “존재하지 않는다”는 사실은 정확히 알 수 있지만, “존재한다”는 판단은 일정 확률로 틀릴 수 있다.
블룸 필터는 여러 개의 해시 함수를 사용해 데이터를 비트 배열(bit array)에 매핑한다.
구현을 위해서 알아둬야할 개념은 해시 함수, 비트 연산정도이다.
블룸 필터의 동작 원리:
- 메시지 키를 k개의 해시 함수로 변환 → k개의 비트 위치 지정
- 해당 위치를 1로 세팅
- 검사 시 모든 위치가 1이면 존재 가능, 하나라도 0이면 존재하지 않음
그럼 왜 블룸 필터처럼 오판 가능성이 있는 근사 자료구조를 쓰는가?
사실 정확하게 관리할려면 Hash Table(map, set) 쓰면 된다. 평균적으로 시간 복잡도 O(1)이 나오지만 최악의 경우 O(N)이다.
참고로 Set도 내부적으로 map과 동일하게 해시 테이블을 사용하되 Value값에 더미를 넣는 식으로 구현하는 편이다.
여기에 GC 부담이나 락 관리까지 신경쓰면 된다.
나도 concurrent map이나 set쪽 쓰는게 더 코드 직관적이긴 하지만 벤치마크의 락 관리로 오버헤드 생기는건 너무 본말전도다.
그에 반면, 블룸 필터는 O(k)의 비트 연산으로 아주 적은 메모리로 처리할 수 있다. (사실 해시 연산에 대해서 O(value length)만큼 더 해야하지만 넘어가자)
| 자료구조 | 메모리 사용량 | 비고 |
| Bloom filter | m (bits, -n * ln(fp) / (ln2)^2) | n=100,000, fp=0.001 → m ≈ 180 KB |
| Hash Table(map/set) | 약 24byte + key size/entry (offset + partition, 16byte) | 100,000(n) * 40~50byte → m ≈ 4~5 MB |
메모리 사용량만 놓고 봐도 약 23~28배 정도나 차이난다.
오판만 고려한다면 블룸 필터쪽이 락 관리나 GC 부담도 필요 없고, 비용적으로 수십 만 msg/s의 TPS에 적합하다고 볼 수 있다.
분산 시스템에서의 활용
특히 '존재하지 않는지 판단'하는 비용이 상대적으로 큰 분산 시스템에서 그 효용을 발휘한다.
Cassandra는 데이터가 존재하는지 확인하는 Disk IO를 최대한 줄이기 위해서 내부적으로 블룸 필터를 사용하고 있다.
이처럼 블룸 필터는 분산 시스템에서 대규모 메시지 처리, 높은 TPS 환경에서 메모리 효율과 성능을 동시에 잡는 단골 손님이다.
나 역시도 메시지 발행시 설정한 고유값 messageID를 블룸 필터에 넣는 식으로 적은 비용과 성능 유지라는 두 마리 토끼를 잡고자 했다.
cursus는 설계 과정부터 Kafka를 보고 공부하면서 진행했기 때문에, 메시지마다 고유값(producerID-seqNum-epoch) 역시 어느정도 이를 따른다.
블룸 필터 구현
간단하게 필요한 부분만 가져왔다. 블룸 필터의 해시 함수로 murmur쪽이 Go 표준에 없어서 안썼다.
package bench
import (
"encoding/binary"
"hash/fnv"
"math"
"sync/atomic"
)
type BloomFilter struct {
bits []uint64
m uint64
k uint64
}
func NewBloomFilter(expected uint64, fpRate float64) *BloomFilter {
m := uint64(-1 * float64(expected) * math.Log(fpRate) / (math.Ln2 * math.Ln2))
k := uint64(float64(m) / float64(expected) * math.Ln2)
size := (m + 63) / 64
return &BloomFilter{
bits: make([]uint64, size),
m: m,
k: k,
}
}
func hashf(data []byte) (uint64, uint64) {
h1 := fnv.New64a()
h1.Write(data)
sum1 := h1.Sum64()
var buf [8]byte
binary.BigEndian.PutUint64(buf[:], sum1)
h2 := fnv.New64()
h2.Write(buf[:])
sum2 := h2.Sum64()
return sum1, sum2
}
func (bf *BloomFilter) Add(data []byte) bool {
h1, h2 := hashf(data)
var seen = true
for i := uint64(0); i < bf.k; i++ {
idx := (h1 + i*h2) % bf.m
word := idx / 64
bit := uint64(1) << (idx % 64)
old := atomic.LoadUint64(&bf.bits[word])
if old&bit == 0 {
seen = false
atomic.OrUint64(&bf.bits[word], bit)
}
}
return seen
}
벤치마크는 메시지 소비시마다 Add 메서드를 통해서 넣고, 이미 블룸 필터에 존재하는 경우(seen)는 True를 반환한다.
이때 이미 k개의 해시 함수에 대해서 bits offset을 전부 차지하고 있다면 중복 메시지로 카운트한다.
정확도 검사의 결과 분석
오판 비율(fpRate)를 0.001로 설정해서 공식에 의해 비트수(m)와 해시 함수 개수(k)를 이상적으로 둔 결과, 다음과 같이 나왔다.
아, 참고로 메시지의 크기를 1KB로 설정한 벤치마크 테스트이다.
Total Messages : 100000
Elapsed Time : 0.35s
Overall TPS : 289589.07 msg/s
Duplicate Detected : 32 (Bloom filter, fp possible)
Message Loss : 0
Phase Total TPS : 282253.00
p95 Partition Avg TPS : 327776.59
p99 Partition Avg TPS : 327776.59
#10 total=8333 avgTPS=302138.2
#0 total=8334 avgTPS=305570.5
#9 total=8333 avgTPS=295242.9
#7 total=8333 avgTPS=297891.1
#2 total=8334 avgTPS=321543.7
#1 total=8334 avgTPS=332686.7
#5 total=8333 avgTPS=313307.9
#8 total=8333 avgTPS=310533.2
#3 total=8334 avgTPS=327776.6
#4 total=8333 avgTPS=279992.8
#11 total=8333 avgTPS=297874.3
#6 total=8333 avgTPS=304979.1
전체 발행한 100,000 메시지 중에서 최대 ~100건 수준의 오판(false positive)까지 허용 범위 이내이니, 32개의 중복이 나온 것 같지만 오차 범위 이내로 실제 중복은 아니었다.
False Negative는 없기 때문에 존재하지 않는 경우(데이터 유실)에 대해서 결과를 보장한다.