--- id: data-eng-airflow-dagster title: Airflow / Dagster — Data Pipeline / DAG category: Coding status: draft source_trust_level: B verification_status: conceptual created_at: 2026-05-09 updated_at: 2026-05-09 tags: [data-engineering, airflow, dagster, etl, vibe-coding] tech_stack: { language: "Python", applicable_to: ["Data Engineering"] } applied_in: [] aliases: [Airflow, Dagster, Prefect, DAG, ETL, asset-based, software-defined assets] --- # Airflow / Dagster > Data pipeline orchestrator. **Airflow = task-centric, 옛 표준. Dagster = asset-centric, modern**. Prefect = Python-native flow. ETL / ML training / 정기 작업. ## 📖 핵심 개념 - DAG: Directed Acyclic Graph — task 흐름. - Task / Op: 한 단계. - Asset (Dagster): "이 table 이 결과" — 추적. - Sensor / Trigger: event-based 시작. ## 💻 코드 패턴 ### Airflow DAG ```python from airflow.decorators import dag, task from datetime import datetime @dag( schedule='0 2 * * *', start_date=datetime(2026, 1, 1), catchup=False, tags=['daily'], ) def daily_report(): @task def extract(): return load_from_postgres('SELECT * FROM orders WHERE date = today()') @task def transform(orders): return aggregate(orders) @task def load(report): upload_to_s3(report) load(transform(extract())) dag = daily_report() ``` ### Airflow Operator (legacy 스타일) ```python from airflow import DAG from airflow.operators.bash import BashOperator from airflow.providers.postgres.operators.postgres import PostgresOperator dag = DAG('etl', schedule='@daily', ...) extract = PostgresOperator( task_id='extract', postgres_conn_id='source_db', sql='SELECT * FROM raw', dag=dag, ) transform = BashOperator( task_id='transform', bash_command='python /scripts/transform.py', dag=dag, ) extract >> transform ``` ### Dagster (modern, asset-based) ```python from dagster import asset, AssetExecutionContext, Definitions @asset def raw_orders(context: AssetExecutionContext) -> pd.DataFrame: return pd.read_sql('SELECT * FROM orders', engine) @asset def daily_aggregates(raw_orders: pd.DataFrame) -> pd.DataFrame: return raw_orders.groupby('date').agg({'amount': 'sum'}) @asset def report(daily_aggregates: pd.DataFrame) -> None: upload_to_s3(daily_aggregates, 'reports/daily.csv') defs = Definitions(assets=[raw_orders, daily_aggregates, report]) ``` → Asset = data 가 진짜 source. Lineage 자동. ### Schedule ```python from dagster import ScheduleDefinition, define_asset_job daily_job = define_asset_job('daily', selection=['raw_orders', 'daily_aggregates', 'report']) daily_schedule = ScheduleDefinition( job=daily_job, cron_schedule='0 2 * * *', ) defs = Definitions(assets=[...], schedules=[daily_schedule]) ``` ### Sensor (event-based) ```python from dagster import sensor, RunRequest @sensor(job=daily_job, minimum_interval_seconds=60) def s3_sensor(context): files = list_s3_files('incoming/') if files: return RunRequest(run_key=files[0], run_config={'ops': {'load': {'config': {'file': files[0]}}}}) ``` ### IO Manager (where data goes) ```python from dagster import IOManager, io_manager import boto3 class S3IOManager(IOManager): def handle_output(self, context, obj): boto3.client('s3').put_object(...) def load_input(self, context): return boto3.client('s3').get_object(...) @io_manager def s3_io_manager(): return S3IOManager() @asset(io_manager_key='s3_io') def my_asset(): return ... defs = Definitions( assets=[my_asset], resources={'s3_io': s3_io_manager}, ) ``` → Asset 의 storage 분리 — 같은 코드 dev/prod 다른 storage. ### Partition (시계열) ```python from dagster import DailyPartitionsDefinition, asset daily = DailyPartitionsDefinition(start_date='2026-01-01') @asset(partitions_def=daily) def orders_by_day(context): date = context.partition_key # '2026-05-09' return pd.read_sql(f"SELECT * FROM orders WHERE date = '{date}'", engine) ``` → Day 단위 backfill / re-run. ### Backfill ```bash dagster job backfill -j daily_job --partition-set daily # Airflow airflow dags backfill --start-date 2026-04-01 --end-date 2026-05-01 daily_report ``` ### Test (Dagster — pytest 친화) ```python def test_daily_aggregates(): from my_assets import daily_aggregates raw = pd.DataFrame({'date': ['2026-05-09'], 'amount': [100]}) result = daily_aggregates(raw) assert result.iloc[0]['amount'] == 100 ``` ### Resource (DI, env 별 다름) ```python from dagster import resource, asset @resource def postgres_engine(init_context): return create_engine(init_context.resource_config['url']) @asset(required_resource_keys={'postgres'}) def my_asset(context): df = pd.read_sql('...', context.resources.postgres) return df # config (dev / prod) prod_resources = {'postgres': postgres_engine.configured({'url': 'postgresql://prod...'})} ``` ### Lineage / observability ``` Dagster UI: - Asset graph (data dependency) - Materialization history - Runtime / cost per asset - Failure rate ``` → "이 데이터가 어디서 왔지?" 자동 답. ### Prefect (Python-native, 단순) ```python from prefect import flow, task @task(retries=3, retry_delay_seconds=10) def extract(): return [1, 2, 3] @flow def my_flow(): data = extract() transform(data) my_flow.serve(name='daily', cron='0 2 * * *') ``` ### Trade-offs ``` Airflow: + 큰 ecosystem, 안정 + 모든 cloud / db 의 operator - Task-centric (data 관점 X) - 옛 design (Python 2.x heritage) Dagster: + Asset-centric → lineage 자동 + Modern Python (typing, async) + Local dev 친화 - 작은 ecosystem - 학습 곡선 Prefect: + Python-native, simplest + Hybrid execution - Smaller community ``` ### Cost / scale ``` Airflow self-host: K8s + executor (CeleryKubernetesExecutor). Managed: MWAA (AWS), Cloud Composer (GCP), Astronomer. Dagster: Self-host or Dagster Cloud. ``` ## 🤔 의사결정 기준 | 상황 | 추천 | |---|---| | 새 프로젝트 | Dagster | | 기존 / 큰 | Airflow | | 단순 / 빠른 | Prefect | | 클라우드 매니지드 | MWAA / Composer / Dagster Cloud | | Streaming | Spark / Flink (별도) | | dbt + python | Dagster (best 통합) | ## ❌ 안티패턴 - **거대 single task**: 재시도 비싸. 작게 split. - **Idempotency 없음**: backfill 시 중복. - **State in memory**: worker 다름 = 잃음. - **DAG 안 큰 import**: schedule 시 매번 re-import. - **외부 호출 직접**: rate limit / failure. 별 service. - **Logging 없음**: 디버깅 어려움. - **Secret hardcode**: vault / connection store. ## 🤖 LLM 활용 힌트 - 새 = Dagster (asset-centric). - 옛 = Airflow. - Idempotent + partition + lineage. ## 🔗 관련 문서 - [[Data_Eng_dbt]] - [[Data_Eng_Lakehouse]] - [[Backend_Cron_Patterns]]