mytopic.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaTopic
metadata:
name: ${TOPICNAME}
labels:
strimzi.io/cluster: "my-cluster"
spec:
partitions: 1
replicas: 3
config:
retention.ms: 7200000
segment.bytes: 1073741824
min.insync.replicas: 2
2
# 토픽 모니터링 : Kafka-UI 같이 확인 >> 설정 반응이 조금 느릴 수 있음
watch -d kubectl get kafkatopics -n kafka
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, envsubst 활용
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml
cat mytopic.yaml | yh
TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
3
# 토픽 생성 확인 (kubectl native)
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
# 토픽 상세 정보 확인 : 설정값 미 지정 시 기본값이 적용
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe
Topic: mytopic1
TopicId: hz--p3HXTRq-54EVuuY68w
PartitionCount: 1
ReplicationFactor: 3
Configs: min.insync.replicas=2
segment.bytes=1073741824
retention.ms=7200000
message.format.version=3.0-IV1
Topic: mytopic1Partition: 0Leader: 2Replicas: 2,0,1Isr: 2,0,1
4
# 토픽 Topic 생성 : 파티션 1개 리플리케이션 3개
kubectl exec -it ds/myclient -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000
# 토픽 생성 확인
kubectl get kafkatopics -n kafka
NAME CLUSTER PARTITIONS REPLICATION FACTOR READY
mytopic1 my-cluster 1 3 True
mytopic2 my-cluster 1 3 True
...
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic
mytopic1
mytopic2
# 토픽 상세 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1
5
# 토픽의 파티션 갯수 늘리기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 2ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2 Partition: 0 Leader: 0 Replicas: 0,2,1Isr: 0,2,1
Topic: mytopic2 Partition: 1 Leader: 1 Replicas: 1,2,0Isr: 1,2,0
# Kafka-UI 같이 확인
# 실습 구성도 그림 확인
# 토픽의 파티션 갯수 줄이기(안됨)
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 1
Error while executing topic command : Topic currently has 2 partitions, which is higher than the requested 1.
[2022-06-03 14:59:31,427] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
(kafka.admin.TopicCommand$)
# 토픽 일부 옵션 설정 : min.insync.replicas=2 를 min.insync.replicas=3 으로 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe
Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 2ReplicationFactor: 3Configs: min.insync.replicas=3,retention.ms=172800000,message.format.version=3.0-IV1
Topic: mytopic2Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1
Topic: mytopic2Partition: 1Leader: 1Replicas: 1,2,0Isr: 1,2,0
# 토픽 일부 옵션 설정 : 다음 실습을 위해 min.insync.replicas=2 로 다시 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=2
kafka-ui
> mytopic1 > Message > Live mode로 변경하라.
# 토픽 모니터링
watch -d kubectl get kafkatopics -n kafka
# 사용 스크립트
kafka-console-producer.sh
kafka-console-consumer.sh
# 토픽에 데이터 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1
> hello
> world
> 0
> 1
> 2
CTRL+D 로 빠져나오기
# 토픽 데이터 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning
hello
world
0
1
2
CTRL+C 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property "parse.key=true" --property "key.separator=:"
>key1:doik1
>key2:doik2
>key3:doik3
CTRL+D 로 빠져나오기
# 토픽에 데이터(메시지키+메시지값) 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property print.key=true --property key.separator="-" --from-beginning
null-hello
null-world
null-0
null-1
null-2
key1-doik1
key2-doik2
key3-doik3
CTRL+C 로 빠져나오기
# 토픽에 데이터 최대 컨슘 메시지 갯수 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --max-messages 2 --from-beginning
doik2
hello
Processed a total of 2 messages
# 토픽에서 특정 파티션만 컨슘 확인
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --partition 0 --from-beginning
...
CTRL+C 로 빠져나오기
# 특정 오프셋 기준 컨슘 확인 등 다양한 조건의 확인 가능 >> 직접 찾아서 해보세요!
# 토픽 삭제 (kubectl native)
kubectl delete kafkatopics -n kafka mytopic1
# kafkacat 사용
# 정보 확인 : NodePort 접속
1
#docker run -it --network=host edenhill/kcat:1.7.1 -b YOUR_BROKER -L
NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')
NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})
2
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L
Metadata for all topics (from broker -1: 127.0.0.1:30356/bootstrap):
3 brokers:
broker 0 at 192.168.10.103:31478 (controller)
broker 2 at 192.168.10.101:31617
broker 1 at 192.168.10.102:30565
4 topics:
topic "mytopic2" with 2 partitions:
partition 0, leader 0, replicas: 0,2,1, isrs: 0,2,1
partition 1, leader 1, replicas: 1,2,0, isrs: 1,2,0
...
3
# 메시지 보내기 Producer mode (reads messages from stdin):
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -t mytopic2 -P
1
2
3
CTRL+D 로 종료
4
UI에서 확인
kafka-consumer-groups.sh
1
# 토픽에 데이터 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF
101
102
103
104
105
106
107
108
109
110
EOF
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF
AAA
BBB
CCC
DDD
EOF
2
# 컨슈머 그룹 확인
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --list
__strimzi-topic-operator-kstreams
# 컨슈머 그룹 기반으로 동작, 특정 목적을 가진 컨슈머들을 묶음으로 사용하는 것. 컨슈머그룹으로 토픽의 레코드를 가져갈 경우 어느 레코드까지 읽었는지에 대한 데이터가 브로커에 저장됨
3
## 컨슈머 그룹은 따로 생성하는 명령을 적용하는 것이 아니라, 컨슈머를 동작할 때 컨슈머 그룹이름을 지정하면 새로 생성됨
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup --from-beginning
...
CTRL+C 로 빠져나오기
4
# 컨슈머 그룹 상태 확인
## 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인 가능
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
Consumer group 'mygroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 84 84 0 - - -
mygroup mytopic2 1 70 70 0 - - -
# 오프셋 리셋 : 가능한 이유?
>> 컨슈머는 유연하게 메시지를 가져갈 수 있다!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --topic mytopic2 --reset-offsets --to-earliest --execute
GROUP TOPIC PARTITION NEW-OFFSET
mygroup mytopic2 0 0
mygroup mytopic2 1 0
# 다시 컨슈머 그룹 상태 확인 : LAG 확인됨!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
Consumer group 'mygroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 0 84 84 - - -
mygroup mytopic2 1 0 70 70 - - -
5
# 모니터링
while true; do kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe;echo "-----"; sleep 0.5; done
# 컨슈머 그룹 메시지 소비하기 : LAG 확인!
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup
...
CTRL+C 로 빠져나오기
# 다시 컨슈머 그룹 상태 확인 : LAG 0 확인!, CONSUMER-ID 확인!
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 84 84 0 console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7 console-consumer
mygroup mytopic2 1 70 70 0 console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7 console-consumer
로그 파일에 메시지를 저장하기 때문에 일정 기간 동안은 컨슈머들이 데이터를 가져갈 수 있다!
1
# 파드와 노드 매칭 확인
kubectl get pod -n kafka -owide | grep kafka
my-cluster-kafka-0 1/1 Running 0 51m 192.168.1.92 ip-192-168-1-83.ap-northeast-2.compute.internal <none> <none>
my-cluster-kafka-1 1/1 Running 0 51m 192.168.2.166 ip-192-168-2-24.ap-northeast-2.compute.internal <none> <none>
my-cluster-kafka-2 1/1 Running 0 51m 192.168.3.68 ip-192-168-3-206.ap-northeast-2.compute.internal <none> <none>
# 카프카 설정 확인
kubectl describe cm -n kafka my-cluster-kafka-config
...
##########
# Kafka message logs configuration >> 로그 디렉터리
##########
log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}
# 로그 저장소 확인 : 특정 토픽(파티션 개수에 따른 폴더 생성됨)에 세그먼트 확인
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0/mytopic2-0
kubectl exec -it -n kafka my-cluster-kafka-1 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log1
kubectl exec -it -n kafka my-cluster-kafka-2 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log2
drwxr-xr-x - ubuntu 3 Jun 10:30 └── kafka-log2
...
.rw-r--r-- 4 ubuntu 4 Jun 00:23 ├── cleaner-offset-checkpoint
.rw-r--r-- 4 ubuntu 4 Jun 01:31 ├── log-start-offset-checkpoint
.rw-r--r-- 88 ubuntu 3 Jun 22:48 ├── meta.properties
drwxr-xr-x - ubuntu 4 Jun 00:34 ├── mytopic2-0
.rw-r--r-- 10M ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.index
.rw-r--r-- 105k ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.log
.rw-r--r-- 10M ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.timeindex
.rw-r--r-- 8 ubuntu 4 Jun 00:34 │ ├── leader-epoch-checkpoint
.rw-r--r-- 43 ubuntu 3 Jun 23:56 │ └── partition.metadata
drwxr-xr-x - ubuntu 4 Jun 00:34 ├── mytopic2-1
.rw-r--r-- 10M ubuntu 4 Jun 00:38 │ ├── 00000000000000000000.index
.rw-r--r-- 5.2k ubuntu 4 Jun 00:43 │ ├── 00000000000000000000.log
.rw-r--r-- 10M ubuntu 4 Jun 00:38 │ ├── 00000000000000000000.timeindex
.rw-r--r-- 8 ubuntu 4 Jun 00:34 │ ├── leader-epoch-checkpoint
.rw-r--r-- 43 ubuntu 3 Jun 23:58 │ └── partition.metadata
.rw-r--r-- 1.3k ubuntu 4 Jun 01:31 ├── recovery-point-offset-checkpoint
.rw-r--r-- 1.3k ubuntu 4 Jun 01:32 └── replication-offset-checkpoint
# xxd 툴로 00000000000000000000.log 의 hexdump 내용 확인 : 보낸 메시지 내용 확인, 로그 파일에 저장된 메시지는 컨슈머가 읽어갈 수 있음
kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- cat /var/lib/kafka/data-0/kafka-log0/mytopic2-0/00000000000000000000.log | xxd
...
00000040: 0001 0a68 656c 6c6f 0000 0000 0000 0000 ...hello........
00000050: 0100 0000 3d00 0000 0002 5259 97e5 0000 ....=.....RY....
00000060: 0000 0000 0000 0181 2536 7afb 0000 0181 ........%6z.....
00000070: 2536 7afb 0000 0000 0000 03e8 0000 0000 %6z.............
00000080: 0001 0000 0001 1600 0000 010a 776f 726c ............worl
00000090: 6400 0000 0000 0000 0002 0000 0039 0000 d............9..
# 토픽 정보 확인 : kakfa-ui
# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션0
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 0 --property print.key=true --property key.separator=":"
# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션1
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 1 --property print.key=true --property key.separator=":"
# 토픽에 데이터(메시지키+메시지값) 넣어보기
kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 --property "parse.key=true" --property "key.separator=:" <<EOF
key1:0
key1:1
key1:2
key2:3
key2:4
key2:5
EOF
# 모니터링 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인
kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
mygroup mytopic2 0 225 228 3 - - -
mygroup mytopic2 1 75 78 3 - - -
# 토픽 삭제 (kubectl native)
kubectl delete kafkatopics -n kafka mytopic1
kubectl delete kafkatopics -n kafka mytopic2
1
# 모니터링
watch -d kubectl get pod -owide -n kafka
kubectl logs -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
2
# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, ISR=2, envsubst 활용
TOPICNAME=mytopic3 envsubst < mytopic.yaml | kubectl apply -f - -n kafka
# 토픽 정보 확인 : 컨트롤러 브로커 위치 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka -owide
NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')
NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})
docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L -t mytopic3 | grep controller
broker 0 at ec2-3-36-53-67.ap-northeast-2.compute.amazonaws.com:31498 (controller) >> 해당
유동 공인IP를 가진 EC2 찾기
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1
# 메시지 받기 : script 혹은 kafka-ui
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
for ((i=1; i<=100; i++)); do echo "failover-test1-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
3
# 강제로 컨트롤러 브로커 파드 삭제(위치 확인) : 오퍼레이터가 annotate 설정을 모니터링 주기(2분)가 있어서 시간이 지나면 삭제가 실행됨
kubectl annotate pod -n kafka my-cluster-kafka-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-1 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
혹은
kubectl annotate pod -n kafka my-cluster-kafka-2 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
# 메시지 보내고 받기 가능!
...
failover-test1-415
% Reached end of topic mytopic3 [0] at offset 491
failover-test1-416
...
4
추가 삭제~
# 강제로 주키퍼 파드 삭제
kubectl annotate pod -n kafka my-cluster-zookeeper-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w
5
카프카가 잘 되는가???
# 메시지 보내고 받기 가능!
프로듀셔와 컴슈밍이 잘되는지 확인 !
1
# 모니터링
watch kubectl get pod -owide -n kafka
kubetail -n kafka -l name=strimzi-cluster-operator -f # Reconciliation 로그 확인
2
# 카프카 토픽 정보 확인 : 리더파드가 있는 워커노드 위치 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1
3
# test 토픽 리더 kafka 파드의 워커노드 확인
kubectl get pod -owide -n kafka | grep kafka
# (터미널2) 메시지 받기
kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning
# (터미널1) for문 반복 메시지 보내기
for ((i=1; i<=100; i++)); do echo "failover-test2-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test2-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done
# test 토픽 리더 kafka 파드의 워커노드에서 drain : test topic leader pod evict
# kubectl drain <<노드>> --ignore-daemonsets --delete-emptydir-data
kubectl get node
NODE=<각자 자신의 EC2 노드 이름 지정>
NODE=ip-192-168-3-96.ap-northeast-2.compute.internal
kubectl drain $NODE --delete-emptydir-data --force --ignore-daemonsets && kubectl get node -w
# 해당 워커노드 drain 확인
kubectl get kafka,strimzipodsets -n kafka
kubectl get node
# kafka 파드 상태
kubectl get pod -l app.kubernetes.io/name=kafka -n kafka
# 카프카 토픽 정보 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe
Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1
Topic: mytopic3Partition: 0Leader: 0Replicas: 1,0,2Isr: 2,0 # 브로커1는 not in-sync 상태
# ISR min.insync.replicas=3 으로 증가 후 메시지 보내고 받기 확인 >> ISR 기능에 대한 이해를 하자!
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=3
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# ISR min.insync.replicas=2 으로 설정 수정
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=2
# 메시지 보내고 받기 확인
kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"
# 동작 확인 후 uncordon 설정
kubectl get kafka,strimzipodsets -n kafka
kubectl uncordon $NODE
https://brunch.co.kr/@topasvga/3530
위 내용은 주말 CloudNet 스터디 내용 참고하여 정리한 부분입니다.
https://gasidaseo.notion.site/gasidaseo/CloudNet-Blog-c9dfa44a27ff431dafdd2edacc8a1863