데이터에 기반한 사용자 행위 분석 - 어떤 데이터를 수집해야할까?
공격적인지 등의 플레이 스타일이나 성향 분석도 재밌을 거 같고, 인게임 데이터를 수집해서 사용자의 행위를 분석하면서 AFK나 탈주같은 트롤 행위를 검출하면 좋을 것같다.
추후 수집된 데이터들을 기반으로 다시 고민해보겠다. 일단 수집이 먼저다.
데이터 파이프라인(Data Pipeline) 구축
e커머스 서비스들로 예시를 들자면, 구매나 배송같은 트랜잭션 작업 이외에 장바구니에 담거나 다시 꺼내고 쿠폰을 적용하는 모든 사용자 액션을 로그 데이터로 수집하고자 한다.
이런 사용자 행위를 수집하고 분석하는 데이터 파이프라인을 구축하는 것을 목표로 한다.
기존에 이미 사용하고 있던 Kafka를 통해서 데이터를 스트리밍할 생각이다.
Case 1. Kafka Consumer가 이벤트를 받아서 구축한 ELK에서 분석
Case 2. Logstash가 브로커에 담긴 데이터를 복사하도록 담당(Kafka → ElasticSearch)
- Kakfa의 game-events 토픽을 구독, ElasticSearch의 game-logs 인덱스에 저장
logstash.conf 예시
input {
kafka {
bootstrap_servers => "localhost:9092"
topics => ["game-events"]
group_id => "logstash-group"
codec => "json"
}
}
output {
elasticsearch {
hosts => ["http://localhost:9200"]
index => "game-logs"
}
}
수집 과정에서 정제가 필요한 데이터의 경우는 Kafka Streams를 활용하겠다.
웹 백엔드 수준에서 수집하는 로그 데이터
웹 백엔드에 해당하는 회원 서비스, 피드 서비스, 매칭 서비스, 전적 서비스, 전적 통계 서비스에 대한 로그 데이터를 수집
Spring AOP를 이용해서 각 엔드포인트별 요청을 로깅
LoggingProducer는 사실 중요하지 않고, loggingConfig에서 로깅을 위한 Producer를 빈으로 등록하는 것만 염두해두자
공통 모듈(common) 수준에서 각 엔드포인트에 대한 요청들을 로깅한다. (Spring AOP)
@Aspect
@Component
@RequiredArgsConstructor
public class LoggingAspect {
private final LoggingProducer loggingProducer;
@Before("execution(* com.ns.*.adapter.in.web.*.*(..))")
public void beforeMethodExecution(@NotNull JoinPoint joinPoint){
String methodName = joinPoint.getSignature().getName();
loggingProducer.sendMessage("logging","Before executing method: "+methodName)
.subscribe();
}
}
게임 서버 수준에서 수집하는 로그 데이터
게임 내 의사결정, 전투 상황, 사용자의 플레이 스타일, 커뮤니케이션 패턴 등
플레이어의 성향 분석이나 트롤링, 피딩 등의 비정상 행위 탐지, 현재 메타 분석 및 전략 추천
- 각 티어나 점수대별로 분류해야함
현재 수집하고 있는 로그 데이터 종류
- 챔피언 선택 및 밴(champion_selection): 어떤 챔피언을 선택/밴했는지, 날짜(기간별)로 구성
- 킬/데스/어시스트(KDA, combat_kda): 킬, 데스, 어시스트 발생 시간 및 대상
- 아이템 구매/판매(item_transaction): 어떤 아이템을 어떤 타이밍에 구매/판매했는지
- 위치 기반 이동 경로(player_movement): 이건 도대체… 10초마다 한번씩 수집?
- 타워 및 오브젝트 처치 참여 여부(objective_participation) : 타워 및 오브젝트에 대한 킬로그 (시간대, 챔피언)
- 딜량 및 받은 피해량(damage_statistics): 플레이어가 가한 총 피해량, 받은 피해량
- 5분 단위로 진행된 시간별 피해량과 전체 피해량
- 적 챔피언에게 가한 피해량, 구조물에 가한 피해량, 전체 피해량(유닛 피해량을 포함)
- 골드 획득량 및 소비 패턴(gold_transactions): 게임 내 경제 흐름 분석
- CS(미니언 킬 수) 패턴(cs_statistics): 특정 시간대별 CS량
- 채팅 메시지 로그(chat_message): 나중에 욕설 감지같은 기능도 생각중
5분 단위로 수집해야하는 로그 종류
gold_transactions, cs_statistics, damage_statistics
각 사용자들의 이동 경로(player_movement)는 10초마다 수집하려고 생각하고 있다.
Rdkafka(C++) - Kafka Consumer를 IOCP 기반으로 전환
기존 방식의 IOCP 서버에서는 kafka 메시지 수신(consumer→poll())을 위해서 스레드를 할당해서 처리했다.
이러면 메시지를 즉시 처리하면서 Parsing, Validation, Execution 과정이 수행되는 동안 다른 작업을 못한다.
그동안 메시지가 몰리면 Consumer 수신이 지연되면서 병목이 발생할 수 밖에 없다.
여간 찜찜했던지라, 비동기 이벤트를 등록하도록 변경해서 해결하고자 했다.
이제 데이터 수집도 할거라 자주 쓰일테니까!
대충 프로세스는 다음과 같다.
- Consumer에서 데이터가 도착하면, IO Completion Port 큐에 이벤트 Push
- Worker 쓰레드가 메시지 처리를 담당
이러면 수신하는 쓰레드 하나만 있어도 나머지 메시지 처리는 iocp의 worker 쓰레드들이 비동기로 작업하기 때문에 성능을 낼 수 있다.
기존 OverlappedEx 구조체 수정
typedef struct OverlappedEx
{
OVERLAPPED overlapped;
WSABUF wsaBuf;
int rwMode; //READ or WRITE or KAFKA
BYTE* broken_data = new BYTE[1024];
int broken_data_size = 0;
bool header_recv = false;
bool header_broken = false;
bool data_broken = false;
} PER_IO_DATA, * LPPER_IO_DATA;
읽기와 쓰기모드를 구분하기 위한 rwMode에 그냥 KAFKA를 추가한게 다이다.
PostQueuedCompletionStatus를 통한 IOCP 처리
KafkaConsumer 쓰레드에서 다음과 같이 OverlappedEx 구조체를 생성해준다.
WSABUF에 메시지를 그대로 넣을거라 조심해야한다.
auto* overlapped = new OverlappedEx();
overlapped->rwMode = KAFKA;
overlapped->wsaBuf.len = message.size();
overlapped->wsaBuf.buf = new char[message.size() + 1];
std::memcpy(overlapped->wsaBuf.buf, message.c_str(), message.size() + 1);
PostQueuedCompletionStatus(hComPor, 0, 0, (LPOVERLAPPED)overlapped);
json 타입의 메시지를 가공하고 지정된 형태에 맞는지 확인하는 과정을 여기서 넣을까 고민했다.
하지만 메시지가 많아지는 경우마다 오류가 섞이게 되면 병목이 생기게 되어서 할거면 아예 검증 로직도 Worker Thread에서 수행하도록 의도했다.
GetQueuedCompletionStatus를 통한 Worker Thread 작업 수행
kakfa 메시지 처리에 대한 Overlapped IO인 경우를 rwMode로 판단한다.
while (true)
{
BOOL result = GetQueuedCompletionStatus(hComPort, &bytesTrans, (PULONG_PTR)&handleInfo, (LPOVERLAPPED*)&ioInfo, INFINITE);
if (ioInfo->rwMode == KAFKA) {
std::cout << "KafkaMessageHandler 처리 시도" << std::endl;
MatchManager::KafkaMessageHandler(ioInfo);
delete[] ioInfo->wsaBuf.buf;
delete ioInfo;
continue;
}
...
아래는 소켓 처리
KafkaMessageHandler는 그냥 메시지를 가공하고 내용에 맞춰서 실제 로직을 처리하는 단계라 코드를 생략하겠다.
이외에도 KafkaProducer의 메시지 발행에 대해서도 실시간으로 이뤄질 수 있도록 고민해야한다.
로그 데이터로 수집하게 되면 좀 더 잦은 발행이 생길 예정