Airflow DAG 내에 Task 간에 데이터 공유를 위해 사용되는 기술이다.
더 자세한 내용은 공식문서를 참고하자.
예를 들면, Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
있을 수 있다.
주로 작은 규모의 데이터 공유를 위해 사용되며, Xcom 내용은
메타 DB의 Xcom 테이블에 값이 저장된다.
대용량 데이터 공유를 위해서는 s3, hdfs 등의 외부 솔루션을 권장한다.
이제 Xcom을 사용하는 몇 가지 방법을 알아보고 예제를 통해 살펴보도록 하자.
PythonOperator에서 return을 하면 Airflow xcom에 자동으로 push 되기에 return하는 함수를 만들어 하나의 task로 실행시켰다.
def return_xcom():
return "xcom!"
return_xcom = PythonOperator(
task_id = 'return_xcom',
python_callable = return_xcom,
dag = dag
)
PythonOperator에서 return을 하는 방법 이외에도 아래와 같이 context['ti']를
이용하여 xcom에 push, pull 하여 데이터를 주고 받는 것이 가능하다.
PythonOperator를 사용하는 경우 return과 push를 하나의 task에서 중복하여 사용할 수 있으며, 해당 데이터를 전달 받는 곳에서 전달 받는 방식이 다르다는 것을 코드로 확인해보자.
### task_instance 는 ti와 동일한 의미로 축약하여 사용할 수 있다.
### 중요!
### return으로 xcom을 사용하는 경우 xcom_pull(task_ids)를 사용해 데이터를 전달 받는다.
### push 하는 경우에는 key-value 형식에 따라 데이터를 주고 받게 된다.
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
# ...
dag = DAG(
'push_pull_xcom_test',
default_args=default_args,
description='My first tutorial bash DAG',
schedule_interval= None
)
def return_xcom():
return "xcom!"
def xcom_push_test(**context):
xcom_value = "xcom_push_value"
## push 로 xcom을 사용하는 경우 key-value로 데이터를 전달 받을 수 있다.
context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)
## return으로 xcom을 사용하는 경우 task_ids로 데이터를 전달 받을 수 있다.
return "xcom_return_value"
def xcom_pull_test(**context):
# return 으로 xcom 사용한 데이터 전달 받기 (task_ids)
xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
# push 로 xcom 사용한 데이터 전달 받기 (key-value)
xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
# return 으로 xcom 사용한 데이터 전달 받기 (task_ids)
xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')
print("xcom_return : {}".format(xcom_return))
print("xcom_push_value : {}".format(xcom_push_value))
print("xcom_push_return_value : {}".format(xcom_push_return_value))
return_xcom = PythonOperator(
task_id = 'return_xcom',
python_callable = return_xcom,
dag = dag
)
xcom_push_task = PythonOperator(
task_id = 'xcom_push_task',
python_callable = xcom_push_test,
dag = dag
)
xcom_pull_task = PythonOperator(
task_id = 'xcom_pull_task',
python_callable = xcom_pull_test,
dag = dag
)
return_xcom >> xcom_push_task >> xcom_pull_task
Output
[2024-07-28 07:42:15,382] {logging_mixin.py:104} INFO - xcom_return : xcom!
[2024-07-28 07:42:15,384] {logging_mixin.py:104} INFO - xcom_push_value : xcom_push_value
[2024-07-28 07:42:15,385] {logging_mixin.py:104} INFO - xcom_push_return_value : xcom_return_value
위 결과를 확인해보면, return을 통해 xcom을 사용한 경우와 push를 통해 xcom을 사용한 경우 각각 데이터를 전달 받는 방법이 다른 것을 확인할 수 있다.
마지막으로 jinja template를 이용하여 전달받는 방식이다.
여기서는 BashOperator를 사용해 예제를 표현했지만 다른 Operator에서도
동일하게 사용하는 것이 가능하다.
아래 예시와 같이 jinja template을 사용하여 dag 실행시 값을 할당하여 사용한다.
{{ dag_run }}
{{ ts }}
{{ ds }}
airflow 에서 더 많은 사용 예시는 공식문서를 확인하자.
bash_xcom_push = BashOperator(
task_id='bash_xcom_push',
bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
dag=dag
)
bash_xcom_pull = BashOperator(
task_id='bash_xcom_pull',
bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
dag=dag
)
airflow는 주기적으로 모든 dag 정보를 읽어 들여 서버 동기화를 한다.
min_file_process_interval 설정에 따라 다르며 현재 default는 30초이다.
그 과정에서 top level code 들은 계속 로드를 진행하며, 특히 database connection 또는 외부 api 호출은 전체 클러스터 부하를 가져올 수 있다.
따라서 top level code를 피하기 위해 모든 코드는 함수, 클래스, 또는 with dag 문 안에
캡슐화 해야한다.
airflow best practice를 참고해보자.
아래 예제는 외부 api 호출을 top level code로 작성한 bad practice 이다.
from airflow import DAG
from airflow.decorators import task
def expensive_api_call():
print("Hello from Airflow!")
sleep(1000)
my_expensive_response = expensive_api_call()
이는 아래와 같이 task가 실행될 때만 호출하도록 해야 한다.
def expensive_api_call():
sleep(1000)
return "Hello from Airflow!"
with DAG(
dag_id="example_python_operator",
schedule=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example"],
) as dag:
@task()
def print_expensive_api_call():
my_expensive_response = expensive_api_call()
print(my_expensive_response)
또한 import 부분도 주의해야 하며, pandas, numpy, torch, tensorflow와 같은 무거운 import도 top level code를
피해야 한다.
# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum
# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#
...
@task()
def do_stuff_with_pandas_and_torch():
import pandas
import torch
Reference
https://blog.naver.com/gyrbsdl18/221561318823
https://zzsza.github.io/data/2018/01/04/airflow-1/
https://letzgorats.tistory.com/entry/Airflow-Python-Operator%EC%97%90%EC%84%9C-Xcom-%EC%82%AC%EC%9A%A9