์•Œ์•„๋ณผ ๋‚ด์šฉ

  • Task ์ •์˜, ์ƒํƒœ
  • Task ๊ฐ„ ์ˆœ์„œ (Dependency) ์ •์˜
  • XCom, Connection
  • Operator
  • Sensor, Deferrable Operator
  • Hook
  • Provider, Plugin

1. Task๋ž€?

Task๋Š” Airflow์—์„œ ์‹คํ–‰๋˜๋Š” ์ž‘์—…์˜ ๊ธฐ๋ณธ ๋‹จ์œ„๋กœ, DAG์˜ ๋…ธ๋“œ๋กœ ๋ฐฐ์น˜๋˜๊ณ  Dependency ์„ค์ •์„ ํ†ตํ•ด ์‹คํ–‰ ์ˆœ์„œ๋ฅผ ๊ฒฐ์ •ํ•  ์ˆ˜ ์žˆ๋‹ค

Task์—๋Š” ์„ธ ๊ฐ€์ง€ ๊ธฐ๋ณธ ์œ ํ˜•์ด ์กด์žฌํ•œ๋‹ค

  • Operator
  • Sensor (Operator์˜ ์ผ์ข…)
  • @task ๋กœ ๋งŒ๋“  ํ•จ์ˆ˜ Task

๋‚ด๋ถ€์ ์œผ๋กœ๋Š” ๋ชจ๋‘ BaseOperator์˜ ์„œ๋ธŒํด๋ž˜์Šค์ด๋ฉฐ ๊ฐœ๋…์ƒ Task๋Š” ์‹คํ–‰ ์ธ์Šคํ„ด์Šค, Operator๋Š” ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ํ…œํ”Œ๋ฆฟ์„ ์˜๋ฏธํ•œ๋‹ค

Task ์ƒํƒœ

DAG๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ์ •์˜๋œ ๋Œ€๋กœ Task๊ฐ€ ์‹คํ–‰๋˜๋Š”๋ฐ ์ด๋•Œ Task๋Š” ๋‹ค์–‘ํ•œ ์ƒํƒœ๋ฅผ ๊ฐ€์งˆ ์ˆ˜ ์žˆ๋‹ค
Airflow UI์—์„œ Task์˜ ํ˜„์žฌ ์ƒํƒœ๋ฅผ ์‰ฝ๊ฒŒ ํŒŒ์•…ํ•˜๋Š”๋ฐ ๋„์›€์ด ๋œ๋‹ค

  • none: ์ž‘์—…์ด ์•„์ง ์‹คํ–‰ ๋Œ€๊ธฐ์—ด์— ๋“ค์–ด๊ฐ€์ง€ ์•Š์€ ์ƒํƒœ (์˜์กด์„ฑ์ด ์•„์ง ์ถฉ์กฑ๋˜์ง€ ์•Š์Œ)
  • scheduled: Scheduler๊ฐ€ ์ž‘์—…์˜ ์˜์กด์„ฑ์ด ๋ชจ๋‘ ์ถฉ์กฑ๋˜์—ˆ์Œ์„ ํ™•์ธํ•˜๊ณ , ์‹คํ–‰ํ•ด์•ผ ํ•œ๋‹ค๊ณ  ๊ฒฐ์ •ํ•œ ์ƒํƒœ
  • queued: ์ž‘์—…์ด Executor์— ํ• ๋‹น๋˜์–ด, Worker๋ฅผ ๊ธฐ๋‹ค๋ฆฌ๊ณ  ์žˆ๋Š” ์ƒํƒœ
  • running: ์ž‘์—…์ด Worker (๋˜๋Š” ๋กœ์ปฌ/๋™๊ธฐ์‹ executor)์—์„œ ์‹คํ–‰ ์ค‘์ธ ์ƒํƒœ
  • success: ์ž‘์—…์ด ์˜ค๋ฅ˜ ์—†์ด ์ •์ƒ์ ์œผ๋กœ ์™„๋ฃŒ๋œ ์ƒํƒœ
  • restarting: ์ž‘์—…์ด ์‹คํ–‰ ์ค‘์ผ ๋•Œ ์™ธ๋ถ€์—์„œ ์žฌ์‹œ์ž‘ ์š”์ฒญ์„ ๋ฐ›์€ ์ƒํƒœ
  • failed: ์ž‘์—… ์‹คํ–‰ ์ค‘ ์˜ค๋ฅ˜๊ฐ€ ๋ฐœ์ƒํ•˜์—ฌ ์‹คํŒจํ•œ ์ƒํƒœ
  • skipped: branching, LatestOnly ๋“ฑ์œผ๋กœ ์ธํ•ด ์ž‘์—…์ด ๊ฑด๋„ˆ๋›ฐ์–ด์ง„ ์ƒํƒœ
  • upstream_failed: Upstream ์ž‘์—…์ด ์‹คํŒจํ–ˆ๊ณ , ย Trigger Rules์— ๋”ฐ๋ผ ์ด ์ž‘์—…์ด ํ•„์š”ํ–ˆ๋˜ ๊ฒฝ์šฐ
  • up_for_retry: ์ž‘์—…์ด ์‹คํŒจํ–ˆ์ง€๋งŒ, ์žฌ์‹œ๋„ ํšŸ์ˆ˜๊ฐ€ ๋‚จ์•„ ์žˆ์–ด ๋‹ค์‹œ ์‹คํ–‰๋  ์˜ˆ์ •์ธ ์ƒํƒœ
  • up_for_reschedule: ์ž‘์—…์ด Sensorย ์ด๊ณ , reschedule ๋ชจ๋“œ์—์„œ ๋‹ค์‹œ ์˜ˆ์•ฝ ๋Œ€๊ธฐ ์ค‘์ธ ์ƒํƒœ
  • deferred: ์ž‘์—…์ด trigger๋กœ ์—ฐ๊ธฐ๋œ ์ƒํƒœ (Deferred)
  • removed: DAG ์‹คํ–‰์ด ์‹œ์ž‘๋œ ์ดํ›„ ์ž‘์—…์ด DAG์—์„œ ์‚ฌ๋ผ์ง„ ์ƒํƒœ
Task Dependency

Airflow์—์„œ๋Š” Task ๊ฐ„ ์˜์กด์„ฑ์„ ์ง๊ด€์ ์œผ๋กœ ์ •์˜ํ•  ์ˆ˜ ์žˆ๋‹ค
์˜์กด์„ฑ์„ ์ž˜ ์„ค์ •ํ•ด์•ผ DAG์ด ์˜ฌ๋ฐ”๋ฅธ ์ˆœ์„œ๋กœ ์‹คํ–‰๋˜๊ณ , ๋ณ‘๋ ฌ/๋ถ„๊ธฐ ์‹คํ–‰๋„ ์›ํ™œํžˆ ๊ด€๋ฆฌ๋œ๋‹ค.

1. ๊ธฐ๋ณธ ์—ฐ๊ฒฐ ์—ฐ์‚ฐ์ž
  • >> : ์ˆœ๋ฐฉํ–ฅ

    task1 >> task2   # task1 ์‹คํ–‰ ํ›„ task2 ์‹คํ–‰
    task1.set_downstream(task2) # ๋™์ผํ•œ ํ‘œํ˜„
  • << : ์—ญ๋ฐฉํ–ฅ

    task2 << task1   # task1 ์‹คํ–‰ ํ›„ task2 ์‹คํ–‰
    task2.set_upstream(task1)  # ๋™์ผํ•œ ํ‘œํ˜„
2. ๋‹ค์ค‘ Task ์—ฐ๊ฒฐ
  • ์—ฌ๋Ÿฌ ํƒœ์Šคํฌ๊ฐ€ ๋ชจ์—ฌ์„œ ํ•˜๋‚˜๋กœ ํ•ฉ์ณ์ง€๋Š” ๊ตฌ์กฐ

    [task1, task2] >> task3   # task1, task2๊ฐ€ ๋ชจ๋‘ ์„ฑ๊ณตํ•ด์•ผ task3 ์‹คํ–‰ (Trigger Rules ์ฐธ๊ณ )
  • ๋ฐ˜๋Œ€๋กœ ํ•˜๋‚˜์˜ ํƒœ์Šคํฌ์—์„œ ์—ฌ๋Ÿฌ ๊ฐœ๋กœ ๋ถ„๊ธฐ

    task1 >> [task2, task3]
3. Cross Downstream
  • ๋‘ ๋ฆฌ์ŠคํŠธ ๊ฐ„์˜ ๋ชจ๋“  ์กฐํ•ฉ์„ ์—ฐ๊ฒฐ
    from airflow.sdk import cross_downstream
     
    cross_downstream([task1, task2], [task3, task4])
     
    # ๊ฒฐ๊ณผ
    # task1 >> task3
    # task1 >> task4
    # task2 >> task3
    # task2 >> task4
4. Chain
  • ์ˆœ์ฐจ ์ฒด์ธ์„ ๊ฐ„๊ฒฐํ•˜๊ฒŒ ํ‘œํ˜„
    from airflow.sdk import chain
    chain(task1, task2, task3, task4)
     
    # ๊ฒฐ๊ณผ
    # task1 >> task2 >> task3 >> task4
