Logstash에서 DB(MySQL, Maria) 연동하기
Logstash에서 DB(MySQL, Maria)를 연동하여 데이터를 수집하는 방법을 정리한다.
Logstash에서 DB를 연동하기 위해서는 jdbc input 플러그인을 사용한다. 플러그인 속성은 다음과 같다.
각 속성은 필드명만으로 설명이 가능할 정도로 명확하기 때문에 몇 가지 중요한 부분만 언급하고자 한다.
- jdbc_driver_class
검색엔진을 통해 나오는 내용을 보면 “com.mysql.jdbc.Driver”가 주로 언급되지만, 최신 library를 사용하면 deprecated라고 표시된다. “com.mysql.cj.jdbc.Driver”를 사용하자. (JDBC 6.X 버전에서 변경됨)
- tracking_column, tracking_column_type
파일에서 데이터를 수집할 때는 새롭게 파일이 추가되거나 파일 내용이 추가된 부분부터 데이터를 재수집할 수 있었다. DB를 사용한다면 어떻게 해야 할까? 이때 사용하는 속성이 tracking_column이다. 뒤에 더 자세히 설명하겠지만, 가장 최근 수정된 데이터를 담고 있는 필드를 지정해야 한다. 데이터 수정 일시 필드가 대표적이다.
- statement
수집하고자 하는 데이터를 만들어주는 쿼리다. 쿼리에 반드시 필요한 부분이 위에서 설명한 tracking_column과 ORDER BY 절이다. 검색 항목에 tracking_column을 지정해주고 tracking_column 필드의 오름차순(ASC)으로 정렬한다.
한 가지 더. 기존에 수집한 데이터 이후의 데이터만 수집하기 위해 WHERE 절에 UNIX_TIMESTAMP(update_datetime) > :sql_last_value AND update_datetime < NOW() 도 있지 말자. 이에 대한 자세한 설명은 아래의 블로그 글을 참고한다.
주기적으로 쿼리 수행을 통해 수집된 데이터를 ElasticSearch에 인덱싱 하기 위해서는 반드시 document_id를 지정해 줘야 한다. 이 정보가 설정되지 않는다면 쿼리가 수행될 때마다 매번 데이터가 추가로 인덱싱 된다.
때문에, 아래와 같이 document_id를 지정해주며 이때 document_id의 값은 조회하는 테이블의 PK 값으로 설정하는 게 좋다. 복합 키일 경우, mutate 필터의 add_field를 사용해서 아이디를 생성해 주면 된다.