# Flask, # WebSocker, # Gevent
처음 브런치를 사용하는 사람
이런 글쓰는 일에 익숙하지 않다.
파이썬을 통해서 머신러닝과 API 쪽을 다루면서
리얼타임(Real-time) 서비스의 필요성을 느꼈다.
달랑 코드만 남기고 혼자만 볼 수 있는 글을 남겨놓는 쉬운(?) 방법을 두고
이렇게 글을 쓰는 이유는
"나 또한 프로젝트를 진행하면서 다른 여러 사람들의 공유자원을 이용하여
해결하였다."
나도 다른 사람들에게 조금이나마 도움을 줄 수 있기를 바라며
이 글을 쓰기 시작한다.
프로젝트의 개요는 이렇다.
대략 10대의 Application 에서 나오는 로그 / IDS 로그 등을 Kafka, Redis 를 통하여 받고,
Spark 를 통해서 실시간 스트림 API 를 만들어 데이터를 가공,
Python Webserver 에서 사용자와의 인터페이스를 구현한다.
말로하면 정말 간단한 일이다.
실제로도 큰 작업이 없다.
나는 이 중에서 Python Webserver, Redis, React 를 통해 실시간 데이터를 처리하고,
View 에서 보여주는 간단한 데모를 전달하려고 한다.
한가지 전제조건은 IDS에서 실시간으로 데이터를 Redis로 보내고 있는 상황이다.
Redis ==> Python WebServer ==> React 구조를 기억하고.
나머지는 소스코드에 대한 설명.
1. Redis 에서 데이터 구독
r = redis.StrictRedis(host='127.0.0.1', port=6379, db=0, charset='utf-8', decode_responses=True)
p = r.pubsub()
p.subscribe('suricata')
2. WebSocket 데이터 스트리밍
if request.environ.get('wsgi.websocket'):
ws = request.environ['wsgi.websocket']
while True:
msg = p.get_message()
sleep(.5)
if msg:
in_msg = msg['data']
in_msg = json.loads(str(in_msg))
# 사실, Pandas 를 통해서 데이터를 가공하는 부분이다.
# 간단하게 데이터를 dict 화 하여 내보내는 걸로 예를 대신한다.
res = json.dumps(in_msg)
ws.send(res)
3. Yield 를 통한 데이터 스트리밍
# 위의 코드를 내부 함수로 감싸고 그 함수 자체를 리턴해준다.
@app.route('/redis_data')
def redis_data():
def redis_data_parse():
# 위의 코드를 여기로...
return Response(redis_data())
4. React 에서 데이터 처리
onSocketOpen() {
console.log('Connection established!')
}
onSocketData(message){
let decoded = JSON.parse(message.data)
this.setState({
real: decoded
});
}
onSocketClose(){
console.log('socket closed');
}
componentDidMount() {
this.socket = new WebSocket("ws://" + document.domain + ":8787/redis_data");
this.socket.onopen = () => this.onSocketOpen()
this.socket.onmessage = (m) => this.onSocketData(m)
this.socket.onclose = () => this.onSocketClose()
}
단순히 Component 에서 추가하여 데이터를 state 에 담기만 하면 실시간으로 렌더링 되는 것을 볼 수 있다.
어려울 것은 없지만, 전체적인 흐름에서는 데이터가 섞이는 구간에서 약간의 테크닉적(?)인 처리가 필요했다.
전체 코드를 통해 실습이 필요한 분들이 있다면 기꺼이 코드를 제공하고 싶다.
# Tips
비동기 서버(Flask), 웹소켓을 실행하기 위해서는,
1. from flask import Flask, Response
2. import gevent.pywsgi as wsgi
3. from gevent import monkey
4. from geventwebsocket.handler import WebSocketHandler
마지막으로!
def main():
print('start python server...')
app.debug = True
gevent_server = wsgi.WSGIServer(('0.0.0.0', 8787), app,
handler_class=WebSocketHandler)
gevent_server.serve_forever()
if __name__ == '__main__':
main()