모든 소스 코드는 https://github.com/lkimilhol/kotlin-kafka-toy에서 확인 가능합니다!
안녕하세요. 이번에는 Kafka의 offset에 대해서 알아볼까 합니다.
Kafka 메시지를 처리하는 도중 예외를 만난다면 어떻게 될지, 그리고 offset 관련 옵션 설정에 따라서 어떻게 변할지 공부해보려고 합니다.
이는 실제 Kafka를 이용하여 실제 서비스를 할 때도 많은 도움이 될 것이라 생각합니다.
1. offset이란 무엇일까?
offset이란 무엇일까요? 바로 consumer에서 메시지를 어디까지 읽었는지 저장하는 값 이라고 볼 수 있을거 같네요.
만약 offset=4 인 경우 offset 0, 1, 2, 3은 메시지를 읽은 것으로 생각 할 수 있습니다.
위 그림과 같이 현재 offset은 4이며 다음 읽을 메시지는 5라는 것을 알 수 있습니다.
2. 메시지를 읽어보자!
그럼 실제로 메시지를 읽으면서 offset이 바뀌는 것을 확인해 보겠습니다.
먼저 테스트를 위해 local에서 kafka를 실행시켰습니다.
그리고 토픽을 하나 생성해 주겠습니다.
test라는 토픽을 생성하였습니다. 그리고 producer와 consumer의 구현인데요. kotlin을 사용하여 작성하였습니다.
먼저 producer입니다.
Producer.kt
@Service
class Producer(private val kafkaProducerTemplate: ReactiveKafkaProducerTemplate<String, KafkaMessage>) {
fun produce(topic: String, kafkaMessage: KafkaMessage) {
kafkaProducerTemplate.send(topic, kafkaMessage).subscribe()
}
}
간단하게 kafkaMessage를 작성하는 내용입니다. Produer는 API를 통해 호출하도록 했습니다.
다음은 Consumer입니다. 마찬가지로 아주 간단합니다.
Consumer.kt
@Service
class Consumer {
private var messageList = listOf<KafkaMessage>()
@KafkaListener(id = "kafkaMessages", topics = ["test"], containerFactory = "kafkaListenerContainerFactory")
fun consume(kafkaMessages: ConsumerRecords<String, KafkaMessage>) {
kafkaMessages.forEach { println(it) }
}
fun getPayload(): List<KafkaMessage> {
return messageList
}
}
@KafkaListener 애노테이션을 통해 계속해서 메시지를 처리하도록 리스너 등록을 해두었습니다. 일단 현재 offset을 확인해보도록 하죠.
보시는 바와 같이 CURRENT-OFFSET이 0으로 되어 있는 것을 확인 할 수가 있습니다.
그리고 실제로 메시지를 발행시켜 보도록 하겠습니다.
앞서 말씀드린대로 메시지의 발행은 API 요청을 통해 하였습니다. (아주 간단하기에 코드를 따로 첨부하지 않았습니다)
그리고 위와 같이 메시지를 consume 한 것을 볼 수 있는데요. 이후의 offset을 확인해보겠습니다.
offset이 증가 된 것을 볼 수 있습니다. 우리가 생각한 대로 메시지를 consume 하게 되면 offset이 증가 된 것을 확인 할 수 있었습니다.
그렇다면 다른 필드들은 어떤 의미를 나타낼까요? LOG-END-OFFSET과 LAG에 대해 간단히 살펴보겠습니다.
CURRENT-OFFSET: 현재 consumer-group에서의 offset
LOG-END-OFFSET: Producer에서 마지막으로 발행한 메시지의 offset
LAG: LOG-END-OFFSET에서 CURRENT-OFFSET을 뺀 값
3가지 필드의 의미를 기억해두고, 혹시라도 cunsumer-group의 개념을 모른다면 꼭 숙지 하도록 해야겠습니다.
3. Kafka 메시지 처리 중 예외를 만난다면?
그럼 제가 궁금했던 부분 중 하나인 메시지 처리중 예외를 만나면 offset이 어떻게 되는지 알아보도록 하겠습니다.
마찬가지로 메시지를 발행하지만 메시지를 consume 하는 과정에서 예외를 만나도록 코드를 수정하겠습니다.
Consumer.kt
@KafkaListener(id = "kafkaMessages", topics = ["test"], containerFactory = "kafkaListenerContainerFactory")
fun consume(kafkaMessages: ConsumerRecords<String, KafkaMessage>) {
throw IllegalArgumentException()
kafkaMessages.forEach { println(it) }
}
강제로 IllegalArgumentException을 throw 하도록 하였습니다. 메시지를 발행 시키고 offset에 어떤 변화가 있는지 알아보겠습니다. 현재 offset은 1입니다.
예외를 던졌습니다. 그럼 offset을 확인해 보겠습니다.
현재 offset은 그대로 1 이지만 LOG-END-OFFSET의 값이 2로 증가하였고 LAG가 1 인 것을 확인 할 수 있습니다. 즉 메시지를 consume하는 과정에서 예외를 만났기 때문에 offset이 증가 되지 않은 것을 확인 할 수 있습니다. 즉 메시지를 제대로 consume하지 않았음을 생각 할 수 있습니다.
또한 consumer listener에서 계속해서 메시지를 consume하려고 재시도를 하는 것을 확인 할 수 있습니다.
그럼 메시지를 정상적으로 consume하면 결과는 어떻게 될까요? 예외를 던지는 코드를 삭제하고 서버를 다시 실행시키겠습니다. 그러면 메시지는 정상적으로 consume 될 것입니다.
메시지가 consume 되었습니다. offset을 확인해보도록 하겠습니다.
offset이 2로 증가 되었네요. 이처럼 예외를 만난 경우 메시지가 제대로 consume되지 않아 offset이 증가하지 않았다는 것을 확인 할 수 있었습니다. 그렇다면 메시지가 DLT(dead-letter-topic)로 이동하게 되는 경우에는 어떻게 될까요?
4. 예외를 만난 메시지가 DLT로 이동하게 되면?
Spring-kafka에서는 메시지를 처리 하다가 예외를 만나게 되면 DLT로 메시지를 이동시키는 기능을 제공하고 있습니다.
보통 DLQ라고 불리우는 Dead-Letter-Queue는 메시지가 일련상의 이유로 제대로 처리 되지 않은 경우에 보내지는 Queue라고 할 수 있습니다. kafka에서는 DLT라는 용어를 사용하기에 이후 DLT라고 칭하겠습니다.
일단 메시지가 DLT로 이동 되도록 설정 값을 수정해야 합니다. 코드를 수정하겠습니다.
KafkaCofnig.kt
@Bean("kafkaListenerContainerFactory")
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val containerFactory = ConcurrentKafkaListenerContainerFactory<String, String>()
val recoverer = DeadLetterPublishingRecoverer(kafkaProducerTemplate())
val recoveringBackOff = RecoveringBatchErrorHandler(recoverer, FixedBackOff(2L, 1))
containerFactory.consumerFactory = consumerFactory()
containerFactory.isBatchListener = true
containerFactory.containerProperties.ackMode = ContainerProperties.AckMode.BATCH
containerFactory.setBatchErrorHandler(recoveringBackOff)
return containerFactory
}
집중해서 볼 곳은 recoverer와 recoveringBackOff입니다. 메시지는 총 1번의 재시도를 거쳐서 DLT로 이동하게 됩니다. 보시면 FixedBackOff의 두 번째 인자로 1이 선언 된 것을 확인 할 수 있습니다.
그리고 DLT의 이름은 토픽의 이름에 ".DLT" 가 붙어서 토픽이 생성되는데요. 자동으로 토픽이 생성되게 됩니다. 잘 기억해두도록 합시다.
메시지를 consume하게 되는 경우에 BatchListenerFailedException 예외를 throw 하게 된다면 메시지는 DLT로 이동하게 됩니다. 코드를 실행시켜보고 토픽을 확인해 보도록 하겠습니다.
메시지 consume 과정에서 BatchListenerFailedException 예외를 만났네요.
1번의 재시도를 실행 한 것도 확인이 됩니다. 이 메시지는 DLT로 이동했을 텐데요. 앞서 말씀드린데로 토픽의 이름에 ".DLT"가 붙은 토픽이 생성 될 것입니다. 확인은 조금 미뤄보도록 하고 offset을 확인 해 보도록 하겠습니다.
offset이 3이 된 것을 볼 수 있습니다. 즉 메시지가 DLT로 빠지면서 offset이 증가하였습니다. 그런데 DLT 토픽에 대한 정보는 없네요. 토픽이 자동으로 생성되지 않았을까요? 토픽을 생성하고 다시 테스트를 진행해야 겠습니다. 토픽을 생성해 줍니다.
이미 생성된 토픽이 있다고 합니다. 앞서 예상한대로 토픽은 자동으로 생성이 되었던 것입니다. 사실 이 글을 쓴 이유는 바로 DLT의 offset에 대해 알아보기 위해서 였는데요. offset에 대한 정보가 없을 때 이를 어떻게 처리해야 할까요? 여기서 등장하는 개념이 offset reset 입니다.
5. Kafka offset reset이란?
offset reset이란 offset에 대한 정보가 없을 때 어떻게 처리를 할지에 대한 옵션이라고 할 수 있습니다. 3가지 옵션이 존재하는데요. 살펴보도록 하겠습니다.
- latest : 가장 마지막에 있는 offset 부터 (디폴트)
- earliest : 가장 처음에 있는 offset부터
- none : offset에 대한 정보가 없으면 exception을 던짐
도통 무슨 소린지 알 수가 없는데요. 천천히 살펴봐야 곘네요. 일단 현재의 옵션은 latest 일 것이데요. none으로 설정을 하고 DLT에 있는 메시지 consume을 시도해보겠습니다. 우리는 offset에 대한 정보가 없다고 생각하고 있기 때문에 consume시 예외가 발생해야 합니다. 그럼 코드를 수정해서 DLT를 consume 하는 controller를 하나 만들어 보도록 하겠습니다.
KafkaConfig.kt
@Bean
fun dltConsumerProperties(): Map<String, Any> {
return hashMapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to "localhost:9092",
ConsumerConfig.GROUP_ID_CONFIG to "dlt-consumer-group",
ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG to "true",
ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to "none",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java,
JsonDeserializer.TRUSTED_PACKAGES to "*"
)
}
@Bean
fun kafkaDLTConsumer(): KafkaConsumer<String, KafkaMessage> = KafkaConsumer(dltConsumerProperties())
DLT consumer을 위한 Bean을 등록했습니다. 그러면 이 설정을 가지고 DLT를 consume하는 service를 만들겠습니다.
DLTConsumer.kt
@Service
class DLTConsumer(private val kafkaConsumer: KafkaConsumer<String, KafkaMessage>) {
fun dltConsume() {
kafkaConsumer.subscribe(setOf("test.DLT"))
val message = kafkaConsumer.poll(Duration.ofSeconds(1L))
println(message.count())
message.forEach { println(it.toString()) }
kafkaConsumer.commitSync()
kafkaConsumer.unsubscribe()
}
}
이제 서버를 실행시키고 API reuqest를 해보도록 하겠습니다.
예상했던 대로 예외가 발생합니다. 그러면 이제 reset을 latest로 해보겠습니다. 가장 마지막이라고 하는데요. 일단 현재의 offset 정보입니다.
현재 DLT에 대한 정보는 없네요. latest로 reset을 설정하겠습니다.
offset이 1로 초기화 되었습니다. 하지만 뭔가 이상하네요. 앞서 토픽을 생성하였을때 offset은 0이었고 0번째에 있는 메시지를 읽어야 하지 않을까요?
메시지 카운트를 보니 메시지도 읽어오지 않은것으로 확인되네요.
offset을 확인해보도록 하겠습니다.
메시지는 읽어지지 않았지만 offset은 1이 되었군요. 즉 메시지 하나를 처리하지 못한 결과를 낳았습니다.
이제 earliest로 reset을 설정하겠습니다. 0번째 메시지를 잘 읽어 올 수 있을까요?
offset이 0으로 reset된 것이 확인됩니다. 메시지는 잘 읽었을까요?
메시지 카운트는 1이고 메시지도 잘 읽었습니다. offset 정보가 없는 경우에는 earliest로 제일 첫번째 offset을 읽어올 수 있네요.
6. 마치며
생각보다 길이 엄청 길어졌는데요. 실제로 offset reset에 대하여 확인을 하니 어떤 설정을 주어야 할지 명확해진거 같습니다. 추후에는 latest를 사용하는 경우와 earliest를 사용하는 경우를 고민해봐야겠네요. 긴 글 읽어주셔 감사합니다.
'ETC' 카테고리의 다른 글
[쓰면서 배우는 obsidian] 001. obsidian 설치해보기 (1) | 2024.12.10 |
---|---|
Kafka 메시지 순서 보장 확인해보기 (0) | 2021.10.17 |