Suricata Network Traffic Data Analysis
스파크 관련한 몇 개의 포스트를 정리한다.
개용는 이렇다.
SURICATA 트래픽 분석 툴을 활용해서 현재 방화벽을 타고 오가는 트래픽에 대해서
FLOW, TLS, FILEINFO, DNS, HTTP, DHCP, SMTP, SSH, ALERT 등의
다양한 이벤트를 실시간 수집 / 가공 / 분석 + ML/AI 서비스를 접목한다.
중요한건 이들 모두를 한 번에 확인하는(마치 Graph DB 같은) 것이 아니라
특정 이벤트에 대한 정보만 필터링하여 분석 한 후에
다시 Kafka 내부 토픽으로 전달하는 방식을 취한다.
기본적으로!
CentOS 7 LTS 를 사용한다.
필수적으로!
1. Java
export JAVA_HOME=/usr/lib/jvm/java-1.8.0-openjdk-1.8.0.191.b12-1.el7_6.x86_64
export PATH=${JAVA_HOME}/bin:$PATH
2. Spark
export SPARK_HOME=/home/david/env3.5/spark/spark-2.3.1-bin-hadoop2.7
export PATH=${SPARK_HOME}/bin:$PATH
3. Python
export PYTHONPATH=${SPARK_HOME}/python:${SPARK_HOME}/python/lib/py4j-0.10.7-src.zip:${SPARK_HOME}/python/lib/pyspark.zip
export PATH=$PATH:${SPARK_HOME}/python
4. Scala
export SCALA_HOME=/usr/lib/scala
export PATH=$PATH:$SCALA_HOME/bin
옵션으로!
1. Jupyter
export PYSPARK_DRIVER_PYTHON=jupyter
export PYSPARK_DRIVER_PYTHON_OPTS='notebook'
bash_profile 에다가 이쁘게 적어두자.
Kafka, Redis 관련해서는 각자 알아서 입맛에 맛게 설치하면 된다.
나는 Docker + K8S 로 간단하게 클러스터링 구성을 통해서 사용한다.
Logstash 또는 Fluentd 를 사용해서 File, Syslog 등의 데이터를 운반 할 수 있는데
개인적으로는 Fluentd 를 추천한다.
RDBMS 는 MariaDB 또는 PostgreSQL 추천한다.
New SQL 사용도 추천한다. VoltDB는 작은 Demo 버젼으로 사용 가능하다.
No SQL 은 Elasticsearch 또는 Cassandra 쓰자.
개인적으로 Cassandra 추천한다. Graph DB 까지 같이 볼 수 있다.
WEB API(REST) 쪽은 당연히 React + GraphQL 이다.
솔직히 웹 쪽은 별 이유가 없다. 그냥 대세를 따른다.
서버사이드로 오면 이제 고민이 많아 진다.
개인적으로는 Flask API 를 사용한다. 여러 프레임워크와 잘 붙는다.
가장 큰 이점은 개발이 빠르고 관리가 용이하다.
Scala의 Play를 사용하는 것도 추천한다.
Spring은 기본이니까 알아서 선택한다.
Deep Learning은 역시나 역시다.
Tensorflow + Keras 조합을 추천한다.
PyTorch, MXNET 등의 프레임워크도 좋다.
머신러닝을 꼭 Deep 하게 하지 않아도 된다.
파이썬에서 제공하는 머신러닝 라이브러리를 사용해도 좋고,
스파크에서 제공하는 MLlib 모듈을 사용해도 좋다.
다시 본론으로!
우리는 SURICATA 에서 발생하는 이벤트 중에서 DHCP에 주목해 본다.
사용자가 DHCP 서버에 붙는 목적은 이렇다.
1. 무선 네트워크 쓰려고
2. 유선 네트워크 쓰려고
3. 해킹할라고
DHCP를 통해서 사용자는 자동으로 IP를 할당 받고,
이를 통해서 인터넷을 사용 할 수 있다.
간단하게 DHCP 에 붙는 모습을 보면,
간단하게 보면 OFFER를 던지고 해당 GATEWAY를 통해서 실제 요청이 넘어가고 최종 응답을 받는 형태이다.
수많은 사용자가 DHCP 요청 / 응답을 수행한다고 보면,
이를 실시간으로 어느 사용자가 DHCP를 통해 IP를 부여받고,
몇 번의 빈도를 나타내는지 등의
기본적인 행위에 대한 분석을 수행 할 수 있다.
일단 해당 데이터를 Kafka에 넣고 Spark 를 통해서 Streaming 연결을 수행한다.
# 01. 필요한 모듈 불러오기
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
# 02. Main 함수
if __name__ == "__main__":
# 03. PySpark 세션 설정
spark = SparkSession.builder.getOrCreate()
# 04. 스키마 구조 설계 및 선언
schema = StructType([
StructField("src_ip", StringType(), True),
StructField("dest_ip", StringType(), True),
StructField("timestamp", StringType(), True),
StructField("event_type", StringType(), True),
StructField("dhcp", StructType([
StructField("type", StringType(), True), # request
StructField("id", StringType(), True),
StructField("client_mac", StringType(), True),
StructField("hostname", StringType(), True),
StructField("client_ip", StringType(), True),
StructField("assigned_ip", StringType(), True),
StructField("requested_ip", StringType(), True)
]))
])
# 05. 스트림 데이터 읽기(From Kafka To Spark Structured SQL)
suricata_log = spark.readStream.\
format("kafka").\
option("kafka.bootstrap.servers", "<IP_ADDRESS>:<PORT>").\
option("subscribe", "suricata"). \
option("kafkaConsumer.pollTimeoutMs", "6000"). \
option("failOnDataLoss", False).\
load()
# 06. 데이터 가공(필터)
suricata_sql_log = suricata_log \
.selectExpr("cast (value as string) as json", "cast (timestamp as timestamp) as timestamps") \
.select(from_json("json", schema).alias("data"), "timestamps") \
.selectExpr("data.src_ip as src_ip",
"data.event_type as event_type",
"data.dest_ip as dest_ip",
"data.timestamp as timestamp",
"data.dhcp.type as type",
"data.dhcp.id as id",
"data.dhcp.client_mac as mac",
"data.dhcp.client_ip as cip",
"data.dhcp.hostname as hostname",
"data.dhcp.assigned_ip as aip",
"data.dhcp.requested_ip as rip",
"timestamps as ts") \
.where('event_type = "dhcp" and src_ip != "61.82.88.6" and hostname is not null') \
.groupBy('id' ,window('ts', '30 minutes')) \
.agg(max('src_ip'), max('hostname'), max('cip'), max('rip'), max('ts'), count('id')) \
.orderBy("max(ts)", ascending=False)
# 일단 Console을 통해서 동작 여부를 판단한다.
# 07. 스트림 데이터 쓰기(콘솔)
query = suricata_sql_log.writeStream.\
outputMode('complete').\
option('numRows', 50). \ # 최대 50 행에 대해서만 출력(너무 많으면 한 화면에서 안보임)
option('truncate', False). \
format('console').\
start()
# 08. 스트림 데이터 쓰기(Kafka)
#query = suricata_sql_log.writeStream.format("kafka") \
# .option("kafka.bootstrap.servers", "<IP_ADDRESS>:<PORT>") \
# .option("topic", "suricata_dhcp_processed") \
# .option("checkpointLocation", "/home/tmp/suricata_dhcp") \
# .outputMode('complete') \
# .start()
query.awaitTermination()
이렇게 소스만 달랑 남기는걸 개인적으로 그렇게 좋게 여기지 않지만,
이것을 글로 설명하기에는 너무 바쁜 세상에 살고 있다.
이제 스파크를 통해서 실행해 보려고 하는데,
MESOS 이야기를 거르고 갈 수가 없다.
하지만 바쁜 세상에 살고 있는 우리가 모든 것을 다 핸들링 할 수 없기에
그냥 StandAlone 으로 실행하자.
MESOS + 알파, AirFlow, WatchDog 등 다양한 오픈소스를 함께 사용하고
어느정도 이런 Architect에 대한 큰 그림을 가지고 가야만이
환경구축과 개발에서 나오는 돌발 이슈를 제어 할 수 있다.
그래서 클라우드, 클라우드 하나보다.
실행은 이렇게!
spark-submit --master mesos://192.168.2.12:5050 --packages org.apache.kafka:kafka-clients:0.10.0.1,org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --supervise --executor-memory 2G --total-executor-cores 2 /home/david/env3.5/aiohttp_test/spark_kafka_dhcp.py
콘솔을 1분정도 들여다 보면,
다양한 사용자가 DHCP를 통해 인터넷을 사용하고 있다.
끝.