brunch

You can make anything
by writing

C.S.Lewis

by 비즈스프링 Oct 30. 2023

분석 데이터-매체 데이터 연동은 어떻게 이뤄질까?(3)

지난포스팅에서는 통합매체 리포트(AIR) 데이터를 생성하기 위해 데이터 베이스를 이용하여 실제로 값이 쌓이는 과정과 결과에 대해 살펴보았습니다.


이번 포스팅에는 데이터를 통합매체 리포트에서 사용하기 위해 이루어지는 프로세스에 대해 살펴보겠습니다.


Elasticsearch (출처: Wikipedia)

Elasticsearch는 아파치 루씬(Apache Lucene)기반의 오픈 소스 분산 검색 엔진입니다. NoSQL 기반의 데이터베이스로 JSON 형식의 데이터를 저장하고 검색이 가능합니다. 또한 비정형 데이터를 색인하고 검색하는 것이 가능하고, 데이터를 기반으로 분석 작업을 진행할 수도 있습니다.


자세한 내용은 아래 링크에서도 확인 가능합니다.

Elasticsearch -온라인 서비스를 위한 빅데이터 플랫폼


비즈스프링에서는 대량의 데이터를 빠르게 읽어오고 표현하기 위해 매체통합 리포트를 조회 시, Elasticsearch에 저장되어 있는 데이터를 전달받아 화면에 표시하고 있습니다. 기존에 Logstash를 이용하여 Elasticsearch 데이터 저장에 대한 포스팅을 한 적이 있었습니다. 하지만 근래에 데이터 흐름의 변화로 인하여 Logstash가 아닌 Elasticsearch Bulk API를 이용하여 더욱 더 간단하게 Elasticsearch에 데이터를 저장할 수 있도록 변경하였습니다. 이에 대해 설명하도록 하려고 합니다.




Bulk API

Elasticsearch에서는 대량의 데이터를 한번의 API 호출로 여러 Index에 CRUD(Create, Read, Update, Delete) 처리를 할 수 있도록 Bulk API를 제공하고 있습니다. 이를 이용하면 기존의 데이터 CRUD보다 훨씬 빠른 속도로 데이터 처리가 가능한 장점이 있습니다. (자세한 내용은 해당 링크 (클릭) 에서도 확인 가능합니다.)


Bulk API는 POST 형식으로 /_bulk 주소를 호출하게 되며, body에는 형식에 맞게 작성하여 파일 혹은 내용을 첨부합니다.


POST _bulk

{ "index" : { "_index" : "test", "_id" : "1" } }

{ "field1" : "value1" }

{ "delete" : { "_index" : "test", "_id" : "2" } }

{ "create" : { "_index" : "test", "_id" : "3" } }

{ "field1" : "value3" }

{ "update" : {"_id" : "1", "_index" : "test"} }

{ "doc" : {"field2" : "value2"} }

Create: Index(DB 저장소, 이하 인덱스)에 Document(데이터, 이하 도큐멘트)를 생성한다.

Index: 인덱스의 도큐멘트를 수정한다.

Update: 인덱스의 도큐멘트를 문서의 일부 수정한다. (Index와 유사)


데이터 형식은 JSON 형식으로 작성합니다. 그리고 인덱스에 대한 정보와 원하는 행동에 맞는 데이터에(추가의 경우 추가할 내용, 삭제의 경우 삭제 할 데이터의 ID 등) 대한 내용을 작성합니다. Bulk API 처리 시, 한줄씩 처리를 하게 때문에 한줄씩 작성해야 합니다. 이를 다르게 말하면, 각 줄마다 별도의 Index를 타겟으로 처리도 가능합니다.




Python Elasticsearch Bulk Helpers

위의 Bulk API 형식을 보게되면 인덱스에 접근하려면 모든 줄에 인덱스의 정보 및 관련 데이터를 추가해야 합니다. 즉, 처음부터 Bulk API를 위한 데이터가 아니라면 기존의 Raw 데이터를 Bulk API 형식에 맞게 재가공을 해야하고, 이 과정에서 적지않은 메모리 및 디스크 Load가 일어나게 됩니다.


이러한 불편한 점을 해결하기 위해 Python에서 사용할 수 있는 Elasticsearch Python Client 패키지에서 제공하는 Bulk helpers 를 사용하고 있습니다.


해당 플러그인 공식 홈페이지에는 다음과 같이 소개하고 있습니다.


`There are several helpers for the bulk API since its requirement for specific formatting and other considerations can make it cumbersome if used directly.`


‘기존의 ES에서 제공하는 Bulk API는 고려해야 할 사항이 너무 많기 때문에 성가신 상황이 생기는데 이를 도와주는 기능입니다’ 라고 소개를 하고 있습니다. helpers를 이용하면 대량의 데이터 세트를 메모리에 로드하지 않고도 인덱싱(데이터 추가 작업) 할 수 있기 때문에 빠르고 안정적으로 데이터 추가를 할 수 있습니다.


이러한 장점을 이용하기 위해 JSON 형식으로 생성된 데이터를 읽어와 인덱싱하는 과정을 살펴 보겠습니다.




pip를 통하여 elasticsearch 설치

ES에 접속 및 인덱싱을 위해서는 elasticsearch 패키지를 설치해야 합니다. 다음과 같은 명령어로 설치를 할 수 있습니다.



pip3 install elasticsearch



패키지 설치가 끝나면 다음과 같은 py파일(Python 파일)을 작성합니다.



import sys, json

from elasticsearch import Elasticsearch, helpers

from itertools import islice

from dateutil.parser import parse

# 파일 경로

filePath = ‘20231010_bizspring.json’

# 인덱스명 prefix

indexPrefix = ‘bizspring’

# 인덱스명

indexNm = ‘test’

# 단일 인덱스 혹은 일자별 인덱스를 구분하는 변수

isSingleIndex = false

# bulk 함수를 실행할 데이터 카운트. 한번에 너무 많은 데이터를 전송하면 ES서버에서 메모리 초과 오류가 발생할 가능성이 있기 때문에 100건 정도로 나눠 전송함

unitCnt = 100

# 데이터를 저장한 카운트

sendCnt = 0

# Elasticsearch 접속

es = Elasticsearch("http://esm:9200", timeout=30, max_retries=10, retry_on_timeout=True)

# 파일 내용을 읽음

with open(filePath, 'r') as file:

    print("Start bulk upload to " + indexNm)

    # 데이터 카운트 만큼 나눠서 실행

    for lines in iter(lambda: list(islice(file, unitCnt)), []):

        # bulk helpers를 이용하여 데이터 대량 업로드 실행

        response = helpers.bulk(es, gen_data(lines))

        sendCnt += 1

        print ("bulk upload({}): {}".format(sendCnt, response))

...

def gen_data(lines):

    # 데이터를 라인별로 읽어 bulk 형식에 맞는 object로 변경하여 반환

    for line in lines:

        yield {

            "_index": create_index(line),

            "_source": line

        }

# 인덱스명을 생성하는 함수

# 비즈스프링의 경우, 일자별로 인덱스를 나누는 경우와 하나의 인덱스에서 특정 값으로 분기하는 경우가 있기 때문에 별도의 전역변수로 설정하여 사용함

def create_index(line):

    jsonData = json.loads(line)

    try:

        # 단일 인덱스의 경우 prefix + ‘_’ + 인덱스명

        # 예시의 인덱스명의 경우 ‘bizspring_test’

        if isSingleIndex == 'true':

            index = indexPrefix + '_' + indexNm

        # 일자별 인덱스의 경우 데이터에서 날짜를 추출한 뒤 변환하여 인덱스명에 사용

        # 예시의 인덱스명의 경우 ‘bizspring_test-2023-10-10’

        else:

            # 데이터 중 ‘stat_date(날짜)’를 읽음

            statdate = parse(jsonData['stat_date'])

            # 파이썬 함수인 strftime을 사용하여 yyyy-MM-dd 형식으로 변환

            suffix = statdate.strftime("%Y-%m-%d")

            index = indexPrefix + '_' + indexNm + '-' + suffix

    except Exception as ex:

        index = indexPrefix + '_' + indexNm

    return index



다음 프로그램의 흐름을 설명하면 다음과 같습니다.

Elasticsearch 서버에 접속합니다.

데이터가 저장되어 있는 파일 약 100라인 단위로 읽습니다.

Bulk 업로드를 위해 데이터를 가공하고 Bulk 업로드를 진행합니다.


위와 같이 Python과 Python Elasticsearch Bulk Helpers를 이용하면 간단하게 데이터 대량 업로드가 가능합니다.



이상으로, AIR™(Ad Integrated Report)에서 데이터를 사용하기 위해 이루어지는 프로세스에 대해 정리해보았습니다. 데이터 적재 프로세스에 대해 궁금한 점이 있다면 언제든지 문의 해주시길 바랍니다.


감사합니다.



ad@bizspring.co.kr / 02-6919-5516




마케팅에서의 데이터 활용 기술과 인사이트
No.1 Data Partner for Data-Driven Growth
비즈스프링

공식 블로그 | 페이스북 | 네이버 블로그 | 유튜브 | 트위터 | 슬라이드쉐어


브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari