[G1-Sync] Manual knowledge update

This commit is contained in:
Antigravity Agent
2026-05-09 21:08:02 +09:00
parent f0befc887a
commit 93ec7e9056
363 changed files with 68333 additions and 64 deletions
@@ -0,0 +1,286 @@
---
id: data-eng-airflow-dagster
title: Airflow / Dagster — Data Pipeline / DAG
category: Coding
status: draft
source_trust_level: B
verification_status: conceptual
created_at: 2026-05-09
updated_at: 2026-05-09
tags: [data-engineering, airflow, dagster, etl, vibe-coding]
tech_stack: { language: "Python", applicable_to: ["Data Engineering"] }
applied_in: []
aliases: [Airflow, Dagster, Prefect, DAG, ETL, asset-based, software-defined assets]
---
# Airflow / Dagster
> Data pipeline orchestrator. **Airflow = task-centric, 옛 표준. Dagster = asset-centric, modern**. Prefect = Python-native flow. ETL / ML training / 정기 작업.
## 📖 핵심 개념
- DAG: Directed Acyclic Graph — task 흐름.
- Task / Op: 한 단계.
- Asset (Dagster): "이 table 이 결과" — 추적.
- Sensor / Trigger: event-based 시작.
## 💻 코드 패턴
### Airflow DAG
```python
from airflow.decorators import dag, task
from datetime import datetime
@dag(
schedule='0 2 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
tags=['daily'],
)
def daily_report():
@task
def extract():
return load_from_postgres('SELECT * FROM orders WHERE date = today()')
@task
def transform(orders):
return aggregate(orders)
@task
def load(report):
upload_to_s3(report)
load(transform(extract()))
dag = daily_report()
```
### Airflow Operator (legacy 스타일)
```python
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.postgres.operators.postgres import PostgresOperator
dag = DAG('etl', schedule='@daily', ...)
extract = PostgresOperator(
task_id='extract',
postgres_conn_id='source_db',
sql='SELECT * FROM raw',
dag=dag,
)
transform = BashOperator(
task_id='transform',
bash_command='python /scripts/transform.py',
dag=dag,
)
extract >> transform
```
### Dagster (modern, asset-based)
```python
from dagster import asset, AssetExecutionContext, Definitions
@asset
def raw_orders(context: AssetExecutionContext) -> pd.DataFrame:
return pd.read_sql('SELECT * FROM orders', engine)
@asset
def daily_aggregates(raw_orders: pd.DataFrame) -> pd.DataFrame:
return raw_orders.groupby('date').agg({'amount': 'sum'})
@asset
def report(daily_aggregates: pd.DataFrame) -> None:
upload_to_s3(daily_aggregates, 'reports/daily.csv')
defs = Definitions(assets=[raw_orders, daily_aggregates, report])
```
→ Asset = data 가 진짜 source. Lineage 자동.
### Schedule
```python
from dagster import ScheduleDefinition, define_asset_job
daily_job = define_asset_job('daily', selection=['raw_orders', 'daily_aggregates', 'report'])
daily_schedule = ScheduleDefinition(
job=daily_job,
cron_schedule='0 2 * * *',
)
defs = Definitions(assets=[...], schedules=[daily_schedule])
```
### Sensor (event-based)
```python
from dagster import sensor, RunRequest
@sensor(job=daily_job, minimum_interval_seconds=60)
def s3_sensor(context):
files = list_s3_files('incoming/')
if files:
return RunRequest(run_key=files[0], run_config={'ops': {'load': {'config': {'file': files[0]}}}})
```
### IO Manager (where data goes)
```python
from dagster import IOManager, io_manager
import boto3
class S3IOManager(IOManager):
def handle_output(self, context, obj):
boto3.client('s3').put_object(...)
def load_input(self, context):
return boto3.client('s3').get_object(...)
@io_manager
def s3_io_manager():
return S3IOManager()
@asset(io_manager_key='s3_io')
def my_asset(): return ...
defs = Definitions(
assets=[my_asset],
resources={'s3_io': s3_io_manager},
)
```
→ Asset 의 storage 분리 — 같은 코드 dev/prod 다른 storage.
### Partition (시계열)
```python
from dagster import DailyPartitionsDefinition, asset
daily = DailyPartitionsDefinition(start_date='2026-01-01')
@asset(partitions_def=daily)
def orders_by_day(context):
date = context.partition_key # '2026-05-09'
return pd.read_sql(f"SELECT * FROM orders WHERE date = '{date}'", engine)
```
→ Day 단위 backfill / re-run.
### Backfill
```bash
dagster job backfill -j daily_job --partition-set daily
# Airflow
airflow dags backfill --start-date 2026-04-01 --end-date 2026-05-01 daily_report
```
### Test (Dagster — pytest 친화)
```python
def test_daily_aggregates():
from my_assets import daily_aggregates
raw = pd.DataFrame({'date': ['2026-05-09'], 'amount': [100]})
result = daily_aggregates(raw)
assert result.iloc[0]['amount'] == 100
```
### Resource (DI, env 별 다름)
```python
from dagster import resource, asset
@resource
def postgres_engine(init_context):
return create_engine(init_context.resource_config['url'])
@asset(required_resource_keys={'postgres'})
def my_asset(context):
df = pd.read_sql('...', context.resources.postgres)
return df
# config (dev / prod)
prod_resources = {'postgres': postgres_engine.configured({'url': 'postgresql://prod...'})}
```
### Lineage / observability
```
Dagster UI:
- Asset graph (data dependency)
- Materialization history
- Runtime / cost per asset
- Failure rate
```
→ "이 데이터가 어디서 왔지?" 자동 답.
### Prefect (Python-native, 단순)
```python
from prefect import flow, task
@task(retries=3, retry_delay_seconds=10)
def extract():
return [1, 2, 3]
@flow
def my_flow():
data = extract()
transform(data)
my_flow.serve(name='daily', cron='0 2 * * *')
```
### Trade-offs
```
Airflow:
+ 큰 ecosystem, 안정
+ 모든 cloud / db 의 operator
- Task-centric (data 관점 X)
- 옛 design (Python 2.x heritage)
Dagster:
+ Asset-centric → lineage 자동
+ Modern Python (typing, async)
+ Local dev 친화
- 작은 ecosystem
- 학습 곡선
Prefect:
+ Python-native, simplest
+ Hybrid execution
- Smaller community
```
### Cost / scale
```
Airflow self-host: K8s + executor (CeleryKubernetesExecutor).
Managed: MWAA (AWS), Cloud Composer (GCP), Astronomer.
Dagster:
Self-host or Dagster Cloud.
```
## 🤔 의사결정 기준
| 상황 | 추천 |
|---|---|
| 새 프로젝트 | Dagster |
| 기존 / 큰 | Airflow |
| 단순 / 빠른 | Prefect |
| 클라우드 매니지드 | MWAA / Composer / Dagster Cloud |
| Streaming | Spark / Flink (별도) |
| dbt + python | Dagster (best 통합) |
## ❌ 안티패턴
- **거대 single task**: 재시도 비싸. 작게 split.
- **Idempotency 없음**: backfill 시 중복.
- **State in memory**: worker 다름 = 잃음.
- **DAG 안 큰 import**: schedule 시 매번 re-import.
- **외부 호출 직접**: rate limit / failure. 별 service.
- **Logging 없음**: 디버깅 어려움.
- **Secret hardcode**: vault / connection store.
## 🤖 LLM 활용 힌트
- 새 = Dagster (asset-centric).
- 옛 = Airflow.
- Idempotent + partition + lineage.
## 🔗 관련 문서
- [[Data_Eng_dbt]]
- [[Data_Eng_Lakehouse]]
- [[Backend_Cron_Patterns]]