지난 글에 이어 이번에는 카프카 커넥트 오프셋 관리 기능에 대해 설명하겠습니다. 혹시라도 1부 글을 읽지 않으신 분들은 1부 글과 이어지는 내용이 있으니, 1부 글을 먼저 읽고 오시는 편이 좋을 것 같습니다.
지난 글에서 소스 커넥터는 프로듀서와 유사하고, 싱크 커넥터는 컨슈머와 유사하다는 말씀을 드린 적이 있습니다. 이번에 주로 다룰 내용은 싱크 커넥터와 오프셋 관련된 부분이므로 참고하시기 바랍니다.
싱크 커넥터는 컨슈머와 유사한 기능을 하며, 카프카의 토픽에서 데이터를 읽은 뒤 싱크 시스템으로 전송하는 역할을 합니다. 컨슈머의 경우 카프카의 토픽에서 메시지를 읽어가게 되면, 컨슈머의 점검, 컨슈머의 확장, 컨슈머의 재시작 등을 유연하게 대응하기 위해 컨슈머가 메시지를 어디까지 읽었는지 위치를 표시하게 됩니다. 이러한 위치를 카프카에서는 오프셋이라고 하고, 컨슈머 그룹마다 오프셋 위치를 별도의 공간에 저장하게 됩니다. 컨슈머 그룹의 오프셋은 초기 카프카 버전에서는 주키퍼의 지노드에 저장하였습니다. 이는 성능상의 이슈로 현재 버전의 카프카에서는 카프카의 내부 토픽인 __consumer_offset 토픽에 저장하고 있습니다.
컨슈머의 경우 해당 오프셋에 저장된 내용을 바탕으로 위치를 조정할 수 있습니다.
카프카 커넥트의 경우는 총 3개의 내부 토픽을 사용하게 되는데, 각각의 토픽과 용도는 다음과 같습니다.
config.storage.topic: 커넥트의 구성과 관련된 정보 저장
offset.storage.topic: 커넥트의 오프셋 정보 저장
status.storage.topic: 커넥트의 상태 정보 저장
.
만약 테스트나 개발 목적으로 카프카 커넥트를 단독모드로 구성하는 경우, 오프셋 정보 등은 로컬에 위치하는 별도의 파일에 저장하며, 카프카 커넥트를 분산모드로 구성하는 경우에만 커넥트에서 내부적으로 자체 관리하는 토픽에 저장합니다.
그 외에도 외부의 별도 토픽으로 오프셋을 관리하거나 개발환경에서 반복적으로 테스트할 때, 커넥터 이름을 변경하지 않고도 오프셋을 초기화 하거나 오류 처리 기능으로 해결할 수 없는 문제를 발생시키는 레코드를 건너뛰는 경우 관리자가 조치하기가 다소 까다로웠습니다.
하지만, 카프카 3.5 버전부터는 오프셋을 관리할 수 있는 REST API 기능이 추가되어, 관리자 또는 개발자 입장에서 매우 유연하게 대응할 수 있게 되었습니다.
GET /connectors/{connector}/offsets
PUT /connectors/{connector}/stop
지금부터 오프셋을 관리할 수 있는 REST API 기능에 대해 설명하겠습니다. 이 글에서는 카프카 설치와 커넥트 설치등의 내용은 따로 다루지 않고, REST API를 이용하여 오프셋을 조정하는 방법에 대해서만 설명하는 점 양해 부탁드리겠습니다.
테스트 환경은 다음과 같이 구성되어 있다고 가정하겠습니다.
카프카 버전: 3.6.1
브로커 이름: peter-kafka01.foo.bar
데이터가 저장된 토픽명: sqlite-sample-Models
사용할 커넥터: FileStreamSinkConnector
파일명: sink.txt
시나리오: FileStreamSinkConnector커넥터를 이용해 카프카의 sqlite-sample-Models토픽에서 레코드를 전부 가져왔으나, 로컬 파일의 손상 등으로 특정 오프셋으로 조정한 후 다시 레코드를 가져옴
해당 시나리오를 그림으로 표현하면 다음과 같습니다.
리눅스 명령어인 curl을 이용해 카프카 커넥트를 다루는 REST API와 출력 결과를 예제로 같이 보면서 설명하겠습니다. 가장 먼저 카프카 커넥트의 상태를 확인합니다.
$ curl http://kcluster01.foo.bar:8083 | jq
{
"version": "3.6.1",
"commit": "5e3c2b738d253ff5",
"kafka_cluster_id": "QrqgwH_BR8-md0QbhKrxpQ"
}
카프카 커넥트는 3.6.1이며, 카프카의 아이디가 표시되고 있습니다.
FileStreamSinkConnector를 실행하여 sqlite-sample-Models 토픽의 레코드를 sink.txt로 저장합니다.
$ curl -s -X POST \ -H "Content-Type: application/json" \ --data '{ "name": "File-Sink-Connector", "config": { "topics": "sqlite-sample-Models", "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "file": "sink.txt" } }' http://kcluster01.foo.bar:8083/connectors
FileStreamSinkConnector 커넥터가 정상적으로 실행되었는지 상태를 확인합니다.
$curl -s http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/status | jq
{
"name": "File-Sink-Connector",
"connector": {
"state": "RUNNING",
"worker_id": "192.168.0.4:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "192.168.0.4:8083"
}
],
"type": "sink"
}
커넥터의 상태는 RUNNING이며, 정상인 것을 확인할 수 있습니다. 그럼 sqlite-sample-Models 토픽의 레코드가 모두 sink.txt 파일에 저장되었는지 파일의 내용을 확인합니다.
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
sink.txt 파일에 모든 내용이 저장된 것을 확인하였습니다. FileStreamSinkConnector 커넥터의 오프셋을 확인합니다.
$ curl -s -X GET http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "sqlite-sample-Models"
},
"offset": {
"kafka_offset": 17
}
}
]
}
현재 오프셋은 17인 것을 알 수 있습니다. 그럼 오류가 생겼다고 가정하고, 오프셋을 10으로 변경해 보겠습니다. 오프셋을 변경하기 전에 커넥터를 중지한 후 오프셋을 변경합니다.
$ curl -s -X PUT http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/stop
$curl -s -X PATCH \
-H "Content-Type: application/json" \
--data '{
"offsets": [
{
"partition": {
"kafka_topic": "sqlite-sample-Models",
"kafka_partition": 0
},
"offset": {
"kafka_offset": 10
}
}
]
}' http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets
{"message":"The offsets for this connector have been altered successfully"}
응답 메시지를 통해 오프셋이 정상적으로 변경된 것을 확인할 수 있습니다. 다시 한번 FileStreamSinkConnector 커넥터의 변경된 오프셋을 확인합니다.
$ curl -s -X GET http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/offsets | jq
{
"offsets": [
{
"partition": {
"kafka_partition": 0,
"kafka_topic": "sqlite-sample-Models"
},
"offset": {
"kafka_offset": 10
}
}
]
}
오프셋이 10으로 변경된 것을 확인하였습니다. 이제 FileStreamSinkConnector 커넥터를 재시작합니다.
$ curl -s -X PUT http://kcluster01.foo.bar:8083/connectors/File-Sink-Connector/resume
다음으로 파일의 내용을 확인해 보겠습니다.
$ cat sink.txt
Struct{model_id=1,model_name=The Blonde,model_base_price=23000,brand_id=1}
Struct{model_id=2,model_name=The Brunette,model_base_price=25000,brand_id=1}
Struct{model_id=3,model_name=The Red Head,model_base_price=29000,brand_id=1}
Struct{model_id=4,model_name=Hat,model_base_price=22000,brand_id=2}
Struct{model_id=5,model_name=Sweater,model_base_price=25000,brand_id=2}
Struct{model_id=6,model_name=T-Shirt,model_base_price=27000,brand_id=2}
Struct{model_id=7,model_name=Orange,model_base_price=15000,brand_id=3}
Struct{model_id=8,model_name=Blue,model_base_price=12000,brand_id=3}
Struct{model_id=9,model_name=Green,model_base_price=17000,brand_id=3}
Struct{model_id=10,model_name=LaFerrari,model_base_price=125000,brand_id=4}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
Struct{model_id=11,model_name=450,model_base_price=75000,brand_id=4}
Struct{model_id=12,model_name=F12 Berlinetta,model_base_price=110000,brand_id=4}
Struct{model_id=13,model_name=F40,model_base_price=100000,brand_id=4}
Struct{model_id=14,model_name=Extra,model_base_price=30000,brand_id=5}
Struct{model_id=15,model_name=Too Much,model_base_price=35000,brand_id=5}
Struct{model_id=16,model_name=Beats,model_base_price=24000,brand_id=6}
Struct{model_id=17,model_name=Bars,model_base_price=35000,brand_id=6}
파일의 내용을 확인해 보면, model_id=17 이후에 model_id=11부터 다시 추가된 것을 확인할 수 있습니다. 커넥터의 상태를 확인하느라 다소 REST API 호출이 많았지만, 결과적으로 커넥터의 오프셋을 변경한 후 레코드도 재처리되었습니다.
지금까지 카프카 커넥트의 오프셋 관리 기능을 활용하여, 손상된 데이터를 다시 처리하거나 특정 상황에서 원하는 오프셋으로 재조정할 수 있음을 확인했습니다. 특히 REST API를 통해 손쉽게 커넥터의 상태를 점검하고, 오프셋을 관리할 수 있어 관리자 입장에서 유연하게 대응할 수 있습니다.
앞서 설명드린 기능들은 단순한 테스트 시나리오에 불과하지만, 실제 운영 환경에서도 유사한 문제들이 발생할 수 있기 때문에 이를 사전에 숙지하고, 적절히 대응한다면 좋은 결과가 있을 것입니다.
앞으로 카프카 커넥트를 더욱 효율적으로 사용하기를 바라며, 이상으로 카프카 커넥트에 대한 글을 마치도록 하겠습니다. 감사합니다.