6.8 KiB
6.8 KiB
id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
| id | title | category | status | source_trust_level | verification_status | created_at | updated_at | tags | tech_stack | applied_in | aliases | |||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| data-eng-airflow-dagster | Airflow / Dagster — Data Pipeline / DAG | Coding | draft | B | conceptual | 2026-05-09 | 2026-05-09 |
|
|
|
Airflow / Dagster
Data pipeline orchestrator. Airflow = task-centric, 옛 표준. Dagster = asset-centric, modern. Prefect = Python-native flow. ETL / ML training / 정기 작업.
📖 핵심 개념
- DAG: Directed Acyclic Graph — task 흐름.
- Task / Op: 한 단계.
- Asset (Dagster): "이 table 이 결과" — 추적.
- Sensor / Trigger: event-based 시작.
💻 코드 패턴
Airflow DAG
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='0 2 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['daily'],
)
def daily_report():
@task
def extract():
return load_from_postgres('SELECT * FROM orders WHERE date = today()')
@task
def transform(orders):
return aggregate(orders)
@task
def load(report):
upload_to_s3(report)
load(transform(extract()))
dag = daily_report()
Airflow Operator (legacy 스타일)
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
dag = DAG('etl', schedule='@daily', ...)
extract = PostgresOperator(
task_id='extract',
postgres_conn_id='source_db',
sql='SELECT * FROM raw',
dag=dag,
)
transform = BashOperator(
task_id='transform',
bash_command='python /scripts/transform.py',
dag=dag,
)
extract >> transform
Dagster (modern, asset-based)
from dagster import asset, AssetExecutionContext, Definitions
@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
return pd.read_sql('SELECT * FROM orders', engine)
@asset
def daily_aggregates(raw_orders: pd.DataFrame) -> pd.DataFrame:
return raw_orders.groupby('date').agg({'amount': 'sum'})
@asset
def report(daily_aggregates: pd.DataFrame) -> None:
upload_to_s3(daily_aggregates, 'reports/daily.csv')
defs = Definitions(assets=[raw_orders, daily_aggregates, report])
→ Asset = data 가 진짜 source. Lineage 자동.
Schedule
from dagster import ScheduleDefinition, define_asset_job
daily_job = define_asset_job('daily', selection=['raw_orders', 'daily_aggregates', 'report'])
daily_schedule = ScheduleDefinition(
job=daily_job,
cron_schedule='0 2 * * *',
)
defs = Definitions(assets=[...], schedules=[daily_schedule])
Sensor (event-based)
from dagster import sensor, RunRequest
@sensor(job=daily_job, minimum_interval_seconds=60)
def s3_sensor(context):
files = list_s3_files('incoming/')
if files:
return RunRequest(run_key=files[0], run_config={'ops': {'load': {'config': {'file': files[0]}}}})
IO Manager (where data goes)
from dagster import IOManager, io_manager
import boto3
class S3IOManager(IOManager):
def handle_output(self, context, obj):
boto3.client('s3').put_object(...)
def load_input(self, context):
return boto3.client('s3').get_object(...)
@io_manager
def s3_io_manager():
return S3IOManager()
@asset(io_manager_key='s3_io')
def my_asset(): return ...
defs = Definitions(
assets=[my_asset],
resources={'s3_io': s3_io_manager},
)
→ Asset 의 storage 분리 — 같은 코드 dev/prod 다른 storage.
Partition (시계열)
from dagster import DailyPartitionsDefinition, asset
daily = DailyPartitionsDefinition(start_date='2026-01-01')
@asset(partitions_def=daily)
def orders_by_day(context):
date = context.partition_key # '2026-05-09'
return pd.read_sql(f"SELECT * FROM orders WHERE date = '{date}'", engine)
→ Day 단위 backfill / re-run.
Backfill
dagster job backfill -j daily_job --partition-set daily
# Airflow
airflow dags backfill --start-date 2026-04-01 --end-date 2026-05-01 daily_report
Test (Dagster — pytest 친화)
def test_daily_aggregates():
from my_assets import daily_aggregates
raw = pd.DataFrame({'date': ['2026-05-09'], 'amount': [100]})
result = daily_aggregates(raw)
assert result.iloc[0]['amount'] == 100
Resource (DI, env 별 다름)
from dagster import resource, asset
@resource
def postgres_engine(init_context):
return create_engine(init_context.resource_config['url'])
@asset(required_resource_keys={'postgres'})
def my_asset(context):
df = pd.read_sql('...', context.resources.postgres)
return df
# config (dev / prod)
prod_resources = {'postgres': postgres_engine.configured({'url': 'postgresql://prod...'})}
Lineage / observability
Dagster UI:
- Asset graph (data dependency)
- Materialization history
- Runtime / cost per asset
- Failure rate
→ "이 데이터가 어디서 왔지?" 자동 답.
Prefect (Python-native, 단순)
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10)
def extract():
return [1, 2, 3]
@flow
def my_flow():
data = extract()
transform(data)
my_flow.serve(name='daily', cron='0 2 * * *')
Trade-offs
Airflow:
+ 큰 ecosystem, 안정
+ 모든 cloud / db 의 operator
- Task-centric (data 관점 X)
- 옛 design (Python 2.x heritage)
Dagster:
+ Asset-centric → lineage 자동
+ Modern Python (typing, async)
+ Local dev 친화
- 작은 ecosystem
- 학습 곡선
Prefect:
+ Python-native, simplest
+ Hybrid execution
- Smaller community
Cost / scale
Airflow self-host: K8s + executor (CeleryKubernetesExecutor).
Managed: MWAA (AWS), Cloud Composer (GCP), Astronomer.
Dagster:
Self-host or Dagster Cloud.
🤔 의사결정 기준
| 상황 | 추천 |
|---|---|
| 새 프로젝트 | Dagster |
| 기존 / 큰 | Airflow |
| 단순 / 빠른 | Prefect |
| 클라우드 매니지드 | MWAA / Composer / Dagster Cloud |
| Streaming | Spark / Flink (별도) |
| dbt + python | Dagster (best 통합) |
❌ 안티패턴
- 거대 single task: 재시도 비싸. 작게 split.
- Idempotency 없음: backfill 시 중복.
- State in memory: worker 다름 = 잃음.
- DAG 안 큰 import: schedule 시 매번 re-import.
- 외부 호출 직접: rate limit / failure. 별 service.
- Logging 없음: 디버깅 어려움.
- Secret hardcode: vault / connection store.
🤖 LLM 활용 힌트
- 새 = Dagster (asset-centric).
- 옛 = Airflow.
- Idempotent + partition + lineage.