์˜ˆ์‹œ
task0 >> task1
[task1, task2] >> task3
task3 >> [task4, task5]
cross_downstream([task4, task5], [task6, task7])
chain(task6, task9, task10)
chain(task7, task8)

XCOM : Task๊ฐ„์˜ ์ •๋ณด๊ตํ™˜

Task๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ ์ฃผ๊ณ ๋ฐ›์ง€ ์•Š๋Š”๋ฐ ์ด์ „ Task์—์„œ ์–ป์€ ๊ฒฐ๊ณผ๋ฅผ ๋‹ค์Œ Task์—์„œ ๋ฐ›์•„์„œ ์‚ฌ์šฉํ•˜๊ณ  ์‹ถ์„ ์ˆ˜ ์žˆ๋‹ค

XCom(Cross-Communication)์€ Airflow์—์„œ ํƒœ์Šคํฌ ๊ฐ„ ์ž‘์€ ๋ฐ์ดํ„ฐ ์กฐ๊ฐ์„ ์ฃผ๊ณ ๋ฐ›๋Š” ๋ฉ”์ปค๋‹ˆ์ฆ˜์œผ๋กœ ๋ฐ์ดํ„ฐ๋ฅผ key-value ํ˜•ํƒœ๋กœ metadata DB (XCom ํ…Œ์ด๋ธ”)์— ์ €์žฅํ•  ์ˆ˜ ์žˆ๋‹ค ์ผ๋ฐ˜์ ์œผ๋กœ ์งง์€ ๋ฌธ์ž์—ดยท์ˆซ์žยท๊ฒฝ๋กœยท์ž‘์€ JSON์„ ์ „๋‹ฌํ•˜๋Š” ์šฉ๋„๋กœ ์‚ฌ์šฉํ•œ๋‹ค

๊ธฐ๋ณธ ๋™์ž‘์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • ti.xcom_push(key, value) ๋กœ ์ €์žฅ, ti.xcom_pull(task_ids, key) ๋กœ ์กฐํšŒ ย 
  • @task๋กœ ์ •์˜ํ•œ ํ•จ์ˆ˜์˜ Return๊ฐ’์€ ์ž๋™์œผ๋กœ XCom์— ์ €์žฅ
  • ๊ธฐ๋ณธ ์ง๋ ฌํ™”๋Š” JSON ํ˜ธํ™˜ ํƒ€์ž… ๊ถŒ์žฅ (๋ณต์žก ๊ฐ์ฒด๋Š” ์ปค์Šคํ…€ ์ง๋ ฌํ™”/๋ฐฑ์—”๋“œ ๊ณ ๋ ค)
  • XCom์€ TaskInstance ๋‹จ์œ„๋กœ ๊ธฐ๋ก๋œ๋‹ค (๊ฐ™์€ DAG Run, ๊ฐ™์€ ํƒœ์Šคํฌ ์‹คํ–‰ ๋ฒ”์œ„)

๊ฐ€์žฅ ๊ธฐ๋ณธ์ ์ธ ์˜ˆ์‹œ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
 
def producer(**context):
ย  ย  # ์–ด๋–ค ๊ณ„์‚ฐ/์กฐํšŒ ๊ฒฐ๊ณผ๋ผ๊ณ  ๊ฐ€์ •
ย  ย  result = {"n": 7, "msg": "hello xcom"}
ย  ย  context["ti"].xcom_push(key="payload", value=result)
 
def consumer(**context):
ย  ย  payload = context["ti"].xcom_pull(task_ids="produce", key="payload")
ย  ย  print("pulled:", payload)ย  # {'n': 7, 'msg': 'hello xcom'}
 
with DAG(
ย  ย  dag_id="ex_xcom_push_pull",
ย  ย  start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
ย  ย  schedule="@daily",
ย  ย  catchup=False,
ย  ย  tags=["xcom", "basic"],
) as dag:
ย  ย  produce = PythonOperator(task_id="produce", python_callable=producer)
ย  ย  consume = PythonOperator(task_id="consume", python_callable=consumer)
ย  ย  
ย  ย  produce >> consume

XCom์„ ์‚ฌ์šฉํ•  ๋•Œ ๋“ฑ์žฅํ•˜๋Š” Context์™€ 'ti'?

  • Context : Scheduler๊ฐ€ Task๋ฅผ ์‹คํ–‰ํ•  ๋•Œ ๋„˜๊ฒจ์ฃผ๋Š” metadata dictionary
    • PythonOperator์˜ python_callable์—๋Š” ๊ธฐ๋ณธ์ ์œผ๋กœ **context๊ฐ€ ์ „๋‹ฌ
    • @task๋กœ ์ •์˜ํ•œ ํ•จ์ˆ˜ ์•ˆ์—์„œ๋„ get_current_context()๋กœ ๋™์ผํ•œ ๋‚ด์šฉ ํ™•์ธ ๊ฐ€๋Šฅ
    • context์— ํฌํ•จ๋œ key ์˜ˆ์‹œ
      • โ€˜tiโ€™ : ย TaskInstance ๊ฐ์ฒด (alias: task_instance)
      • โ€˜taskโ€™, โ€˜dagโ€™ : ํ˜„์žฌ Task, DAG ๊ฐ์ฒด
      • โ€˜dag_runโ€™ : ํ˜„์žฌ DAG Run ๊ฐ์ฒด
      • โ€˜dsโ€™, โ€˜tsโ€™ : ๋…ผ๋ฆฌ ์‹คํ–‰์ผ์˜ ๋‚ ์งœ ๋ฌธ์ž์—ด(YYYY-MM-DD), ํƒ€์ž„์Šคํƒฌํ”„(ISO)
      • โ€˜logical_dateโ€™: pendulum.DateTime ๊ฐ์ฒด
      • โ€˜data_interval_startโ€™, โ€˜data_interval_endโ€™: ์Šค์ผ€์ค„ ์ฃผ๊ธฐ์˜ ์‹œ์ž‘/๋ ์‹œ๊ฐ
      • โ€˜paramsโ€™: DAG/Task์— ์ฃผ์ž…ํ•œ ์‚ฌ์šฉ์ž ํŒŒ๋ผ๋ฏธํ„ฐ ๋”•์…”๋„ˆ๋ฆฌ
      • โ€˜varโ€™ : ํ…œํ”Œ๋ฆฟ์—์„œ ๋ณ€์ˆ˜ ์ ‘๊ทผ์šฉ ๋งคํฌ๋กœ (ํŒŒ์ด์ฌ ์ฝ”๋“œ์—์„œ๋Š” Variable.get() ์‚ฌ์šฉ ๊ถŒ์žฅ)
  • โ€˜tiโ€™ : ํ˜„์žฌ ์‹คํ–‰ ์ค‘์ธ ํƒœ์Šคํฌ์˜ ์ธ์Šคํ„ด์Šค๋ฅผ ํ‘œํ˜„ํ•˜๋Š” ๊ฐ์ฒด
    • ti.task_id, ti_dag_id, ti.try_number ๋“ฑ ๋‹ค์–‘ํ•œ ์†์„ฑ ํ™•์ธ ๊ฐ€๋Šฅ
    • xcom_push๋กœ ๋„ฃ์€ ๊ฐ’์€ metaDB์— ์ €์žฅ๋˜๊ธฐ ๋•Œ๋ฌธ์— context ์ถœ๋ ฅ์œผ๋กœ ํ™•์ธ์•ˆ๋จ

2. Control Flow

๊ธฐ๋ณธ์ ์œผ๋กœ Task๋Š” ๋ชจ๋“  upstream task๊ฐ€ ์„ฑ๊ณตํ•ด์•ผ ์‹คํ–‰๋˜์ง€๋งŒ ๋‹ค์Œ๊ณผ ๊ฐ™์€ ๋ฐฉ๋ฒ•๋“ค์„ ํ†ตํ•ด ๋” ์œ ์—ฐํ•œ ์ œ์–ด๊ฐ€ ๊ฐ€๋Šฅํ•˜๋‹ค

Branching

๋งŒ์•ฝ ์–ด๋–ค task์˜ ์„ฑ๊ณต ์—ฌ๋ถ€์— ๋”ฐ๋ผ ๋‹ค์Œ ์ž‘์—…์„ ๋‹ค๋ฅด๊ฒŒ ๊ฐ€์ ธ๊ฐ€๊ณ  ์‹ถ์„ ๋•Œ Branching์„ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค

@task.branch() decorator๋ฅผ ์‚ฌ์šฉํ•˜๊ฑฐ๋‚˜ BranchPythonOperator๋ฅผ ์‚ฌ์šฉํ•ด์„œ branching ์—ญํ• ์„ ๋‹ด๋‹นํ•˜๋Š” task๋ฅผ ํŒŒ์ด์ฌ ํ•จ์ˆ˜๋กœ ์ž‘์„ฑํ•˜๋Š”๋ฐ ์ด๋•Œ return ๊ฐ’์€ ๋‹ค์Œ์— ์‹คํ–‰ํ•  task_id ๋ฅผ ๋„ฃ์œผ๋ฉด ๋œ๋‹ค

๋‹ค์Œ์€ BranchPythonOperator๋ฅผ ์‚ฌ์šฉํ•˜๋Š” ์˜ˆ์‹œ์ด๋‹ค (decorator ์‚ฌ์šฉ์€ docs ์ฐธ๊ณ )

from airflow import DAG
from airflow.operators.bash import BashOperatofrom airflow import DAGr
from airflow.operators.python import BranchPythonOperator
 
# DAG ์ •์˜
dag = DAG(
    "branch_random_dag",
    default_args=default_args,
    description="0~10 ๋žœ๋ค ๊ฐ’์— ๋”ฐ๋ฅธ ๋ธŒ๋žœ์น˜ DAG",
    schedule="@daily",
    catchup=False,
)
 
 
def generate_random_and_branch():
    """0~10 ์‚ฌ์ด์˜ ๋žœ๋ค ๊ฐ’์„ ์ƒ์„ฑํ•˜๊ณ  5 ์ด์ƒ/๋ฏธ๋งŒ์— ๋”ฐ๋ผ ๋ธŒ๋žœ์น˜ ๊ฒฐ์ •"""
    random_value = random.randint(0, 10)
    print(f"์ƒ์„ฑ๋œ ๋žœ๋ค ๊ฐ’: {random_value}")
 
    if random_value >= 5:
        print("๋žœ๋ค ๊ฐ’์ด 5 ์ด์ƒ์ž…๋‹ˆ๋‹ค. task_A๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.")
        return "task_A"
    else:
        print("๋žœ๋ค ๊ฐ’์ด 5 ๋ฏธ๋งŒ์ž…๋‹ˆ๋‹ค. task_B๋ฅผ ์‹คํ–‰ํ•ฉ๋‹ˆ๋‹ค.")
        return "task_B"
 
 
# ๋ธŒ๋žœ์น˜ ํƒœ์Šคํฌ (๋žœ๋ค ๊ฐ’ ์ƒ์„ฑ ๋ฐ ๋ถ„๊ธฐ ๊ฒฐ์ •)
branch_task = BranchPythonOperator(
    task_id="branch_task",
    python_callable=generate_random_and_branch,
    dag=dag,
)
 
# Task A (๋žœ๋ค ๊ฐ’์ด 5 ์ด์ƒ์ผ ๋•Œ ์‹คํ–‰)
task_A = BashOperator(
    task_id="task_A",
    bash_command='echo "Task A ์‹คํ–‰๋จ - ๋žœ๋ค ๊ฐ’์ด 5 ์ด์ƒ์ž…๋‹ˆ๋‹ค!"',
    dag=dag,
)
 
# Task B (๋žœ๋ค ๊ฐ’์ด 5 ๋ฏธ๋งŒ์ผ ๋•Œ ์‹คํ–‰)
task_B = BashOperator(
    task_id="task_B",
    bash_command='echo "Task B ์‹คํ–‰๋จ - ๋žœ๋ค ๊ฐ’์ด 5 ๋ฏธ๋งŒ์ž…๋‹ˆ๋‹ค!"',
    dag=dag,
)
 
# ํƒœ์Šคํฌ ์˜์กด์„ฑ ์„ค์ •
branch_task >> [task_A, task_B]

์œ„ DAG๋ฅผ ์‹คํ–‰ํ•ด์„œ ๋‚˜์˜จ ๋žœ๋ค ๊ฐ’์ด 1์ด๋ผ์„œ 5๋ณด๋‹ค ์ž‘๊ธฐ ๋•Œ๋ฌธ์— task_B๊ฐ€ ์‹คํ–‰๋˜์—ˆ๋‹ค

Trigger Rules

์ผ๋ถ€๋งŒ ์„ฑ๊ณตํ•ด๋„ ๋‹ค์Œ task๋ฅผ ์‹คํ–‰ํ•˜๋Š” ๋“ฑ ๋‹ค์–‘ํ•œ Trigger rules๋ฅผ ์ธ์ž๋กœ ๋„ฃ์–ด์„œ ์ž์œ ๋กญ๊ฒŒ ์ปค์Šคํ„ฐ๋งˆ์ด์ง•์ด ๊ฐ€๋Šฅํ•˜๋‹ค

  • all_success (๊ธฐ๋ณธ๊ฐ’): ๋ชจ๋“  upstream ์„ฑ๊ณต ์‹œ ์‹คํ–‰
  • all_failed: ๋ชจ๋“  upstream ์‹คํŒจ ์‹œ ์‹คํ–‰
  • all_done: ๋ชจ๋“  upstream ์‹คํ–‰
  • all_skipped: ๋ชจ๋“  upstream ์ด skipped ์ƒํƒœ์ธ ๊ฒฝ์šฐ ์‹คํ–‰
  • one_success: ํ•˜๋‚˜๋ผ๋„ ์„ฑ๊ณตํ•˜๋ฉด ์‹คํ–‰
  • one_failed: ํ•˜๋‚˜๋ผ๋„ ์‹คํŒจํ•˜๋ฉด ์‹คํ–‰
  • one_done: ํ•˜๋‚˜๋ผ๋„ ์„ฑ๊ณต ํ˜น์€ ์‹คํŒจํ•˜๋Š” ๊ฒฝ์šฐ ์‹คํ–‰
  • none_failed: ์‹คํŒจ๋งŒ ์—†์œผ๋ฉด ์‹คํ–‰ (์„ฑ๊ณต/์Šคํ‚ต ํ˜ผํ•ฉ ํ—ˆ์šฉ)
  • always: ๋ฌด์กฐ๊ฑด ์‹คํ–‰
  • โ€ฆ
from airflow.operators.empty import EmptyOperator
 
notify = EmptyOperator(
	task_id="notify",
	trigger_rule="one_failed"
)
Latest Only

์˜ˆ๋ฅผ ๋“ค์–ด ํ•œ ๋‹ฌ์น˜ ๋ฐ์ดํ„ฐ๋ฅผ ์ฒ˜๋ฆฌํ•˜๊ธฐ ์œ„ํ•ด ๊ณผ๊ฑฐ ๋‚ ์งœ์— ๋Œ€ํ•œ backfill์ด ํ•„์š”ํ•œ ๊ฒฝ์šฐ๋ฅผ ์ƒ๊ฐํ•ด๋ณด์ž
์•Œ๋ฆผ ์ „์†ก ๋ฐ ๋ฆฌํฌํŠธ ์ƒ์„ฑ ์ž‘์—…์˜ ๊ฒฝ์šฐ ๊ณผ๊ฑฐ ๋ฐ์ดํ„ฐ์—๋Š” ๊ตณ์ด ์ง„ํ–‰ํ•˜์ง€ ์•Š์•„๋„ ๋˜๋Š” ์ž‘์—…์ผ ๊ฒƒ์ด๋‹ค

์ด๋Ÿฐ ๊ฒฝ์šฐ Latest Only๋ฅผ ์‚ฌ์šฉํ•ด ์ตœ์‹  DAG ์‹คํ–‰์—๋งŒ ํŠน์ • ํƒœ์Šคํฌ๋ฅผ ์‹คํ–‰ํ•˜๋„๋ก ์ œ์–ดํ•  ์ˆ˜ ์žˆ๋‹ค

LatestOnlyOperator๋ฅผ ํ†ตํ•ด ๊ทธ ์ดํ›„ downstream task๋“ค์„ Latest DAG run ์ด ์•„๋‹Œ ๊ฒฝ์šฐ ์ž๋™์œผ๋กœ skipํ•  ์ˆ˜ ์žˆ๋‹ค

Latest DAG run? โ†’ ํ˜„์žฌ ์‹œ๊ฐ„์ด ํ—ค๋‹น DAG ์‹คํ–‰ ์‹œ๊ฐ„๊ณผ ๋‹ค์Œ ์Šค์ผ€์ค„ ์‚ฌ์ด์— ์žˆ๊ณ , ์™ธ๋ถ€์—์„œ ์ˆ˜๋™์œผ๋กœ trigger ๋œ ์‹คํ–‰์ด ์•„๋‹Œ ๊ฒฝ์šฐ

์˜ค๋Š˜ ๋‚ ์งœ๊ฐ€ 2025-09-05์ด๊ณ  catchup=True๋ฅผ ์ด์šฉํ•ด์„œ 2025-09-01๋ถ€ํ„ฐ ์˜ค๋Š˜๊นŒ์ง€ ์ž‘์—…์„ ๋ชจ๋‘ ์ฒ˜๋ฆฌํ•˜๊ฒŒ DAG๋ฅผ ์ž‘์„ฑํ•œ ์˜ˆ์‹œ์ด๋‹ค

import pendulum
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
 
with DAG(
    dag_id="latest_only_dag",
    schedule="@daily",
    start_date=pendulum.datetime(2025, 9, 1, tz="UTC"),
    catchup=True,
    tags=["example3"],
) as dag:
    latest_only = LatestOnlyOperator(task_id="latest_only")
    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)
 
    latest_only >> task1 >> [task3, task4]
    task2 >> [task3, task4]

๊ฐ€์žฅ ์ตœ๊ทผ์ธ ์˜ค๋Š˜์„ ์ œ์™ธํ•˜๊ณ  ์ด์ „ ๋ฐ์ดํ„ฐ ์ฒ˜๋ฆฌ์˜ ๊ฒฝ์šฐ LatestOnlyOperator๋กœ ์ธํ•ด task1๊ณผ task3๊ฐ€ skip๋œ ๊ฒƒ์„ ํ™•์ธํ•  ์ˆ˜ ์žˆ๋‹ค

Depends On Past

Task๊ฐ€ ์ด์ „ DAG Run์—์„œ ์„ฑ๊ณตํ–ˆ์„ ๋•Œ๋งŒ ํ˜„์žฌ ์‹คํ–‰์„ ํ—ˆ์šฉํ•˜๋Š” ๊ธฐ๋Šฅ์œผ๋กœ ๊ฐ™์€ Task๊ฐ€ ์ด์ „ ์‹คํ–‰์—์„œ ์‹คํŒจํ–ˆ๋‹ค๋ฉด, ํ˜„์žฌ ์‹คํ–‰๋„ ์ž๋™์œผ๋กœ ๊ฑด๋„ˆ๋›ฐ๊ฑฐ๋‚˜ ์‹คํŒจ ์ฒ˜๋ฆฌ๋œ๋‹ค

๋ฐ์ดํ„ฐ ์ ์žฌ, ETL ๋“ฑ์—์„œ ์—ฐ์†์„ฑ์ด ์ค‘์š”ํ•œ ๊ฒฝ์šฐ (์˜ˆ: ํ•˜๋ฃจ์น˜ ๋ฐ์ดํ„ฐ๊ฐ€ ๋ˆ„๋ฝ๋˜๋ฉด ๋‹ค์Œ๋‚  ๋ฐ์ดํ„ฐ ์ ์žฌ๋„ ๋ง‰๊ณ  ์‹ถ์„ ๋•Œ) ํ˜น์€ Task๊ฐ€ ์ˆœ์ฐจ์ ์œผ๋กœ ์‹คํ–‰๋˜์–ด์•ผ ํ•  ๋•Œ (์˜ˆ: ํŒŒ์ผ ์ฒ˜๋ฆฌ, ๋ฐฐ์น˜ ์ž‘์—… ๋“ฑ) ์— ํ™œ์šฉ๋œ๋‹ค

Setup and Teardown

Setup๊ณผ Teardown์€ Airflow์—์„œ ๋ฆฌ์†Œ์Šค์˜ ์ƒ์„ฑ๊ณผ ์ •๋ฆฌ๋ฅผ ์ž๋™ํ™”ํ•˜์—ฌ, ๋ฐ์ดํ„ฐ ํŒŒ์ดํ”„๋ผ์ธ ์‹คํ–‰ ์‹œ ๋ฆฌ์†Œ์Šค ๊ด€๋ฆฌ๊ฐ€ ๋ˆ„๋ฝ๋˜์ง€ ์•Š๋„๋ก ๋„์™€์ฃผ๋Š” ๊ธฐ๋Šฅ์ด๋‹ค


3. Operator

Operator๋Š” ํŠน์ • ์ž‘์—…์„ ์ˆ˜ํ–‰ํ•˜๋„๋ก ์„ค๊ณ„๋œ ์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ Task ํ…œํ”Œ๋ฆฟ ์ด๋‹ค

์ง€๊ธˆ๊นŒ์ง€ ์˜ˆ์‹œ์—์„œ ์‚ฌ์šฉ๋œ EmptyOperator, BashOperator, PythonOperator ๋“ฑ๊ณผ ๊ฐ™์ด ์›ํ•˜๋Š” ๊ธฐ๋Šฅ์„ ํŽธํ•˜๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๊ฒŒ ๋„์™€์ฃผ๋Š” ์—ญํ• ์„ ํ•œ๋‹ค
๋งŽ์ด ์‚ฌ์šฉํ•˜๋Š” ๋Œ€๋ถ€๋ถ„์˜ Operator๋Š” Airflow์—์„œ ๊ธฐ๋ณธ์œผ๋กœ ์ œ๊ณตํ•˜๊ฑฐ๋‚˜, ํ˜น์€ python package๋งŒ ์„ค์น˜(์ฐธ๊ณ  : Provider)ํ•˜๋ฉด ์‰ฝ๊ฒŒ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค

๋Œ€ํ‘œ์ ์ธ Operators
  • EmptyOperator : ์•„๋ฌด ์ž‘์—…์„ ํ•˜์ง€๋Š” ์•Š์ง€๋งŒ, ํ…Œ์ŠคํŠธ๋‚˜ ์ž‘์—… ๊ฐ„์˜ ์˜์กด์„ฑ์„ ๋ช…ํ™•ํžˆ ํ•  ๋•Œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ์Œ
  • BashOperator
    run_this = BashOperator(
        task_id="run_after_loop",
        bash_command="echo https://airflow.apache.org/",
    )
  • PythonOperator : ํ•˜๊ณ  ์‹ถ์€ ์ž‘์—…์„ ํŒŒ์ด์ฌ ํ•จ์ˆ˜๋กœ๋งŒ ์ž‘์„ฑํ•˜๋ฉด ๋
    def print_context(ds=None, **kwargs):
    """Print the Airflow context and ds variable from the context."""
    print("[group] All kwargs")
    pprint(kwargs)
    print("[endgroup]")
    print("[group] Context variable ds")
    print(ds)
    print("[endgroup]")
    return "Whatever you return gets printed in the logs"
     
    run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
  • BranchDateTimeOperator : DAG ์‹คํ–‰ ์‹œ๊ฐ„์— ๋”ฐ๋ผ branch ์ž‘์—…์„ ๊ตฌ์„ฑํ•˜๊ณ  ์‹ถ์€ ๊ฒฝ์šฐ ์‚ฌ์šฉ
    empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1)
    empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1)
     
    cond1 = BranchDateTimeOperator(
        task_id="datetime_branch",
        follow_task_ids_if_true=["date_in_range"],
        follow_task_ids_if_false=["date_outside_range"],
        target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0),
        target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0),
        dag=dag1,
    )
     
    # Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00
    cond1 >> [empty_task_11, empty_task_21]
  • EmailOperator
  • HttpOperator
  • SQLExecuteQueryOperator
  • DockerOperator
  • HiveOperator
  • S3FileTransformOperator
  • SlackAPIOperator
  • โ€ฆ
Custom Operator

๊ณต์‹์ ์œผ๋กœ ์ œ๊ณต๋˜์ง€ ์•Š์ง€๋งŒ ์ž์ฃผ ์‚ฌ์šฉํ•˜๋Š” ๊ธฐ๋Šฅ์ด ์žˆ๋Š” ๊ฒฝ์šฐ Custom Operator๋ฅผ ์ง์ ‘ ๋งŒ๋“ค์–ด ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค ๋‚ด์žฅ Operator๋กœ ํ‘œํ˜„ํ•˜๊ธฐ ์–ด๋ ค์šด ๋กœ์ง์„ BaseOperator ์„œ๋ธŒํด๋ž˜์Šค๋กœ ์บก์Аํ™”ํ•œ ์‚ฌ์šฉ์ž ์ •์˜ ํ…œํ”Œ๋ฆฟ์ด๊ณ  Airflow 3์—์„œ๋Š” ๊ณต์‹ SDK ๊ฒฝ๋กœ airflow.sdk ์˜ BaseOperator ์ƒ์†์„ ๊ถŒ์žฅํ•˜๊ณ  ์žˆ๋‹ค

์›ํ•˜๋Š” Operator๋ฅผ ๋งŒ๋“ค๊ธฐ ์œ„ํ•ด์„œ๋Š” BaseOperator ํด๋ž˜์Šค๋ฅผ ์ƒ์† ๋ฐ›์•„ ๋‘ ๊ฐ€์ง€ ๋ฉ”์„œ๋“œ๋ฅผ override ํ•ด์•ผํ•œ๋‹ค

  • ์ƒ์„ฑ์ž (__init__) : Operator์— ํ•„์š”ํ•œ ํŒŒ๋ผ๋ฏธํ„ฐ ์ง€์ •. ์ด ๋•Œ DB ํ˜ธ์ถœ๊ณผ ๊ฐ™์€ ๋ฌด๊ฑฐ์šด ์ž‘์—…์€ ๊ธˆ์ง€
  • ์‹คํ–‰ method (execute) : Operator๊ฐ€ ์‹คํ–‰๋  ๋•Œ ํ˜ธ์ถœ๋˜๋Š” ์ฝ”๋“œ. Airflow context๋ฅผ ํŒŒ๋ผ๋ฏธํ„ฐ๋กœ ๋ฐ›์•„ ์„ค์ •๊ฐ’์„ ์ฝ์„ ์ˆ˜ ์žˆ์Œ

์™œ __init__์—์„œ ๋ฌด๊ฑฐ์šด ์ž‘์—…์ด ๊ธˆ์ง€?

  • Scheduler๊ฐ€ DAG ํŒŒ์ผ ์ฃผ๊ธฐ์ ์œผ๋กœ parsingํ•˜๋ฉฐ ๊ฐ Task์— ํ•ด๋‹นํ•˜๋Š” Operator Instance ๊ณ„์† ์ƒ์„ฑ โ†’ __init__ ํ˜ธ์ถœ
    • ๋”ฐ๋ผ์„œ ์ƒ์„ฑ์ž์—์„œ DB์—ฐ๊ฒฐ, API ํ˜ธ์ถœ, ๋Œ€์šฉ๋Ÿ‰ ํŒŒ์ผ ์ฝ๊ธฐ ๋“ฑ์„ ์ˆ˜ํ–‰ํ•˜๋ฉด ๋น„ํšจ์œจ์ 
    • ๊ฐ€๋ณ๊ฒŒ ํŒŒ๋ผ๋ฏธํ„ฐ ์ €์žฅ๋งŒ ํ•˜๋Š”๊ฒŒ ํšจ์œจ์ 
  • execute๋Š” ์‹ค์ œ ์‹คํ–‰๋  ๋•Œ ํ•œ ๋ฒˆ ํ˜ธ์ถœ๋˜๊ธฐ ๋•Œ๋ฌธ์— ์™ธ๋ถ€ ๋ฆฌ์†Œ์Šค ์ ‘๊ทผ์€ execute์—์„œ ํ•ด์•ผํ•จ
    • ๋”ฐ๋ผ์„œ Hook, Client ์ƒ์„ฑ, ์ธ์ฆ ์กฐํšŒ, ์ฟผ๋ฆฌ/์š”์ฒญ ์ „์†ก ๋“ฑ์€ execute์—์„œ ์ƒ์„ฑ ๋ฐ ํ˜ธ์ถœ

Custom Operator๋ฅผ ์ •์ƒ์ ์œผ๋กœ importํ•ด์„œ ์‚ฌ์šฉํ•˜๊ธฐ ์œ„ํ•ด์„œ Airflow์˜ PYTHONPATH์— ํฌํ•จ๋œ ํด๋”(dags/, config/, plugins/)์— ๋‘์–ด์•ผ ํ•œ๋‹ค
์ผ๋ฐ˜์ ์œผ๋กœ plugins/ ํด๋”์— ๋งŽ์ด ์ž‘์„ฑํ•˜๊ณ  ์•„๋ž˜ ์˜ˆ์‹œ๋„ plugins/ ์— ์ถ”๊ฐ€ํ•˜๋ฉด ๋œ๋‹ค

# plugins/hello_operator.py
from airflow.sdk import BaseOperator  # Airflow 3 ๊ณต๊ฐœ SDK
 
class HelloOperator(BaseOperator):
	def __init__(self, name: str, **kwargs):
		super().__init__(**kwargs)  # task_id ๋“ฑ ๊ณตํ†ต ์ธ์ž ์ฒ˜๋ฆฌ
		self.name = name
 
	def execute(self, context):
		message = f"Hello, {self.name}!"
		print(message)
		return message
# DAG์—์„œ ์‚ฌ์šฉ
from datetime import datetime
from airflow import DAG
from hello_operator import HelloOperator
 
with DAG(dag_id="custom_op_example", start_date=datetime(2023, 1, 1), schedule=None, catchup=False) as dag:
	hello = HelloOperator(task_id="hello_task", name="Airflow 3")

4. Sensor์™€ Deferrable Operator

Sensor

Airflow์—์„œ Sensor๋Š” ํŠน์ • ์กฐ๊ฑด์ด ๋งŒ์กฑ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ(polling or trigger) ํ•˜๋Š” ์—ญํ• ์„ ๋‹ด๋‹นํ•˜๋Š” ํŠน์ˆ˜ํ•œ Operator๋กœ, ์กฐ๊ฑด์ด ์ถฉ์กฑ๋˜๋ฉด ๋‹ค์Œ Task ์‹คํ–‰์ด ์ด์–ด์ง€๋„๋ก ํ•œ๋‹ค

๋Œ€ํ‘œ์ ์ธ Sensor๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • BashSensor: ์ง€์ •ํ•œ Bash ๋ช…๋ น์–ด ๋˜๋Š” ์Šคํฌ๋ฆฝํŠธ๊ฐ€ ์„ฑ๊ณต (๋ฆฌํ„ด ์ฝ”๋“œ 0)ํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • TimeDeltaSensor: ์„ค์ •ํ•œ ์‹œ๊ฐ„ (delta)์ด ๊ฒฝ๊ณผํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • TimeSensor: ์ง€์ •ํ•œ ์‹œ๊ฐ (target_time)์ด ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • DayOfWeekSensor: ์ง€์ •ํ•œ ์š”์ผ์ด ๋  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • FileSensor: ํŠน์ • ๊ฒฝ๋กœ์— ํŒŒ์ผ์ด ์กด์žฌํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐ
  • PythonSensor: ์ง€์ •ํ•œ ํŒŒ์ด์ฌ ํ•จ์ˆ˜๊ฐ€ True๋ฅผ ๋ฐ˜ํ™˜ํ•  ๋•Œ๊นŒ์ง€ ์ฃผ๊ธฐ์ ์œผ๋กœ ์‹คํ–‰ํ•˜๋ฉฐ ๋Œ€๊ธฐ
  • ExternalTaskSensor: ย ๋‹ค๋ฅธ DAG (๋˜๋Š” ๊ฐ™์€ DAG์˜ ๋‹ค๋ฅธ ์‹คํ–‰ ์‹œ์ )์˜ ํŠน์ • ์ž‘์—…์ด ์™„๋ฃŒ๋  ๋•Œ๊นŒ์ง€ ํ˜„์žฌ ์ž‘์—…์„ ๋Œ€๊ธฐ

Sensor์—๋Š” ๋Œ€๊ธฐ ๋ฐฉ์‹์— ๋”ฐ๋ผ ๋‘ ๊ฐ€์ง€ ๋ชจ๋“œ๊ฐ€ ์กด์žฌํ•œ๋‹ค

  • poke (๊ธฐ๋ณธ): ์กฐ๊ฑด์„ ์ฃผ๊ธฐ์ ์œผ๋กœ ํ™•์ธํ•˜๋ฉด์„œ Worker ์Šฌ๋กฏ์„ ๊ณ„์† ์ ์œ 
  • reschedule: ์กฐ๊ฑด ํ™•์ธ ํ›„ ์ผ์ • ์‹œ๊ฐ„ ๋Œ€๊ธฐํ•˜๊ณ  Worker ์Šฌ๋กฏ์„ ๋ฐ˜๋‚ฉ, ๋‹ค์Œ์— ์žฌ์Šค์ผ€์ค„

๋ณดํ†ต ์งง์€ ๋Œ€๊ธฐ์—๋Š” poke, ๊ธด ๋Œ€๊ธฐ์—๋Š” ๋ฆฌ์†Œ์Šค ํšจ์œจ์ ์ด๋ฏ€๋กœ reschedule ์„ ์‚ฌ์šฉํ•˜๋Š” ๊ฒƒ์ด ๋ฐ”๋žŒ์งํ•˜๋‹ค

๋‹ค์Œ์€ ํŠน์ • ๊ฒฝ๋กœ์— โ€˜sensor_test.txtโ€™๋ผ๋Š” ํŒŒ์ผ์ด ์ƒ์„ฑ๋˜๋Š” ๊ฒฝ์šฐ success๊ฐ€ ๋˜๋Š” ์˜ˆ์‹œ์ด๋‹ค

import pendulum
from airflow import DAG
from airflow.providers.standard.sensors.filesystem import FileSensor
 
with DAG(
    dag_id="ex_file_sensor",
    start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
    schedule="@daily",
    catchup=False,
) as dag:
    wait_file = FileSensor(
        task_id="wait_for_file",
        filepath="/opt/airflow/plugins/sensor_test.txt",
        mode="poke",
        poke_interval=30,  # 30์ดˆ ๊ฐ„๊ฒฉ์œผ๋กœ ์ฒดํฌ
        timeout=60 * 60 * 2,  # 2์‹œ๊ฐ„ ๋Œ€๊ธฐ ํƒ€์ž„์•„์›ƒ
    )

์ด DAG๋ฅผ ์‹คํ–‰ํ•˜๊ธฐ ์ „์— local filesystem ์ ‘๊ทผ์„ ์œ„ํ•œ connection์„ ๋งŒ๋“ค์–ด์•ผ ํ•œ๋‹ค

  • connection ์ด๋ฆ„ : โ€˜fs_defaultโ€™
  • connection type : File (path)
  • Path : โ€™/โ€™

fs_default๋ผ๋Š” ์ด๋ฆ„์€ FileSensor๊ฐ€ ๊ธฐ๋ณธ๊ฐ’์œผ๋กœ ์ธ์‹ํ•˜๋Š” conn_id์ด๊ณ  Path๋ฅผ โ€™/โ€˜๋กœ ํ•ด์„œ filepath์—๋Š” ์ ˆ๋Œ€๊ฒฝ๋กœ๋ฅผ ์ž…๋ ฅํ•ด์ฃผ๋ฉด ๋œ๋‹ค

์ด DAG๋ฅผ ์‹คํ–‰ํ•˜๋ฉด ๋งˆ์šดํŠธ๋œ /opt/airflow/plugins ํด๋”์— sensor_test.txt๋ผ๋Š” ์ด๋ฆ„์˜ ํŒŒ์ผ์ด ์ƒ๊ธธ ๋•Œ๊นŒ์ง€ ๊ณ„์† running ์ƒํƒœ๋กœ ๋‚จ์•„์žˆ๊ณ  ํ•ด๋‹น ํŒŒ์ผ์ด ์ถ”๊ฐ€๋˜๋ฉด success ์ƒํƒœ๊ฐ€ ๋œ๋‹ค

Deferrable Operator

Deferrable Operator๋Š” Airflow์˜ Operator ๋˜๋Š” Sensor๊ฐ€ ์™ธ๋ถ€ ์กฐ๊ฑด์„ ๊ธฐ๋‹ค๋ฆด ๋•Œ ๋ฆฌ์†Œ์Šค ํšจ์œจ์„ ๊ทน๋Œ€ํ™”ํ•˜๊ธฐ ์œ„ํ•ด ๊ณ ์•ˆ๋œ ํŠน๋ณ„ํ•œ ์œ ํ˜•์œผ๋กœ ๋™์ž‘ ์›๋ฆฌ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • Deferrable Operator๊ฐ€ ํŠน์ • ์กฐ๊ฑด์„ ๊ธฐ๋‹ค๋ ค์•ผ ํ•  ๋•Œ, trigger ๊ฐ์ฒด๋ฅผ ํ†ตํ•ด ์‹คํ–‰์„ ์ค‘๋‹จ
  • Trigger๋Š” Airflow์˜ triggerer ์ปดํฌ๋„ŒํŠธ์—์„œ ๋น„๋™๊ธฐ๋กœ ๋™์ž‘ํ•˜๋ฉฐ, ์กฐ๊ฑด์ด ์ถฉ์กฑ๋  ๋•Œ๊นŒ์ง€ ๊ฐ์‹œ
  • ์กฐ๊ฑด์ด ์ถฉ์กฑ๋˜๋ฉด ํŠธ๋ฆฌ๊ฑฐ๊ฐ€ ์‹ ํ˜ธ๋ฅผ ๋ณด๋‚ด Operator๊ฐ€ ๋‹ค์‹œ ์‹คํ–‰์„ ์žฌ๊ฐœ

๊ธฐ์กด Operator์™€ Sensor (poke ๋ชจ๋“œ)๋Š” ์ž‘์—…์ด ๋Œ€๊ธฐ ์ƒํƒœ์—ฌ๋„ worker slot์„ ๊ณ„์† ์ ์œ ํ•˜์ง€๋งŒ, Deferrable Operator๋Š” ๋Œ€๊ธฐ ์ค‘์— worker slot์„ ํ•ด์ œํ•˜๊ณ  trigger๊ฐ€ ๋Œ€๊ธฐ ๋กœ์ง์„ ๋Œ€์‹  ์ฒ˜๋ฆฌํ•˜๊ฒŒ ๋œ๋‹ค

Deferrable Operator์˜ ์žฅ์ ์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • ๋ฆฌ์†Œ์Šค ์ ˆ์•ฝ: ๊ธด ๋Œ€๊ธฐ ์‹œ๊ฐ„์ด ํ•„์š”ํ•œ ์ž‘์—…์—์„œ๋„ worker ์ž์›์„ ์ ์œ ํ•˜์ง€ ์•Š์Œ ย 
  • ํ™•์žฅ์„ฑ ๊ฐœ์„ : ์ˆ˜๋ฐฑ~์ˆ˜์ฒœ ๊ฐœ์˜ Sensor๊ฐ€ ๋™์‹œ์— ๋Œ€๊ธฐํ•˜๋”๋ผ๋„ Triggerer ํ”„๋กœ์„ธ์Šค๊ฐ€ ๊ด€๋ฆฌํ•˜๋ฏ€๋กœ ํด๋Ÿฌ์Šคํ„ฐ ํšจ์œจ์ด ๋†’์Œ ย 
  • ์šด์˜ ์•ˆ์ •์„ฑ: ์žฅ๊ธฐ ๋Œ€๊ธฐ๋กœ ์ธํ•ด worker๊ฐ€ ๋ฌถ์ด์ง€ ์•Š์œผ๋ฏ€๋กœ SLA ์ง€์—ฐ์ด๋‚˜ ํ ์ ์ฒด๋ฅผ ๋ฐฉ์ง€

๋ณดํ†ต ๊ธฐ์กด sensor, operator ์ด๋ฆ„ ๋’ค์— Async๊ฐ€ ๋ถ™์–ด์žˆ๊ฑฐ๋‚˜ ํ˜น์€ deferrable=True๋ผ๋Š” ํŒŒ๋ผ๋ฏธํ„ฐ๋ฅผ ์„ค์ •ํ•ด์ฃผ๋ฉด deferrable ๋ชจ๋“œ๋กœ ๋™์ž‘ํ•˜๊ฒŒ ๋œ๋‹ค

Deferrable Operator ์‚ฌ์šฉ ์‹œ ๋ช‡ ๊ฐ€์ง€ ์ฃผ์˜ํ•ด์•ผํ•  ์ ์ด ์žˆ๋‹ค

  • Triggerer ํ”„๋กœ์„ธ์Šค ์‹คํ–‰ ํ•„์š”: airflow triggerer ํ”„๋กœ์„ธ์Šค๊ฐ€ ๋ฐ˜๋“œ์‹œ ๊ตฌ๋™ ์ค‘์ด์–ด์•ผ ํ•จ ย 
  • ๋ฆฌ์†Œ์Šค ํ•œ๊ณ„ ๊ณ ๋ ค: Triggerer๊ฐ€ ๋ชจ๋“  deferrable ํƒœ์Šคํฌ๋ฅผ ๊ฐ์‹œํ•˜๋ฏ€๋กœ, ๋„ˆ๋ฌด ๋งŽ์€ deferrable ํƒœ์Šคํฌ๊ฐ€ ํ•œ๊บผ๋ฒˆ์— ๋ชฐ๋ฆฌ๋ฉด triggerer ํŠœ๋‹ ํ•„์š” ย 
  • ํ˜ธํ™˜์„ฑ: ๋ชจ๋“  Operator๊ฐ€ deferrable ๋ฒ„์ „์„ ์ œ๊ณตํ•˜๋Š” ๊ฒƒ์€ ์•„๋‹ˆ๋ฉฐ, ์ผ๋ถ€๋Š” ๋ณ„๋„์˜ provider ํŒจํ‚ค์ง€์— ํฌํ•จ๋จ

๋งˆ์ง€๋ง‰์œผ๋กœ Sensor์—์„œ reschedule ๋ชจ๋“œ์™€์˜ ์ฐจ์ด๋ฅผ ์ •๋ฆฌํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

๊ตฌ๋ถ„reschedule ๋ชจ๋“œDeferrable Operator
Worker ์ ์œ ์กฐ๊ฑด ํ™•์ธ ํ›„ worker ์Šฌ๋กฏ์„ ๋ฐ˜๋‚ฉํ•˜์ง€๋งŒ, ๋‹ค์‹œ ์žกํ˜€์„œ ์‹คํ–‰๋จworker ์Šฌ๋กฏ ์™„์ „ ํ•ด์ œ, Triggerer๊ฐ€ ๊ฐ์‹œ
๋Œ€๊ธฐ ์ฒ˜๋ฆฌ ์ฃผ์ฒดScheduler๊ฐ€ ์ฃผ๊ธฐ์ ์œผ๋กœ ํƒœ์Šคํฌ๋ฅผ reschedulingTriggerer ํ”„๋กœ์„ธ์Šค๊ฐ€ ์ด๋ฒคํŠธ/์กฐ๊ฑด์„ ๋น„๋™๊ธฐ ๊ฐ์‹œ
๋ฆฌ์†Œ์Šค ํšจ์œจ์„ฑworker ์ ์œ ๋Š” ์ค„์ง€๋งŒ ์ฃผ๊ธฐ์  rescheduling์œผ๋กœ metadata DBยท์Šค์ผ€์ค„๋Ÿฌ ๋ถ€ํ•˜ ์กด์žฌ์žฅ๊ธฐ ๋Œ€๊ธฐ์— ์ตœ์ ํ™”, metadata DB ์ ‘๊ทผ ์ตœ์†Œํ™”, ๋Œ€๊ทœ๋ชจ sensor์—๋„ ์•ˆ์ •์ 
์ง€์› ์—ฌ๋ถ€๋Œ€๋ถ€๋ถ„์˜ Sensor์—์„œ mode="reschedule" ์˜ต์…˜๋งŒ ์„ค์ •ํ•˜๋ฉด ์‚ฌ์šฉ ๊ฐ€๋Šฅ์ „์šฉ Async/Deferrable Operator ํ•„์š”
์ ํ•ฉํ•œ ์ƒํ™ฉ์ˆ˜๋ถ„~์ˆ˜์‹ญ ๋ถ„ ๋Œ€๊ธฐ, Sensor ์ˆ˜๊ฐ€ ๋งŽ์ง€ ์•Š์„ ๋•Œ์ˆ˜์‹œ๊ฐ„~์ˆ˜์ผ ์ด์ƒ ์žฅ๊ธฐ ๋Œ€๊ธฐ, ์ˆ˜๋ฐฑ ๊ฐœ ์ด์ƒ์˜ Sensor ๋ณ‘๋ ฌ ์‹คํ–‰ ์‹œ

5. Hook

Airflow์—์„œ Hook์€ ์™ธ๋ถ€ ์‹œ์Šคํ…œ(DB, API, ํด๋ผ์šฐ๋“œ ์„œ๋น„์Šค ๋“ฑ)๊ณผ ์—ฐ๊ฒฐํ•˜๊ธฐ ์œ„ํ•œ ๊ณ ์ˆ˜์ค€ ์ธํ„ฐํŽ˜์ด์Šค๋‹ค. ย  BaseHook ํด๋ž˜์Šค๋ฅผ ์ƒ์†ํ•˜์—ฌ ๊ตฌํ˜„ํ•˜๋ฉฐ Airflow Connections ์ •๋ณด๋ฅผ ์‚ฌ์šฉํ•ด ์ธ์ฆ, ์—”๋“œํฌ์ธํŠธ, ํฌํŠธ ๋“ฑ์„ ์ž๋™์œผ๋กœ ๊ฐ€์ ธ์˜ฌ ์ˆ˜ ์žˆ๋‹ค

Hook์ด ํ•„์š”ํ•œ ์ด์œ ๋Š” ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  1. ์ผ๊ด€๋œ ์ธํ„ฐํŽ˜์ด์Šค ์ œ๊ณต
    • ๋‹ค์–‘ํ•œ DB/API๋ฅผ ๋™์ผํ•œ ๋ฐฉ์‹์œผ๋กœ ์ ‘๊ทผ โ†’ ์ฝ”๋“œ ๊ฐ€๋…์„ฑ, ์œ ์ง€๋ณด์ˆ˜์„ฑ โ†‘
  2. Connection ๊ด€๋ฆฌ ํ†ตํ•ฉ
    • Airflow UI์—์„œ Connection์„ ์„ค์ •ํ•˜๋ฉด Hook์ด ์ด๋ฅผ ์ž๋™์œผ๋กœ ๋ถˆ๋Ÿฌ์˜ด ย 
    • ๋น„๋ฐ€๋ฒˆํ˜ธ, ํ† ํฐ ๊ฐ™์€ ๋ฏผ๊ฐ ์ •๋ณด๋Š” ์ฝ”๋“œ์— ๋…ธ์ถœ๋˜์ง€ ์•Š์Œ
  3. ๋ชจ๋“ˆํ™”ยท์žฌ์‚ฌ์šฉ์„ฑ ย 
    • ์—ฌ๋Ÿฌ DAG์—์„œ ๋™์ผ Hook์„ ๋ถˆ๋Ÿฌ์™€ ํ™œ์šฉ ๊ฐ€๋Šฅ ย 
    • ์ปค์Šคํ…€ Hook ์ž‘์„ฑ ์‹œ, ํŒ€ ์ฐจ์›์˜ ๊ณตํ†ต ์œ ํ‹ธ์ฒ˜๋Ÿผ ์‚ฌ์šฉ ๊ฐ€๋Šฅ
  4. ๋ณต์žกํ•œ ์ž‘์—… ์ฒ˜๋ฆฌ ย 
    • Operator ์ˆ˜์ค€์—์„œ ์ œ๊ณตํ•˜์ง€ ์•Š๋Š” ๊ธฐ๋Šฅ์„ Hook์œผ๋กœ ์ง์ ‘ ๊ตฌํ˜„ ๊ฐ€๋Šฅ ย 
    • ํ•˜๋‚˜์˜ Task์—์„œ ๋‘ ๊ฐœ ์ด์ƒ์˜ Hook์„ ๋ณ‘ํ–‰ํ•ด ์‚ฌ์šฉํ•  ์ˆ˜๋„ ์žˆ์Œ

Operator์™€ ๋น„๊ตํ•˜๋ฉด ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • Operator
    • Airflow์—์„œ ์‹คํ–‰ ๋‹จ์œ„(Task)๋ฅผ ์ •์˜ ย 
    • ๋‚ด๋ถ€์ ์œผ๋กœ Hook์„ ํ˜ธ์ถœํ•˜์—ฌ ์™ธ๋ถ€ ์‹œ์Šคํ…œ๊ณผ ์ƒํ˜ธ์ž‘์šฉ ย 
  • Hook ย 
    • ์™ธ๋ถ€ ์—ฐ๊ฒฐ์„ ์ถ”์ƒํ™” ย 
    • Operator์—์„œ ์‚ฌ์šฉ๋˜๋Š” ํ•˜์œ„ ๊ตฌ์„ฑ ์š”์†Œ ย 
    • ์ง์ ‘ ์‹คํ–‰๋˜์ง€ ์•Š์Œ (Task๊ฐ€ ์•„๋‹˜)

Operator๋Š” โ€œ๋ฌด์—‡์„ ์‹คํ–‰ํ• ์ง€โ€, Hook์€ โ€œ์–ด๋””์— ์—ฐ๊ฒฐํ• ์ง€โ€๋ฅผ ๋‹ด๋‹นํ•œ๋‹ค๊ณ  ๋ณผ ์ˆ˜ ์žˆ๋‹ค.

Connection

๋ณดํ†ต Hook๋‹ค๋ฅธ ์„œ๋น„์Šค์— ์—ฐ๊ฒฐ์„ ํ•  ๋•Œ ์‚ฌ์šฉํ•˜๊ธฐ ๋•Œ๋ฌธ์— ์—ฐ๊ฒฐ ์ •๋ณด ๊ฐ€ ํ•„์š”ํ•œ๋ฐ ์ด๋ฅผ Airflow Connection์— ๋“ฑ๋กํ•ด์„œ ์‚ฌ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค

์•„๋ž˜์˜ ์˜ˆ์‹œ์ฒ˜๋Ÿผ Airflow UI์—์„œ ์ง์ ‘ ๋“ฑ๋กํ•˜๊ฑฐ๋‚˜ ์•„์˜ˆ ํ™˜๊ฒฝ๋ณ€์ˆ˜๋กœ docker compose์— ์ฃผ์ž…ํ•  ์ˆ˜ ์žˆ๋‹ค

์˜ˆ์‹œ: PostgresHook

Airflow UI์—์„œ Admin>Connection>Add Connection์— ๋“ค์–ด๊ฐ€์„œ ์•„๋ž˜์™€ ๊ฐ™์ด โ€˜postgres_defaultโ€™๋ผ๋Š” connection ์ด๋ฆ„์œผ๋กœ Airflow metaDB๋กœ ์‚ฌ์šฉ ์ค‘์ธ postgres์™€ ์—ฐ๊ฒฐํ•  ์ˆ˜ ์žˆ๋‹ค

๊ทธ ํ›„ ์•„๋ž˜ DAG๋ฅผ ์‹คํ–‰ํ•˜๋ฉด โ€˜postgres_defaultโ€™ connection์„ ์ด์šฉํ•ด PostgresHook์ด Postgres์— ์ ‘์†ํ•˜๊ฒŒ ๋˜๊ณ  โ€œSELECT now()::timestamp, current_database()โ€ ๋ผ๋Š” ์ฟผ๋ฆฌ๋ฅผ ์‹คํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค

import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
  
def fetch_rows(**_):
ย  ย  hook = PostgresHook(postgres_conn_id="postgres_default")
ย  ย  rows = hook.get_records("SELECT now()::timestamp, current_database()")
ย  ย  print(rows)
 
with DAG(
ย  ย  dag_id="ex_hook_postgres",
ย  ย  start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
ย  ย  schedule="@daily",
ย  ย  catchup=False,
ย  ย  tags=["hook", "postgres"],
) as dag:
ย  ย  read_pg = PythonOperator(
ย  ย  ย  ย  task_id="read_postgres",
ย  ย  ย  ย  python_callable=fetch_rows,
ย  ย  )

Hook๋„ Operator์™€ ๋งˆ์ฐฌ๊ฐ€์ง€๋กœ BaseHook์„ ์ƒ์†๋ฐ›์•„ Custom์œผ๋กœ ์ง์ ‘ ์ž‘์„ฑํ•  ์ˆ˜ ์žˆ๋‹ค


6. Provider์™€ Plugin

Provider

Provider๋Š” ํŠน์ • ์„œ๋น„์Šค๋‚˜ ํ”Œ๋žซํผ๊ณผ์˜ ํ†ตํ•ฉ์„ ํŒจํ‚ค์ง€ ๋‹จ์œ„๋กœ ์ œ๊ณตํ•˜๋Š” ํ™•์žฅ ๋ชจ๋“ˆ ์ง‘ํ•ฉ์œผ๋กœ Operator, Hook, Sensor, Transfer, ์—ฐ๊ฒฐ ํƒ€์ž…, ๋กœ๊ทธ ํ•ธ๋“ค๋Ÿฌ, ์‹œํฌ๋ฆฟ ๋ฐฑ์—”๋“œ, ์•Œ๋ฆผ ๋“ฑ ๋‹ค์–‘ํ•œ ํ™•์žฅ์„ ํฌํ•จํ•  ์ˆ˜ ์žˆ๋‹ค

์•„์ฃผ ๋‹ค์–‘ํ•œ Provider ํŒจํ‚ค์ง€๊ฐ€ ์žˆ๊ธฐ ๋•Œ๋ฌธ์— ๊ฐ„๋‹จํ•˜๊ฒŒ airflow ํ™˜๊ฒฝ์— installํ•ด์„œ ์ž์œ ๋กญ๊ฒŒ ์ถ”๊ฐ€๊ธฐ๋Šฅ์„ ํ™œ์šฉํ•  ์ˆ˜ ์žˆ๋‹ค

์˜ˆ๋ฅผ ๋“ค์–ด airflow๋ฅผ ํ†ตํ•ด spark ๊ด€๋ จ ์ž‘์—…์„ ํ•˜๊ณ  ์‹ถ์„ ๋•Œ apache-airflow-providers-apache-spark ๋ฅผ ์‚ฌ์šฉํ•˜๋ฉด ๋œ๋‹ค

์„ค์น˜ ๋ฐฉ๋ฒ•์€ ๋‹ค์Œ๊ณผ ๊ฐ™๋‹ค

  • airflow dockerfile์— ํ•ด๋‹น pythonํŒจํ‚ค์ง€๋ฅผ ์ถ”๊ฐ€ํ•ด์„œ ๋‹ค์‹œ ๋นŒ๋“œํ•ด์„œ ์‚ฌ์šฉ (๊ถŒ์žฅ)
  • .env์— _PIP_ADDITIONAL_REQUIREMENTS='apache-airflow-providders-apache-spark'๋ฅผ ์ถ”๊ฐ€ํ•˜๊ณ  docker compose ์‹คํ–‰ (์ž„์‹œ ๊ฐœ๋ฐœ์šฉ)

ํ•ด๋‹น provider docs๋ฅผ ๋ณด๋ฉด Connection types, Decorators, Operators ๋“ฑ ์ด ํŒจํ‚ค์ง€์—์„œ ์ œ๊ณตํ•˜๋Š” ๊ธฐ๋Šฅ๋“ค์„ ๋ชจ์•„์„œ ๋ณผ ์ˆ˜ ์žˆ๋‹ค

๋‹ค์Œ docs๋ฅผ ์ฐธ๊ณ ํ•ด custom provider๋ฅผ ์ง์ ‘ ๋งŒ๋“ค ์ˆ˜ ์žˆ๋‹ค

Plugin

Plugin์€ Airflow์˜ ๋™์ž‘๊ณผ UI๋ฅผ ํ™•์žฅ/๋ณ€๊ฒฝํ•˜๊ธฐ ์œ„ํ•œ ๋ฉ”์ปค๋‹ˆ์ฆ˜์ด๋‹ค

์•ž์„œ custom operator๋ฅผ ๋งŒ๋“ค ๋•Œ ๋ดค๋“ฏ์ด plugins/ ์ง์ ‘ ๊ธฐ๋Šฅ์„ ์ถ”๊ฐ€ํ•  ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์ด๋ผ ๋ณผ ์ˆ˜ ์žˆ๋Š”๋ฐ
๋‹จ์ˆœํžˆ operator, hook ๋“ฑ์„ ์ถ”๊ฐ€ํ•˜๋Š” ๊ฒƒ์„ ๋„˜์–ด์„œ UI, Flask ๋ทฐ, Admin ํ™”๋ฉด ๋“ฑ Airflow์˜ ์ž์ฒด ๊ธฐ๋Šฅ์— ๋Œ€ํ•œ custom๋„ ๊ฐ€๋Šฅํ•˜๋‹ค

์ฆ‰, Provider๊ฐ€ ํŠน์ • ์„œ๋น„์Šค์™€์˜ ์—ฐ๋™์„ ์œ„ํ•œ ๊ณต์‹ ๋ฐฐํฌ ๋ชจ๋“ˆ์ด๋ผ๋ฉด Plugin์€ UI, Flask ํ™•์žฅ๊นŒ์ง€ ๊ฐ€๋Šฅํ•œ ์‚ฌ์šฉ์ž ์ •์˜ ๋ชจ๋“ˆ๋กœ Provider๋ฅผ ํฌ๊ด„ํ•˜๋Š” ๊ฐœ๋…์ด๋‹ค

๋‹น์—ฐํžˆ ์ง์ ‘ custom plugin์„ ๊ตฌํ˜„ํ•  ์ˆ˜ ์žˆ์œผ๋ฉฐ, airflow-executor์ฒ˜๋Ÿผ ํŒจํ‚ค์ง€ํ™” ๋œ plugin๋„ ์žˆ๋‹ค
DAG/ํƒœ์Šคํฌ ์ƒํƒœ๋ฅผ Prometheus๋กœ ๋…ธ์ถœํ•˜๋Š” ํ”Œ๋Ÿฌ๊ทธ์ธ์œผ๋กœ, ํ•ด๋‹น ํŒจํ‚ค์ง€๋ฅผ ์„ค์น˜ํ•˜๋ฉด http://localhost:8080/admin/metrics ๊ฒฝ๋กœ๊ฐ€ ์ถ”๊ฐ€๋˜์–ด Prometheus์—์„œ ์ด๋ฅผ ์ธ์‹ํ•ด์„œ ๋ชจ๋‹ˆํ„ฐ๋ง์„ ํŽธํ•˜๊ฒŒ ํ•  ์ˆ˜ ์žˆ๋‹ค


7. ์ฃผ์š” ๊ฐœ๋… ๋น„๊ต

๊ฐœ๋…๋ชฉ์ /์—ญํ• ํ™•์žฅ์„ฑ์‚ฌ์šฉ ๋ฐฉ๋ฒ•
TaskDAG ๋‚ด ์‹คํ–‰ ๋‹จ์œ„๋กœ์„œ Operator ์ธ์Šคํ„ด์Šค ๋˜๋Š” @task ํ•จ์ˆ˜๊ฐ€ ์ƒ์„ฑํ•˜๋Š” ๋…ธ๋“œ์ž์ฒด ํ™•์žฅ ๊ฐœ๋…์€ ์—†์Œ, ํ…œํ”Œ๋ฆฟ(Operator) ๋‹ค์–‘ํ™”๋กœ ํ˜•ํƒœ ํ™•์žฅDAG์—์„œ Operator ์ธ์Šคํ„ด์Šค ์ƒ์„ฑ ๋˜๋Š” @task ์‚ฌ์šฉ, ์˜์กด์„ฑ ์—ฐ๊ฒฐ
Operator์žฌ์‚ฌ์šฉ ๊ฐ€๋Šฅํ•œ ์ž‘์—… ํ…œํ”Œ๋ฆฟ์œผ๋กœ execute()์— ์ˆ˜ํ–‰ ๋กœ์ง ์ •์˜BaseOperator ์ƒ์†์œผ๋กœ ์ปค์Šคํ…€ ๊ตฌํ˜„ ๊ฐ€๋ŠฅDAG์—์„œ ํด๋ž˜์Šค๋ฅผ ์ธ์Šคํ„ด์Šคํ™”ํ•˜์—ฌ Task ์ƒ์„ฑ
Sensor์™ธ๋ถ€ ์กฐ๊ฑด(ํŒŒ์ผ ์กด์žฌ, API ์‘๋‹ต, ํŠน์ • ์‹œ๊ฐ ๋“ฑ)์„ ๋งŒ์กฑํ•  ๋•Œ๊นŒ์ง€ ๋Œ€๊ธฐํ•˜๋Š” ํŠน์ˆ˜ OperatorBaseSensorOperator ์ƒ์†, Deferrable ๋ฒ„์ „ ๊ตฌํ˜„ ๊ฐ€๋Šฅmode="poke" ๋˜๋Š” mode="reschedule"๋กœ ์‚ฌ์šฉ, ์ผ๋ถ€๋Š” Async/Deferrable Sensor๋กœ ์‚ฌ์šฉ
Hook์™ธ๋ถ€ ์‹œ์Šคํ…œ ์ธํ„ฐํŽ˜์ด์Šค, Connection๊ณผ ์—ฐ๊ณ„๋œ ๊ณ ์ˆ˜์ค€ API ์ œ๊ณตBaseHook ์ƒ์†์œผ๋กœ ์ปค์Šคํ…€ Hook ๊ตฌํ˜„ ๊ฐ€๋Šฅ๋ณดํ†ต Operator ๋‚ด๋ถ€์—์„œ ์ƒ์„ฑยทํ˜ธ์ถœ, ํ•„์š” ์‹œ PythonOperator์—์„œ ์ง์ ‘ ์‚ฌ์šฉ
ProviderํŠน์ • ์„œ๋น„์Šค ์—ฐ๋™์„ ํŒจํ‚ค์ง€ ๋‹จ์œ„๋กœ ์ œ๊ณต, ์ฝ”์–ด ์™ธ ํ™•์žฅ ๋ฌถ์Œ์ฝ”์–ด์™€ ๋…๋ฆฝ ๋ฒ„์ „, ์‹ ๊ทœ/์‚ฌ๋‚ด Provider ์ œ์ž‘ยท๋ฐฐํฌ ๊ฐ€๋Šฅpip install apache-airflow-providers-... ์„ค์น˜ ํ›„ ๋ชจ๋“ˆ์—์„œ ํด๋ž˜์Šค ์ž„ํฌํŠธ
PluginAirflow ๋™์ž‘/UI ํ™•์žฅ์„ ์œ„ํ•œ ํ”Œ๋Ÿฌ๊ทธ์ธ ์‹œ์Šคํ…œFlask/FastAPI ๋ทฐ, ๋ฆฌ์Šค๋„ˆ, ๋งคํฌ๋กœ ๋“ฑ ์ „์—ญ ํ™•์žฅ ๊ตฌ์„ฑ ๊ฐ€๋Šฅplugins/ ํด๋” ๋˜๋Š” ํŒจํ‚ค์ง€ ์—”ํŠธ๋ฆฌํฌ์ธํŠธ๋กœ ๋“ฑ๋ก ํ›„ ์žฌ์‹œ์ž‘