알아볼 내용

  • DAG 정의 방법
  • DAG arguments

1. DAG 기본 구성

하나의 DAG를 작성하기 위해 필요한 요소는 다음과 같다

  • DAG 속성 정의
  • DAG를 구성하는 여러 Task 정의
  • Task간의 실행 순서 결정

먼저 DAG를 정의하는 방식과 여러 속성값에 대해서 알아보자


2. DAG 정의 방식

Airflow에서는 다양한 방식으로 DAG를 선언할 수 있다

1) with 블록 사용
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
import datetime
  
with DAG(
    dag_id="my_dag",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
) as dag:
    EmptyOperator(task_id="task")
2) Standard Constructor
from airflow.sdk import DAG
from airflow.operators.empty import EmptyOperator
import datetime
  
dag = DAG(
    dag_id="my_dag",
    start_date=datetime.datetime(2021, 1, 1),
    schedule="@daily",
)
 
EmptyOperator(task_id="task", dag=dag)
3) Decorator 방식
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
import datetime
  
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def my_dag():
    EmptyOperator(task_id="task")
  
my_dag()

3. DAG Arguments

DAG-level 인자
  • dag_id: DAG의 고유 이름
  • description: DAG 설명
  • schedule: 실행 주기
    • 프리셋 문자열
      • @once: 한 번만 실행  
      • @daily: 매일 0시에 실행  
      • @weekly: 매주 실행  
      • @hourly: 매시간 실행  
    • Crontab 형식
      • "0 9 * * *" → 매일 오전 9시  
      • "*/15 * * * *" → 15분마다 실행
  • start_date: DAG 실행 시작 시각 (pendulum.datetime() 사용 권장)
    • pendulum은 Airflow에서 표준 datetime 모듈보다 더 강력하고 직관적인 날짜/시간 처리 라이브러리
    • 특히 timezone-aware datetime 객체를 쉽게 생성할 수 있어 Airflow의 DAG 스케줄링에 매우 적합
  • catchup: 누락된 DAG 실행을 보완할지 여부 (False 권장)
  • tags: UI 필터링용 태그 리스트 (예: ["example", "etl"])
  • max_active_runs: 동시에 실행 가능한 DAG 인스턴스 수
  • dagrun_timeout: DAG 실행 최대 제한 시간 (datetime.timedelta)
  • default_args: task 공통 인자 딕셔너리
    • DAG에 포함되는 여러 Task가 동일한 arguments를 갖는 경우 많음
    • 매번 각각 선언해주는 것보다 DAG의 default_argument에 dictionary 형태로 전달해주면 자동으로 해당 DAG에 속하는 Operator에 적용
  • params: 템플릿에서 사용할 사용자 정의 파라미터 딕셔너리
default_args 내부에서 자주 쓰는 인자
  • owner: task 소유자 (예: "airflow")
  • depends_on_past: 이전 task 성공 여부에 따라 현재 task 실행 결정
  • start_date: task-level 시작 시각 (보통 DAG에서 설정하므로 생략 가능)
  • email: 알림 받을 이메일 주소 리스트
  • email_on_failure: 실패 시 이메일 전송 여부
  • email_on_retry: 재시도 시 이메일 전송 여부
  • retries: 재시도 횟수
  • retry_delay: 재시도 간 시간 간격 (datetime.timedelta)
Params

DAG를 실행시킬 때, 매번 다른 날짜의 데이터를 처리하거나, 실험적으로 파라미터 값을 바꿔보고 싶을 때 코드 수정 없이 UI에서 바로 값만 바꿔 실행할 수 있다

아래 코드는 기본적으로 “Hello, Airflow”를 출력하는 DAG인데,
UI에서 Trigger를 누르면 나오는 입력창에 원하는 문자열을 입력하면 형식에 맞게 그 문자열을 출력한다

param_test.py
import logging
from airflow.sdk import DAG, Param, get_current_context, task
 
# DAG 정의: 'greet'라는 문자열 파라미터를 받음 (기본값: "Airflow")
with DAG(
    dag_id="param_test",
    params={"greet": Param("Airflow", type="string", minLength=1, maxLength=20)},
) as dag:
 
    @task.python
    def hello_task():
        ctx = get_current_context()
        logger = logging.getLogger("airflow.task")
        # 사용자가 입력한 greet 파라미터 값을 출력
        logger.info(f"Hello, {ctx['params']['greet']}!")
 
    hello_task()