brunch

You can make anything
by writing

C.S.Lewis

by 강진우 Aug 19. 2020

람다를 이용해 CF 로그를 ES에 저장하기

Journey to AWS

오늘 다룰 이야기는 제목 그대로 람다를 이용해 CF 로그를 ES에 저장하는 과정에 대한 이야기입니다. Logstash라는 좋은 툴이 있는데 왜 람다를 이용했는지에 대한 이야기와 어떤 문제를 해결하려고 했는지에 대한 이야기를 해보려고 합니다. 그럼 시작하겠습니다.


Logstash를 통한 CF 로그 파싱의 한계


Logstash를 이용해 CF의 로그를 ES에 저장하는 방법은 구글링 해보면 많은 자료들이 나옵니다. CF의 로그들이 S3에 저장되고 Logstash의 input을 S3로 지정한 후 Grok 패턴을 이용해 각 필드들을 뽑아내고 output으로 ES를 지정하는 형태로 동작하게 됩니다.

CF 로그를 logstash를 통해 처리하는 과정

특히 AWS의 프리미엄 서포트 내용 중에도 같은 내용이 있습니다. (https://aws.amazon.com/ko/premiumsupport/knowledge-center/cloudfront-logs-elasticsearch/) 이 글은 AWS ES에 저장하는 내용을 담고 있습니다만 Logstash를 통해 파싱 한 아웃풋을 각자가 운영하는 ES의 엔드포인트로 변경한다면 코드를 그대로 활용할 수도 있습니다. 하지만 Logstash를 이용한 방법에는 한 가지 문제가 있습니다. 바로 다수의 Logstash 인스턴스를 운영할 수 없다는 것입니다.

logstash offset 관리 기능

예를 들어 Kafka를 통해서 다수의 Logstash 인스턴스들이 로그를 파싱 한다고 가정했을 때 각각의 Logstash 들은 자신이 Kafka에서 몇 번째 로그를 가져오고 있는지와 같은 offset 공유를 통해서 한 번 가져온 로그를 다시 가져오지 않게 동작합니다. 물론 서로 다른 파티션에서 작업을 하기 때문에 섞이지 않기도 합니다. 하지만 S3에서 로그를 파싱 할 때는 각각의 Logstash가 어떤 파일을 가져오는지에 대한 offset 공유가 되지 않기 때문에 한 번 파싱 한 로그를 또 파싱 하거나 하는 문제가 발생합니다. 사실 CF의 로그 양이 많지 않을 경우에는 Logstash 인스턴스를 다수로 운영할 필요가 없지만, CF의 로그 양이 많을 경우에는 Logstash 인스턴스 한대로는 파싱 작업이 밀리기 시작하고 결국 로그 수집 자체가 지연되어 아무런 의미가 없어집니다. 그래서 CF의 로그를 ES로 저장하기 위한 다른 방법을 고민해야 할 필요가 생겼습니다.


S3 ObjectCreated 이벤트와 람다 함수


그리고 고민하던 끝에 람다 함수를 이용하는 어떨까 라는 생각이 들었습니다. 람다 함수를 실행할 수 있는 다양한 이벤트들 중에 S3 ObjectCreated 이벤트를 활용하여 로그를 수집하면 CF로부터 업로드된 로그 파일 하나당 람다 함수가 하나씩 붙어서 파싱 하게 되기 때문에 중복 파싱의 염려도 없고 로그가 급증할 때에도 로그가 밀리지 않기 때문입니다.

람다를 활용한 로그 파싱

실제 람다 함수의 구성


그럼 실제 람다 함수의 코드는 어떻게 구성되어 있는지 살펴보겠습니다.

S3에 업로드된 로그 파일을 람다의 /tmp에 다운로드하는 코드

우선 S3에 업로드된 로그 파일을 파싱 하기 위해서는 람다 내부에 다운로드하여야 합니다. 람다 내부는 원래 read-only 파일 시스템이지만 /tmp 디렉터리는 512MB까지 읽기/쓰기 작업을 할 수 있습니다. 따라서 /tmp 디렉터리에 임시 파일을 생성해서 업로드된 로그 파일을 람다 내부에 저장합니다.

저장된 파일은 gz 형태의 파일이기 때문에 gz 파일을 읽기 위해 gzip 라이브러리로 파일을 읽습니다. 그리고 파일을 한 줄씩 읽어낼 scanner를 생성하고 파싱 된 문서를 저장할 ES와의 커넥션을 생성합니다.

gzip 파일을 읽고 ES와 커넥션 생성

다음으로 scanner를 통해 파일을 한 줄씩 읽어서 파싱 합니다. 여기에 Grok 패턴을 적용할 수도 있지만 CF의 로그는 탭을 기준으로 나눠져 있기 때문에 간단하게 Split 함수를 사용할 수도 있습니다. 그렇게 Split으로 생성된 데이터를 CloudFrontLog라는 구조체에 넣어 줍니다.

scanner로 한 줄씩 읽고 파싱 하기

CloudFrontLog 구조체는 아래와 같이 정의되어 있습니다.

CloudFrontLog 구조체

이 부분은 조금 중요한데 간혹 Split으로 33개 미만의 필드가 생성되는 CF 로그들이 있습니다. 모든 CF 로그들이 모든 필드를 가지고 있지 않기 때문에 33개 미만의 필드가 생성되는 로그들은 우선 버리도록 해 두었습니다. 너무 복잡한 예외 처리가 오히려 복잡성을 높이지 않을까 라는 생각이었습니다.

그리고 파싱 된 데이터들은 ES의 Bulk API를 통해 한 번에 BulkSize에 저장된 개수씩 저장됩니다. BulkSize의 경우는 ES의 성능을 기반으로 유동적으로 설정하면 됩니다.

남은 Bulk 작업 수행

그리고 마지막까지 남은 Bulk 작업이 있다면 수행하게 하고 함수를 종료합니다. 이렇게 구성하면 CF 로그가 10분 단위로 천만 건이 들어오는 경우에도 로그가 밀리지 않고 잘 수집되는 것을 볼 수 있습니다.

ES를 통해 확인하는 CF 로그

마치며


Logstash를 통한 CF 로그 파싱의 한계로 인해 한동안 CF의 로그들을 수집하고 분석하지 못했었는데, 람다를 사용한 새로운 구조를 통해 순간적으로 많은 양의 CF 로그가 들어와도 밀리지 않고 수집할 수 있게 되었습니다. 이 글이 저와 같은 고민을 하시는 분들에게 좋은 자료가 되었으면 좋겠습니다. 끝까지 읽어 주셔서 감사합니다.


브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari