빅데이터 분석이나, 머신러닝 코드를 만들다 보면 필요한 것 중에 하나가 여러개의 태스크를 연결해서 수행해야 할 경우가 있다. 데이터 베이스 ETL(Extract, Transform, Load) 작업과 비슷한 흐름이라고 보면 된다.
단순하게 cron과 쉘로 순차적으로 수행하는 것으로 가능하지만, 에러가 났을 때 재처리를 하거나, 수행 결과에 따라 분기를 하는 등 조금 더 구조화된 도구가 필요하다.
Apache Airflow는 복잡한 계산을 요하는 작업 흐름과 데이터 처리 파이프라인을
조율하기 위해서 만든 오픈소스 도구이다. 실행할 Task(Operator)를 정의하고
순서에 등록, 실행, 모니터링 할 수 있다.
길이가 긴 스크립트 실행을 cron으로 돌리거나 빅데이터 처리 배치 작업을
정기적으로 수행하려고 할때 Airflow가 도움이 될 수 있다.
Airflow 상의 작업흐름은 방향성 비순환 그래프(DAG)
로 설계된다.
즉, 작업 흐름을 짤 때 그것이 어떻게 독립적으로 실행 가능한 태스크들로 나뉠 수 있을까
생각해봐야 한다. 그래야 각 태스크를 그래프로 결합하여 전체적인
논리 흐름에 맞게 합칠 수 있다.
위 그림에서 전체 Workflow를 DAG라고 하며 각각의 작업 단위를 Task라고 한다.
그래프 모양이 작업흐름의 전반적인 논리 구조를 결정한다. Airflow DAG는 여러 분기를 포함할 수 있고 작업흐름 실행 시 건너뛸 지점과 중단할 지점을 결정할 수 있다.
Airflow의 장점 중 하나는 각 태스크에서 오류가 발생할 때 재처리 작업을 편리 하게
수행할 수 있다.
먼저 Airflow를 구성하는 각 컴포넌트의 역할을 간략하게 살펴보자.
앞에서 Airflow의 기본 동작 원리를 설명하면서 Airflow에 Executor라는 개념이
있다고 언급했는데, Executor는 작업의 한 단위인 Task를 실행 하는 주체이다.
Executor에는 다양한 종류가 있고 각 종류에 따라 동작 원리가 상이하다. 현재 Airflow에는 Sequential Executor와 Debug Executor, Local Executor, Dask Executor, Celery Executor, Kubernetes Executor를 제공하고 있으며 Airflow 2.0에는 CeleryKubernetes Executor가 추가되었다.
공식문서 를 참고하여 설치를 진행해보자.
$ mkdir airflow-docker
$ cd airflow-docker
### fetch docker-compose file
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.0.2/docker-compose.yaml'
$ mkdir ./dags ./logs ./plugins
$ echo -e "AIRFLOW_UID=$(id -u)\nAIRFLOW_GID=0" > .env
$ docker-compose up airflow-init
### start all service
$ docker-compose up
위의 명령어를 모두 실행 후 localhost:8080 에서 web ui를 확인할 수 있으며, 초기 id/pw 는 airflow 이다.
테스트가 완료된 이후 컨테이너와 database에 있는 데이터의 volumes 등을 아래와 같이 정리해준다.
docker-compose down --volumes --rmi all
Airflow Tutorial 페이지를 참고해서 Tutorial을 진행해보자.
simple_bash 라는 이름의 DAG를 생성할 것이다. Docker Compose를 실행한 경로(다른 경로로 이동하지 않았다면 docker-airflow/dags) 에 dags라는 디렉토리가 있을 것이다. 이 디렉토리에 simple_bash.py 파일을 생성하고, 작성을 시작한다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
DAG 및 DAG 내 Task들에 일괄적으로 적용할 속성 객체를 작성한다.
default_args={
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2020, 5, 16, 14, 0),
'email': ['kaven@gmail.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1),
'execution_timeout': timedelta(seconds=300),
'on_failure_callback': SlackWebhook.airflow_failed_callback
}
Callback을 사용하여 task가 성공하거나 실패했을 경우 알람을 보내는 등의 동작을 구성할 수있다.
DAG 객체를 아래와 같이 정의할 수 있다.
dag = DAG(
'tutorial_bash',
default_args=default_args,
description='My first tutorial bash DAG',
schedule_interval= '* * * * *'
)
schedule_interval에 아래와 같이 DAG 스케줄링 간격(Cron 표현식 혹은 미리 정의된 속성 사용 가능)을
지정할 수 있다.
hello world를 출력하는 작업(say_hello)와 현재 시간을 출력하는 작업(what_time)을 정의할 것이다.
t1 = BashOperator(
task_id='say_hello',
bash_command='echo "hello world"',
dag=dag
)
t2 = BashOperator(
task_id='what_time',
bash_command='date',
dag=dag
)
t1 >> t2
BashOperator 에는 다음과 같은 속성이 존재한다.
또한 t1 >> t2 는 t1이 실행된 후 t2를 실행한다는 의미이다.(t1이 t2의 Upstream Task)
dags 폴더에 위에서 작성한 파이썬 파일을 생성 후 docker compose up 을 통해 실행하게 되면 아래와 같이 web ui에서 생성한 dag를 확인할 수 있다.
만들어진 DAG는 활성화된 상태가 아니어서(Paused) 실행되지 않는다. 실행을 위해서는 CLI나 Web UI상에서 ‘Off’ 버튼을 눌러 ‘On’ 상태로 변경해주어야 한다.
Web UI에서 확인하면 ‘Off’ 였던 상태가 ‘On’ 으로 변경되고, DAG가 실행되고 있는 것을 볼 수 있다.
DAG에서 특정 task를 클랙했을 때 아래와 같이 팝업창을 볼수 있다. 각 task에 대해 로그를 보거나 rendered 된 파라미터를 볼수도 있고, task 실패시 Clear 버튼을 통해 retry 할수도 있다.
일부 task가 실패했거나, 데이터에 변경이 발생해서 특정 시점 작업을 다시 수행해야 할 수 있다.
Dag 실행 화면의 Tree 보기 또는 Graph 보기에서 실패한 Task를
클릭하면 Task의 세부 동작을 설정할 수 있는 팝업이 출력된다.
Clear 버튼을 클릭하여 재처리를 할 수 있고 삭제 대상 Task 범위를 지정해 줄 수 있다.
특정 Task 이후 모든 Task를 재처리 해야 하는 경우 모든 Task를 일일이 클릭하지 않고 범위를 지정하여 한번에 재처리가 가능하다.
선택 가능한 삭제 대상 Task 범위는 다음과 같다.
Past, Future, Upstream, Downstream을 그림으로 표현하면 다음과 같다.
다음으로는 Apache airflow를 사용함에 있어서 혼동할 수 있는 부분과 주의사항에 대해 살펴보자.
airflow에서 DAG를 동작시키고 task instance를 확인해 보면
아래와 같이 Execution Date, Start Date, End Date를 확인 할수 있다.
excution_date를 살펴보면 의문점이 들 수 있다.
아래 stackoverflow에 airflow사용자들이 주로 혼란스러워하는 포인트가 잘 표현되어 있다.
https://stackoverflow.com/questions/39612488/airflow-trigger-dag-execution-date-is-the-next-day-why
결론 부터 말하면 execution_date는 실행날짜가 아니라 주문번호(run id)이다. 굳이 시간으로
이해하고 싶다면 예약 시간이 아니라 예약을 잡으려고 시도한 시간이라고
이해해야 한다.
아래와 같이 dag가 설정되어 있다고 가정해보자.
'start_date': datetime(2019, 6, 6),
......
schedule_interval="@daily",
......
이렇게 하면 dag는 실제로 2019년 6월 7일에 최초 실행되며, execution_date에는 2019-06-06값이 들어가게 된다.
그리고 다음날인 6월 8일에 실행되고, execution_date에는 2019-06-07값이 들어 가게 되고 하루마다 반복된다.
따라서, 위의 stackoverflow 질문자는 두 가지를 모르고 있다.
1. execution_date가 실행시간이 아니라 주문번호라는 것
주문 번호를 “execution_date”:”2016-06-20” 라고 넣고, 어째서 “2016-09-21”에
실행(fire)되냐고 묻고 있는 것이다.
주문 번호가 “2016-06-20” 이고 schedule_interval이 1day이면 “2016-09-21”에
실행 된다.
2. start_date를 now()로 설정하면 위험하다는 점
airflow에서 dynamic value를 쓰지말라고 권고 하고 있을 뿐더러, 특히 질문자처럼 execution_date 잘못 이해하고 있으면 airflow는 분명히 의도와는 다르게 동작할 것이다.
회사에서도 많이 느꼈지만 pipeline의 실패나 혹은 데이터 재처리를 할 경우가 종종 발생한다. 이 경우 과거 데이터를 재처리 시켜주는 명령인 backfill을 진행할 때가 있는데 pipeline이 실행했던 시점의 데이터와 날짜를 넣어야 하다 보니 excution_date를 날짜 변수값으로 이용해 꼭 필요한 값이라 생각한다.
UTC 환경을 전제로 작업할 경우, 즉 UTC 기준 날짜로 로그가 쌓이고 UTC 기준으로 로그를 읽을 경우 고려하지 않아도 되지만 한국 시간(UTC +9)기준으로 로그를 사용한다면, UTC 기준 15:00 ~ 24:00 사이에 서로 날짜가 다르기 때문에 문제가 발생할 수 있다.
바꿔 말하면, 한국 시간으로 오전 9시 이전에 스케줄링해야 하는 경우에 문제가 발생한다.
우리는 하루에 한번 실행되는 배치를 만든다고 가정하고 실행되는 날짜 기준으로 전날 데이터를 작업한다고 가정해보자.
아래와 같은 경우는 정상적으로 실행된 예시이다.
schedule_interval="@daily",
그런데 아래처럼 한국 시간 오전 9시 이전 시간으로 지정하면 문제가 발생한다.
schedule_interval="31 15 * * *", #(한국 시간 00:31)
2019/06/12
00:31)2019/06/12 에 2019/06/10 일자 로그 작업을 하게 된 것이다.
따라서 한국시간 기준으로 2019-06-12일에 실행되고 2019-06-10일 데이터를 작업하게 된다.
과거에 start_date를 설정하면 airflow는 과거의 task를 차례대로 실행하는 Backfill을 실행한다.
간혹 “과거 언제부터 데이터를 쭈욱 빌드해 주세요” 라는 요청을 받으면 과거 start_date를
잘 설정하기만 하면 빌드는 자동으로 과거부터 실행되어 편리하게 데이터를 빌드할 수 있다.
하지만 이런 동작을 원하지 않는 경우도 있다.
그럴 때는 DAG를 선언할 때 Catchup 설정을 False
로 해주면 backfill을 실행하지 않는다.
즉, 과거의 작업은 중요하지 않고, 현재 시점의 이후 dag만
실행되어야 한다면, 설정을 아래와 같이 변경할 수 있다.
dag = DAG(
dag_id="test_dag",
default_args=default_args,
start_date=datetime(2021, 1, 1, tzinfo=kst),
schedule_interval="0 8 * * *",
catchUp=False,
)
Reference
https://airflow.readthedocs.io/en/1.10.12/dag-run.html
https://blog.naver.com/gyrbsdl18/221561318823
https://zzsza.github.io/data/2018/01/04/airflow-1/
https://medium.com/@aldente0630/%EC%95%84%ED%8C%8C%EC%B9%98-%EC%97%90%EC%96%B4%ED%94%8C%EB%A1%9C%EC%9A%B0%EB%A1%9C-%EC%9E%91%EC%97%85%ED%9D%90%EB%A6%84-%EA%B0%9C%EB%B0%9C%ED%95%B4%EB%B3%B4%EA%B8%B0-8f3653d749b4
https://engineering.linecorp.com/ko/blog/data-engineering-with-airflow-k8s-1/
https://bomwo.cc/posts/execution_date/
https://www.bearpooh.com/154
https://www.bucketplace.com/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/