이번 글은 지난번 연재 글 내용과 이어지는 내용으로 producer acks 옵션과 관련이 있는 내용입니다. 제가 kafka를 운영하면서 실제 일어났던 이슈에 대해 kafka의 로그와 같이 한번 설명드리도록 하겠습니다. 지난 연재 글을 읽지 못하신 분이나 또는 읽었는데 기억이 잘 나지 않으신다면, 다시 한번 읽어보시는 편이 좋을 것 같습니다.
제가 kafka 운영을 시작한 지 얼마 지나지 않아, 운영상 broker의 config를 변경해야 하는 상황이 발생하였습니다. 공용 kafka 서비스 운영에 앞서 config 변경 작업과 config 변경 후 rolling restart에 대한 테스트는 미리 완료한 상태였기 때문에, 별 다른 고민 없이 broker의 config를 수정 후 rolling restart를 진행하였습니다. config 적용 후 rolling restart 작업이 마무리되어 갈 즈음, 해당 카프카를 사용 중이던 서비스 부서로부터 연락이 왔습니다. 지금까지 특별한 이슈 없이 잘 동작하던 consumer의 job이 이유 없이 종료된다고 하는 것이었습니다. kafka 서비스 운영에 경험이 많지 않았던 저는 당황하기 시작했고, 어떻게 대응해야 할지 조차도 몰랐습니다. 급한 대로 우선 broker의 로그를 살펴봤지만 로그 내용조차 눈에 잘 들어오지 않았습니다. 이런저런 시도 끝에 해당 서비스 부서에서 컨슈머 그룹의 이름을 변경하여 job를 새로 실행하였고, 그 이후 별다른 특이사항 없이 해당 이슈는 마무리되었습니다.
그 후, 저는 위에 말씀드린 이슈가 왜 발생했는지 정확한 원인을 파악하기 위해 broker의 로그를 다시 살펴보기 시작했습니다. 그러던 중 해당 이슈의 원인과 해결 방법에 대해 알게 되었고, 지금부터 관련 내용들을 로그와 함께 설명해 나가도록 하겠습니다.
저는 이슈가 발생한 시점에 가장 먼저 발생한 에러 로그가 해당 이슈와 연관성이 가장 높을 것이라는 생각을 했고, rolling restart 작업 시간대를 기준으로 가장 먼저 발생한 에러 로그가 무엇인지 찾기 시작했습니다. 그리고 그 시간에 가장 먼저 발생했던 로그는 아래와 같습니다.
첫 번째 발생 로그
[2016-00-00 11:05:38,979] INFO [KafkaApi-5] Closing connection due to error during produce request with correlation id 574259 from client id producer with ack=0
Topic and partition to exceptions: sampletopic-3 -> org.apache.kafka.common.errors.CorruptRecordException (kafka.server.KafkaApis)
두 번째 발생 로그
[2016-00-00 11:07:41,381] ERROR [KafkaApi-5] Error when handling request Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: ; ReplicaId: -1; MaxWait: 0 ms; MinBytes: 0 bytes; MaxBytes:2147483647 bytes; RequestInfo: ([sampletopic,3],PartitionFetchInfo(281047443,1048576)) (kafka.server.KafkaApis)
kafka.common.KafkaException: Message payload is null: Message(magic = 0, attributes = 2, crc = 1197208708, key = null, payload = null)
세 번째 발생 로그
[2016-00-00 11:08:41,448] ERROR [KafkaApi-5] Error when handling request Name: FetchRequest; Version: 0; CorrelationId: 0; ClientId: ; ReplicaId: -1; MaxWait: 0 ms; MinBytes: 0 bytes; MaxBytes:2147483647 bytes; RequestInfo: ([sampletopic,3],PartitionFetchInfo(281047443,1048576)) (kafka.server.KafkaApis)
kafka.common.KafkaException: Message payload is null: Message(magic = 0, attributes = 2, crc = 1197208708, key = null, payload = null)
kafka 운영 경험이 많지 않던 저는 로그를 보더라도 로그의 전체 내용을 이해하는데 시간이 필요했습니다. 로그 내용을 이해하는 데 있어 약간의 설명이 필요한 부분들이 있어 해당 부분에 대해 먼저 간략하게 설명하도록 하겠습니다.
먼저 INFO, ERROR 부분입니다. 이 내용은 다른 시스템 로그 등에서 볼 수 있는 로그의 레벨입니다. 최초에 발생한 로그는 로그 레벨이 INFO, 이후 로그는 ERROR레벨이라는 것을 알 수 있습니다.
다음으로 [KafkaApi-5] 부분입니다. 이 내용에서 숫자 5의 의미는 broker node id 5번을 뜻하는 내용입니다. 즉 5번 broker에서 발생한 로그라는 것을 알 수 있습니다.
샘플로 보여드린 3개의 로그 중 지금까지 설명드린 로그 부분은 동일하고, 이후 부분은 로그의 내용이 달라지기 때문에 먼저 첫 번째 로그부터 살펴보겠습니다.
로그 끝부분에 보이고 있는 client id producer with ack=0은 제가 이전 글에서 설명드렸던 프로듀서의 acks 옵션 0을 의미합니다. ack=0에 대해 간략하게 다시 한번 정리하면, producer가 보내는 데이터의 일부 손실을 감안하더라도 대기 시간 없이 빠르게 broker로 보내고 싶은 경우에 이 옵션을 사용하게 됩니다. 어느 정도의 손실을 감안하고 broker에게 보내기 때문에 broker에서는 에러가 발생하였음에도 불구하고, 로그 레벨을 INFO로 표시한 것으로 추측하고 있습니다.
로그 내용을 바탕으로 producer는 acks 옵션을 0으로 설정해서 메시지를 보내고 있었고, broker가 rolling restart 되면서 연결되어 있던 커넥션이 예기치 않게 종료되었다는 사실을 알 수 있습니다. 예기치 않게 종료되면서 정상적으로 메시지가 처리되지 않은 것을 알 수 있습니다. 그다음 부분을 보면 "sampletopic-3"이라고 있는데 sampletopic은 토픽의 이름을 나타내고, 3이라는 숫자는 해당 토픽의 파티션 번호를 나타냅니다.
로그 내용을 최종 정리해보면, producer가 5번 broker로 sampletopic이라는 토픽의 파티션 3번에 acks=0으로 데이터를 보내던 중 커넥션이 종료되면서 에러가 발생하였고, 로그 레벨은 INFO라는 의미입니다. 여기서 추가 설명드리자면, 파티션의 leader만 read와 write를 할 수 있습니다. 다시 말하자면, sampletopic의 3번 파티션의 leader는 broker 5번에 있었기 때문에 producer가 메시지를 보내고 있었습니다.
이제 두 번째 로그입니다. FetchRequest 하면서 에러가 발생하였고, 요청에 대한 내용은 [sampletopic,3]와 PartitionFetchInfo(281047443,1048576)입니다. sampletopic은 위에서 말씀드린 것처럼 토픽의 이름이고, 3은 파티션 번호입니다. 다음에 나오는 281047443이라는 숫자는 유니크한 offset을 나타내고, 1048576는 fetchSize를 의미합니다. 로그 내용을 최종 정리해보면, sampletopic이라는 토픽 파티션 3번의 offset 281047443를 가져가면서 에러가 발생한 것입니다. 첫 번째 로그에서도 sampletopic의 파티션 3번이 언급되었는데 뭔가 관련이 있을 것 같다는 느낌이 오시나요?
마지막 세 번째 로그입니다. 자세히 살펴보면 로그에 찍힌 시간만 다르고, 로그 내용이 두 번째 로그와 일치합니다. sampletopic의 파티션 3번의 offset281047443를 가져가면서 에러가 발생하였습니다.
위 로그들을 종합적으로 정리해보면, sampletopic의 파티션 3번에 대한 leader는 broker id 5번에 있었던 것입니다.(파티션의 leader만 read, write가 가능합니다) producer는 acks=0으로 메시지를 보내고 있었고, 커넥션이 종료되면서 일부 메시지가 에러가 발생했습니다. 그리고는 consumer가 sampletopic의 파티션 3번 데이터를 offset 순서대로 가져가던 중 커넥션이 종료되면서 에러가 발생한 offset 281047443의 데이터를 가져가면서 에러가 발생한 것을 의미합니다.
지금까지 설명드린 내용을 이해를 돕기 위해 간단하게 그림으로 살펴보겠습니다.
위 그림은 producer가 ack=0으로 데이터를 보내고 있던 중 offset 2에 메시지 전달이 제대로 되지 않은 내용을 나타냈습니다. 위에 설명드린 로그 내용과 비교하자면, 첫 번째 로그가 발생한 시점이라고 이해하시면 될 것 같습니다. 이어서 다음 그림 보겠습니다.
위 그림은 consumer가 offset 0, 1번까지 제대로 가져갔는데 2번 메시지를 제대로 가져가지 못하고 있는 내용입니다. 위에 설명드린 로그 내용과 비교하자면, 두 번째와 세 번째 로그가 발생한 시점이라고 이해하시면 됩니다.
지금까지 로그를 통해서 문제가 발생한 원인에 대해서 설명을 드렸습니다. 그럼 위와 같은 일이 발생했을 경우 어떻게 해결해야 할까요? 먼저 consumer 레벨에서 예외처리 등을 통해 offset을 강제로 변경해주는 방법이 있고, zookeeper에 해당 컨슈머 그룹의 offset 값이 저장되어 있기 때문에 해당 값을 변경하는 방법이 있습니다. zookeeper의 offset 값 변경 방법에는 2가지가 있습니다. 첫 번째는 zookeeper의 cli를 이용, set 명령어로 해당 offset 값을 원하는 값으로 변경할 수 있고, 두 번째 방법은 broker에서 제공하는 cli를 이용하는 변경할 수 있습니다. 첫 번째의 예제는 다음에 기회가 되면 zookeeper에 대한 설명과 같이 설명드리도록 하고, 이번 글에서는 broker에서 제공하는 cli를 이용하여 컨슈머 그룹의 offset을 업데이트하는 방법에 대해서 설명드리도록 하겠습니다.
offset 업데이트 방법을 예제로 설명드리기 위해 토픽명은 sampletopic, 주키퍼는 samplezookeeper, 컨슈머 그룹 이름은 sampleconsumer라고 가정하겠습니다. 먼저 클러스터 내 broker 중의 한대에 접근한 후 kafka가 설치되어 있는 디렉토리로 이동합니다. 저의 경우에는 kafka가 설치된 경로가 "/usr/local/kafka"입니다.
# kafka 설치 경로로 이동
> cd /usr/local/kafka
# 현재 토픽명 sampletopic을 컨슘 하는 컨슈머 그룹 sampleconsumer의 offset 정보 확인하기
> bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker —group sampleconsumer —topic sampletopic —zookeeper samplezookeeper/kafka
# 결과 보기
Group Topic Pid Offset logSize Lag Owner
sampleconsumer sampletopic 0 596842078 598314304 1472226 none
sampleconsumer sampletopic 1 596816215 598290598 1474383 none
sampleconsumer sampletopic 2 596827351 598300525 1473174 none
sampleconsumer sampletopic 3 596809301 598281656 1472355 none
sampleconsumer sampletopic 4 596794417 598267853 1473436 none
sampleconsumer sampletopic 5 596799110 598271088 1471978 none
sampleconsumer sampletopic 6 596731455 598202943 1471488 none
sampleconsumer sampletopic 7 596855211 598328887 1473676 none
kafka에서 제공해주는 kafka-run-class.sh이라는 cmd tool를 이용하여 현재 consumer group의 offset 정보를 확인할 수 있습니다. 이제 위에서 말씀드린 이슈 등으로 인하여, consumer group의 offset을 강제로 최신의 값으로 변경해보도록 하겠습니다. 해당 명령어를 실행하기 전에 consumer.properties 파일을 만들어야 하는데, 간단하게 아래와 같이 만드시면 됩니다.
# consumer.properties 내용
zookeeper.connect=samplezookeeper/kafka
zookeeper.connection.timeout.ms=6000
group.id=sampleconsumer
offsets.storage=zookeeper
dual.commit.enabled=false
# latest 값으로 offset 변경하기
> bin/kafka-run-class.sh kafka.tools.UpdateOffsetsInZK latest consumer.properties sampletopic
# 결과 보기
updating partition 0 with new offset: 598341641
updating partition 1 with new offset: 598318144
updating partition 2 with new offset: 598327935
updating partition 3 with new offset: 598309210
updating partition 4 with new offset: 598295248
updating partition 5 with new offset: 598298750
updating partition 6 with new offset: 598230432
updating partition 7 with new offset: 598356650
updated the offset for 8 partitions
위의 명령어를 이용하여 업데이트 한 이후, 현재 컨슈머 그룹의 offset 보기를 다시 실행해보면 해당 컨슈머 그룹의 offset이 변경된 것을 확인할 수 있습니다. 컨슈머 그룹의 offset 값을 최신 값으로 변경하는 작업을 진행하셨다면, 반드시 알아 두셔야 하는 내용이 있습니다. 컨슈머 그룹이 offset을 기준으로 데이터를 하나하나 모두 읽어간 것이 아니라, 중간에 읽어가지 못한 부분이 있다는 사실입니다. 아래에서 그림과 같이 설명드리도록 하겠습니다.
위의 내용에 대해 이해를 돕기 위해 그림 한 장을 준비했습니다.
consumer가 sampletopic 3번 파티션 데이터를 가져가기 하고 있고, offset 위치는 0번입니다. 0번 데이터를 가져가려고 하였으나, 이슈 등으로 인하여 가져가기가 실패하고 있습니다. 그래서 broker의 커맨드나 zookeeper의 커맨드 등을 이용하여 offset을 latest offset으로 업데이트하게 되었고, consumer는 offset 0번이 아닌 3번의 데이터부터 읽어가게 되었습니다. 결국 해당 컨슈머는 offset 0, 1, 2의 데이터는 가져가지 못한 채로 3번 이후의 offset을 계속 가져가게 됩니다.
저는 이러한 이슈를 겪은 이후로 서비스 부서에서 kafka를 이용하려고 할 때, 항상 acks 옵션을 어떻게 사용하고 있는지 체크하는 버릇이 생겼습니다. 만약 acks 옵션을 0을 사용하고 계시면, 0 보다는 1을 사용하기를 권장하고 있습니다. 최근의 kafka 지원 어플리케이션들은 대부분 ack 1 값을 default값으로 사용하고 있지만, old 버전 등의 어플리케이션에서는 default값이 0인 경우가 종종 있습니다. 현재 사용하고 계시는 producer의 옵션을 한번 체크해보시고, 가능하다면 acks=1로 사용하기를 추천합니다.^^
긴 글 읽어주셔서 감사합니다.