Coroutine base Kafka message dispatcher
· 약 8분
이 문서는 Kafka 기반 비동기 메시지 처리 구조에서 Kotlin Coroutines를 이용한 Message Dispatcher의 개선 과정을 단계별 코드 예제와 함께 정리한 문서입니다.
✅ Step 1: 기본 Dispatcher
기능
- 채널을 통해 메시지를 받아 핸들러에 전달
- 예외 발생 시 로그만 출력
기본 구조
- Kafka Consumer → Coroutine Channel → Message Dispatcher → Message Handler
문제점
- 예외 발생 시 아무런 후속 조치 없음
- 메시지 반복 소비 또는 유실 가능성 있음
class MessageDispatcher(
private val channel: ReceiveChannel<DomainMessage>,
private val handler: MessageHandler
) {
suspend fun start() {
for (message in channel) {
try {
handler.handle(message)
} catch (e: Exception) {
println("❌ Error: ${e.message}")
}
}
}
}
✅ Step 2: 예외 처리 정책 추가
기능
shouldRetry(e),shouldStopConsuming(e)을 통해 예외 분류- 일시적 오류 vs 치명적 오류 구분 가능
interface ExceptionHandlingPolicy {
fun shouldRetry(e: Throwable): Boolean
fun shouldStopConsuming(e: Throwable): Boolean
}
class BasicPolicy : ExceptionHandlingPolicy {
override fun shouldRetry(e: Throwable) = false
override fun shouldStopConsuming(e: Throwable) = e is IllegalStateException
}
Dispatcher 내부:
if (policy.shouldStopConsuming(e)) {
scope.cancel("Fatal", e)
}
✅ Step 3: RetryQueue / DLQ 도입
기능
IOException같은 일시적 예외는 재시도 대상으로RetryQueue에 저장- 치명적이지 않지만 재시도 대상도 아닌 예외는
DeadLetterQueue에 저장
interface RetryQueue {
suspend fun enqueue(message: DomainMessage, reason: Throwable)
}
interface DeadLetterQueue {
suspend fun publish(message: DomainMessage, reason: Throwable)
}
✅ Step 4: Retry 시 offset commit 추가
기능
- 재시도 대상 메시지는 retryQueue에 저장한 후 Kafka에 offset을 commit
- Kafka에 동일 메시지가 다시 소비되지 않도록 함
if (policy.shouldRetry(e)) {
retryQueue.enqueue(message, e)
commitOffset(message.offset)
}
