코딜기
[Airflow] Airflow Jinja 템플릿 본문
반응형
Airflow Jinja 템플릿이란?
- 파이썬에서 Jinja 템플릿을 사용하는 것과 비슷하게 Jinja 템플릿을 사용해서 간단하게 데이터를 넘겨줄 수 있습니다.
- 오퍼레이터 파라미터 입력 시 중괄호 2개를 이용하면 Airflow에서 기본적으로 제공하는 변수들을 치환된 값으로 입력할 수 있습니다. (ex: 수행 날짜, DAG_ID)
- 모든 오퍼레이터, 모든 파라미터에 Template 변수 적용이 가능한 것은 아니기 때문에 Airflow 문서에서 어떤 파라미터에 Template 변수 적용 가능한지 확인이 필요합니다.
가장 많이 사용하는 Airflow 템플릿 변수
- data_interval_start → DAG 스케줄의 시작 시점
- data_interval_end → DAG 스케줄의 끝 시점
- ds → str 형태의 시점값 (YYYY-MM-DD)
- ds_nodash → str 형태의 시점값 (YYYYMMDD)
- ts → str 형태의 시점값 (2018-01-01T00:00:00+00:00)
- ts_nodash_with_tz → str 형태의 시점값 (20180101T000000+0000))
- ts_nodash → str 형태의 시점값 (20180101T000000)
※ Airflow에서 DAG의 시점값은 헷갈릴 수 있기 때문에 꼭 이해를 해야 합니다.
- data_interval_start는 배치작업이 시작하는 시점이 아닌, 처음 데이터를 읽는 시점입니다.
- data_interval_end는 배치작업이 끝나는 시점이 아닌, 마지막 데이터를 읽는 시점입니다.
- 즉, 현재 배치 작업의 시작 시점을 꺼내고 싶다면 data_interval_end를 사용해야 합니다.
- 또한, 이전 배치 작업의 시작 시점을 꺼내고 싶다면 data_interval_start를 사용해야 합니다.
Jinja 템플릿 사용
1. Python Operator Template에서 사용가능한 파라미터 (Document로 확인하기)
- op_kwargs
- op_args
- templates_dict
# 데코레이터 사용 X
def python_function1(start_date, end_date, **kwargs):
print(start_date)
print(end_date)
python_t1 = PythonOperator(
task_id = 'python_t1',
python_callable = python_function,
# data_interval_start와 data_interval_end를 ds형식으로 표현
op_kwargs = {'start_date':'{{data_interval_start | ds}}',
'end_date':'{{data_interval_end | ds}}'}
# 데코레이터 사용
# **kwargs에 제공된 Template 변수들을 사용
@task(task_id = 'python_t2')
def python_function2(**kwargs):
print(kwargs)
print('ds:' + kwargs['ds'])
print('ts:' + str(kwargs['ts']))
print('data_interval_start:' + str(kwargs['data_interval_start']))
print('data_interval_end:' + str(kwargs['data_interval_end']))
print('task_instance': + str(kwargs['ti']))
python_function2()
※ 파이썬 오퍼레이터는 **kwargs에 Template 변수들을 자동으로 제공해주고 있습니다.
2. Bash Operator Template에서 사용가능한 파라미터 (Document로 확인하기)
- bash_command (str)
- env (dict[str, str] | None)
- cwd (str | None)
# bash_command 템플릿
bash_t1 = BashOperator(
task_id = 'bash_t1',
bash_command = 'echo "End date is {{ data_interval_end }}"'
)
# env 템플릿
bash_t2 = BashOperator(
task_id = 'bash_t2',
# data_interval_start와 data_interval_end를 ds형식으로 표현
env = {'START_DATE': '{{ data_interval_start | ds}}',
'END_DATE':'{{ data_interval_end | ds }}'},
bash_command = 'echo "Start date is $START_DATE " && ''echo "End date is $END_DATE"'
)
반응형
'Data Engineering > Airflow' 카테고리의 다른 글
[Airflow] Task Decorator (0) | 2024.08.12 |
---|---|
[Airflow] Email 전송 오퍼레이터 (Email Operator with.docker) (0) | 2024.08.12 |
[Airflow] Airflow에서 외부 파일 읽기 (with.docker) (0) | 2024.05.29 |
[Airflow] airflow.cfg 개념과 각 섹션들의 기능 (0) | 2024.05.14 |
[Airflow] EC2 가상 환경에 Airflow 패키지 설치하기 (0) | 2024.05.14 |
Comments