코딜기

[Airflow] Airflow Jinja 템플릿 본문

Data Engineering/Airflow

[Airflow] Airflow Jinja 템플릿

코딜기 2024. 8. 13. 12:03
반응형

Airflow Jinja 템플릿이란?

  • 파이썬에서 Jinja 템플릿을 사용하는 것과 비슷하게 Jinja 템플릿을 사용해서 간단하게 데이터를 넘겨줄 수 있습니다.
  • 오퍼레이터 파라미터 입력 시 중괄호 2개를 이용하면 Airflow에서 기본적으로 제공하는 변수들을 치환된 값으로 입력할 수 있습니다. (ex: 수행 날짜, DAG_ID)
  • 모든 오퍼레이터, 모든 파라미터에 Template 변수 적용이 가능한 것은 아니기 때문에 Airflow 문서에서 어떤 파라미터에 Template 변수 적용 가능한지 확인이 필요합니다.

가장 많이 사용하는 Airflow 템플릿 변수

  • data_interval_start → DAG 스케줄의 시작 시점
  • data_interval_end → DAG 스케줄의 끝 시점
  • ds → str 형태의 시점값 (YYYY-MM-DD)
  • ds_nodash → str 형태의 시점값 (YYYYMMDD)
  • ts → str 형태의 시점값 (2018-01-01T00:00:00+00:00)
  • ts_nodash_with_tz → str 형태의 시점값 (20180101T000000+0000))
  • ts_nodash → str 형태의 시점값 (20180101T000000)

※ Airflow에서 DAG의 시점값은 헷갈릴 수 있기 때문에 꼭 이해를 해야 합니다.

  • data_interval_start는 배치작업이 시작하는 시점이 아닌, 처음 데이터를 읽는 시점입니다.
  • data_interval_end는  배치작업이 끝나는 시점이 아닌, 마지막 데이터를 읽는 시점입니다.
  • 즉, 현재 배치 작업의 시작 시점을 꺼내고 싶다면 data_interval_end를 사용해야 합니다.
  • 또한, 이전 배치 작업의 시작 시점을 꺼내고 싶다면 data_interval_start를 사용해야 합니다.

Jinja 템플릿 사용

1. Python Operator Template에서 사용가능한 파라미터 (Document로 확인하기)

  • op_kwargs
  • op_args
  • templates_dict
# 데코레이터 사용 X
def python_function1(start_date, end_date, **kwargs):
	print(start_date)
	print(end_date)

python_t1 = PythonOperator(
	task_id = 'python_t1',
	python_callable = python_function,
        # data_interval_start와 data_interval_end를 ds형식으로 표현
	op_kwargs = {'start_date':'{{data_interval_start | ds}}',
                     'end_date':'{{data_interval_end | ds}}'}

# 데코레이터 사용
# **kwargs에 제공된 Template 변수들을 사용
@task(task_id = 'python_t2')
def python_function2(**kwargs):
	print(kwargs)
	print('ds:' + kwargs['ds'])
	print('ts:' + str(kwargs['ts']))
	print('data_interval_start:' + str(kwargs['data_interval_start']))
    	print('data_interval_end:' + str(kwargs['data_interval_end']))
    	print('task_instance': + str(kwargs['ti']))
        
python_function2()

※ 파이썬 오퍼레이터는 **kwargs에 Template 변수들을 자동으로 제공해주고 있습니다.

 

2. Bash Operator Template에서 사용가능한 파라미터 (Document로 확인하기)

  • bash_command (str)
  • env (dict[str, str] | None)
  • cwd (str | None)
# bash_command 템플릿
bash_t1 = BashOperator(
    task_id = 'bash_t1',
    bash_command = 'echo "End date is {{ data_interval_end }}"'
    )

# env 템플릿
bash_t2 = BashOperator(
    task_id = 'bash_t2',
    # data_interval_start와 data_interval_end를 ds형식으로 표현
    env = {'START_DATE': '{{ data_interval_start | ds}}',
           'END_DATE':'{{ data_interval_end | ds }}'},
    bash_command = 'echo "Start date is $START_DATE " && ''echo "End date is $END_DATE"'
)
반응형
Comments