brunch

You can make anything
by writing

C.S.Lewis

by Master Seo Nov 14. 2023

EKS9탄-13. EKS DB -Kafka-13/18


실시간 데이터를 받아  소비하는 카프카에 대해 알아보자.

 


<1> 카프카 사용 사례

<2> 카프카 특징

<3> Strimzi 소개 = 카프카 오퍼레이트 

<4> 카프카 클러스터 생성 (Zookeeper)

<5> 테스트용 파드 생성 후 카프카 클러스터 정보 확인

<6> Kafka UI - 링크

<7> (참고) 모니터링 : 프로메테우스 & 그라파나 






<1> 카프카 사용 사례


1

if(kakao) 2021 : 스마트 메시지 서비스 개발기 - 링크

카프카 스트림즈 사용 : 사용자의 반응 로그(imp, click)를 취합하여 저장하고 유저 데이터를 매핑하여 처리하는 용도



제네시스 – 광고추천팀의 카프카 기반 스트리밍 데이터 플랫폼

https://tech.kakao.com/2022/04/13/kafka-connect-streaming-data-platform/



2

if(kakao) 2019 : 카카오 T 대 “서포터즈 기사" 개발 사례 - 링크

이점 : 메시지 흐름 단순화, 메시지의 영속성 보장, 하나의 이벤트 메시지를 동시에 여러 서비스에서 소비, 메시지 버저닝



3

[넷플릭스 기술 블로그] 스트림 프로세싱 플랫폼 - 링크

Anatomy of a single streaming job



4

[DEVIEW 2021] 데이터 드리븐으로 서비스 운영 100% 자동화하기 : 로깅 데이터와 카프카를 활용 - 링크

적용 예시 : 쿠키런! 킹덩에서 레벨 3을 달성하면 보석 100개를 드립니다 → 게임 데이터를 연산 후 미션 달성 판단 시 보상 재화를 인벤토리로 지급



5

[DEVIEW 2021] 하루 n억개 웹툰 로그를 처리하는 Realtime Application 만들기 - 링크




<2> 카프카 특징


1

높은 처리량 : 묶음 단위 배치 처리로 대용량 실시간 로그 데이터 처리 적합

확장성 : 브로커 스케일 인/아웃

영속성 : 다른 메시징 플랫폼과 다르게 전송받은 데이터를 메모리에 저장하지 않고 파일 시스템에 저장

고가용성 : 서버/브로커 장애 시 데이터 복제로 인해 지속적으로 데이터 처리 가능



2

KSQL - 별도의 코드를 작성하지 않고도 익숙한 SQL 기반으로 실시간 처리가 가능, 스트림/배치 처리

프로듀서 = Producer : 데이터(메시지, 이벤트)를 만들어서 카프카에 주는 쪽

acks : 프로듀서가 전송한 데이터를 얼마나 신뢰성 높게 저장할지 지정

min.insync.replicas 옵션값을 2로 설정했을 때부터 acks 를 all 로 설정하는 의미가 있다

컨슈머 = Consumer : 카프카로 부터 데이터(메시지, 이벤트)를 빼내서 소비하는 쪽

주키퍼 ZooKeeper : 분산 애플리케이션에서 (메타데이터를 관리하는) 코디네이터 역할을 하는 애플리케이션

브로커 Broker : 카프카 애플리케이션이 설치된 서버 또는 노드 ⇒ 묶어서 클러스터

브로커 역할= 컨트롤러 : 클러스터의 다수 브로커 중 한 대가 컨트롤러의 역할

카프카 커넥트 : 프로듀서 역할을 하는 소스 커넥터 source connector 와 컨슈머 역할을 하는 싱크 커넥터 sink connector 2가지로 나뉨

https://www.confluent.io/blog/apache-kafka-goes-1-0/





<3> Strimzi 소개 = 카프카 오퍼레이트 


1

카프카 소개

구성요소 : 카프카 클러스터, 주키퍼, 카프카 커넥트Connect, 미러메이커MirrorMaker, 카프카 브리지Bridge, 카프카 익스포터Exporter - 링크


2

카프카 주요 요소


주키퍼 ZooKeeper : 카프카의 **메타데이터** **관리** 및 **브로커**의 정상 **상태 점검** health check 을 담당

 IMPORTANT : **KRaft** mode is **not ready** for **production** in Apache **Kafka** or in **Strimzi**- Link

카프카 Kafka 또는 카프카 클러스터 Kafka cluster :  여러 대의 브로커를 구성한 클러스터를 의미

브로커 broker : 카프카 애플리케이션이 설치된 서버 또는 노드를 말함

프로듀서 producer : 카프카로 메시지를 보내는 역할을 하는 클라이언트를 총칭

컨슈머 consumer : 카프카에서 메시지를 꺼내가는 역할을 하는 클라이언트를 총칭

토픽 topic : 카프카는 메시지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유함

파티션 partition : 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것을 말함

세그먼트 segment : 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일

메시지 message 또는 레코드 record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말함


3

Strimzi 는 쿠버네티스 환경에서 카프카Kafka 운영 관리에 도움을 주는 Operator 입니다 


https://strimzi.io/docs/operators/in-development/overview.html


https://strimzi.io/downloads/


Strimzi v0.38.0 버전 출시 : Apache Kafka 지원 버전 - v3.5.0, v3.5.1, v3.6.0



4

Strimzi Operator 설치 with Helm : v0.38.0 - Chart


# 네임스페이스 생성

kubectl create namespace kafka



# Repo 추가

helm repo add strimzi https://strimzi.io/charts/


helm show values strimzi/strimzi-kafka-operator



# 차트 설치 : 오퍼레이터 파드 설치

helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka



# 배포한 리소스 확인 : Operator 디플로이먼트(파드)

kubectl get deploy,pod -n kafka

kubectl get-all -n kafka



# 오퍼레이터가 지원하는 카프카 버전 확인

kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3

      STRIMZI_KAFKA_IMAGES:                               3.5.0=quay.io/strimzi/kafka:0.38.0-kafka-3.5.0

                                                          3.5.1=quay.io/strimzi/kafka:0.38.0-kafka-3.5.1

                                                          3.6.0=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0



# 배포한 리소스 확인 

CRDs - 각각이 제공 기능으로 봐도됨!


kubectl get crd | grep strimzi


kafkabridges.kafka.strimzi.io                2023-11-17T01:48:44Z

kafkaconnectors.kafka.strimzi.io             2023-11-17T01:48:44Z

kafkaconnects.kafka.strimzi.io               2023-11-17T01:48:44Z

kafkamirrormaker2s.kafka.strimzi.io          2023-11-17T01:48:45Z

kafkamirrormakers.kafka.strimzi.io           2023-11-17T01:48:44Z

kafkanodepools.kafka.strimzi.io              2023-11-17T01:48:45Z

kafkarebalances.kafka.strimzi.io             2023-11-17T01:48:45Z

kafkas.kafka.strimzi.io                      2023-11-17T01:48:43Z

kafkatopics.kafka.strimzi.io                 2023-11-17T01:48:44Z

kafkausers.kafka.strimzi.io                  2023-11-17T01:48:44Z

strimzipodsets.core.strimzi.io               2023-11-17T01:48:44Z

// 스트림즈도 스테이트 풀셋을 안쓴다.



# (참고) CRD 상세 정보 확인

kubectl describe crd kafkas.kafka.strimzi.io

kubectl describe crd kafkatopics.kafka.strimzi.io



5

삭제

helm uninstall kafka-operator -n kafka && kubectl delete ns kafka





<4> 카프카 클러스터 생성 (Zookeeper)


기존 Statefulsets 대신 → StrimziPodSets 이 기본 설정이다.


StrimziPodSets(sps) 특징  ?

: sts 중간 파드 바로 삭제 불가, 파드 Spec 동일해야됨 (볼륨, CPU, Memory), 아키텍처 구성 용이

https://learnk8s.io/kafka-ha-kubernetes



kafka-1.yaml


apiVersion: kafka.strimzi.io/v1beta2

kind: Kafka

metadata:

  name: my-cluster

spec:

  kafka:

    version: 3.6.0

    replicas: 3

    listeners:

      - name: plain

        port: 9092

        type: internal

        tls: false

      - name: tls

        port: 9093

        type: internal

        tls: false

      - name: external

        port: 9094

        type: nodeport

        tls: false

    readinessProbe:

      initialDelaySeconds: 15

      timeoutSeconds: 5

    livenessProbe:

      initialDelaySeconds: 15

      timeoutSeconds: 5

    config:

      offsets.topic.replication.factor: 3    # 리플리케이션 3개가 만들어진다.

      transaction.state.log.replication.factor: 3

      transaction.state.log.min.isr: 2

      default.replication.factor: 3

      min.insync.replicas: 2   # 미니멈 리플리카가 2다 되면 동작이 일어난다.

      inter.broker.protocol.version: "3.6"

    storage:

      type: jbod

      volumes:

      - id: 0

        type: persistent-claim

        size: 10Gi

        deleteClaim: true

    template

      pod: 

        affinity: 

          podAntiAffinity: 

            requiredDuringSchedulingIgnoredDuringExecution: 

              - labelSelector: 

                  matchExpressions: 

                    - key: app.kubernetes.io/name

                      operator: In

                      values: 

                        - kafka

                topologyKey: "topology.ebs.csi.aws.com/zone"      # 토폴로지를 파드 어피니티를 설정함.

  zookeeper:

    replicas: 3

    readinessProbe:

      initialDelaySeconds: 15

      timeoutSeconds: 5

    livenessProbe:

      initialDelaySeconds: 15

      timeoutSeconds: 5

    storage:

      type: persistent-claim

      size: 10Gi

      deleteClaim: true

    template: 

      pod: 

        affinity: 

          podAntiAffinity: 

            requiredDuringSchedulingIgnoredDuringExecution: 

              - labelSelector: 

                  matchExpressions: 

                    - key: app.kubernetes.io/name

                      operator: In

                      values: 

                        - zookeeper

                topologyKey: "topology.ebs.csi.aws.com/zone"

  entityOperator:      # 파드안에 유저등 관리한다.   pod/my-cluster-entity-operator-644-zlkdr   3/3  

    topicOperator: {}

    userOperator: {}




1

# (옵션) 신규 터미널 : 모니터링

watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka

kubectl logs deployment/strimzi-cluster-operator -n kafka -f



# 카프카 클러스터 YAML 파일 확인 : listeners(3개), podAntiAffinity

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml

cat kafka-1.yaml | yh



# 카프카 클러스터 배포 : 카프카(브로커 3개), 주키퍼(3개), entityOperator 디플로이먼트


## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원한다.

preferredDuringSchedulingIgnoredDuringExecution 미지원 한다?


kubectl apply -f kafka-1.yaml -n kafka



# 배포된 리소스들 확인

kubectl get-all -n kafka



2

# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨

kubectl get kafka -n kafka

AME         DESIRED KAFKA REPLICAS   DESIRED ZK REPLICAS   READY   WARNINGS

my-cluster   3                        3                     True



kubectl get cm,secret -n kafka



# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X

kubectl get strimzipodsets -n kafka



# 노드 정보 확인

kubectl describe node | more


kubectl get node --label-columns=topology.ebs.csi.aws.com/zone




kubectl describe pv | grep 'Node Affinity:' -A2



# 배포된 리소스 확인 : 배포된 파드 생성 확인

kubectl get pod -n kafka -l app.kubernetes.io/name=kafka

kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper

kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster



# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)

kubectl get svc,endpointslice -n kafka



# 배포된 리소스 확인 : 카프카/주키퍼 파드 저장소 확인

kubectl get pvc,pv -n kafka

kubectl df-pv



# 배포된 리소스 확인 : 컨피그맵 확인

kubectl get cm -n kafka



# 컨피그맵 상세 확인

kubectl describe cm -n kafka strimzi-cluster-operator

kubectl describe cm -n kafka my-cluster-zookeeper-config

kubectl describe cm -n kafka my-cluster-entity-topic-operator-config

kubectl describe cm -n kafka my-cluster-entity-user-operator-config

kubectl describe cm -n kafka my-cluster-kafka-0

kubectl describe cm -n kafka my-cluster-kafka-1

kubectl describe cm -n kafka my-cluster-kafka-2


...(생략)...

##########

# Node / Broker ID

##########

broker.id=${STRIMZI_BROKER_ID}

node.id=${STRIMZI_BROKER_ID}

##########

# Kafka message logs configuration >> 로그 디렉터리

##########

log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}

...

##########

# User provided configuration

##########

default.replication.factor=3

inter.broker.protocol.version=3.6

min.insync.replicas=2

offsets.topic.replication.factor=3

transaction.state.log.min.isr=2

transaction.state.log.replication.factor=3

log.message.format.version=3.6

...



3

# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보

kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq



# (옵션) NetworkPolicy 확인 >> 어떤 동작을 처리하는가?

kubectl get networkpolicy -n kafka

kubectl describe networkpolicy -n kafka



# (옵션) poddisruptionbudget 확인 >> 어떤 동작을 처리하는가?

kubectl get pdb -n kafka

kubectl describe pdb -n kafka





<5> 테스트용 파드 생성 후 카프카 클러스터 정보 확인


1

# 파일 다운로드

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml

cat myclient.yaml | yh


2

# 데몬셋으로 myclient 파드 배포 : 어떤 네임스페이스에 배포되는가?

VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -


kubectl get pod -l name=kafkaclient -owide



# Kafka client 에서 제공되는 kafka 관련 도구들 확인

kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin



3

# 카프카 파드의 SVC 도메인이름을 변수에 지정

SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092


echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile



# 브로커 정보

kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS

// 브로커 3개의 상세 정보가 나온다.



# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회


kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe


kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe


kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe



4

# 토픽 리스트 확인

kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list

__consumer_offsets

__strimzi-topic-operator-kstreams-topic-store-changelog

__strimzi_store_topic



5

# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR

kubectl get kafkatopics -n kafka





<6> Kafka UI - 링크


# 배포

helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts


cat <<EOF > kafkaui-values.yml

yamlApplicationConfig:

  kafka:

    clusters:

      - name: yaml

        bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092

  auth:

    type: disabled

  management:

    health:

      ldap:

        enabled: false

EOF



# 설치

helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml



# 접속 확인

kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'


kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui.$MyDomain"


echo -e "kafka-ui Web URL = http://kafka-ui.$MyDomain"





<7> (참고) 모니터링 : 프로메테우스 & 그라파나 


→ 단, 기존에 프로메테우스/그라파나 helm과 카프카 클러스터는 삭제 후 아래 과정 진행


https://anissetiouajni.com/posts/simplify-kafka-operational-tasks-with-strimzi-and-kubernetes/


https://blog.naver.com/qwerty_1234s/222768094790



1. exporter 설정된 카프카 클러스터 배포


# exporter 관련 설정 확인

curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-2.yaml

cat kafka-2.yaml | yh

# exporter 설정된 카프카 클러스터 배포

kubectl apply -f kafka-2.yaml -n kafka



2. 예제 코드 복사


#

git clone https://github.com/AmarendraSingh88/kafka-on-kubernetes.git

cd kafka-on-kubernetes/kafka-demo/demo3-monitoring/

tree



3. 프로메테우스 설치


# 프로메테우스 설치 : --server-side 는 왜 쓸까요? 안쓰면 어떻게 될까요?

kubectl apply -f prometheus-operator-deployment.yaml -n monitoring --server-side

kubectl apply -f prometheus.yaml -n monitoring

kubectl apply -f prometheus-rules.yaml -n monitoring

kubectl apply -f strimzi-pod-monitor.yaml -n monitoring




4. 그라파나 설치


# 그라파나 설치

kubectl apply -f grafana/grafana.yaml -n monitoring

kubectl patch svc -n monitoring grafana -p '{"spec":{"type":"LoadBalancer"}}'

kubectl annotate service grafana -n monitoring "external-dns.alpha.kubernetes.io/hostname=grafana.$MyDomain"

# 접속 정보 확인

echo -e "Grafana URL = http://grafana.$MyDomain:3000"




그라파나 웹 접속 : admin / admin

그라파나 데이터 소스 설정 : 프로메테우스파드-0 에 헤드리스 접속 주소 입력 후 연결 확인


prometheus-prometheus-0.prometheus-operated.monitoring.svc.cluster.local:9090



그라파나 대시보드 추가 : 아래 파일 다운 로드 후 Import 에서 내용 입력 - Link



5. 모니터링 확인



6. (삭제 시 참고) 그라파나 서비스(Service:CLB) 삭제 :  kubectl delete svc -n monitoring grafana





위 내용은  주말 CloudNet  스터디 내용 참고하여  정리한 부분입니다.

https://gasidaseo.notion.site/gasidaseo/CloudNet-Blog-c9dfa44a27ff431dafdd2edacc8a1863  



다음

https://brunch.co.kr/@topasvga/3529



브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari