์์๋ณผ ๋ด์ฉ
- Task ์ ์, ์ํ
- Task ๊ฐ ์์ (Dependency) ์ ์
- XCom, Connection
- Operator
- Sensor, Deferrable Operator
- Hook
- Provider, Plugin
1. Task๋?
Task๋ Airflow์์ ์คํ๋๋ ์์ ์ ๊ธฐ๋ณธ ๋จ์๋ก, DAG์ ๋ ธ๋๋ก ๋ฐฐ์น๋๊ณ Dependency ์ค์ ์ ํตํด ์คํ ์์๋ฅผ ๊ฒฐ์ ํ ์ ์๋ค
Task์๋ ์ธ ๊ฐ์ง ๊ธฐ๋ณธ ์ ํ์ด ์กด์ฌํ๋ค
- Operator
- Sensor (Operator์ ์ผ์ข )
@task
๋ก ๋ง๋ ํจ์ Task
๋ด๋ถ์ ์ผ๋ก๋ ๋ชจ๋ BaseOperator
์ ์๋ธํด๋์ค์ด๋ฉฐ ๊ฐ๋
์ Task๋ ์คํ ์ธ์คํด์ค, Operator๋ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ํ
ํ๋ฆฟ์ ์๋ฏธํ๋ค
Task ์ํ
DAG๋ฅผ ์คํํ๋ฉด ์ ์๋ ๋๋ก Task๊ฐ ์คํ๋๋๋ฐ ์ด๋ Task๋ ๋ค์ํ ์ํ๋ฅผ ๊ฐ์ง ์ ์๋ค
Airflow UI์์ Task์ ํ์ฌ ์ํ๋ฅผ ์ฝ๊ฒ ํ์
ํ๋๋ฐ ๋์์ด ๋๋ค
none
: ์์ ์ด ์์ง ์คํ ๋๊ธฐ์ด์ ๋ค์ด๊ฐ์ง ์์ ์ํ (์์กด์ฑ์ด ์์ง ์ถฉ์กฑ๋์ง ์์)scheduled
: Scheduler๊ฐ ์์ ์ ์์กด์ฑ์ด ๋ชจ๋ ์ถฉ์กฑ๋์์์ ํ์ธํ๊ณ , ์คํํด์ผ ํ๋ค๊ณ ๊ฒฐ์ ํ ์ํqueued
: ์์ ์ด Executor์ ํ ๋น๋์ด, Worker๋ฅผ ๊ธฐ๋ค๋ฆฌ๊ณ ์๋ ์ํrunning
: ์์ ์ด Worker (๋๋ ๋ก์ปฌ/๋๊ธฐ์ executor)์์ ์คํ ์ค์ธ ์ํsuccess
: ์์ ์ด ์ค๋ฅ ์์ด ์ ์์ ์ผ๋ก ์๋ฃ๋ ์ํrestarting
: ์์ ์ด ์คํ ์ค์ผ ๋ ์ธ๋ถ์์ ์ฌ์์ ์์ฒญ์ ๋ฐ์ ์ํfailed
: ์์ ์คํ ์ค ์ค๋ฅ๊ฐ ๋ฐ์ํ์ฌ ์คํจํ ์ํskipped
: branching, LatestOnly ๋ฑ์ผ๋ก ์ธํด ์์ ์ด ๊ฑด๋๋ฐ์ด์ง ์ํupstream_failed
: Upstream ์์ ์ด ์คํจํ๊ณ , ย Trigger Rules์ ๋ฐ๋ผ ์ด ์์ ์ด ํ์ํ๋ ๊ฒฝ์ฐup_for_retry
: ์์ ์ด ์คํจํ์ง๋ง, ์ฌ์๋ ํ์๊ฐ ๋จ์ ์์ด ๋ค์ ์คํ๋ ์์ ์ธ ์ํup_for_reschedule
: ์์ ์ด Sensorย ์ด๊ณ , reschedule ๋ชจ๋์์ ๋ค์ ์์ฝ ๋๊ธฐ ์ค์ธ ์ํdeferred
: ์์ ์ด trigger๋ก ์ฐ๊ธฐ๋ ์ํ (Deferred)removed
: DAG ์คํ์ด ์์๋ ์ดํ ์์ ์ด DAG์์ ์ฌ๋ผ์ง ์ํ
Task Dependency
Airflow์์๋ Task ๊ฐ ์์กด์ฑ์ ์ง๊ด์ ์ผ๋ก ์ ์ํ ์ ์๋ค
์์กด์ฑ์ ์ ์ค์ ํด์ผ DAG์ด ์ฌ๋ฐ๋ฅธ ์์๋ก ์คํ๋๊ณ , ๋ณ๋ ฌ/๋ถ๊ธฐ ์คํ๋ ์ํํ ๊ด๋ฆฌ๋๋ค.
1. ๊ธฐ๋ณธ ์ฐ๊ฒฐ ์ฐ์ฐ์
-
>>
: ์๋ฐฉํฅtask1 >> task2 # task1 ์คํ ํ task2 ์คํ task1.set_downstream(task2) # ๋์ผํ ํํ
-
<<
: ์ญ๋ฐฉํฅtask2 << task1 # task1 ์คํ ํ task2 ์คํ task2.set_upstream(task1) # ๋์ผํ ํํ
2. ๋ค์ค Task ์ฐ๊ฒฐ
-
์ฌ๋ฌ ํ์คํฌ๊ฐ ๋ชจ์ฌ์ ํ๋๋ก ํฉ์ณ์ง๋ ๊ตฌ์กฐ
[task1, task2] >> task3 # task1, task2๊ฐ ๋ชจ๋ ์ฑ๊ณตํด์ผ task3 ์คํ (Trigger Rules ์ฐธ๊ณ )
-
๋ฐ๋๋ก ํ๋์ ํ์คํฌ์์ ์ฌ๋ฌ ๊ฐ๋ก ๋ถ๊ธฐ
task1 >> [task2, task3]
3. Cross Downstream
- ๋ ๋ฆฌ์คํธ ๊ฐ์ ๋ชจ๋ ์กฐํฉ์ ์ฐ๊ฒฐ
from airflow.sdk import cross_downstream cross_downstream([task1, task2], [task3, task4]) # ๊ฒฐ๊ณผ # task1 >> task3 # task1 >> task4 # task2 >> task3 # task2 >> task4
4. Chain
- ์์ฐจ ์ฒด์ธ์ ๊ฐ๊ฒฐํ๊ฒ ํํ
from airflow.sdk import chain chain(task1, task2, task3, task4) # ๊ฒฐ๊ณผ # task1 >> task2 >> task3 >> task4
์์
task0 >> task1
[task1, task2] >> task3
task3 >> [task4, task5]
cross_downstream([task4, task5], [task6, task7])
chain(task6, task9, task10)
chain(task7, task8)
XCOM : Task๊ฐ์ ์ ๋ณด๊ตํ
Task๋ ๊ธฐ๋ณธ์ ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ ์ฃผ๊ณ ๋ฐ์ง ์๋๋ฐ ์ด์ Task์์ ์ป์ ๊ฒฐ๊ณผ๋ฅผ ๋ค์ Task์์ ๋ฐ์์ ์ฌ์ฉํ๊ณ ์ถ์ ์ ์๋ค
XCom(Cross-Communication)์ Airflow์์ ํ์คํฌ ๊ฐ ์์ ๋ฐ์ดํฐ ์กฐ๊ฐ์ ์ฃผ๊ณ ๋ฐ๋ ๋ฉ์ปค๋์ฆ์ผ๋ก ๋ฐ์ดํฐ๋ฅผ key-value ํํ๋ก metadata DB (XCom ํ ์ด๋ธ)์ ์ ์ฅํ ์ ์๋ค ์ผ๋ฐ์ ์ผ๋ก ์งง์ ๋ฌธ์์ดยท์ซ์ยท๊ฒฝ๋กยท์์ JSON์ ์ ๋ฌํ๋ ์ฉ๋๋ก ์ฌ์ฉํ๋ค
๊ธฐ๋ณธ ๋์์ ๋ค์๊ณผ ๊ฐ๋ค
ti.xcom_push(key, value)
๋ก ์ ์ฅ,ti.xcom_pull(task_ids, key)
๋ก ์กฐํ ย@task
๋ก ์ ์ํ ํจ์์ Return๊ฐ์ ์๋์ผ๋ก XCom์ ์ ์ฅ- ๊ธฐ๋ณธ ์ง๋ ฌํ๋ JSON ํธํ ํ์ ๊ถ์ฅ (๋ณต์ก ๊ฐ์ฒด๋ ์ปค์คํ ์ง๋ ฌํ/๋ฐฑ์๋ ๊ณ ๋ ค)
- XCom์ TaskInstance ๋จ์๋ก ๊ธฐ๋ก๋๋ค (๊ฐ์ DAG Run, ๊ฐ์ ํ์คํฌ ์คํ ๋ฒ์)
๊ฐ์ฅ ๊ธฐ๋ณธ์ ์ธ ์์๋ ๋ค์๊ณผ ๊ฐ๋ค
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
def producer(**context):
ย ย # ์ด๋ค ๊ณ์ฐ/์กฐํ ๊ฒฐ๊ณผ๋ผ๊ณ ๊ฐ์
ย ย result = {"n": 7, "msg": "hello xcom"}
ย ย context["ti"].xcom_push(key="payload", value=result)
def consumer(**context):
ย ย payload = context["ti"].xcom_pull(task_ids="produce", key="payload")
ย ย print("pulled:", payload)ย # {'n': 7, 'msg': 'hello xcom'}
with DAG(
ย ย dag_id="ex_xcom_push_pull",
ย ย start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
ย ย schedule="@daily",
ย ย catchup=False,
ย ย tags=["xcom", "basic"],
) as dag:
ย ย produce = PythonOperator(task_id="produce", python_callable=producer)
ย ย consume = PythonOperator(task_id="consume", python_callable=consumer)
ย ย
ย ย produce >> consume
XCom์ ์ฌ์ฉํ ๋ ๋ฑ์ฅํ๋ Context์ 'ti'?
- Context : Scheduler๊ฐ Task๋ฅผ ์คํํ ๋ ๋๊ฒจ์ฃผ๋ metadata dictionary
- PythonOperator์
python_callable
์๋ ๊ธฐ๋ณธ์ ์ผ๋ก**context
๊ฐ ์ ๋ฌ@task
๋ก ์ ์ํ ํจ์ ์์์๋get_current_context()
๋ก ๋์ผํ ๋ด์ฉ ํ์ธ ๊ฐ๋ฅ- context์ ํฌํจ๋ key ์์
- โtiโ : ย TaskInstance ๊ฐ์ฒด (alias:
task_instance
)- โtaskโ, โdagโ : ํ์ฌ Task, DAG ๊ฐ์ฒด
- โdag_runโ : ํ์ฌ DAG Run ๊ฐ์ฒด
- โdsโ, โtsโ : ๋ ผ๋ฆฌ ์คํ์ผ์ ๋ ์ง ๋ฌธ์์ด(YYYY-MM-DD), ํ์์คํฌํ(ISO)
- โlogical_dateโ:
pendulum.DateTime
๊ฐ์ฒด- โdata_interval_startโ, โdata_interval_endโ: ์ค์ผ์ค ์ฃผ๊ธฐ์ ์์/๋ ์๊ฐ
- โparamsโ: DAG/Task์ ์ฃผ์ ํ ์ฌ์ฉ์ ํ๋ผ๋ฏธํฐ ๋์ ๋๋ฆฌ
- โvarโ : ํ ํ๋ฆฟ์์ ๋ณ์ ์ ๊ทผ์ฉ ๋งคํฌ๋ก (ํ์ด์ฌ ์ฝ๋์์๋
Variable.get()
์ฌ์ฉ ๊ถ์ฅ)- โtiโ : ํ์ฌ ์คํ ์ค์ธ ํ์คํฌ์ ์ธ์คํด์ค๋ฅผ ํํํ๋ ๊ฐ์ฒด
ti.task_id
,ti_dag_id
,ti.try_number
๋ฑ ๋ค์ํ ์์ฑ ํ์ธ ๊ฐ๋ฅ- xcom_push๋ก ๋ฃ์ ๊ฐ์ metaDB์ ์ ์ฅ๋๊ธฐ ๋๋ฌธ์ context ์ถ๋ ฅ์ผ๋ก ํ์ธ์๋จ
2. Control Flow
๊ธฐ๋ณธ์ ์ผ๋ก Task๋ ๋ชจ๋ upstream task๊ฐ ์ฑ๊ณตํด์ผ ์คํ๋์ง๋ง ๋ค์๊ณผ ๊ฐ์ ๋ฐฉ๋ฒ๋ค์ ํตํด ๋ ์ ์ฐํ ์ ์ด๊ฐ ๊ฐ๋ฅํ๋ค
Branching
๋ง์ฝ ์ด๋ค task์ ์ฑ๊ณต ์ฌ๋ถ์ ๋ฐ๋ผ ๋ค์ ์์ ์ ๋ค๋ฅด๊ฒ ๊ฐ์ ธ๊ฐ๊ณ ์ถ์ ๋ Branching์ ์ฌ์ฉํ๋ฉด ๋๋ค
@task.branch()
decorator๋ฅผ ์ฌ์ฉํ๊ฑฐ๋ BranchPythonOperator
๋ฅผ ์ฌ์ฉํด์ branching ์ญํ ์ ๋ด๋นํ๋ task๋ฅผ ํ์ด์ฌ ํจ์๋ก ์์ฑํ๋๋ฐ ์ด๋ return ๊ฐ์ ๋ค์์ ์คํํ task_id ๋ฅผ ๋ฃ์ผ๋ฉด ๋๋ค
๋ค์์ BranchPythonOperator
๋ฅผ ์ฌ์ฉํ๋ ์์์ด๋ค (decorator ์ฌ์ฉ์ docs ์ฐธ๊ณ )
from airflow import DAG
from airflow.operators.bash import BashOperatofrom airflow import DAGr
from airflow.operators.python import BranchPythonOperator
# DAG ์ ์
dag = DAG(
"branch_random_dag",
default_args=default_args,
description="0~10 ๋๋ค ๊ฐ์ ๋ฐ๋ฅธ ๋ธ๋์น DAG",
schedule="@daily",
catchup=False,
)
def generate_random_and_branch():
"""0~10 ์ฌ์ด์ ๋๋ค ๊ฐ์ ์์ฑํ๊ณ 5 ์ด์/๋ฏธ๋ง์ ๋ฐ๋ผ ๋ธ๋์น ๊ฒฐ์ """
random_value = random.randint(0, 10)
print(f"์์ฑ๋ ๋๋ค ๊ฐ: {random_value}")
if random_value >= 5:
print("๋๋ค ๊ฐ์ด 5 ์ด์์
๋๋ค. task_A๋ฅผ ์คํํฉ๋๋ค.")
return "task_A"
else:
print("๋๋ค ๊ฐ์ด 5 ๋ฏธ๋ง์
๋๋ค. task_B๋ฅผ ์คํํฉ๋๋ค.")
return "task_B"
# ๋ธ๋์น ํ์คํฌ (๋๋ค ๊ฐ ์์ฑ ๋ฐ ๋ถ๊ธฐ ๊ฒฐ์ )
branch_task = BranchPythonOperator(
task_id="branch_task",
python_callable=generate_random_and_branch,
dag=dag,
)
# Task A (๋๋ค ๊ฐ์ด 5 ์ด์์ผ ๋ ์คํ)
task_A = BashOperator(
task_id="task_A",
bash_command='echo "Task A ์คํ๋จ - ๋๋ค ๊ฐ์ด 5 ์ด์์
๋๋ค!"',
dag=dag,
)
# Task B (๋๋ค ๊ฐ์ด 5 ๋ฏธ๋ง์ผ ๋ ์คํ)
task_B = BashOperator(
task_id="task_B",
bash_command='echo "Task B ์คํ๋จ - ๋๋ค ๊ฐ์ด 5 ๋ฏธ๋ง์
๋๋ค!"',
dag=dag,
)
# ํ์คํฌ ์์กด์ฑ ์ค์
branch_task >> [task_A, task_B]
์ DAG๋ฅผ ์คํํด์ ๋์จ ๋๋ค ๊ฐ์ด 1์ด๋ผ์ 5๋ณด๋ค ์๊ธฐ ๋๋ฌธ์ task_B๊ฐ ์คํ๋์๋ค
Trigger Rules
์ผ๋ถ๋ง ์ฑ๊ณตํด๋ ๋ค์ task๋ฅผ ์คํํ๋ ๋ฑ ๋ค์ํ Trigger rules๋ฅผ ์ธ์๋ก ๋ฃ์ด์ ์์ ๋กญ๊ฒ ์ปค์คํฐ๋ง์ด์ง์ด ๊ฐ๋ฅํ๋ค
all_success
(๊ธฐ๋ณธ๊ฐ): ๋ชจ๋ upstream ์ฑ๊ณต ์ ์คํall_failed
: ๋ชจ๋ upstream ์คํจ ์ ์คํall_done
: ๋ชจ๋ upstream ์คํall_skipped
: ๋ชจ๋ upstream ์ดskipped
์ํ์ธ ๊ฒฝ์ฐ ์คํone_success
: ํ๋๋ผ๋ ์ฑ๊ณตํ๋ฉด ์คํone_failed
: ํ๋๋ผ๋ ์คํจํ๋ฉด ์คํone_done
: ํ๋๋ผ๋ ์ฑ๊ณต ํน์ ์คํจํ๋ ๊ฒฝ์ฐ ์คํnone_failed
: ์คํจ๋ง ์์ผ๋ฉด ์คํ (์ฑ๊ณต/์คํต ํผํฉ ํ์ฉ)always
: ๋ฌด์กฐ๊ฑด ์คํ- โฆ
from airflow.operators.empty import EmptyOperator
notify = EmptyOperator(
task_id="notify",
trigger_rule="one_failed"
)
Latest Only
์๋ฅผ ๋ค์ด ํ ๋ฌ์น ๋ฐ์ดํฐ๋ฅผ ์ฒ๋ฆฌํ๊ธฐ ์ํด ๊ณผ๊ฑฐ ๋ ์ง์ ๋ํ backfill์ด ํ์ํ ๊ฒฝ์ฐ๋ฅผ ์๊ฐํด๋ณด์
์๋ฆผ ์ ์ก ๋ฐ ๋ฆฌํฌํธ ์์ฑ ์์
์ ๊ฒฝ์ฐ ๊ณผ๊ฑฐ ๋ฐ์ดํฐ์๋ ๊ตณ์ด ์งํํ์ง ์์๋ ๋๋ ์์
์ผ ๊ฒ์ด๋ค
์ด๋ฐ ๊ฒฝ์ฐ Latest Only๋ฅผ ์ฌ์ฉํด ์ต์ DAG ์คํ์๋ง ํน์ ํ์คํฌ๋ฅผ ์คํํ๋๋ก ์ ์ดํ ์ ์๋ค
LatestOnlyOperator
๋ฅผ ํตํด ๊ทธ ์ดํ downstream task๋ค์ Latest DAG run ์ด ์๋ ๊ฒฝ์ฐ ์๋์ผ๋ก skipํ ์ ์๋ค
Latest DAG run? โ ํ์ฌ ์๊ฐ์ด ํค๋น DAG ์คํ ์๊ฐ๊ณผ ๋ค์ ์ค์ผ์ค ์ฌ์ด์ ์๊ณ , ์ธ๋ถ์์ ์๋์ผ๋ก trigger ๋ ์คํ์ด ์๋ ๊ฒฝ์ฐ
์ค๋ ๋ ์ง๊ฐ 2025-09-05์ด๊ณ catchup=True๋ฅผ ์ด์ฉํด์ 2025-09-01๋ถํฐ ์ค๋๊น์ง ์์ ์ ๋ชจ๋ ์ฒ๋ฆฌํ๊ฒ DAG๋ฅผ ์์ฑํ ์์์ด๋ค
import pendulum
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.latest_only import LatestOnlyOperator
from airflow.sdk import DAG
from airflow.utils.trigger_rule import TriggerRule
with DAG(
dag_id="latest_only_dag",
schedule="@daily",
start_date=pendulum.datetime(2025, 9, 1, tz="UTC"),
catchup=True,
tags=["example3"],
) as dag:
latest_only = LatestOnlyOperator(task_id="latest_only")
task1 = EmptyOperator(task_id="task1")
task2 = EmptyOperator(task_id="task2")
task3 = EmptyOperator(task_id="task3")
task4 = EmptyOperator(task_id="task4", trigger_rule=TriggerRule.ALL_DONE)
latest_only >> task1 >> [task3, task4]
task2 >> [task3, task4]
๊ฐ์ฅ ์ต๊ทผ์ธ ์ค๋์ ์ ์ธํ๊ณ ์ด์ ๋ฐ์ดํฐ ์ฒ๋ฆฌ์ ๊ฒฝ์ฐ LatestOnlyOperator
๋ก ์ธํด task1๊ณผ task3๊ฐ skip๋ ๊ฒ์ ํ์ธํ ์ ์๋ค
Depends On Past
Task๊ฐ ์ด์ DAG Run์์ ์ฑ๊ณตํ์ ๋๋ง ํ์ฌ ์คํ์ ํ์ฉํ๋ ๊ธฐ๋ฅ์ผ๋ก ๊ฐ์ Task๊ฐ ์ด์ ์คํ์์ ์คํจํ๋ค๋ฉด, ํ์ฌ ์คํ๋ ์๋์ผ๋ก ๊ฑด๋๋ฐ๊ฑฐ๋ ์คํจ ์ฒ๋ฆฌ๋๋ค
๋ฐ์ดํฐ ์ ์ฌ, ETL ๋ฑ์์ ์ฐ์์ฑ์ด ์ค์ํ ๊ฒฝ์ฐ (์: ํ๋ฃจ์น ๋ฐ์ดํฐ๊ฐ ๋๋ฝ๋๋ฉด ๋ค์๋ ๋ฐ์ดํฐ ์ ์ฌ๋ ๋ง๊ณ ์ถ์ ๋) ํน์ Task๊ฐ ์์ฐจ์ ์ผ๋ก ์คํ๋์ด์ผ ํ ๋ (์: ํ์ผ ์ฒ๋ฆฌ, ๋ฐฐ์น ์์ ๋ฑ) ์ ํ์ฉ๋๋ค
Setup and Teardown
Setup๊ณผ Teardown์ Airflow์์ ๋ฆฌ์์ค์ ์์ฑ๊ณผ ์ ๋ฆฌ๋ฅผ ์๋ํํ์ฌ, ๋ฐ์ดํฐ ํ์ดํ๋ผ์ธ ์คํ ์ ๋ฆฌ์์ค ๊ด๋ฆฌ๊ฐ ๋๋ฝ๋์ง ์๋๋ก ๋์์ฃผ๋ ๊ธฐ๋ฅ์ด๋ค
3. Operator
Operator๋ ํน์ ์์ ์ ์ํํ๋๋ก ์ค๊ณ๋ ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ Task ํ ํ๋ฆฟ ์ด๋ค
์ง๊ธ๊น์ง ์์์์ ์ฌ์ฉ๋ EmptyOperator
, BashOperator
, PythonOperator
๋ฑ๊ณผ ๊ฐ์ด ์ํ๋ ๊ธฐ๋ฅ์ ํธํ๊ฒ ์ฌ์ฉํ ์ ์๊ฒ ๋์์ฃผ๋ ์ญํ ์ ํ๋ค
๋ง์ด ์ฌ์ฉํ๋ ๋๋ถ๋ถ์ Operator๋ Airflow์์ ๊ธฐ๋ณธ์ผ๋ก ์ ๊ณตํ๊ฑฐ๋, ํน์ python package๋ง ์ค์น(์ฐธ๊ณ : Provider)ํ๋ฉด ์ฝ๊ฒ ์ฌ์ฉํ ์ ์๋ค
๋ํ์ ์ธ Operators
- EmptyOperator : ์๋ฌด ์์ ์ ํ์ง๋ ์์ง๋ง, ํ ์คํธ๋ ์์ ๊ฐ์ ์์กด์ฑ์ ๋ช ํํ ํ ๋ ์ฌ์ฉํ ์ ์์
- BashOperator
run_this = BashOperator( task_id="run_after_loop", bash_command="echo https://airflow.apache.org/", )
- PythonOperator : ํ๊ณ ์ถ์ ์์
์ ํ์ด์ฌ ํจ์๋ก๋ง ์์ฑํ๋ฉด ๋
def print_context(ds=None, **kwargs): """Print the Airflow context and ds variable from the context.""" print("[group] All kwargs") pprint(kwargs) print("[endgroup]") print("[group] Context variable ds") print(ds) print("[endgroup]") return "Whatever you return gets printed in the logs" run_this = PythonOperator(task_id="print_the_context", python_callable=print_context)
- BranchDateTimeOperator : DAG ์คํ ์๊ฐ์ ๋ฐ๋ผ branch ์์
์ ๊ตฌ์ฑํ๊ณ ์ถ์ ๊ฒฝ์ฐ ์ฌ์ฉ
empty_task_11 = EmptyOperator(task_id="date_in_range", dag=dag1) empty_task_21 = EmptyOperator(task_id="date_outside_range", dag=dag1) cond1 = BranchDateTimeOperator( task_id="datetime_branch", follow_task_ids_if_true=["date_in_range"], follow_task_ids_if_false=["date_outside_range"], target_upper=pendulum.datetime(2020, 10, 10, 15, 0, 0), target_lower=pendulum.datetime(2020, 10, 10, 14, 0, 0), dag=dag1, ) # Run empty_task_11 if cond1 executes between 2020-10-10 14:00:00 and 2020-10-10 15:00:00 cond1 >> [empty_task_11, empty_task_21]
- EmailOperator
- HttpOperator
- SQLExecuteQueryOperator
- DockerOperator
- HiveOperator
- S3FileTransformOperator
- SlackAPIOperator
- โฆ
Custom Operator
๊ณต์์ ์ผ๋ก ์ ๊ณต๋์ง ์์ง๋ง ์์ฃผ ์ฌ์ฉํ๋ ๊ธฐ๋ฅ์ด ์๋ ๊ฒฝ์ฐ Custom Operator๋ฅผ ์ง์ ๋ง๋ค์ด ์ฌ์ฉํ ์ ์๋ค
๋ด์ฅ Operator๋ก ํํํ๊ธฐ ์ด๋ ค์ด ๋ก์ง์ BaseOperator
์๋ธํด๋์ค๋ก ์บก์ํํ ์ฌ์ฉ์ ์ ์ ํ
ํ๋ฆฟ์ด๊ณ Airflow 3์์๋ ๊ณต์ SDK ๊ฒฝ๋ก airflow.sdk
์ BaseOperator
์์์ ๊ถ์ฅํ๊ณ ์๋ค
์ํ๋ Operator๋ฅผ ๋ง๋ค๊ธฐ ์ํด์๋ BaseOperator
ํด๋์ค๋ฅผ ์์ ๋ฐ์ ๋ ๊ฐ์ง ๋ฉ์๋๋ฅผ override ํด์ผํ๋ค
- ์์ฑ์ (
__init__
) : Operator์ ํ์ํ ํ๋ผ๋ฏธํฐ ์ง์ . ์ด ๋ DB ํธ์ถ๊ณผ ๊ฐ์ ๋ฌด๊ฑฐ์ด ์์ ์ ๊ธ์ง - ์คํ method (
execute
) : Operator๊ฐ ์คํ๋ ๋ ํธ์ถ๋๋ ์ฝ๋. Airflow context๋ฅผ ํ๋ผ๋ฏธํฐ๋ก ๋ฐ์ ์ค์ ๊ฐ์ ์ฝ์ ์ ์์
์
__init__
์์ ๋ฌด๊ฑฐ์ด ์์ ์ด ๊ธ์ง?
- Scheduler๊ฐ DAG ํ์ผ ์ฃผ๊ธฐ์ ์ผ๋ก parsingํ๋ฉฐ ๊ฐ Task์ ํด๋นํ๋ Operator Instance ๊ณ์ ์์ฑ โ
__init__
ํธ์ถ
- ๋ฐ๋ผ์ ์์ฑ์์์ DB์ฐ๊ฒฐ, API ํธ์ถ, ๋์ฉ๋ ํ์ผ ์ฝ๊ธฐ ๋ฑ์ ์ํํ๋ฉด ๋นํจ์จ์
- ๊ฐ๋ณ๊ฒ ํ๋ผ๋ฏธํฐ ์ ์ฅ๋ง ํ๋๊ฒ ํจ์จ์
execute
๋ ์ค์ ์คํ๋ ๋ ํ ๋ฒ ํธ์ถ๋๊ธฐ ๋๋ฌธ์ ์ธ๋ถ ๋ฆฌ์์ค ์ ๊ทผ์execute
์์ ํด์ผํจ
- ๋ฐ๋ผ์ Hook, Client ์์ฑ, ์ธ์ฆ ์กฐํ, ์ฟผ๋ฆฌ/์์ฒญ ์ ์ก ๋ฑ์ execute์์ ์์ฑ ๋ฐ ํธ์ถ
Custom Operator๋ฅผ ์ ์์ ์ผ๋ก importํด์ ์ฌ์ฉํ๊ธฐ ์ํด์ Airflow์ PYTHONPATH์ ํฌํจ๋ ํด๋(dags/
, config/
, plugins/
)์ ๋์ด์ผ ํ๋ค
์ผ๋ฐ์ ์ผ๋ก plugins/
ํด๋์ ๋ง์ด ์์ฑํ๊ณ ์๋ ์์๋ plugins/
์ ์ถ๊ฐํ๋ฉด ๋๋ค
# plugins/hello_operator.py
from airflow.sdk import BaseOperator # Airflow 3 ๊ณต๊ฐ SDK
class HelloOperator(BaseOperator):
def __init__(self, name: str, **kwargs):
super().__init__(**kwargs) # task_id ๋ฑ ๊ณตํต ์ธ์ ์ฒ๋ฆฌ
self.name = name
def execute(self, context):
message = f"Hello, {self.name}!"
print(message)
return message
# DAG์์ ์ฌ์ฉ
from datetime import datetime
from airflow import DAG
from hello_operator import HelloOperator
with DAG(dag_id="custom_op_example", start_date=datetime(2023, 1, 1), schedule=None, catchup=False) as dag:
hello = HelloOperator(task_id="hello_task", name="Airflow 3")
4. Sensor์ Deferrable Operator
Sensor
Airflow์์ Sensor๋ ํน์ ์กฐ๊ฑด์ด ๋ง์กฑ๋ ๋๊น์ง ๋๊ธฐ(polling or trigger) ํ๋ ์ญํ ์ ๋ด๋นํ๋ ํน์ํ Operator๋ก, ์กฐ๊ฑด์ด ์ถฉ์กฑ๋๋ฉด ๋ค์ Task ์คํ์ด ์ด์ด์ง๋๋ก ํ๋ค
๋ํ์ ์ธ Sensor๋ ๋ค์๊ณผ ๊ฐ๋ค
- BashSensor: ์ง์ ํ Bash ๋ช ๋ น์ด ๋๋ ์คํฌ๋ฆฝํธ๊ฐ ์ฑ๊ณต (๋ฆฌํด ์ฝ๋ 0)ํ ๋๊น์ง ๋๊ธฐ
- TimeDeltaSensor: ์ค์ ํ ์๊ฐ (delta)์ด ๊ฒฝ๊ณผํ ๋๊น์ง ๋๊ธฐ
- TimeSensor: ์ง์ ํ ์๊ฐ (target_time)์ด ๋ ๋๊น์ง ๋๊ธฐ
- DayOfWeekSensor: ์ง์ ํ ์์ผ์ด ๋ ๋๊น์ง ๋๊ธฐ
- FileSensor: ํน์ ๊ฒฝ๋ก์ ํ์ผ์ด ์กด์ฌํ ๋๊น์ง ๋๊ธฐ
- PythonSensor: ์ง์ ํ ํ์ด์ฌ ํจ์๊ฐ True๋ฅผ ๋ฐํํ ๋๊น์ง ์ฃผ๊ธฐ์ ์ผ๋ก ์คํํ๋ฉฐ ๋๊ธฐ
- ExternalTaskSensor: ย ๋ค๋ฅธ DAG (๋๋ ๊ฐ์ DAG์ ๋ค๋ฅธ ์คํ ์์ )์ ํน์ ์์ ์ด ์๋ฃ๋ ๋๊น์ง ํ์ฌ ์์ ์ ๋๊ธฐ
Sensor์๋ ๋๊ธฐ ๋ฐฉ์์ ๋ฐ๋ผ ๋ ๊ฐ์ง ๋ชจ๋๊ฐ ์กด์ฌํ๋ค
- poke (๊ธฐ๋ณธ): ์กฐ๊ฑด์ ์ฃผ๊ธฐ์ ์ผ๋ก ํ์ธํ๋ฉด์ Worker ์ฌ๋กฏ์ ๊ณ์ ์ ์
- reschedule: ์กฐ๊ฑด ํ์ธ ํ ์ผ์ ์๊ฐ ๋๊ธฐํ๊ณ Worker ์ฌ๋กฏ์ ๋ฐ๋ฉ, ๋ค์์ ์ฌ์ค์ผ์ค
๋ณดํต ์งง์ ๋๊ธฐ์๋ poke
, ๊ธด ๋๊ธฐ์๋ ๋ฆฌ์์ค ํจ์จ์ ์ด๋ฏ๋ก reschedule
์ ์ฌ์ฉํ๋ ๊ฒ์ด ๋ฐ๋์งํ๋ค
๋ค์์ ํน์ ๊ฒฝ๋ก์ โsensor_test.txtโ๋ผ๋ ํ์ผ์ด ์์ฑ๋๋ ๊ฒฝ์ฐ success๊ฐ ๋๋ ์์์ด๋ค
import pendulum
from airflow import DAG
from airflow.providers.standard.sensors.filesystem import FileSensor
with DAG(
dag_id="ex_file_sensor",
start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
schedule="@daily",
catchup=False,
) as dag:
wait_file = FileSensor(
task_id="wait_for_file",
filepath="/opt/airflow/plugins/sensor_test.txt",
mode="poke",
poke_interval=30, # 30์ด ๊ฐ๊ฒฉ์ผ๋ก ์ฒดํฌ
timeout=60 * 60 * 2, # 2์๊ฐ ๋๊ธฐ ํ์์์
)
์ด DAG๋ฅผ ์คํํ๊ธฐ ์ ์ local filesystem ์ ๊ทผ์ ์ํ connection์ ๋ง๋ค์ด์ผ ํ๋ค
- connection ์ด๋ฆ : โfs_defaultโ
- connection type : File (path)
- Path : โ/โ
fs_default๋ผ๋ ์ด๋ฆ์ FileSensor
๊ฐ ๊ธฐ๋ณธ๊ฐ์ผ๋ก ์ธ์ํ๋ conn_id์ด๊ณ Path๋ฅผ โ/โ๋ก ํด์ filepath์๋ ์ ๋๊ฒฝ๋ก๋ฅผ ์
๋ ฅํด์ฃผ๋ฉด ๋๋ค
์ด DAG๋ฅผ ์คํํ๋ฉด ๋ง์ดํธ๋ /opt/airflow/plugins
ํด๋์ sensor_test.txt
๋ผ๋ ์ด๋ฆ์ ํ์ผ์ด ์๊ธธ ๋๊น์ง ๊ณ์ running ์ํ๋ก ๋จ์์๊ณ ํด๋น ํ์ผ์ด ์ถ๊ฐ๋๋ฉด success ์ํ๊ฐ ๋๋ค
Deferrable Operator
Deferrable Operator๋ Airflow์ Operator ๋๋ Sensor๊ฐ ์ธ๋ถ ์กฐ๊ฑด์ ๊ธฐ๋ค๋ฆด ๋ ๋ฆฌ์์ค ํจ์จ์ ๊ทน๋ํํ๊ธฐ ์ํด ๊ณ ์๋ ํน๋ณํ ์ ํ์ผ๋ก ๋์ ์๋ฆฌ๋ ๋ค์๊ณผ ๊ฐ๋ค
- Deferrable Operator๊ฐ ํน์ ์กฐ๊ฑด์ ๊ธฐ๋ค๋ ค์ผ ํ ๋, trigger ๊ฐ์ฒด๋ฅผ ํตํด ์คํ์ ์ค๋จ
- Trigger๋ Airflow์ triggerer ์ปดํฌ๋ํธ์์ ๋น๋๊ธฐ๋ก ๋์ํ๋ฉฐ, ์กฐ๊ฑด์ด ์ถฉ์กฑ๋ ๋๊น์ง ๊ฐ์
- ์กฐ๊ฑด์ด ์ถฉ์กฑ๋๋ฉด ํธ๋ฆฌ๊ฑฐ๊ฐ ์ ํธ๋ฅผ ๋ณด๋ด Operator๊ฐ ๋ค์ ์คํ์ ์ฌ๊ฐ
๊ธฐ์กด Operator์ Sensor (poke ๋ชจ๋)๋ ์์ ์ด ๋๊ธฐ ์ํ์ฌ๋ worker slot์ ๊ณ์ ์ ์ ํ์ง๋ง, Deferrable Operator๋ ๋๊ธฐ ์ค์ worker slot์ ํด์ ํ๊ณ trigger๊ฐ ๋๊ธฐ ๋ก์ง์ ๋์ ์ฒ๋ฆฌํ๊ฒ ๋๋ค
Deferrable Operator์ ์ฅ์ ์ ๋ค์๊ณผ ๊ฐ๋ค
- ๋ฆฌ์์ค ์ ์ฝ: ๊ธด ๋๊ธฐ ์๊ฐ์ด ํ์ํ ์์ ์์๋ worker ์์์ ์ ์ ํ์ง ์์ ย
- ํ์ฅ์ฑ ๊ฐ์ : ์๋ฐฑ~์์ฒ ๊ฐ์ Sensor๊ฐ ๋์์ ๋๊ธฐํ๋๋ผ๋ Triggerer ํ๋ก์ธ์ค๊ฐ ๊ด๋ฆฌํ๋ฏ๋ก ํด๋ฌ์คํฐ ํจ์จ์ด ๋์ ย
- ์ด์ ์์ ์ฑ: ์ฅ๊ธฐ ๋๊ธฐ๋ก ์ธํด worker๊ฐ ๋ฌถ์ด์ง ์์ผ๋ฏ๋ก SLA ์ง์ฐ์ด๋ ํ ์ ์ฒด๋ฅผ ๋ฐฉ์ง
๋ณดํต ๊ธฐ์กด sensor, operator ์ด๋ฆ ๋ค์ Async
๊ฐ ๋ถ์ด์๊ฑฐ๋ ํน์ deferrable=True
๋ผ๋ ํ๋ผ๋ฏธํฐ๋ฅผ ์ค์ ํด์ฃผ๋ฉด deferrable ๋ชจ๋๋ก ๋์ํ๊ฒ ๋๋ค
Deferrable Operator ์ฌ์ฉ ์ ๋ช ๊ฐ์ง ์ฃผ์ํด์ผํ ์ ์ด ์๋ค
- Triggerer ํ๋ก์ธ์ค ์คํ ํ์:
airflow triggerer
ํ๋ก์ธ์ค๊ฐ ๋ฐ๋์ ๊ตฌ๋ ์ค์ด์ด์ผ ํจ ย - ๋ฆฌ์์ค ํ๊ณ ๊ณ ๋ ค: Triggerer๊ฐ ๋ชจ๋ deferrable ํ์คํฌ๋ฅผ ๊ฐ์ํ๋ฏ๋ก, ๋๋ฌด ๋ง์ deferrable ํ์คํฌ๊ฐ ํ๊บผ๋ฒ์ ๋ชฐ๋ฆฌ๋ฉด triggerer ํ๋ ํ์ ย
- ํธํ์ฑ: ๋ชจ๋ Operator๊ฐ deferrable ๋ฒ์ ์ ์ ๊ณตํ๋ ๊ฒ์ ์๋๋ฉฐ, ์ผ๋ถ๋ ๋ณ๋์ provider ํจํค์ง์ ํฌํจ๋จ
๋ง์ง๋ง์ผ๋ก Sensor์์ reschedule ๋ชจ๋์์ ์ฐจ์ด๋ฅผ ์ ๋ฆฌํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค
๊ตฌ๋ถ | reschedule ๋ชจ๋ | Deferrable Operator |
---|---|---|
Worker ์ ์ | ์กฐ๊ฑด ํ์ธ ํ worker ์ฌ๋กฏ์ ๋ฐ๋ฉํ์ง๋ง, ๋ค์ ์กํ์ ์คํ๋จ | worker ์ฌ๋กฏ ์์ ํด์ , Triggerer๊ฐ ๊ฐ์ |
๋๊ธฐ ์ฒ๋ฆฌ ์ฃผ์ฒด | Scheduler๊ฐ ์ฃผ๊ธฐ์ ์ผ๋ก ํ์คํฌ๋ฅผ rescheduling | Triggerer ํ๋ก์ธ์ค๊ฐ ์ด๋ฒคํธ/์กฐ๊ฑด์ ๋น๋๊ธฐ ๊ฐ์ |
๋ฆฌ์์ค ํจ์จ์ฑ | worker ์ ์ ๋ ์ค์ง๋ง ์ฃผ๊ธฐ์ rescheduling์ผ๋ก metadata DBยท์ค์ผ์ค๋ฌ ๋ถํ ์กด์ฌ | ์ฅ๊ธฐ ๋๊ธฐ์ ์ต์ ํ, metadata DB ์ ๊ทผ ์ต์ํ, ๋๊ท๋ชจ sensor์๋ ์์ ์ |
์ง์ ์ฌ๋ถ | ๋๋ถ๋ถ์ Sensor์์ mode="reschedule" ์ต์
๋ง ์ค์ ํ๋ฉด ์ฌ์ฉ ๊ฐ๋ฅ | ์ ์ฉ Async/Deferrable Operator ํ์ |
์ ํฉํ ์ํฉ | ์๋ถ~์์ญ ๋ถ ๋๊ธฐ, Sensor ์๊ฐ ๋ง์ง ์์ ๋ | ์์๊ฐ~์์ผ ์ด์ ์ฅ๊ธฐ ๋๊ธฐ, ์๋ฐฑ ๊ฐ ์ด์์ Sensor ๋ณ๋ ฌ ์คํ ์ |
5. Hook
Airflow์์ Hook์ ์ธ๋ถ ์์คํ
(DB, API, ํด๋ผ์ฐ๋ ์๋น์ค ๋ฑ)๊ณผ ์ฐ๊ฒฐํ๊ธฐ ์ํ ๊ณ ์์ค ์ธํฐํ์ด์ค๋ค. ย
BaseHook
ํด๋์ค๋ฅผ ์์ํ์ฌ ๊ตฌํํ๋ฉฐ Airflow Connections ์ ๋ณด๋ฅผ ์ฌ์ฉํด ์ธ์ฆ, ์๋ํฌ์ธํธ, ํฌํธ ๋ฑ์ ์๋์ผ๋ก ๊ฐ์ ธ์ฌ ์ ์๋ค
Hook์ด ํ์ํ ์ด์ ๋ ๋ค์๊ณผ ๊ฐ๋ค
- ์ผ๊ด๋ ์ธํฐํ์ด์ค ์ ๊ณต
- ๋ค์ํ DB/API๋ฅผ ๋์ผํ ๋ฐฉ์์ผ๋ก ์ ๊ทผ โ ์ฝ๋ ๊ฐ๋ ์ฑ, ์ ์ง๋ณด์์ฑ โ
- Connection ๊ด๋ฆฌ ํตํฉ
- Airflow UI์์ Connection์ ์ค์ ํ๋ฉด Hook์ด ์ด๋ฅผ ์๋์ผ๋ก ๋ถ๋ฌ์ด ย
- ๋น๋ฐ๋ฒํธ, ํ ํฐ ๊ฐ์ ๋ฏผ๊ฐ ์ ๋ณด๋ ์ฝ๋์ ๋ ธ์ถ๋์ง ์์
- ๋ชจ๋ํยท์ฌ์ฌ์ฉ์ฑ ย
- ์ฌ๋ฌ DAG์์ ๋์ผ Hook์ ๋ถ๋ฌ์ ํ์ฉ ๊ฐ๋ฅ ย
- ์ปค์คํ Hook ์์ฑ ์, ํ ์ฐจ์์ ๊ณตํต ์ ํธ์ฒ๋ผ ์ฌ์ฉ ๊ฐ๋ฅ
- ๋ณต์กํ ์์
์ฒ๋ฆฌ ย
- Operator ์์ค์์ ์ ๊ณตํ์ง ์๋ ๊ธฐ๋ฅ์ Hook์ผ๋ก ์ง์ ๊ตฌํ ๊ฐ๋ฅ ย
- ํ๋์ Task์์ ๋ ๊ฐ ์ด์์ Hook์ ๋ณํํด ์ฌ์ฉํ ์๋ ์์
Operator์ ๋น๊ตํ๋ฉด ๋ค์๊ณผ ๊ฐ๋ค
- Operator
- Airflow์์ ์คํ ๋จ์(Task)๋ฅผ ์ ์ ย
- ๋ด๋ถ์ ์ผ๋ก Hook์ ํธ์ถํ์ฌ ์ธ๋ถ ์์คํ ๊ณผ ์ํธ์์ฉ ย
- Hook ย
- ์ธ๋ถ ์ฐ๊ฒฐ์ ์ถ์ํ ย
- Operator์์ ์ฌ์ฉ๋๋ ํ์ ๊ตฌ์ฑ ์์ ย
- ์ง์ ์คํ๋์ง ์์ (Task๊ฐ ์๋)
Operator๋ โ๋ฌด์์ ์คํํ ์งโ, Hook์ โ์ด๋์ ์ฐ๊ฒฐํ ์งโ๋ฅผ ๋ด๋นํ๋ค๊ณ ๋ณผ ์ ์๋ค.
Connection
๋ณดํต Hook๋ค๋ฅธ ์๋น์ค์ ์ฐ๊ฒฐ์ ํ ๋ ์ฌ์ฉํ๊ธฐ ๋๋ฌธ์ ์ฐ๊ฒฐ ์ ๋ณด ๊ฐ ํ์ํ๋ฐ ์ด๋ฅผ Airflow Connection์ ๋ฑ๋กํด์ ์ฌ์ฉํ ์ ์๋ค
์๋์ ์์์ฒ๋ผ Airflow UI์์ ์ง์ ๋ฑ๋กํ๊ฑฐ๋ ์์ ํ๊ฒฝ๋ณ์๋ก docker compose์ ์ฃผ์ ํ ์ ์๋ค
์์: PostgresHook
Airflow UI์์ Admin>Connection>Add Connection์ ๋ค์ด๊ฐ์ ์๋์ ๊ฐ์ด โpostgres_defaultโ๋ผ๋ connection ์ด๋ฆ์ผ๋ก Airflow metaDB๋ก ์ฌ์ฉ ์ค์ธ postgres์ ์ฐ๊ฒฐํ ์ ์๋ค
๊ทธ ํ ์๋ DAG๋ฅผ ์คํํ๋ฉด โpostgres_defaultโ connection์ ์ด์ฉํด PostgresHook
์ด Postgres์ ์ ์ํ๊ฒ ๋๊ณ โSELECT now()::timestamp, current_database()โ ๋ผ๋ ์ฟผ๋ฆฌ๋ฅผ ์คํํ ์ ์๋ค
import pendulum
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
def fetch_rows(**_):
ย ย hook = PostgresHook(postgres_conn_id="postgres_default")
ย ย rows = hook.get_records("SELECT now()::timestamp, current_database()")
ย ย print(rows)
with DAG(
ย ย dag_id="ex_hook_postgres",
ย ย start_date=pendulum.datetime(2025, 1, 1, tz="Asia/Seoul"),
ย ย schedule="@daily",
ย ย catchup=False,
ย ย tags=["hook", "postgres"],
) as dag:
ย ย read_pg = PythonOperator(
ย ย ย ย task_id="read_postgres",
ย ย ย ย python_callable=fetch_rows,
ย ย )
Hook๋ Operator์ ๋ง์ฐฌ๊ฐ์ง๋ก BaseHook
์ ์์๋ฐ์ Custom์ผ๋ก ์ง์ ์์ฑํ ์ ์๋ค
6. Provider์ Plugin
Provider
Provider๋ ํน์ ์๋น์ค๋ ํ๋ซํผ๊ณผ์ ํตํฉ์ ํจํค์ง ๋จ์๋ก ์ ๊ณตํ๋ ํ์ฅ ๋ชจ๋ ์งํฉ์ผ๋ก Operator, Hook, Sensor, Transfer, ์ฐ๊ฒฐ ํ์ , ๋ก๊ทธ ํธ๋ค๋ฌ, ์ํฌ๋ฆฟ ๋ฐฑ์๋, ์๋ฆผ ๋ฑ ๋ค์ํ ํ์ฅ์ ํฌํจํ ์ ์๋ค
์์ฃผ ๋ค์ํ Provider ํจํค์ง๊ฐ ์๊ธฐ ๋๋ฌธ์ ๊ฐ๋จํ๊ฒ airflow ํ๊ฒฝ์ installํด์ ์์ ๋กญ๊ฒ ์ถ๊ฐ๊ธฐ๋ฅ์ ํ์ฉํ ์ ์๋ค
์๋ฅผ ๋ค์ด airflow๋ฅผ ํตํด spark ๊ด๋ จ ์์
์ ํ๊ณ ์ถ์ ๋ apache-airflow-providers-apache-spark
๋ฅผ ์ฌ์ฉํ๋ฉด ๋๋ค
์ค์น ๋ฐฉ๋ฒ์ ๋ค์๊ณผ ๊ฐ๋ค
- airflow dockerfile์ ํด๋น pythonํจํค์ง๋ฅผ ์ถ๊ฐํด์ ๋ค์ ๋น๋ํด์ ์ฌ์ฉ (๊ถ์ฅ)
.env
์_PIP_ADDITIONAL_REQUIREMENTS='apache-airflow-providders-apache-spark'
๋ฅผ ์ถ๊ฐํ๊ณ docker compose ์คํ (์์ ๊ฐ๋ฐ์ฉ)
ํด๋น provider docs๋ฅผ ๋ณด๋ฉด Connection types, Decorators, Operators ๋ฑ ์ด ํจํค์ง์์ ์ ๊ณตํ๋ ๊ธฐ๋ฅ๋ค์ ๋ชจ์์ ๋ณผ ์ ์๋ค
๋ค์ docs๋ฅผ ์ฐธ๊ณ ํด custom provider๋ฅผ ์ง์ ๋ง๋ค ์ ์๋ค
Plugin
Plugin์ Airflow์ ๋์๊ณผ UI๋ฅผ ํ์ฅ/๋ณ๊ฒฝํ๊ธฐ ์ํ ๋ฉ์ปค๋์ฆ์ด๋ค
์์ custom operator๋ฅผ ๋ง๋ค ๋ ๋ดค๋ฏ์ด plugins/
์ง์ ๊ธฐ๋ฅ์ ์ถ๊ฐํ ์ ์๋ ๊ฒ์ด๋ผ ๋ณผ ์ ์๋๋ฐ
๋จ์ํ operator, hook ๋ฑ์ ์ถ๊ฐํ๋ ๊ฒ์ ๋์ด์ UI, Flask ๋ทฐ, Admin ํ๋ฉด ๋ฑ Airflow์ ์์ฒด ๊ธฐ๋ฅ์ ๋ํ custom๋ ๊ฐ๋ฅํ๋ค
์ฆ, Provider๊ฐ ํน์ ์๋น์ค์์ ์ฐ๋์ ์ํ ๊ณต์ ๋ฐฐํฌ ๋ชจ๋์ด๋ผ๋ฉด Plugin์ UI, Flask ํ์ฅ๊น์ง ๊ฐ๋ฅํ ์ฌ์ฉ์ ์ ์ ๋ชจ๋๋ก Provider๋ฅผ ํฌ๊ดํ๋ ๊ฐ๋ ์ด๋ค
๋น์ฐํ ์ง์ custom plugin์ ๊ตฌํํ ์ ์์ผ๋ฉฐ, airflow-executor์ฒ๋ผ ํจํค์งํ ๋ plugin๋ ์๋ค
DAG/ํ์คํฌ ์ํ๋ฅผ Prometheus๋ก ๋
ธ์ถํ๋ ํ๋ฌ๊ทธ์ธ์ผ๋ก, ํด๋น ํจํค์ง๋ฅผ ์ค์นํ๋ฉด http://localhost:8080/admin/metrics
๊ฒฝ๋ก๊ฐ ์ถ๊ฐ๋์ด Prometheus์์ ์ด๋ฅผ ์ธ์ํด์ ๋ชจ๋ํฐ๋ง์ ํธํ๊ฒ ํ ์ ์๋ค
7. ์ฃผ์ ๊ฐ๋ ๋น๊ต
๊ฐ๋ | ๋ชฉ์ /์ญํ | ํ์ฅ์ฑ | ์ฌ์ฉ ๋ฐฉ๋ฒ |
---|---|---|---|
Task | DAG ๋ด ์คํ ๋จ์๋ก์ Operator ์ธ์คํด์ค ๋๋ @task ํจ์๊ฐ ์์ฑํ๋ ๋
ธ๋ | ์์ฒด ํ์ฅ ๊ฐ๋ ์ ์์, ํ ํ๋ฆฟ(Operator) ๋ค์ํ๋ก ํํ ํ์ฅ | DAG์์ Operator ์ธ์คํด์ค ์์ฑ ๋๋ @task ์ฌ์ฉ, ์์กด์ฑ ์ฐ๊ฒฐ |
Operator | ์ฌ์ฌ์ฉ ๊ฐ๋ฅํ ์์
ํ
ํ๋ฆฟ์ผ๋ก execute() ์ ์ํ ๋ก์ง ์ ์ | BaseOperator ์์์ผ๋ก ์ปค์คํ
๊ตฌํ ๊ฐ๋ฅ | DAG์์ ํด๋์ค๋ฅผ ์ธ์คํด์คํํ์ฌ Task ์์ฑ |
Sensor | ์ธ๋ถ ์กฐ๊ฑด(ํ์ผ ์กด์ฌ, API ์๋ต, ํน์ ์๊ฐ ๋ฑ)์ ๋ง์กฑํ ๋๊น์ง ๋๊ธฐํ๋ ํน์ Operator | BaseSensorOperator ์์, Deferrable ๋ฒ์ ๊ตฌํ ๊ฐ๋ฅ | mode="poke" ๋๋ mode="reschedule" ๋ก ์ฌ์ฉ, ์ผ๋ถ๋ Async/Deferrable Sensor๋ก ์ฌ์ฉ |
Hook | ์ธ๋ถ ์์คํ ์ธํฐํ์ด์ค, Connection๊ณผ ์ฐ๊ณ๋ ๊ณ ์์ค API ์ ๊ณต | BaseHook ์์์ผ๋ก ์ปค์คํ
Hook ๊ตฌํ ๊ฐ๋ฅ | ๋ณดํต Operator ๋ด๋ถ์์ ์์ฑยทํธ์ถ, ํ์ ์ PythonOperator์์ ์ง์ ์ฌ์ฉ |
Provider | ํน์ ์๋น์ค ์ฐ๋์ ํจํค์ง ๋จ์๋ก ์ ๊ณต, ์ฝ์ด ์ธ ํ์ฅ ๋ฌถ์ | ์ฝ์ด์ ๋ ๋ฆฝ ๋ฒ์ , ์ ๊ท/์ฌ๋ด Provider ์ ์ยท๋ฐฐํฌ ๊ฐ๋ฅ | pip install apache-airflow-providers-... ์ค์น ํ ๋ชจ๋์์ ํด๋์ค ์ํฌํธ |
Plugin | Airflow ๋์/UI ํ์ฅ์ ์ํ ํ๋ฌ๊ทธ์ธ ์์คํ | Flask/FastAPI ๋ทฐ, ๋ฆฌ์ค๋, ๋งคํฌ๋ก ๋ฑ ์ ์ญ ํ์ฅ ๊ตฌ์ฑ ๊ฐ๋ฅ | plugins/ ํด๋ ๋๋ ํจํค์ง ์ํธ๋ฆฌํฌ์ธํธ๋ก ๋ฑ๋ก ํ ์ฌ์์ |