brunch

You can make anything
by writing

C.S.Lewis

by Master Seo Nov 14. 2023

38탄-14. EKS DB -Kafka 2/3


<1> 토픽 생성 및 관리

<2> 토픽에 메시지 보내고 받기 : kafka-ui 해당 topic 에서 Live Mode 로 확인

<3> KafkaCat 툴사용해보기  : NodePort로 접속 확인 - GithubBlog

<4> 로그 세그먼트

<5> 메시지 키를 통해서 정해진 파티션을 통해서 메시지 전달 순서를 보장 및 분산 확인

<6> 장애 발생 1 및 동작 확인: 강제로 kafka or Zookeeper 파드 1개 삭제

<7> 장애 발생 2 : 강제로 토픽 리더 파드가 있는 노드를 drain = 노드 장애





<1> 토픽 생성 및 관리


mytopic.yaml


apiVersion: kafka.strimzi.io/v1beta2

kind: KafkaTopic

metadata:

  name: ${TOPICNAME}

  labels:

    strimzi.io/cluster: "my-cluster"

spec:

  partitions: 1

  replicas: 3

  config:

    retention.ms: 7200000

    segment.bytes: 1073741824

    min.insync.replicas: 2




2

# 토픽 모니터링 : Kafka-UI 같이 확인 >> 설정 반응이 조금 느릴 수 있음

watch -d kubectl get kafkatopics -n kafka




# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, envsubst 활용

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/3/mytopic.yaml

cat mytopic.yaml | yh


TOPICNAME=mytopic1 envsubst < mytopic.yaml | kubectl apply -f - -n kafka



3

# 토픽 생성 확인 (kubectl native)

kubectl get kafkatopics -n kafka

NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY

mytopic1   my-cluster   1            3

...

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic

mytopic1



# 토픽 상세 정보 확인 : 설정값 미 지정 시 기본값이 적용

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic1 --describe

Topic: mytopic1

TopicId: hz--p3HXTRq-54EVuuY68w

PartitionCount: 1

ReplicationFactor: 3

Configs: min.insync.replicas=2

segment.bytes=1073741824

retention.ms=7200000

message.format.version=3.0-IV1

Topic: mytopic1Partition: 0Leader: 2Replicas: 2,0,1Isr: 2,0,1



4

# 토픽 Topic 생성 : 파티션 1개 리플리케이션 3개

kubectl exec -it ds/myclient -- kafka-topics.sh --create --bootstrap-server $SVCDNS --topic mytopic2 --partitions 1 --replication-factor 3 --config retention.ms=172800000



# 토픽 생성 확인

kubectl get kafkatopics -n kafka

NAME       CLUSTER      PARTITIONS   REPLICATION FACTOR   READY

mytopic1   my-cluster   1            3                    True

mytopic2   my-cluster   1            3                    True

...


kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list | grep mytopic

mytopic1

mytopic2




# 토픽 상세 정보 확인

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe

Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1

Topic: mytopic2Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1



5

# 토픽의 파티션 갯수 늘리기

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 2


kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe

Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 2ReplicationFactor: 3Configs: min.insync.replicas=2,retention.ms=172800000,message.format.version=3.0-IV1

Topic: mytopic2  Partition: 0  Leader: 0  Replicas: 0,2,1Isr: 0,2,1

Topic: mytopic2  Partition: 1  Leader: 1  Replicas: 1,2,0Isr: 1,2,0




# Kafka-UI 같이 확인


# 실습 구성도 그림 확인


# 토픽의 파티션 갯수 줄이기(안됨)

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter --partitions 1

Error while executing topic command : Topic currently has 2 partitions, which is higher than the requested 1.

[2022-06-03 14:59:31,427] ERROR org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.

 (kafka.admin.TopicCommand$)



# 토픽 일부 옵션 설정 : min.insync.replicas=2 를 min.insync.replicas=3 으로 수정


kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=3


kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic2 --describe

Topic: mytopic2TopicId: 965ASQDmQfiuIxPiiC9RPQPartitionCount: 2ReplicationFactor: 3Configs: min.insync.replicas=3,retention.ms=172800000,message.format.version=3.0-IV1

