Files
2nd/10_Wiki/Topics/Coding/Data_Eng_Airflow_Dagster.md
T
2026-05-09 21:08:02 +09:00

6.8 KiB

id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
id title category status source_trust_level verification_status created_at updated_at tags tech_stack applied_in aliases
data-eng-airflow-dagster Airflow / Dagster — Data Pipeline / DAG Coding draft B conceptual 2026-05-09 2026-05-09
data-engineering
airflow
dagster
etl
vibe-coding
language applicable_to
Python
Data Engineering
Airflow
Dagster
Prefect
DAG
ETL
asset-based
software-defined assets

Airflow / Dagster

Data pipeline orchestrator. Airflow = task-centric, 옛 표준. Dagster = asset-centric, modern. Prefect = Python-native flow. ETL / ML training / 정기 작업.

📖 핵심 개념

  • DAG: Directed Acyclic Graph — task 흐름.
  • Task / Op: 한 단계.
  • Asset (Dagster): "이 table 이 결과" — 추적.
  • Sensor / Trigger: event-based 시작.

💻 코드 패턴

Airflow DAG

from airflow.decorators import dag, task
from datetime import datetime

@dag(
    schedule='0 2 * * *',
    start_date=datetime(2026, 1, 1),
    catchup=False,
    tags=['daily'],
)
def daily_report():
    @task
    def extract():
        return load_from_postgres('SELECT * FROM orders WHERE date = today()')
    
    @task
    def transform(orders):
        return aggregate(orders)
    
    @task
    def load(report):
        upload_to_s3(report)
    
    load(transform(extract()))

dag = daily_report()

Airflow Operator (legacy 스타일)

from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator

dag = DAG('etl', schedule='@daily', ...)

extract = PostgresOperator(
    task_id='extract',
    postgres_conn_id='source_db',
    sql='SELECT * FROM raw',
    dag=dag,
)

transform = BashOperator(
    task_id='transform',
    bash_command='python /scripts/transform.py',
    dag=dag,
)

extract >> transform

Dagster (modern, asset-based)

from dagster import asset, AssetExecutionContext, Definitions

@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
    return pd.read_sql('SELECT * FROM orders', engine)

@asset
def daily_aggregates(raw_orders: pd.DataFrame) -> pd.DataFrame:
    return raw_orders.groupby('date').agg({'amount': 'sum'})

@asset
def report(daily_aggregates: pd.DataFrame) -> None:
    upload_to_s3(daily_aggregates, 'reports/daily.csv')

defs = Definitions(assets=[raw_orders, daily_aggregates, report])

→ Asset = data 가 진짜 source. Lineage 자동.

Schedule

from dagster import ScheduleDefinition, define_asset_job

daily_job = define_asset_job('daily', selection=['raw_orders', 'daily_aggregates', 'report'])

daily_schedule = ScheduleDefinition(
    job=daily_job,
    cron_schedule='0 2 * * *',
)

defs = Definitions(assets=[...], schedules=[daily_schedule])

Sensor (event-based)

from dagster import sensor, RunRequest

@sensor(job=daily_job, minimum_interval_seconds=60)
def s3_sensor(context):
    files = list_s3_files('incoming/')
    if files:
        return RunRequest(run_key=files[0], run_config={'ops': {'load': {'config': {'file': files[0]}}}})

IO Manager (where data goes)

from dagster import IOManager, io_manager
import boto3

class S3IOManager(IOManager):
    def handle_output(self, context, obj):
        boto3.client('s3').put_object(...)
    def load_input(self, context):
        return boto3.client('s3').get_object(...)

@io_manager
def s3_io_manager():
    return S3IOManager()

@asset(io_manager_key='s3_io')
def my_asset(): return ...

defs = Definitions(
    assets=[my_asset],
    resources={'s3_io': s3_io_manager},
)

→ Asset 의 storage 분리 — 같은 코드 dev/prod 다른 storage.

Partition (시계열)

from dagster import DailyPartitionsDefinition, asset

daily = DailyPartitionsDefinition(start_date='2026-01-01')

@asset(partitions_def=daily)
def orders_by_day(context):
    date = context.partition_key  # '2026-05-09'
    return pd.read_sql(f"SELECT * FROM orders WHERE date = '{date}'", engine)

→ Day 단위 backfill / re-run.

Backfill

dagster job backfill -j daily_job --partition-set daily

# Airflow
airflow dags backfill --start-date 2026-04-01 --end-date 2026-05-01 daily_report

Test (Dagster — pytest 친화)

def test_daily_aggregates():
    from my_assets import daily_aggregates
    
    raw = pd.DataFrame({'date': ['2026-05-09'], 'amount': [100]})
    result = daily_aggregates(raw)
    assert result.iloc[0]['amount'] == 100

Resource (DI, env 별 다름)

from dagster import resource, asset

@resource
def postgres_engine(init_context):
    return create_engine(init_context.resource_config['url'])

@asset(required_resource_keys={'postgres'})
def my_asset(context):
    df = pd.read_sql('...', context.resources.postgres)
    return df

# config (dev / prod)
prod_resources = {'postgres': postgres_engine.configured({'url': 'postgresql://prod...'})}

Lineage / observability

Dagster UI:
- Asset graph (data dependency)
- Materialization history
- Runtime / cost per asset
- Failure rate

→ "이 데이터가 어디서 왔지?" 자동 답.

Prefect (Python-native, 단순)

from prefect import flow, task

@task(retries=3, retry_delay_seconds=10)
def extract():
    return [1, 2, 3]

@flow
def my_flow():
    data = extract()
    transform(data)

my_flow.serve(name='daily', cron='0 2 * * *')

Trade-offs

Airflow:
+ 큰 ecosystem, 안정
+ 모든 cloud / db 의 operator
- Task-centric (data 관점 X)
- 옛 design (Python 2.x heritage)

Dagster:
+ Asset-centric → lineage 자동
+ Modern Python (typing, async)
+ Local dev 친화
- 작은 ecosystem
- 학습 곡선

Prefect:
+ Python-native, simplest
+ Hybrid execution
- Smaller community

Cost / scale

Airflow self-host: K8s + executor (CeleryKubernetesExecutor).
Managed:           MWAA (AWS), Cloud Composer (GCP), Astronomer.

Dagster:
Self-host or Dagster Cloud.

🤔 의사결정 기준

상황 추천
새 프로젝트 Dagster
기존 / 큰 Airflow
단순 / 빠른 Prefect
클라우드 매니지드 MWAA / Composer / Dagster Cloud
Streaming Spark / Flink (별도)
dbt + python Dagster (best 통합)

안티패턴

  • 거대 single task: 재시도 비싸. 작게 split.
  • Idempotency 없음: backfill 시 중복.
  • State in memory: worker 다름 = 잃음.
  • DAG 안 큰 import: schedule 시 매번 re-import.
  • 외부 호출 직접: rate limit / failure. 별 service.
  • Logging 없음: 디버깅 어려움.
  • Secret hardcode: vault / connection store.

🤖 LLM 활용 힌트

  • 새 = Dagster (asset-centric).
  • 옛 = Airflow.
  • Idempotent + partition + lineage.

🔗 관련 문서