빅데이터 분석이나, 머신러닝 코드를 만들다 보면 필요한 것 중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이터 베이스 ETL(Extract, Transform, Load) 작업과 비슷한 흐름이라고 보면 된다.
단순하게 cron과 쉘로 순차적으로 수행하는 것으로 가능하지만, 에러가 났을 때 재처리를 하거나, 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.
Apache Airflow는 복잡한 계산을 요하는 작업 흐름과 데이터 처리 파이프라인을
조율하기 위해서 만든 오픈소스 도구이다. 실행할 Task(Operator)를 정의하고
순서에 등록, 실행, 모니터링 할 수 있다.
길이가 긴 스크립트 실행을 cron으로 돌리거나 빅데이터 처리 배치 작업을
정기적으로 수행하려고 할때 Airflow가 도움이 될 수 있다.
Airflow 상의 작업흐름은 방향성 비순환 그래프(DAG)로 설계된다.
즉, 작업 흐름을 짤 때 그것이 어떻게 독립적으로 실행 가능한 태스크들로 나뉠 수 있을까
생각해봐야 한다. 그래야 각 태스크를 그래프로 결합하여 전체적인
논리 흐름에 맞게 합칠 수 있다.
그래프 모양이 작업흐름의 전반적인 논리 구조를 결정한다. Airflow DAG는 여러 분기를 포함할 수 있고 작업흐름 실행 시 건너뛸 지점과 중단할 지점을 결정할 수 있다.
Airflow의 장점 중 하나는 각 태스크에서 오류가 발생할 때 재처리 작업을 편리 하게
수행할 수 있다.
먼저 Airflow를 구성하는 각 컴포넌트의 역할을 간략하게 살펴보자.
Scheduler : 가장 중추적인 역할을 수행하며 모든 DAG(Directed Acyclic Graph)와 태스크를 모니터링하고 관리한다. 그리고 주기적으로 실행해야 할 태스크를 찾고 해당 태스크를 실행 가능한 상태로 변경
Webserver : workflow(DAG)의 실행, 재시작, 로그, 코드 등 모두 WebServer UI로 관리 가능
Kerberos : 인증처리를 위한 티켓 갱신(ticket renewer) 프로세스(선택사항)
DAG Script : 개발자가 작성한 Python 워크 플로우 스크립트
MetaDB : Airflow 메타데이터 저장소이다. 어떤 DAG가 존재하고 어떤 태스크로 구성되었는지, 어떤 태스크가 실행 중이고 또 실행 가능한 상태인지 등의 많은 정보가 기입된다.
Executor : 태스크 인스턴스를 실행하는 주체이며 종류가 다양하다.
Worker : 실제 작업을 수행하는 주체이며 워커의 동작 방식은 Executor의 종류에 따라 상이하다.
앞에서 Airflow의 기본 동작 원리를 설명하면서 Airflow에 Executor라는 개념이
있다고 언급했는데, Executor는 작업의 한 단위인 Task를 실행 하는 주체이다.
Executor에는 다양한 종류가 있고 각 종류에 따라 동작 원리가 상이하다. 현재 Airflow에는 Sequential Executor와 Debug Executor, Local Executor, Dask Executor, Celery Executor, Kubernetes Executor를 제공하고 있으며 Airflow 2.0에는 CeleryKubernetes Executor가 추가되었다.
apache/airflow를 docker images로 로컬에서 간단하게 설치하고 테스트 할 수 있다.
$ git clone https://github.com/puckel/docker-airflow
$ cd docker-airflow
$ docker pull puckel/docker-airflow
$ docker-compose -f docker-compose-LocalExecutor.yml up -d
또는
$ docker-compose -f docker-compose-CeleryExecutor.yml up -d
$ docker ps // 현재 실행중인 컨테이너 확인
자세한 사용방법은 아래 링크를 참고하자.
https://github.com/puckel/docker-airflow
그 후 local:8080을 통해 airflow web ui에 접속 가능하다.
Airflow Tutorial 페이지를 참고해서 Tutorial을 진행해보자.
simple_bash 라는 이름의 DAG를 생성할 것이다. Docker Compose를 실행한 경로(다른 경로로 이동하지 않았다면 docker-airflow/dags) 에 dags라는 디렉토리가 있을 것이다. 이 디렉토리에 simple_bash.py 파일을 생성하고, 작성을 시작한다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
DAG 및 DAG 내 Task들에 일괄적으로 적용할 속성 객체를 작성한다.
default_args={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 5, 16, 14, 0),
'email': ['kaven@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'on_failure_callback': SlackWebhook.airflow_failed_callback
}
Callback을 사용하여 task가 성공하거나 실패했을 경우 알람을 보내는 등의 동작을 구성할 수있다.
DAG 객체를 아래와 같이 정의할 수 있다.
dag = DAG(
'tutorial_bash',
default_args=default_args,
description='My first tutorial bash DAG',
schedule_interval= '* * * * *'
)
schedule_interval에 아래와 같이 DAG 스케줄링 간격(Cron 표현식 혹은 미리 정의된 속성 사용 가능)을
지정할 수 있다.
hello world를 출력하는 작업(say_hello)와 현재 시간을 출력하는 작업(what_time)을 정의할 것이다.
t1 = BashOperator(
task_id='say_hello',
bash_command='echo "hello world"',
dag=dag
)
t2 = BashOperator(
task_id='what_time',
bash_command='date',
dag=dag
)
t1 >> t2
BashOperator 에는 다음과 같은 속성이 존재한다.
또한 t1 >> t2 는 t1이 실행된 후 t2를 실행한다는 의미이다.(t1이 t2의 Upstream Task)
Airflow CLI로 방금 만든 DAG가 잘 반영되었는지 확인해보자. 원래는 airflow list_dags 명령어로 Airflow에 등록된 DAG 목록을 출력할 수 있는데, 여기서는 Docker Compose로 띄워 놓았기 때문에 airflow list_dags 명령어 앞에 docker-compose -f docker-compose-CeleryExecutor.yml run –rm webserver를 붙여주어야 한다.
단, docker-airflow 위치에서 아래 명령어를 실행 한다.
$ docker-compose -f docker-compose-CeleryExecutor.yml run --rm webserver airflow list_dags
-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
tutorial
tutorial_bash
WebServer에서도 일정 시간이 지나면 아래와 같이 tutorial_bash가 만들어진 것을 확인 할 수 있다.
만들어진 DAG는 활성화된 상태가 아니어서(Paused) 실행되지 않는다. 실행을 위해서는 CLI나 Web UI상에서 ‘Off’ 버튼을 눌러 ‘On’ 상태로 변경해주어야 한다.
CLI Command는 airflow unpause [DAG ID] 로써 여기서는 Docker compose 명령어와 함께 아래와 같이 실행하면 된다.
> docker-compose -f docker-compose-CeleryExecutor.yml run --rm webserver airflow unpause tutorial_bash
[2020-05-16 06:04:41,772] INFO - Filling up the DagBag from /usr/local/airflow/dags/tutorial_bash.py
Dag: tutorial_bash, paused: False
Web UI에서 확인하면 ‘Off’ 였던 상태가 ‘On’ 으로 변경되고, DAG가 실행되고 있는 것을 볼 수 있다.
DAG에서 특정 tak를 클랙했을 때 아래와 같이 팝업창을 볼수 있다. 각 task에 대해 로그를 보거나 rendered 된 파라미터를 볼수도 있고, task 실패시 Clear 버튼을 통해 retry 할수도 있다.
여기서 선택한 task 부터 다시 돌리고 싶을 경우 Future 를 선택 후 Clear를
클릭한다.
다음으로는 Apache airflow를 사용함에 있어서 혼동할 수 있는 부분과 주의사항에 대해 살펴보자.
airflow에서 DAG를 동작시키고 task instance를 확인해 보면
아래와 같이 Execution Date, Start Date, End Date를 확인 할수 있다.
excution_date를 살펴보면 의문점이 들 수 있다.
아래 stackoverflow에 airflow사용자들이 주로 혼란스러워하는 포인트가 잘 표현되어 있다.
https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why
결론 부터 말하면 execution_date는 실행날짜가 아니라 주문번호(run id)이다. 굳이 시간으로
이해하고 싶다면 예약 시간이 아니라 예약을 잡으려고 시도한 시간이라고
이해해야 한다.
아래와 같이 dag가 설정되어 있다고 가정해보자.
'start_date': datetime(2019, 6, 6),
......
schedule_interval="@daily",
......
이렇게 하면 dag는 실제로 2019년 6월 7일에 최초 실행되며, execution_date에는 2019-06-06값이 들어가게 된다.
그리고 다음날인 6월 8일에 실행되고, execution_date에는 2019-06-07값이 들어 가게 되고 하루마다 반복된다.
따라서, 위의 stackoverflow 질문자는 두 가지를 모르고 있다.
1. execution_date가 실행시간이 아니라 주문번호라는 것
주문 번호를 “execution_date”:”2016-06-20” 라고 넣고, 어째서 “2016-09-21”에
실행(fire)되냐고 묻고 있는 것이다.
주문 번호가 “2016-06-20” 이고 schedule_interval이 1day이면 “2016-09-21”에
실행 된다.
2. start_date를 now()로 설정하면 위험하다는 점
airflow에서 dynamic value를 쓰지말라고 권고 하고 있을 뿐더러, 특히 질문자처럼 execution_date 잘못 이해하고 있으면 airflow는 분명히 의도와는 다르게 동작할 것이다.
회사에서도 많이 느꼈지만 pipeline의 실패나 혹은 데이터 재처리를 할 경우가 종종 발생한다. 이 경우 과거 데이터를 재처리 시켜주는 명령인 backfill을 진행할 때가 있는데 pipeline이 실행했던 시점의 데이터와 날짜를 넣어야 하다 보니 excution_date를 날짜 변수값으로 이용해 꼭 필요한 값이라 생각한다.
UTC 환경을 전제로 작업할 경우, 즉 UTC 기준 날짜로 로그가 쌓이고 UTC 기준으로 로그를 읽을 경우 고려하지 않아도 되지만 한국 시간(UTC +9)기준으로 로그를 사용한다면, UTC 기준 15:00 ~ 24:00 사이에 서로 날짜가 다르기 때문에 문제가 발생할 수 있다.
바꿔 말하면, 한국 시간으로 오전 9시 이전에 스케줄링해야 하는 경우에 문제가 발생한다.
우리는 하루에 한번 실행되는 배치를 만든다고 가정하고 실행되는 날짜 기준으로 전날 데이터를 작업한다고 가정해보자.
아래와 같은 경우는 정상적으로 실행된 예시이다.
schedule_interval="@daily",
그런데 아래처럼 한국 시간 오전 9시 이전 시간으로 지정하면 문제가 발생한다.
schedule_interval="31 15 * * *", #(한국 시간 00:31)
2019/06/12
00:31)2019/06/12 에 2019/06/10 일자 로그 작업을 하게 된 것이다.
따라서 한국시간 기준으로 2019-06-12일에 실행되고 2019-06-10일 데이터를 작업하게 된다.
과거에 start_date를 설정하면 airflow는 과거의 task를 차례대로 실행하는 Backfill을 실행한다.
간혹 “과거 언제부터 데이터를 쭈욱 빌드해 주세요” 라는 요청을 받으면 과거 start_date를
잘 설정하기만 하면 빌드는 자동으로 과거부터 실행되어 편리하게 데이터를 빌드할 수 있다.
하지만 이런 동작을 원하지 않는 경우도 있다.
그럴 때는 DAG를 선언할 때 Catchup 설정을 False
로 해주면 backfill을 실행하지 않는다.
즉, 과거의 작업은 중요하지 않고, 현재 시점의 이후 dag만
실행되어야 한다면, 설정을 아래와 같이 변경할 수 있다.
dag = DAG(
dag_id="test_dag",
default_args=default_args,
start_date=datetime(2021, 1, 1, tzinfo=kst),
schedule_interval="0 8 * * *",
catchUp=False,
)
Reference
https://airflow.readthedocs.io/en/1.10.12/dag-run.html
https://blog.naver.com/gyrbsdl18/221561318823
https://zzsza.github.io/data/2018/01/04/airflow-1/
https://medium.com/@aldente0630/%EC%95%84%ED%8C%8C%EC%B9%98-%EC%97%90%EC%96%B4%ED%94%8C%EB%A1%9C%EC%9A%B0%EB%A1%9C-%EC%9E%91%EC%97%85%ED%9D%90%EB%A6%84-%EA%B0%9C%EB%B0%9C%ED%95%B4%EB%B3%B4%EA%B8%B0-8f3653d749b4
https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
https://bomwo.cc/posts/execution_date/
https://leeyh0216.github.io/2020-05-16/airflow_install_and_tutorial
https://www.bucketplace.com/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/