brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Nov 21. 2018

Python: Spark-Kafka-Sql(4)

# Better, # 중급, # MQS, # 파이썬, # SPARK

최근 Spark + Kafka + SQL + Python + React 관련 내용을 좀 정리.


뒤죽박죽 흩어져 있던 코드를 다시 한 번 보면서 고칠 수 있는 부분과

개념적으로 보충이 필요하거나 새롭게 나온 아키텍트와 관련해 작업을 진행.


가장 중요한건 속도|정확성|오류에 견딜 수 있는 능력.


Spark-Structured-Streaming-SQL 은 솔직히 꾀나 어려움.

서비스로 바로 옮길 수 있는 능력이라면 내부 구성과 시스템의 프로세스를 이해야함.


네트워크 패킷 데이터, 이미지 스트리밍 데이터 등등의 각종 로그 데이터를

정보화 하는 작업의 일환으로 시작.


최근에는 데이터 엔지니어 라는 포지션이 꾀나 인기가 좋은 듯.

데이터 엔지니어... 정의가 모호하지만 있어서 나쁘지 않을 듯.


작업은 Linux-CentOS7-Python3.5 에서 진행.

Kafka, Spark 버젼은 그냥 맘대로 해도 상관 없음. 최신을 안쓰고 레거시를 쓰는 사람은 좀 아는 사람(?)


데이터 구조.


schema = StructType([
    StructField("src_ip", StringType(), True),
    StructField("dest_ip", StringType(), True),
    StructField("src_port", StringType(), True),
    StructField("dest_port", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("dns", StructType([
        StructField("type", StringType(), True),
        StructField("rrname", StringType(), True)
    ]))
])



데이터를 보면 일단 뭘 하고 싶어하는지 데충 감이...옴.

사용하는 DNS를 그룹화 => 카운트 => 데이터셋, 대략 이러한 스텝.


회사 내부에서 DNS 사용 트렌드와 패턴을 분석할 수 있음.

다른 주제를 붙이면 나올 수 있는 꺼리는 무궁무진.


참고로 이번 작업에는 Elastic을 사용하지 않았다. New SQL 로 대체 할 수 있는지 테스트 중...


위의 데이터는 Suricata 라는  IDS 장비를 통해 생성되는 데이터를 작게 만들어 놓음 것.

이 데이터는 바로 Kafka 로 들어감.


그럼 Kafka 에서 스트리밍 받아서 위의 데이터를 SQL 정의로 그룹화 함.



suricata_log = spark.readStream.\
    format("kafka").\
    option("kafka.bootstrap.servers", "KAFKA_IP_ADDRESS:KAFKA_PORT_NUMBER").\
    option("subscribe", "topic_name"). \
    option("kafkaConsumer.pollTimeoutMs", "6000"). \
    option("failOnDataLoss", False).\
    load()


일단, 받아!

그리고 나서는,



suricata_sql_log = suricata_log \
    .selectExpr("cast (value as string) as json", "cast (timestamp as timestamp) as timestamps") \
    .select(from_json("json", schema).alias("data"), "timestamps") \
    .selectExpr("data.src_ip as src_ip", "data.event_type as event_type", \

                 "data.dns.rrname as rrname", \
                "data.dns.type as type", "timestamps as ts") \
    .where('type = "query"') \
    .where('rrname is not null') \
    .withWatermark("ts", "30 minutes") \
    .groupBy('rrname', window('ts', '30 minutes', '30 minutes')) \
    .agg(count('rrname'), max('ts')) \
    .orderBy("count(rrname)", ascending=False) \
    .selectExpr("to_json(struct(*)) as value")


이렇게 로직을 돌리면!


JSON 형태의 결과값이 떨어짐. 이걸 그대로 다시 Kafka 에 스트리밍으로 넣어!



query = suricata_sql_log.writeStream.format("kafka") \
    .option("kafka.bootstrap.servers", "KAFKA_IP_ADDRESS:KAFKA_PORT_NUMBER") \
    .option("topic", "suricata_processed") \
    .option("checkpointLocation", "/path/to/checkpoint") \
    .outputMode('complete') \
    .start()


query.awaitTermination()


이러면 Kafka에 새롭게 가공된 데이터가 들어감.

이거면 거의 90%는 다 한거임.

10%로는 웹 서버, 리액트(React) 구현 하는 것.


파이썬 웹 서버를 만들 때, 가볍게 가고 싶으면 Flask 권장. 개인적으로.

웬만하면 Gevent 사용. 개인적으로.

React 는 Redux 아키텍트로 구현. 매우 개인적으로.

Kafka 는 꼭 클러스터링 해서 쓰시고.

Spark 도 꼭 클러스터링 해서 쓰시고.


위에서 굵은 글씨체로 된 부분에 대한 부가 설명.

withWatermark('ts', '30 minutes') : 쉽게 말하면, 30분 뒤로 Time Lag 에 대해서 봐준다는 뜻.

window('ts', '30 minutes', '30 minutes') : 더 쉽게 말하면, 30분 동안의 데이터를 30분 범위에서 AGG.


즐겁게 코딩하고. 공유도 하고. 그러세요.



작가의 이전글 Python: m.l with Keras #2
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari