brunch

You can make anything
by writing

C.S.Lewis

by 유윤식 Apr 24. 2019

Python: AirFlow(1)

공기의 흐름!? 클라우드 시대의 가장 유망한 프로젝트.

매우 사견이지만

AWS, AZURE 등 잘나간다는 클라우드에서 나오는 서비스의

대부분이

오픈소스를 기반으로 하거나,

오픈소스의 아이디어를 취하며 새로운 이름으로 런칭되고 있다.


사실, 이건 99% 사실이다.


AirFlow를 혹시 들어보거나 활용해본 사람을 주변에서 잘 보지 못한다.

해외의 사례는 좀 다른다.


개인이든 작은 규모의 프로젝트팀이든

AirFlow를 통한 내부 개발 로직을 관리하는 경우가 많다.


AWS, AZURE, GCP 등

산을 만들고 숲을 이룬 케이스의 클라우드 플랫폼이 운영되는,

사용자 앞단에서 가장 큰 영향력을 가지고 있는 로직이,

바로 AirFlow와 같다.


한국의 Naver가 클라우드 플랫폼을 만든다고 한다.

이미 만들었다. 주변에서 종종 전해듣기론,

아직 완성은 아니라고 한다.


일단 AIrFlow를 잠시 사용해본 경험(?)에서 보자면,


예)

    1. 머신러닝 학습 결과물(모델) 생성

    2. 웹 서비스로 적용

    3. 사용자 서비스 실행

    4. 사용자 로그 수집


이런게 있다면 먼저 1. 작업이 완성된 후에,

2. 작업을 진행 할 수 있다. (의존성 주입, Down_Stream)

그 다음, 3. 작업을 진행하고 동시에 4. 작업을 이어간다. (의존성 주입, Up_Stream)


위의 모든 사항을 부드럽게 구성하고 관리/감독 할 수 있게끔 해주는게

바로 'AIrFlow'의 역할이다.


Tutorial 간단히 따라할 수 있는 URL 공유


https://airflow.apache.org/start.html



열심히 따라해 보면 대략 이런 UI를 만나게 된다.


DAG
DAG ID: tutorial => Copy Version


AirFlow 실행

따라하세요! (자세한 설명은 Tutorial URL의 Quick Start 참고)


1. >> pip install apache-airflow

2. >> export AIRFLOW_HOME=/comp/airflow

3. >> airflow initdb

3. >> airflow webserver -p 8060 --daemon

4. >> airflow scheduler



코드하나 방출

알맞게 수정하세요!


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from datetime import datetime, timedelta

import airflow

default_args = {

    'owner': 'hello_airflow',

    'depends_on_past': False,

    'start_date': airflow.utils.dates.days_ago(10),

    'email': ['digital@openbase.co.kr'],

    'email_on_failure': True,

    'email_on_retry': True,

    'retries': 3,

    'retry_delay': timedelta(minutes=30),

    # 'queue': 'bash_queue',

    # 'pool': 'backfill',

    # 'priority_weight': 10,

    'end_date': datetime(2019, 4, 30),

}

dag = DAG('tutorial_david', default_args=default_args, schedule_interval='* * * * *')

t1 = BashOperator(

    task_id='print_date',

    bash_command='date',

    dag=dag)

t2 = BashOperator(

    task_id='sleep',

    bash_command='sleep 5',

    retries=3,

    dag=dag)

templated_command = """

    {% for i in range(5) %}

        echo "{{ ds }}"

        echo "{{ macros.ds_add(ds, 7)}}"

        echo "{{ params.my_param }}"

    {% endfor %}

"""

t3 = BashOperator(

    task_id='templated',

    bash_command=templated_command,

    params={'my_param': 'Parameter I passed in'},

    dag=dag)


t1 >> [t2, t3]



이 코드는 Quick Start 에서 나오는 tutorial 예제를 따라하면서 조금 변형된 형태이다.

하지만 큰 틀은 유지하였다.


결국 전달하고자 하는 바는 이것이다.

Ref: https://www.google.com/url?sa=i&source=images&cd=&cad=rja&uact=8&ved=2ahUKEwjfxsyK0efhAhWbxosBH


끝.

작가의 이전글 Python: m.l with Keras #8
브런치는 최신 브라우저에 최적화 되어있습니다. IE chrome safari