실시간 데이터를 받아 소비하는 카프카에 대해 알아보자.
1
if(kakao) 2021 : 스마트 메시지 서비스 개발기 - 링크
카프카 스트림즈 사용 : 사용자의 반응 로그(imp, click)를 취합하여 저장하고 유저 데이터를 매핑하여 처리하는 용도
제네시스 – 광고추천팀의 카프카 기반 스트리밍 데이터 플랫폼
https://tech.kakao.com/2022/04/13/kafka-connect-streaming-data-platform/
2
if(kakao) 2019 : 카카오 T 대 “서포터즈 기사" 개발 사례 - 링크
이점 : 메시지 흐름 단순화, 메시지의 영속성 보장, 하나의 이벤트 메시지를 동시에 여러 서비스에서 소비, 메시지 버저닝
3
[넷플릭스 기술 블로그] 스트림 프로세싱 플랫폼 - 링크
Anatomy of a single streaming job
4
[DEVIEW 2021] 데이터 드리븐으로 서비스 운영 100% 자동화하기 : 로깅 데이터와 카프카를 활용 - 링크
적용 예시 : 쿠키런! 킹덩에서 레벨 3을 달성하면 보석 100개를 드립니다 → 게임 데이터를 연산 후 미션 달성 판단 시 보상 재화를 인벤토리로 지급
5
[DEVIEW 2021] 하루 n억개 웹툰 로그를 처리하는 Realtime Application 만들기 - 링크
1
높은 처리량 : 묶음 단위 배치 처리로 대용량 실시간 로그 데이터 처리 적합
확장성 : 브로커 스케일 인/아웃
영속성 : 다른 메시징 플랫폼과 다르게 전송받은 데이터를 메모리에 저장하지 않고 파일 시스템에 저장
고가용성 : 서버/브로커 장애 시 데이터 복제로 인해 지속적으로 데이터 처리 가능
2
KSQL - 별도의 코드를 작성하지 않고도 익숙한 SQL 기반으로 실시간 처리가 가능, 스트림/배치 처리
프로듀서 = Producer : 데이터(메시지, 이벤트)를 만들어서 카프카에 주는 쪽
acks : 프로듀서가 전송한 데이터를 얼마나 신뢰성 높게 저장할지 지정
min.insync.replicas 옵션값을 2로 설정했을 때부터 acks 를 all 로 설정하는 의미가 있다
컨슈머 = Consumer : 카프카로 부터 데이터(메시지, 이벤트)를 빼내서 소비하는 쪽
주키퍼 ZooKeeper : 분산 애플리케이션에서 (메타데이터를 관리하는) 코디네이터 역할을 하는 애플리케이션
브로커 Broker : 카프카 애플리케이션이 설치된 서버 또는 노드 ⇒ 묶어서 클러스터
브로커 역할= 컨트롤러 : 클러스터의 다수 브로커 중 한 대가 컨트롤러의 역할
카프카 커넥트 : 프로듀서 역할을 하는 소스 커넥터 source connector 와 컨슈머 역할을 하는 싱크 커넥터 sink connector 2가지로 나뉨
https://www.confluent.io/blog/apache-kafka-goes-1-0/
1
카프카 소개
구성요소 : 카프카 클러스터, 주키퍼, 카프카 커넥트Connect, 미러메이커MirrorMaker, 카프카 브리지Bridge, 카프카 익스포터Exporter - 링크
2
카프카 주요 요소
주키퍼 ZooKeeper : 카프카의 **메타데이터** **관리** 및 **브로커**의 정상 **상태 점검** health check 을 담당
IMPORTANT : **KRaft** mode is **not ready** for **production** in Apache **Kafka** or in **Strimzi**- Link
카프카 Kafka 또는 카프카 클러스터 Kafka cluster : 여러 대의 브로커를 구성한 클러스터를 의미
브로커 broker : 카프카 애플리케이션이 설치된 서버 또는 노드를 말함
프로듀서 producer : 카프카로 메시지를 보내는 역할을 하는 클라이언트를 총칭
컨슈머 consumer : 카프카에서 메시지를 꺼내가는 역할을 하는 클라이언트를 총칭
토픽 topic : 카프카는 메시지 피드들을 토픽으로 구분하고, 각 토픽의 이름은 카프카 내에서 고유함
파티션 partition : 병렬 처리 및 고성능을 얻기 위해 하나의 토픽을 여러 개로 나눈 것을 말함
세그먼트 segment : 프로듀서가 전송한 실제 메시지가 브로커의 로컬 디스크에 저장되는 파일
메시지 message 또는 레코드 record : 프로듀서가 브로커로 전송하거나 컨슈머가 읽어가는 데이터 조각을 말함
3
Strimzi 는 쿠버네티스 환경에서 카프카Kafka 운영 관리에 도움을 주는 Operator 입니다
https://strimzi.io/docs/operators/in-development/overview.html
Strimzi v0.38.0 버전 출시 : Apache Kafka 지원 버전 - v3.5.0, v3.5.1, v3.6.0
4
# 네임스페이스 생성
kubectl create namespace kafka
# Repo 추가
helm repo add strimzi https://strimzi.io/charts/
helm show values strimzi/strimzi-kafka-operator
# 차트 설치 : 오퍼레이터 파드 설치
helm install kafka-operator strimzi/strimzi-kafka-operator --version 0.38.0 --namespace kafka
# 배포한 리소스 확인 : Operator 디플로이먼트(파드)
kubectl get deploy,pod -n kafka
kubectl get-all -n kafka
# 오퍼레이터가 지원하는 카프카 버전 확인
kubectl describe deploy -n kafka | grep KAFKA_IMAGES: -A3
STRIMZI_KAFKA_IMAGES: 3.5.0=quay.io/strimzi/kafka:0.38.0-kafka-3.5.0
3.5.1=quay.io/strimzi/kafka:0.38.0-kafka-3.5.1
3.6.0=quay.io/strimzi/kafka:0.38.0-kafka-3.6.0
# 배포한 리소스 확인
CRDs - 각각이 제공 기능으로 봐도됨!
kubectl get crd | grep strimzi
kafkabridges.kafka.strimzi.io 2023-11-17T01:48:44Z
kafkaconnectors.kafka.strimzi.io 2023-11-17T01:48:44Z
kafkaconnects.kafka.strimzi.io 2023-11-17T01:48:44Z
kafkamirrormaker2s.kafka.strimzi.io 2023-11-17T01:48:45Z
kafkamirrormakers.kafka.strimzi.io 2023-11-17T01:48:44Z
kafkanodepools.kafka.strimzi.io 2023-11-17T01:48:45Z
kafkarebalances.kafka.strimzi.io 2023-11-17T01:48:45Z
kafkas.kafka.strimzi.io 2023-11-17T01:48:43Z
kafkatopics.kafka.strimzi.io 2023-11-17T01:48:44Z
kafkausers.kafka.strimzi.io 2023-11-17T01:48:44Z
strimzipodsets.core.strimzi.io 2023-11-17T01:48:44Z
// 스트림즈도 스테이트 풀셋을 안쓴다.
# (참고) CRD 상세 정보 확인
kubectl describe crd kafkas.kafka.strimzi.io
kubectl describe crd kafkatopics.kafka.strimzi.io
5
삭제
helm uninstall kafka-operator -n kafka && kubectl delete ns kafka
기존 Statefulsets 대신 → StrimziPodSets 이 기본 설정이다.
StrimziPodSets(sps) 특징 ?
: sts 중간 파드 바로 삭제 불가, 파드 Spec 동일해야됨 (볼륨, CPU, Memory), 아키텍처 구성 용이
https://learnk8s.io/kafka-ha-kubernetes
kafka-1.yaml
apiVersion: kafka.strimzi.io/v1beta2
kind: Kafka
metadata:
name: my-cluster
spec:
kafka:
version: 3.6.0
replicas: 3
listeners:
- name: plain
port: 9092
type: internal
tls: false
- name: tls
port: 9093
type: internal
tls: false
- name: external
port: 9094
type: nodeport
tls: false
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
config:
offsets.topic.replication.factor: 3 # 리플리케이션 3개가 만들어진다.
transaction.state.log.replication.factor: 3
transaction.state.log.min.isr: 2
default.replication.factor: 3
min.insync.replicas: 2 # 미니멈 리플리카가 2다 되면 동작이 일어난다.
inter.broker.protocol.version: "3.6"
storage:
type: jbod
volumes:
- id: 0
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- kafka
topologyKey: "topology.ebs.csi.aws.com/zone" # 토폴로지를 파드 어피니티를 설정함.
zookeeper:
replicas: 3
readinessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
livenessProbe:
initialDelaySeconds: 15
timeoutSeconds: 5
storage:
type: persistent-claim
size: 10Gi
deleteClaim: true
template:
pod:
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app.kubernetes.io/name
operator: In
values:
- zookeeper
topologyKey: "topology.ebs.csi.aws.com/zone"
entityOperator: # 파드안에 유저등 관리한다. pod/my-cluster-entity-operator-644-zlkdr 3/3
topicOperator: {}
userOperator: {}
1
# (옵션) 신규 터미널 : 모니터링
watch kubectl get kafka,strimzipodsets,pod,svc,endpointslice,pvc -n kafka
kubectl logs deployment/strimzi-cluster-operator -n kafka -f
# 카프카 클러스터 YAML 파일 확인 : listeners(3개), podAntiAffinity
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-1.yaml
cat kafka-1.yaml | yh
## 배포 시 requiredDuringSchedulingIgnoredDuringExecution 지원한다.
preferredDuringSchedulingIgnoredDuringExecution 미지원 한다?
kubectl apply -f kafka-1.yaml -n kafka
# 배포된 리소스들 확인
kubectl get-all -n kafka
2
# 배포된 리소스 확인 : 주키퍼 설치 완료 후 >> 카프카 브로커 설치됨
kubectl get kafka -n kafka
AME DESIRED KAFKA REPLICAS DESIRED ZK REPLICAS READY WARNINGS
my-cluster 3 3 True
kubectl get cm,secret -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 strimzipodsets 생성 확인 >> sts 스테이트풀렛 사용 X
kubectl get strimzipodsets -n kafka
# 노드 정보 확인
kubectl describe node | more
kubectl get node --label-columns=topology.ebs.csi.aws.com/zone
kubectl describe pv | grep 'Node Affinity:' -A2
# 배포된 리소스 확인 : 배포된 파드 생성 확인
kubectl get pod -n kafka -l app.kubernetes.io/name=kafka
kubectl get pod -n kafka -l app.kubernetes.io/name=zookeeper
kubectl get pod -n kafka -l app.kubernetes.io/instance=my-cluster
# 배포된 리소스 확인 : 서비스 Service(Headless) 등 생성 확인 - listeners(3개)
kubectl get svc,endpointslice -n kafka
# 배포된 리소스 확인 : 카프카/주키퍼 파드 저장소 확인
kubectl get pvc,pv -n kafka
kubectl df-pv
# 배포된 리소스 확인 : 컨피그맵 확인
kubectl get cm -n kafka
# 컨피그맵 상세 확인
kubectl describe cm -n kafka strimzi-cluster-operator
kubectl describe cm -n kafka my-cluster-zookeeper-config
kubectl describe cm -n kafka my-cluster-entity-topic-operator-config
kubectl describe cm -n kafka my-cluster-entity-user-operator-config
kubectl describe cm -n kafka my-cluster-kafka-0
kubectl describe cm -n kafka my-cluster-kafka-1
kubectl describe cm -n kafka my-cluster-kafka-2
...(생략)...
##########
# Node / Broker ID
##########
broker.id=${STRIMZI_BROKER_ID}
node.id=${STRIMZI_BROKER_ID}
##########
# Kafka message logs configuration >> 로그 디렉터리
##########
log.dirs=/var/lib/kafka/data-0/kafka-log${STRIMZI_BROKER_ID}
...
##########
# User provided configuration
##########
default.replication.factor=3
inter.broker.protocol.version=3.6
min.insync.replicas=2
offsets.topic.replication.factor=3
transaction.state.log.min.isr=2
transaction.state.log.replication.factor=3
log.message.format.version=3.6
...
3
# kafka 클러스터 Listeners 정보 확인 : 각각 9092 평문, 9093 TLS, 세번째 정보는 External 접속 시 NodePort 정보
kubectl get kafka -n kafka my-cluster -o jsonpath={.status.listeners} | jq
# (옵션) NetworkPolicy 확인 >> 어떤 동작을 처리하는가?
kubectl get networkpolicy -n kafka
kubectl describe networkpolicy -n kafka
# (옵션) poddisruptionbudget 확인 >> 어떤 동작을 처리하는가?
kubectl get pdb -n kafka
kubectl describe pdb -n kafka
1
# 파일 다운로드
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/myclient.yaml
cat myclient.yaml | yh
2
# 데몬셋으로 myclient 파드 배포 : 어떤 네임스페이스에 배포되는가?
VERSION=3.6 envsubst < myclient.yaml | kubectl apply -f -
kubectl get pod -l name=kafkaclient -owide
# Kafka client 에서 제공되는 kafka 관련 도구들 확인
kubectl exec -it ds/myclient -- ls /opt/bitnami/kafka/bin
3
# 카프카 파드의 SVC 도메인이름을 변수에 지정
SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092
echo "export SVCDNS=my-cluster-kafka-bootstrap.kafka.svc:9092" >> /etc/profile
# 브로커 정보
kubectl exec -it ds/myclient -- kafka-broker-api-versions.sh --bootstrap-server $SVCDNS
// 브로커 3개의 상세 정보가 나온다.
# 브로커에 설정된 각종 기본값 확인 : --broker --all --describe 로 조회
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 1 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 2 --all --describe
kubectl exec -it ds/myclient -- kafka-configs.sh --bootstrap-server $SVCDNS --broker 0 --all --describe
4
# 토픽 리스트 확인
kubectl exec -it ds/myclient -- kafka-topics.sh --bootstrap-server $SVCDNS --list
__consumer_offsets
__strimzi-topic-operator-kstreams-topic-store-changelog
__strimzi_store_topic
5
# 토픽 리스트 확인 (kubectl native) : PARTITIONS, REPLICATION FACTOR
kubectl get kafkatopics -n kafka
# 배포
helm repo add kafka-ui https://provectus.github.io/kafka-ui-charts
cat <<EOF > kafkaui-values.yml
yamlApplicationConfig:
kafka:
clusters:
- name: yaml
bootstrapServers: my-cluster-kafka-bootstrap.kafka.svc:9092
auth:
type: disabled
management:
health:
ldap:
enabled: false
EOF
# 설치
helm install kafka-ui kafka-ui/kafka-ui -f kafkaui-values.yml
# 접속 확인
kubectl patch svc kafka-ui -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service kafka-ui "external-dns.alpha.kubernetes.io/hostname=kafka-ui.$MyDomain"
echo -e "kafka-ui Web URL = http://kafka-ui.$MyDomain"
→ 단, 기존에 프로메테우스/그라파나 helm과 카프카 클러스터는 삭제 후 아래 과정 진행
https://anissetiouajni.com/posts/simplify-kafka-operational-tasks-with-strimzi-and-kubernetes/
https://blog.naver.com/qwerty_1234s/222768094790
1. exporter 설정된 카프카 클러스터 배포
# exporter 관련 설정 확인
curl -s -O https://raw.githubusercontent.com/gasida/DOIK/main/strimzi/kafka-2.yaml
cat kafka-2.yaml | yh
# exporter 설정된 카프카 클러스터 배포
kubectl apply -f kafka-2.yaml -n kafka
2. 예제 코드 복사
#
git clone https://github.com/AmarendraSingh88/kafka-on-kubernetes.git
cd kafka-on-kubernetes/kafka-demo/demo3-monitoring/
tree
3. 프로메테우스 설치
# 프로메테우스 설치 : --server-side 는 왜 쓸까요? 안쓰면 어떻게 될까요?
kubectl apply -f prometheus-operator-deployment.yaml -n monitoring --server-side
kubectl apply -f prometheus.yaml -n monitoring
kubectl apply -f prometheus-rules.yaml -n monitoring
kubectl apply -f strimzi-pod-monitor.yaml -n monitoring
4. 그라파나 설치
# 그라파나 설치
kubectl apply -f grafana/grafana.yaml -n monitoring
kubectl patch svc -n monitoring grafana -p '{"spec":{"type":"LoadBalancer"}}'
kubectl annotate service grafana -n monitoring "external-dns.alpha.kubernetes.io/hostname=grafana.$MyDomain"
# 접속 정보 확인
echo -e "Grafana URL = http://grafana.$MyDomain:3000"
그라파나 웹 접속 : admin / admin
그라파나 데이터 소스 설정 : 프로메테우스파드-0 에 헤드리스 접속 주소 입력 후 연결 확인
prometheus-prometheus-0.prometheus-operated.monitoring.svc.cluster.local:9090
그라파나 대시보드 추가 : 아래 파일 다운 로드 후 Import 에서 내용 입력 - Link
5. 모니터링 확인
6. (삭제 시 참고) 그라파나 서비스(Service:CLB) 삭제 : kubectl delete svc -n monitoring grafana
위 내용은 주말 CloudNet 스터디 내용 참고하여 정리한 부분입니다.
https://gasidaseo.notion.site/gasidaseo/CloudNet-Blog-c9dfa44a27ff431dafdd2edacc8a1863
https://brunch.co.kr/@topasvga/3529