Topic: mytopic2Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1

Topic: mytopic2Partition: 1Leader: 1Replicas: 1,2,0Isr: 1,2,0




# 토픽 일부 옵션 설정 : 다음 실습을 위해 min.insync.replicas=2 로 다시 수정


kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic2 --alter -add-config min.insync.replicas=2









<2> 토픽에 메시지 보내고 받기 : kafka-ui 해당 topic 에서 Live Mode 로 확인


kafka-ui  

 >  mytopic1 > Message >  Live mode로 변경하라.



# 토픽 모니터링

watch -d kubectl get kafkatopics -n kafka



# 사용 스크립트

kafka-console-producer.sh

kafka-console-consumer.sh



# 토픽에 데이터 넣어보기

kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1

> hello

> world

> 0

> 1

> 2

CTRL+D 로 빠져나오기



# 토픽 데이터 확인

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --from-beginning

hello

world

0

1

2

CTRL+C 로 빠져나오기





# 토픽에 데이터(메시지키+메시지값) 넣어보기


kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property "parse.key=true" --property "key.separator=:"

>key1:doik1

>key2:doik2

>key3:doik3

CTRL+D 로 빠져나오기




# 토픽에 데이터(메시지키+메시지값) 확인


kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --property print.key=true --property key.separator="-" --from-beginning

null-hello

null-world

null-0

null-1

null-2

key1-doik1

key2-doik2

key3-doik3

CTRL+C 로 빠져나오기



# 토픽에 데이터 최대 컨슘 메시지 갯수 확인


kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --max-messages 2 --from-beginning

doik2

hello

Processed a total of 2 messages



# 토픽에서 특정 파티션만 컨슘 확인

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic1 --partition 0 --from-beginning

...

CTRL+C 로 빠져나오기



# 특정 오프셋 기준 컨슘 확인 등 다양한 조건의 확인 가능 >> 직접 찾아서 해보세요!



# 토픽 삭제 (kubectl native)

kubectl delete kafkatopics -n kafka mytopic1





<3> KafkaCat 툴사용해보기  : NodePort로 접속 확인 - GithubBlog



# kafkacat 사용


# 정보 확인 : NodePort 접속


1

#docker run -it --network=host edenhill/kcat:1.7.1 -b YOUR_BROKER -L

NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')


NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})



2

docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L

Metadata for all topics (from broker -1: 127.0.0.1:30356/bootstrap):

 3 brokers:

  broker 0 at 192.168.10.103:31478 (controller)

  broker 2 at 192.168.10.101:31617

  broker 1 at 192.168.10.102:30565

4 topics:

  topic "mytopic2" with 2 partitions:

    partition 0, leader 0, replicas: 0,2,1, isrs: 0,2,1

    partition 1, leader 1, replicas: 1,2,0, isrs: 1,2,0

...


3

# 메시지 보내기 Producer mode (reads messages from stdin):

docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -t mytopic2 -P

1

2

3

CTRL+D 로 종료



4

UI에서 확인






<4> 컨슈머 그룹



kafka-consumer-groups.sh


1

# 토픽에 데이터 넣어보기

kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF

101

102

103

104

105

106

107

108

109

110

EOF



kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 <<EOF

AAA

BBB

CCC

DDD

EOF


2

# 컨슈머 그룹 확인

kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --list

__strimzi-topic-operator-kstreams



# 컨슈머 그룹 기반으로 동작, 특정 목적을 가진 컨슈머들을 묶음으로 사용하는 것. 컨슈머그룹으로 토픽의 레코드를 가져갈 경우 어느 레코드까지 읽었는지에 대한 데이터가 브로커에 저장됨


3

## 컨슈머 그룹은 따로 생성하는 명령을 적용하는 것이 아니라, 컨슈머를 동작할 때 컨슈머 그룹이름을 지정하면 새로 생성됨

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup --from-beginning

...

CTRL+C 로 빠져나오기



4

# 컨슈머 그룹 상태 확인


## 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인 가능


kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe

Consumer group 'mygroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mygroup         mytopic2        0          84              84              0               -               -               -

mygroup         mytopic2        1          70              70              0               -               -               -



# 오프셋 리셋 : 가능한 이유?

 >> 컨슈머는 유연하게 메시지를 가져갈 수 있다!

kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --topic mytopic2 --reset-offsets --to-earliest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET

mygroup                        mytopic2                       0          0

mygroup                        mytopic2                       1          0



# 다시 컨슈머 그룹 상태 확인 : LAG 확인됨!

kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe

Consumer group 'mygroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mygroup         mytopic2        0          0               84              84              -               -               -

mygroup         mytopic2        1          0               70              70              -               -               -



5

# 모니터링

while true; do kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe;echo "-----"; sleep 0.5; done



# 컨슈머 그룹 메시지 소비하기 : LAG 확인!

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --group mygroup

...

CTRL+C 로 빠져나오기



# 다시 컨슈머 그룹 상태 확인 : LAG 0 확인!, CONSUMER-ID 확인!

kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                           HOST            CLIENT-ID

mygroup         mytopic2        0          84              84              0               console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7     console-consumer

mygroup         mytopic2        1          70              70              0               console-consumer-d1137e74-0d1b-4d62-9b90-5142200f2ae7 /172.16.0.7     console-consumer








<4> 로그 세그먼트


로그 파일에 메시지를 저장하기 때문에 일정 기간 동안은 컨슈머들이 데이터를 가져갈 수 있다!



1

# 파드와 노드 매칭 확인

kubectl get pod -n kafka -owide | grep kafka


my-cluster-kafka-0                            1/1     Running   0          51m   192.168.1.92    ip-192-168-1-83.ap-northeast-2.compute.internal    <none>           <none>


my-cluster-kafka-1                            1/1     Running   0          51m   192.168.2.166   ip-192-168-2-24.ap-northeast-2.compute.internal    <none>           <none>


my-cluster-kafka-2                            1/1     Running   0          51m   192.168.3.68    ip-192-168-3-206.ap-northeast-2.compute.internal   <none>           <none>





# 카프카 설정 확인

kubectl describe cm -n kafka my-cluster-kafka-config

...

##########



# Kafka message logs configuration  >> 로그 디렉터리

##########

log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}



# 로그 저장소 확인 : 특정 토픽(파티션 개수에 따른 폴더 생성됨)에 세그먼트 확인


kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0


kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log0/mytopic2-0


kubectl exec -it -n kafka my-cluster-kafka-1 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log1


kubectl exec -it -n kafka my-cluster-kafka-2 -c kafka -- ls -al /var/lib/kafka/data-0/kafka-log2

drwxr-xr-x     - ubuntu  3 Jun 10:30  └── kafka-log2

...

.rw-r--r--     4 ubuntu  4 Jun 00:23     ├── cleaner-offset-checkpoint

.rw-r--r--     4 ubuntu  4 Jun 01:31     ├── log-start-offset-checkpoint

.rw-r--r--    88 ubuntu  3 Jun 22:48     ├── meta.properties

drwxr-xr-x     - ubuntu  4 Jun 00:34     ├── mytopic2-0

.rw-r--r--   10M ubuntu  4 Jun 00:43     │  ├── 00000000000000000000.index

.rw-r--r--  105k ubuntu  4 Jun 00:43     │  ├── 00000000000000000000.log

.rw-r--r--   10M ubuntu  4 Jun 00:43     │  ├── 00000000000000000000.timeindex

.rw-r--r--     8 ubuntu  4 Jun 00:34     │  ├── leader-epoch-checkpoint

.rw-r--r--    43 ubuntu  3 Jun 23:56     │  └── partition.metadata

drwxr-xr-x     - ubuntu  4 Jun 00:34     ├── mytopic2-1

.rw-r--r--   10M ubuntu  4 Jun 00:38     │  ├── 00000000000000000000.index

.rw-r--r--  5.2k ubuntu  4 Jun 00:43     │  ├── 00000000000000000000.log

.rw-r--r--   10M ubuntu  4 Jun 00:38     │  ├── 00000000000000000000.timeindex

.rw-r--r--     8 ubuntu  4 Jun 00:34     │  ├── leader-epoch-checkpoint

.rw-r--r--    43 ubuntu  3 Jun 23:58     │  └── partition.metadata

.rw-r--r--  1.3k ubuntu  4 Jun 01:31     ├── recovery-point-offset-checkpoint

.rw-r--r--  1.3k ubuntu  4 Jun 01:32     └── replication-offset-checkpoint



# xxd 툴로 00000000000000000000.log 의 hexdump 내용 확인 : 보낸 메시지 내용 확인, 로그 파일에 저장된 메시지는 컨슈머가 읽어갈 수 있음



kubectl exec -it -n kafka my-cluster-kafka-0 -c kafka -- cat /var/lib/kafka/data-0/kafka-log0/mytopic2-0/00000000000000000000.log | xxd


...

00000040: 0001 0a68 656c 6c6f 0000 0000 0000 0000  ...hello........

00000050: 0100 0000 3d00 0000 0002 5259 97e5 0000  ....=.....RY....

00000060: 0000 0000 0000 0181 2536 7afb 0000 0181  ........%6z.....

00000070: 2536 7afb 0000 0000 0000 03e8 0000 0000  %6z.............

00000080: 0001 0000 0001 1600 0000 010a 776f 726c  ............worl

00000090: 6400 0000 0000 0000 0002 0000 0039 0000  d............9..




<5> 메시지 키를 통해서 정해진 파티션을 통해서 메시지 전달 순서를 보장 및 분산 확인


# 토픽 정보 확인 : kakfa-ui



# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션0


kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 0 --property print.key=true --property key.separator=":"



# 토픽에 데이터(메시지키+메시지값) 컨슈머 확인 : 파티션1

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic2 --partition 1 --property print.key=true --property key.separator=":" 



# 토픽에 데이터(메시지키+메시지값) 넣어보기

kubectl exec -it ds/myclient -- kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic2 --property "parse.key=true" --property "key.separator=:" <<EOF

key1:0

key1:1

key1:2

key2:3

key2:4

key2:5

EOF



# 모니터링 파티션 번호, 현재까지 가져간 레코드의 오프셋, 파티션 마지막 레코드의 오프셋, 컨슈머 랙 LAG, 컨슈머 ID, 호스트 정보 확인

kubectl exec -it ds/myclient -- kafka-consumer-groups.sh --bootstrap-server $SVCDNS --group mygroup --describe

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID

mygroup         mytopic2        0          225             228             3               -               -               -

mygroup         mytopic2        1          75              78              3               -               -               -



# 토픽 삭제 (kubectl native)

kubectl delete kafkatopics -n kafka mytopic1

kubectl delete kafkatopics -n kafka mytopic2




<6> 장애 발생 1 및 동작 확인: 강제로 kafka or Zookeeper 파드 1개 삭제


1

# 모니터링

watch -d kubectl get pod -owide -n kafka



kubectl logs -n kafka -l name=strimzi-cluster-operator -f   # Reconciliation 로그 확인



2

# 토픽 Topic 생성 (kubectl native) : 파티션 1개 리플리케이션 3개, ISR=2, envsubst 활용

TOPICNAME=mytopic3 envsubst < mytopic.yaml | kubectl apply -f - -n kafka



# 토픽 정보 확인 : 컨트롤러 브로커 위치 확인


kubectl get pod -n kafka -l app.kubernetes.io/name=kafka -owide


NODE1IP=$(kubectl get node -owide | grep 192.168.1 | awk '{print $6}')


NODEPORT=$(kubectl get svc -n kafka my-cluster-kafka-external-bootstrap -o jsonpath={.spec.ports[0].nodePort})


docker run -it --rm --network=host edenhill/kcat:1.7.1 -b $NODE1IP:$NODEPORT -L -t mytopic3 | grep controller

  broker 0 at ec2-3-36-53-67.ap-northeast-2.compute.amazonaws.com:31498 (controller) >> 해당 


유동 공인IP를 가진 EC2 찾기

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe

Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1

Topic: mytopic3Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1



# 메시지 받기 : script 혹은 kafka-ui

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning



# (터미널1) for문 반복 메시지 보내기

kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"


for ((i=1; i<=100;  i++)); do echo "failover-test1-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test1-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done



3

# 강제로 컨트롤러 브로커 파드 삭제(위치 확인) : 오퍼레이터가 annotate 설정을 모니터링 주기(2분)가 있어서 시간이 지나면 삭제가 실행됨

kubectl annotate pod -n kafka my-cluster-kafka-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w

혹은

kubectl annotate pod -n kafka my-cluster-kafka-1 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w

혹은

kubectl annotate pod -n kafka my-cluster-kafka-2 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w



# 메시지 보내고 받기 가능!

...

failover-test1-415

% Reached end of topic mytopic3 [0] at offset 491

failover-test1-416

...



4

추가 삭제~


# 강제로 주키퍼 파드 삭제

kubectl annotate pod -n kafka my-cluster-zookeeper-0 strimzi.io/delete-pod-and-pvc=true && kubectl get pv -w



5

카프카가 잘 되는가???


# 메시지 보내고 받기 가능!

프로듀셔와 컴슈밍이 잘되는지 확인 !





<7> 장애 발생 2 : 강제로 토픽 리더 파드가 있는 노드를 drain = 노드 장애



1

# 모니터링

watch kubectl get pod -owide -n kafka

kubetail -n kafka -l name=strimzi-cluster-operator -f   # Reconciliation 로그 확인



2

# 카프카 토픽 정보 확인 : 리더파드가 있는 워커노드 위치 확인


kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe

Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1

Topic: mytopic3Partition: 0Leader: 0Replicas: 0,2,1Isr: 0,2,1


3

# test 토픽 리더 kafka 파드의 워커노드 확인

kubectl get pod -owide -n kafka  | grep kafka





# (터미널2) 메시지 받기

kubectl exec -it ds/myclient -- kafka-console-consumer.sh --bootstrap-server $SVCDNS --topic mytopic3 --from-beginning



# (터미널1) for문 반복 메시지 보내기

for ((i=1; i<=100;  i++)); do echo "failover-test2-$i" ; kubectl exec -it ds/myclient -- sh -c "echo test2-$i | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3" ; date ; done





# test 토픽 리더 kafka 파드의 워커노드에서 drain : test topic leader pod evict



# kubectl drain <<노드>> --ignore-daemonsets --delete-emptydir-data

kubectl get node

NODE=<각자 자신의 EC2 노드 이름 지정>


NODE=ip-192-168-3-96.ap-northeast-2.compute.internal

kubectl drain $NODE --delete-emptydir-data --force --ignore-daemonsets && kubectl get node -w



# 해당 워커노드 drain 확인

kubectl get kafka,strimzipodsets -n kafka

kubectl get node 



# kafka 파드 상태

kubectl get pod -l app.kubernetes.io/name=kafka -n kafka



# 카프카 토픽 정보 확인

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --topic mytopic3 --describe

Topic: mytopic3TopicId: 077wfV5dSnORaZrLh3WLAwPartitionCount: 1ReplicationFactor: 3Configs: min.insync.replicas=2,segment.bytes=1073741824,retention.ms=7200000,message.format.version=3.0-IV1

Topic: mytopic3Partition: 0Leader: 0Replicas: 1,0,2Isr: 2,0  # 브로커1는 not in-sync 상태



# ISR min.insync.replicas=3 으로 증가 후 메시지 보내고 받기 확인 >> ISR 기능에 대한 이해를 하자!

kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=3



# 메시지 보내고 받기 확인

kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"



# ISR min.insync.replicas=2 으로 설정 수정

kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --topic mytopic3 --alter -add-config min.insync.replicas=2



# 메시지 보내고 받기 확인

kubectl exec -it ds/myclient -- sh -c "echo mytest | kafka-console-producer.sh --bootstrap-server $SVCDNS --topic mytopic3"



# 동작 확인 후 uncordon 설정

kubectl get kafka,strimzipodsets -n kafka

kubectl uncordon $NODE





다음

https://brunch.co.kr/@topasvga/3530




위 내용은  주말 CloudNet  스터디 내용 참고하여  정리한 부분입니다.

https://gasidaseo.notion.site/gasidaseo/CloudNet-Blog-c9dfa44a27ff431dafdd2edacc8a1863  



매거진의 이전글 38탄-13. EKS DB -Kafka 1/3
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari