# 파이썬, # 스파크, # SQL, # Structured SQL
RDD 를 파이썬에서 사용함에 속도 / 성능 측면에서 약간의 불편(?)을 경험 할 수 있다.
이전에 언급했듯,
SQL API 를 이용하면 다른 언어들과 속적인 측면에서 크게 차이가 없다.
그래서,
파이썬에서 Structured Streaming SQL 을 구현하여 데이터를 가공하고,
이를 Kafka 와 연결하는 방법을 보여주려고 한다.
늘 말하지만,
'코드'는 필요하다면 공개 할 수 있다.
방법은 이렇다.
스파크세션(SparkSession) 을 이용한다.
from pyspark.sql import SparkSession
이 외에도,
from pyspark.sql.functions import *
from pyspark.sql.types import *
와 같이 모듈을 선언한다.
준비가 끝났다.
스파크세션을 통해서 spark 변수를 하나 만든다.
spark = SparkSession.builder.getOrCreate()
spark 변수가 생겼다. 이는,
shell 을 통한 (linux$ pyspark) 에서 쓰이는 것과 같다.
데이터를 가공하려는게 주목적이므로,
데이터의 형태를 보고 넘어간다.
{"timestamp":"2018-06-21T03:42:02.006346+0900","flow_id":1866677055494560,"event_type":"flow","src_ip":"192.168.1.163","src_port":48559,"dest_ip":"192.168.18.70","dest_port":161,"proto":"UDP","app_proto":"failed","flow":{"pkts_toserver":1,"pkts_toclient":1,"bytes_toserver":94,"bytes_toclient":97,"start":"2018-06-21T03:37:01.956832+0900","end":"2018-06-21T03:37:01.957109+0900","age":0,"state":"established","reason":"timeout","alerted":false}}
이런 데이터를,
{"src_ip":"192.168.2.11","rrname":"fe0.google.com","col3":1,"col4":"2018-06-25 18:35:16.014"}
이런 인포메이션으로!
"""
!참고!
col3, col4 는 컬럼 이름을 재지정 함으로써 바꿀 수 있다.
col3 은 rrname 의 중복 개수를 나타낸다.
"""
데이터를 위와 같이 가공하기 위해서는,
통계적으로 Grouping 과 Aggregation 이 쓰인다.
데이터의 스키마(Schema) 를 잡는다.
schema = StructType([
StructField("src_ip", StringType(), True),
StructField("dest_ip", StringType(), True),
StructField("src_port", StringType(), True),
StructField("dest_port", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("event_type", StringType(), True),
StructField("dns", StructType([
StructField("type", StringType(), True),
StructField("rrname", StringType(), True)
]))
])
쉽다.
데이터에서 몇가지만 뽑는다.
Nested Node 까지 표현 할 수 있다.
log_ = spark.readStream.format("kafka").option("kafka.bootstrap.servers", "192.168.2.1:9999").option("subscribe", "topic1").option("failOnDataLoss", False).load()
이렇게 Kafka 로부터 스티리밍 데이터를 전달 받는 객체를 만들어 log_ 변수에 전달한다.
중요한건 지금 내가 Kafka 를 너무 좋아한다는 것이다.
다음으로,
데이터를 뽑아야 한다. Kafka 에서 가져오는 데이터는 특이한 스키마와 맞물려있다.
나는 value 라는 스키마에서 원하는 정보를 가져온다.
이 value 안의 정보를 다시 json 으로 바꾸고 각 key 를 통해서 정보를 가져온다.
log_mod = suricata_log.withWatermark("timestamp", "10 minutes").selectExpr("cast (value as string) as json", "cast (timestamp as timestamp) as timestamps")\
.select(from_json("json", schema).alias("data"), "timestamps") \
.selectExpr("data.src_ip", "data.event_type", "data.dns.rrname", "data.dns.type", "cast (timestamps as string)")
조금 복잡하려고 한다. withWatermark 가 무엇이고 select / selectExpr 이 무슨 차이인지도 모르겠다.
라고 느낀다면 스파크를 좀 더 공부하는게 좋다.
사실,
위의 코드에서 데이터를 Grouping 하고 Agg 할 수 있다.
하지만,
나는 가져온 데이터를 Table 에서 가지고 노는 방식으로도 해보았다.
이게 더 재미가 있다.
log_mod.createOrReplaceTempView('tb')
log_sql = spark.sql("""select (t.src_ip, t.rrname, count(t.rrname), max(t.timestamps)) as value
from tb as t
where t.type = "query" and t.src_ip = "192.168.2.1"
GROUP BY t.src_ip, t.rrname
ORDER BY count(t.rrname) DESC
""").select(to_json("value").alias("value"))
select * from ?
SQL 문법을 그대로 사용 할 수 있었다.
사실,
제대로 만든 코드는 아니다. 고쳐야 할 부분이 많다.
그건 공부하면서 조금씩 나아질 부분이라고 생각한다.
(도와달라는 뜻이다.)
거의 마지막으로,
query = suricata_sql_log.writeStream.format("kafka") \
.option("kafka.bootstrap.servers", "192.168.2.1:9092") \
.option("topic", "suricata12mod") \
.option("checkpointLocation", "/home/tmp/check2") \
.outputMode('complete') \
.start()
Kafka 로 다시 가공한 데이터를 새로운 Topic 으로 넣는다.
실질적인 코드는 여기서 끝이다.
이거 하는데 2일 죽어라 책만 봤다.
참고로,
위의 코드에서 'complete' 라고 쓰여져 있는 부분이 있다.
참고바란다.
"""Specifies how data of a streaming DataFrame/Dataset is written to a streaming sink.
Options include:
* 'append':Only the new rows in the streaming DataFrame/Dataset will be written to
the sink
* 'complete':All the rows in the streaming DataFrame/Dataset will be written to the sink
every time these is some updates
* 'update':only the rows that were updated in the streaming DataFrame/Dataset will be
written to the sink every time there are some updates. If the query doesn't contain
aggregations, it will be equivalent to 'append' mode.
.. note:: Evolving.
>>> writer = sdf.writeStream.outputMode('append')
"""
API 내용이다.
진짜 마지막으로,
query.awaitTermination()
하고 실행하면 끝!
초당 평균 200~300개 정도의 데이터를 1초안에 처리한다.
최대 600개 정도의 데이터도 처리한다 1초안에.
스파크가 알아서 해준다 모두.
나는 성능적으로 만족했다.
Spark UI 를 통해서 진해되는 작업의 상태도 알 수 있다.
이 데이터를 통해서 실시간 데쉬보드에 데이터를 뿌려볼 수 있었다.
이제 남은건,
MLlib 을 정복하는 것이다.
나는 이미 Tensorflow, Keras, Scikit-learn 모두 사용하면서,
머신러닝에 어느정도 익숙한 상태다.
추이분석, 얼굴인식, 클러스터링, 앙상블 등과 같은 용어가 MLlib 에서도 쓰이는 것을 얼핏 보았다.
다음에는,
MLlib 을 이용한 새로운 서비스를 보여주려고 한다.
회사 프로젝트라 자세히 전달하는 것은 힘들 수 있다.