brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Jun 26. 2018

Python: Spark-Kafka-Sql(2)

# 파이썬, # 스파크, # SQL, # Structured SQL

RDD 를 파이썬에서 사용함에 속도 / 성능 측면에서 약간의 불편(?)을 경험 할 수 있다.


이전에 언급했듯,

SQL API 를 이용하면 다른 언어들과 속적인 측면에서 크게 차이가 없다.


그래서,

파이썬에서 Structured Streaming SQL 을 구현하여 데이터를 가공하고,

이를 Kafka 와 연결하는 방법을 보여주려고 한다.


늘 말하지만,

'코드'는 필요하다면 공개 할 수 있다.


방법은 이렇다.


스파크세션(SparkSession) 을 이용한다.

from pyspark.sql import SparkSession

이 외에도,

from pyspark.sql.functions import *
from pyspark.sql.types import *

와 같이 모듈을 선언한다.


준비가 끝났다.


스파크세션을 통해서 spark 변수를 하나 만든다.

spark = SparkSession.builder.getOrCreate()


spark 변수가 생겼다. 이는,

shell 을 통한 (linux$ pyspark) 에서 쓰이는 것과 같다.


데이터를 가공하려는게 주목적이므로,

데이터의 형태를 보고 넘어간다.


{"timestamp":"2018-06-21T03:42:02.006346+0900","flow_id":1866677055494560,"event_type":"flow","src_ip":"192.168.1.163","src_port":48559,"dest_ip":"192.168.18.70","dest_port":161,"proto":"UDP","app_proto":"failed","flow":{"pkts_toserver":1,"pkts_toclient":1,"bytes_toserver":94,"bytes_toclient":97,"start":"2018-06-21T03:37:01.956832+0900","end":"2018-06-21T03:37:01.957109+0900","age":0,"state":"established","reason":"timeout","alerted":false}}


이런 데이터를,

{"src_ip":"192.168.2.11","rrname":"fe0.google.com","col3":1,"col4":"2018-06-25 18:35:16.014"}

이런 인포메이션으로!


"""

!참고!

col3, col4 는 컬럼 이름을 재지정 함으로써 바꿀 수 있다.

col3 은 rrname 의 중복 개수를 나타낸다.

"""


데이터를 위와 같이 가공하기 위해서는,

통계적으로 Grouping 과 Aggregation 이 쓰인다.


데이터의 스키마(Schema) 를 잡는다.

schema = StructType([
    StructField("src_ip", StringType(), True),
    StructField("dest_ip", StringType(), True),
    StructField("src_port", StringType(), True),
    StructField("dest_port", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("event_type", StringType(), True),
    StructField("dns", StructType([
        StructField("type", StringType(), True),
        StructField("rrname", StringType(), True)
    ]))
])


쉽다.

데이터에서 몇가지만 뽑는다.

Nested Node 까지 표현 할 수 있다.


log_ = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.2.1:9999").option("subscribe", "topic1").option("failOnDataLoss", False).load()


이렇게 Kafka 로부터 스티리밍 데이터를 전달 받는 객체를 만들어 log_ 변수에 전달한다.

중요한건 지금 내가 Kafka 를 너무 좋아한다는 것이다.


다음으로,

데이터를 뽑아야 한다. Kafka 에서 가져오는 데이터는 특이한 스키마와 맞물려있다.


나는 value 라는 스키마에서 원하는 정보를 가져온다.

이 value 안의 정보를 다시 json 으로 바꾸고 각 key 를 통해서 정보를 가져온다.


log_mod = suricata_log.withWatermark("timestamp", "10 minutes").selectExpr("cast (value as string) as json", "cast (timestamp as timestamp) as timestamps")\
    .select(from_json("json", schema).alias("data"), "timestamps") \
    .selectExpr("data.src_ip", "data.event_type", "data.dns.rrname", "data.dns.type", "cast (timestamps as string)")


조금 복잡하려고 한다. withWatermark 가 무엇이고 select / selectExpr 이 무슨 차이인지도 모르겠다.

라고 느낀다면 스파크를 좀 더 공부하는게 좋다.


사실,

위의 코드에서 데이터를 Grouping 하고 Agg 할 수 있다.

하지만,

나는 가져온 데이터를 Table 에서 가지고 노는 방식으로도 해보았다.

이게 더 재미가 있다.


log_mod.createOrReplaceTempView('tb')

log_sql = spark.sql("""select (t.src_ip, t.rrname, count(t.rrname), max(t.timestamps)) as value
                from tb as t
                where t.type = "query" and t.src_ip = "192.168.2.1"
                GROUP BY t.src_ip, t.rrname
                ORDER BY count(t.rrname) DESC
                """).select(to_json("value").alias("value"))


select * from ?

SQL 문법을 그대로 사용 할 수 있었다.


사실,

제대로 만든 코드는 아니다. 고쳐야 할 부분이 많다.

그건 공부하면서 조금씩 나아질 부분이라고 생각한다.

(도와달라는 뜻이다.)


거의 마지막으로,

query = suricata_sql_log.writeStream.format("kafka") \
    .option("kafka.bootstrap.servers", "192.168.2.1:9092") \
    .option("topic", "suricata12mod") \
    .option("checkpointLocation", "/home/tmp/check2") \
    .outputMode('complete') \
    .start()


Kafka 로 다시 가공한 데이터를 새로운 Topic 으로 넣는다.

실질적인 코드는 여기서 끝이다.


이거 하는데 2일 죽어라 책만 봤다.


참고로,

위의 코드에서 'complete' 라고 쓰여져 있는 부분이 있다.


참고바란다.

"""Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
        Options include:
        * 'append':Only the new rows in the streaming DataFrame/Dataset will be written to
           the sink
        * 'complete':All the rows in the streaming DataFrame/Dataset will be written to the sink
           every time these is some updates
        * 'update':only the rows that were updated in the streaming DataFrame/Dataset will be
           written to the sink every time there are some updates. If the query doesn't contain
           aggregations, it will be equivalent to 'append' mode.
       .. note:: Evolving.
        >>> writer = sdf.writeStream.outputMode('append')
"""

API 내용이다.


진짜 마지막으로,

query.awaitTermination()


하고 실행하면 끝!


초당 평균 200~300개 정도의 데이터를 1초안에 처리한다.

최대 600개 정도의 데이터도 처리한다 1초안에.

스파크가 알아서 해준다 모두.


나는 성능적으로 만족했다.

Spark UI 를 통해서 진해되는 작업의 상태도 알 수 있다.

이 데이터를 통해서 실시간 데쉬보드에 데이터를 뿌려볼 수 있었다.


이제 남은건,

MLlib 을 정복하는 것이다.


나는 이미 Tensorflow, Keras, Scikit-learn 모두 사용하면서,

머신러닝에 어느정도 익숙한 상태다.


추이분석, 얼굴인식, 클러스터링, 앙상블 등과 같은 용어가 MLlib 에서도 쓰이는 것을 얼핏 보았다.


다음에는,

MLlib 을 이용한 새로운 서비스를 보여주려고 한다.

회사 프로젝트라 자세히 전달하는 것은 힘들 수 있다.

작가의 이전글 Python: Spark-Kafka-Sql(1)
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari