알아볼 내용
- DAG 정의 방법
- DAG arguments
1. DAG 기본 구성
하나의 DAG를 작성하기 위해 필요한 요소는 다음과 같다
- DAG 속성 정의
- DAG를 구성하는 여러 Task 정의
- Task간의 실행 순서 결정
먼저 DAG를 정의하는 방식과 여러 속성값에 대해서 알아보자
2. DAG 정의 방식
Airflow에서는 다양한 방식으로 DAG를 선언할 수 있다
1) with 블록 사용
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
import datetime
with DAG(
dag_id="my_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
) as dag:
EmptyOperator(task_id="task")
2) Standard Constructor
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
import datetime
dag = DAG(
dag_id="my_dag",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=dag)
3) Decorator 방식
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
import datetime
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def my_dag():
EmptyOperator(task_id="task")
my_dag()
3. DAG Arguments
DAG-level 인자
dag_id
: DAG의 고유 이름description
: DAG 설명schedule
: 실행 주기- 프리셋 문자열
@once
: 한 번만 실행@daily
: 매일 0시에 실행@weekly
: 매주 실행@hourly
: 매시간 실행
- Crontab 형식
"0 9 * * *"
→ 매일 오전 9시"*/15 * * * *"
→ 15분마다 실행
- 프리셋 문자열
start_date
: DAG 실행 시작 시각 (pendulum.datetime()
사용 권장)pendulum
은 Airflow에서 표준 datetime 모듈보다 더 강력하고 직관적인 날짜/시간 처리 라이브러리- 특히 timezone-aware datetime 객체를 쉽게 생성할 수 있어 Airflow의 DAG 스케줄링에 매우 적합
catchup
: 누락된 DAG 실행을 보완할지 여부 (False
권장)tags
: UI 필터링용 태그 리스트 (예:["example", "etl"]
)max_active_runs
: 동시에 실행 가능한 DAG 인스턴스 수dagrun_timeout
: DAG 실행 최대 제한 시간 (datetime.timedelta
)default_args
: task 공통 인자 딕셔너리- DAG에 포함되는 여러 Task가 동일한 arguments를 갖는 경우 많음
- 매번 각각 선언해주는 것보다 DAG의 default_argument에 dictionary 형태로 전달해주면 자동으로 해당 DAG에 속하는 Operator에 적용
params
: 템플릿에서 사용할 사용자 정의 파라미터 딕셔너리
default_args 내부에서 자주 쓰는 인자
owner
: task 소유자 (예:"airflow"
)depends_on_past
: 이전 task 성공 여부에 따라 현재 task 실행 결정start_date
: task-level 시작 시각 (보통 DAG에서 설정하므로 생략 가능)email
: 알림 받을 이메일 주소 리스트email_on_failure
: 실패 시 이메일 전송 여부email_on_retry
: 재시도 시 이메일 전송 여부retries
: 재시도 횟수retry_delay
: 재시도 간 시간 간격 (datetime.timedelta
)
Params
DAG를 실행시킬 때, 매번 다른 날짜의 데이터를 처리하거나, 실험적으로 파라미터 값을 바꿔보고 싶을 때 코드 수정 없이 UI에서 바로 값만 바꿔 실행할 수 있다
아래 코드는 기본적으로 “Hello, Airflow”를 출력하는 DAG인데,
UI에서 Trigger를 누르면 나오는 입력창에 원하는 문자열을 입력하면 형식에 맞게 그 문자열을 출력한다
import logging
from airflow.sdk import DAG, Param, get_current_context, task
# DAG 정의: 'greet'라는 문자열 파라미터를 받음 (기본값: "Airflow")
with DAG(
dag_id="param_test",
params={"greet": Param("Airflow", type="string", minLength=1, maxLength=20)},
) as dag:
@task.python
def hello_task():
ctx = get_current_context()
logger = logging.getLogger("airflow.task")
# 사용자가 입력한 greet 파라미터 값을 출력
logger.info(f"Hello, {ctx['params']['greet']}!")
hello_task()