[Airflow] 아파치 Airflow - Xcom

Cross Communication / Task 간 데이터 공유(push, pull) / Avoid top level code

Posted by Wonyong Jang on July 27, 2024 · 9 mins read

1. Xcom (Cross Communication) 란?

Airflow DAG 내에 Task 간에 데이터 공유를 위해 사용되는 기술이다.

더 자세한 내용은 공식문서를 참고하자.

예를 들면, Task1의 수행 중 내용이나 결과를 Task2에서 사용 또는 입력으로 주고 싶은 경우
있을 수 있다.

주로 작은 규모의 데이터 공유를 위해 사용되며, Xcom 내용은 메타 DB의 Xcom 테이블에 값이 저장된다.

대용량 데이터 공유를 위해서는 s3, hdfs 등의 외부 솔루션을 권장한다.


2. Xcom 사용 방법

이제 Xcom을 사용하는 몇 가지 방법을 알아보고 예제를 통해 살펴보도록 하자.

  • PythonOperator return 값을 이용한 Xcom 사용
  • push-pull을 이용한 Xcom 사용
  • Jinja templates을 이용한 Xcom 사용

2-1) PythonOperator return 값을 이용한 Xcom 사용

PythonOperator에서 return을 하면 Airflow xcom에 자동으로 push 되기에 return하는 함수를 만들어 하나의 task로 실행시켰다.

def return_xcom():
    return "xcom!"
    
return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)

스크린샷 2024-07-28 오후 4 26 15

2-1) push-pull 을 이용한 Xcom 사용

PythonOperator에서 return을 하는 방법 이외에도 아래와 같이 context['ti']를 이용하여 xcom에 push, pull 하여 데이터를 주고 받는 것이 가능하다.

PythonOperator를 사용하는 경우 return과 push를 하나의 task에서 중복하여 사용할 수 있으며, 해당 데이터를 전달 받는 곳에서 전달 받는 방식이 다르다는 것을 코드로 확인해보자.

### task_instance 는 ti와 동일한 의미로 축약하여 사용할 수 있다.   

### 중요! 
### return으로 xcom을 사용하는 경우 xcom_pull(task_ids)를 사용해 데이터를 전달 받는다.   
### push 하는 경우에는 key-value 형식에 따라 데이터를 주고 받게 된다.   

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator

# ...

dag = DAG(
    'push_pull_xcom_test',
    default_args=default_args,
    description='My first tutorial bash DAG',
    schedule_interval= None
)

def return_xcom():
    return "xcom!"

def xcom_push_test(**context):
    xcom_value = "xcom_push_value"
    
    ## push 로 xcom을 사용하는 경우 key-value로 데이터를 전달 받을 수 있다.   
    context['task_instance'].xcom_push(key='xcom_push_value', value=xcom_value)

    ## return으로 xcom을 사용하는 경우 task_ids로 데이터를 전달 받을 수 있다.
    return "xcom_return_value"

def xcom_pull_test(**context):
    # return 으로 xcom 사용한 데이터 전달 받기 (task_ids)
    xcom_return = context["task_instance"].xcom_pull(task_ids='return_xcom')
    
    # push 로 xcom 사용한 데이터 전달 받기 (key-value)
    xcom_push_value = context['ti'].xcom_pull(key='xcom_push_value')
    
    # return 으로 xcom 사용한 데이터 전달 받기 (task_ids)
    xcom_push_return_value = context['ti'].xcom_pull(task_ids='xcom_push_task')

    print("xcom_return : {}".format(xcom_return)) 
    print("xcom_push_value : {}".format(xcom_push_value)) 
    print("xcom_push_return_value : {}".format(xcom_push_return_value))

return_xcom = PythonOperator(
    task_id = 'return_xcom',
    python_callable = return_xcom,
    dag = dag
)

xcom_push_task = PythonOperator(
    task_id = 'xcom_push_task',
    python_callable = xcom_push_test,
    dag = dag
)

xcom_pull_task = PythonOperator(
    task_id = 'xcom_pull_task',
    python_callable = xcom_pull_test,
    dag = dag
)

return_xcom >> xcom_push_task >> xcom_pull_task

Output

[2024-07-28 07:42:15,382] {logging_mixin.py:104} INFO - xcom_return : xcom!
[2024-07-28 07:42:15,384] {logging_mixin.py:104} INFO - xcom_push_value : xcom_push_value
[2024-07-28 07:42:15,385] {logging_mixin.py:104} INFO - xcom_push_return_value : xcom_return_value

위 결과를 확인해보면, return을 통해 xcom을 사용한 경우와 push를 통해 xcom을 사용한 경우 각각 데이터를 전달 받는 방법이 다른 것을 확인할 수 있다.

스크린샷 2024-07-28 오후 4 50 04

2-3) Jinja templates을 이용한 Xcom 사용

마지막으로 jinja template를 이용하여 전달받는 방식이다.
여기서는 BashOperator를 사용해 예제를 표현했지만 다른 Operator에서도 동일하게 사용하는 것이 가능하다.

아래 예시와 같이 jinja template을 사용하여 dag 실행시 값을 할당하여 사용한다.

{{ dag_run }}
{{ ts }}
{{ ds }}

airflow 에서 더 많은 사용 예시는 공식문서를 확인하자.

bash_xcom_push = BashOperator(
    task_id='bash_xcom_push',
    bash_command='echo "{{ ti.xcom_push(key="bash_xcom_push", value="bash_xcom_push_value") }}"',
    dag=dag
)

bash_xcom_pull = BashOperator(
    task_id='bash_xcom_pull',
    bash_command='echo "{{ ti.xcom_pull(key="bash_xcom_push") }}"',
    dag=dag
)


3. top level code

airflow는 주기적으로 모든 dag 정보를 읽어 들여 서버 동기화를 한다.

min_file_process_interval 설정에 따라 다르며 현재 default는 30초이다.

그 과정에서 top level code 들은 계속 로드를 진행하며, 특히 database connection 또는 외부 api 호출은 전체 클러스터 부하를 가져올 수 있다.

따라서 top level code를 피하기 위해 모든 코드는 함수, 클래스, 또는 with dag 문 안에 캡슐화 해야한다.

airflow best practice를 참고해보자.

아래 예제는 외부 api 호출을 top level code로 작성한 bad practice 이다.

from airflow import DAG
from airflow.decorators import task


def expensive_api_call():
    print("Hello from Airflow!")
    sleep(1000)


my_expensive_response = expensive_api_call()

이는 아래와 같이 task가 실행될 때만 호출하도록 해야 한다.

def expensive_api_call():
    sleep(1000)
    return "Hello from Airflow!"


with DAG(
    dag_id="example_python_operator",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],
) as dag:

    @task()
    def print_expensive_api_call():
        my_expensive_response = expensive_api_call()
        print(my_expensive_response)

또한 import 부분도 주의해야 하며, pandas, numpy, torch, tensorflow와 같은 무거운 import도 top level code를 피해야 한다.

# It's ok to import modules that are not expensive to load at top-level of a DAG file
import random
import pendulum

# Expensive imports should be avoided as top level imports, because DAG files are parsed frequently, resulting in top-level code being executed.
#
# import pandas
# import torch
# import tensorflow
#

...


@task()
def do_stuff_with_pandas_and_torch():
    import pandas
    import torch

Reference

https://blog.naver.com/gyrbsdl18/221561318823
https://zzsza.github.io/data/2018/01/04/airflow-1/
https://letzgorats.tistory.com/entry/Airflow-Python-Operator%EC%97%90%EC%84%9C-Xcom-%EC%82%AC%EC%9A%A9