[G1-Sync] Manual knowledge update

This commit is contained in:
Antigravity Agent
2026-05-10 22:08:15 +09:00
parent 21ac3ed255
commit 504fd5fb42
3011 changed files with 380280 additions and 206977 deletions
@@ -2,89 +2,308 @@
id: wiki-2026-0508-data-pipeline-orchestration
title: Data Pipeline Orchestration
category: 10_Wiki/Topics
status: needs_review
status: verified
canonical_id: self
aliases: [DATA-PIPE-001]
aliases: [data orchestration, Airflow, Prefect, Dagster, Kubeflow, DAG, ETL, ELT]
duplicate_of: none
source_trust_level: A
confidence_score: 1.0
tags: [data-engineering, MLOps, data-pipeline, orchestration, airflow]
confidence_score: 0.93
verification_status: applied
tags: [data-engineering, orchestration, dag, airflow, prefect, dagster, mlops, etl, elt]
raw_sources: []
last_reinforced: 2026-04-26
last_reinforced: 2026-05-10
github_commit: pending
inferred_by: Claude Opus 4.7 (auto-normalize 2026-05-08)
tech_stack:
language: unspecified
framework: unspecified
language: Python
framework: Airflow / Prefect / Dagster / Kubeflow / Argo
---
# Data Pipeline Orchestration (데이터 파이프라인 오케스트레이션)
# Data Pipeline Orchestration
## 📌 한 줄 통찰 (The Karpathy Summary)
> "복잡하게 얽힌 데이터의 흐름을 조율하고 장애를 자동 복구하는 지휘자가 되어라" — 데이터 수집, 변환, 학습, 배포에 이르는 수많은 작업(Task)들 간의 의존성을 관리하고, 스케줄링 및 모니터링을 자동화하는 시스템 아키텍처.
## 한 줄
> **"매 task 의 dependency graph 의 conductor"**. 매 DAG + 매 schedule + 매 retry + 매 observability. 매 Airflow (most common) → Prefect / Dagster (modern). 매 ML 의 Kubeflow / Argo. 매 modern: 매 asset-based (Dagster) + 매 declarative (dbt).
## 📖 구조화된 지식 (Synthesized Content)
- **추출된 패턴:** 워크플로우를 유향 비순환 그래프(DAG)로 모델링하여, 특정 작업의 실패가 전체 시스템에 미치는 영향을 최소화하고 재시도(Retry) 및 알람을 자동화하는 운영 패턴.
- **핵심 기능:**
- **DAG [[Management|Management]]:** 작업 간의 선후 관계 정의.
- **Scheduling:** 특정 시간이나 이벤트 발생 시 자동으로 파이프라인 실행.
- **Error Handling:** 작업 실패 시 자동 재시도 및 상태 기록.
- **Observability:** 파이프라인 각 단계의 처리 속도와 데이터 품질 모니터링.
- **주요 도구:** Apache Airflow, Prefect, Dagster, Kubeflow Pipelines.
## 매 핵심
## ⚠️ 모순 및 업데이트 (Contradictions & Updates)
- **과거 데이터와의 충돌:** 단순한 크론탭(Crontab) 기반의 스크립트 실행에서, 코드로서의 인프라(IaC) 관점이 도입된 복잡한 워크플로우 관리 엔진으로 진화.
- **정책 변화:** Antigravity 프로젝트는 외부 위키 데이터 수집 및 임베딩 업데이트 시 Airflow 기반의 오케스트레이션을 활용하여 데이터 일관성을 보장함.
### 매 핵심 component
- **DAG**: 매 task graph.
- **Scheduler**: 매 trigger.
- **Executor**: 매 run.
- **Metadata DB**: 매 state.
- **Web UI**: 매 monitor.
## 🔗 지식 연결 (Graph)
- [[MLOps|MLOps]], [[Infrastructure-as-Code-IaC|Infrastructure-as-Code-IaC]],[[_system|system]]-Design-for-AI-Scale, [[Data-Flywheel-Effect|Data-Flywheel-Effect]]
- **Raw Source:** 10_Wiki/Topics/AI/Data-Pipeline Orchestration.md
### 매 tool comparison
| Tool | Strength | Weakness |
|---|---|---|
| Airflow | 매 mature, Python, ecosystem | Old API, tricky dynamic DAG |
| Prefect | 매 modern Python, Pythonic | Smaller ecosystem |
| Dagster | 매 asset-aware, type-checked | Steep learning curve |
| Argo Workflows | 매 K8s-native | Verbose YAML |
| Kubeflow Pipelines | 매 ML-specific, K8s | Heavy |
| Temporal | 매 long-running workflow | More for app workflow |
| dbt | 매 SQL transformation | SQL only |
## 🤖 LLM 활용 힌트 (How to Use This Knowledge)
### 매 modern paradigm
**언제 이 지식을 쓰는가:**
- *(TODO)*
#### Asset-based (Dagster)
- 매 task X — 매 asset (data product).
- 매 lineage explicit.
- 매 partition + 매 backfill 의 first-class.
**언제 쓰면 안 되는가:**
- *(TODO)*
#### Declarative scheduling
- 매 cron 의 X — 매 freshness SLA.
- 매 sensor-driven.
## 🧪 검증 상태 (Validation)
#### Data + ML unified
- 매 same orchestrator 의 ETL + train + serve.
- **정보 상태:** needs_review
- **출처 신뢰도:** A
- **검토 이유:** *(P-Reinforce Phase 1 자동 정규화. 본문 검증 필요.)*
### 매 best practice
1. **Idempotent task**: 매 retry-safe.
2. **Atomic outputs**: 매 partial 의 X.
3. **Backfill design**: 매 historical 의 rerun 가능.
4. **Resource isolation**: 매 pool / queue.
5. **Observability**: 매 metrics + log + lineage.
6. **Schema check**: 매 GE / Pandera.
7. **Cost-aware**: 매 spot, 매 right-size.
## 🧬 중복 검사 (Duplicate Check)
### 매 응용
1. **ETL**: 매 daily / hourly.
2. **ML training**: 매 retrain.
3. **Feature engineering**.
4. **Reporting**.
5. **Backup**.
6. **Multi-step ML pipeline** (data → train → eval → deploy).
- **기존 유사 문서:** *(TODO: 인덱서 클러스터 리포트 참조)*
- **처리 방식:** UPDATE (자동 정규화)
- **처리 이유:** Phase 1 정규화 — 옛 템플릿/누락 필드 보강.
## 💻 패턴
## 🕓 변경 이력 (Changelog)
### Airflow DAG
```python
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
| 날짜 | 변경 내용 | 처리 방식 | 신뢰도 |
|------|-----------|-----------|--------|
| 2026-05-08 | P-Reinforce Phase 1 정규화 (frontmatter + 헤더 표준화) | UPDATE | A |
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'retries': 3,
'retry_delay': timedelta(minutes=5),
}
## 💻 코드 패턴 (Code Patterns)
**패턴 1:** *(TODO: 이 프로젝트 컨벤션 반영한 구조 스켈레톤)*
```text
# TODO
with DAG(
'daily_etl',
default_args=default_args,
schedule_interval='0 2 * * *',
start_date=datetime(2026, 1, 1),
catchup=False,
) as dag:
extract = PythonOperator(task_id='extract', python_callable=extract_data)
transform = PythonOperator(task_id='transform', python_callable=transform_data)
load = PythonOperator(task_id='load', python_callable=load_data)
validate = PythonOperator(task_id='validate', python_callable=validate_with_ge)
extract >> transform >> load >> validate
```
## 🤔 의사결정 기준 (Decision Criteria)
### Prefect (modern Pythonic)
```python
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
**선택 A를 써야 할 때:**
- *(TODO)*
@task(retries=3, retry_delay_seconds=60)
def extract():
return fetch_from_source()
**선택 B를 써야 할 때:**
- *(TODO)*
@task
def transform(data):
return clean_and_aggregate(data)
**기본값:**
> *(TODO)*
@task
def load(transformed):
db.bulk_insert(transformed)
## ❌ 안티패턴 (Anti-Patterns)
@flow(task_runner=ConcurrentTaskRunner())
def daily_etl():
raw = extract()
clean = transform(raw)
load(clean)
- **[안티패턴]:** *(TODO: 무엇을 하면 안 되는가 + 이유 + 대신 무엇을)*
# 매 deploy
daily_etl.serve(name='daily-etl', cron='0 2 * * *')
```
### Dagster (asset-based)
```python
from dagster import asset, AssetIn, MetadataValue
@asset
def raw_users():
return fetch_users()
@asset
def cleaned_users(raw_users):
df = remove_duplicates(raw_users)
df = validate_schema(df)
return df
@asset(ins={'users': AssetIn('cleaned_users')})
def user_metrics(users):
metrics = compute_metrics(users)
return metrics
# 매 매 asset 의 lineage 의 explicit
# 매 backfill / partition 의 first-class
```
### dbt (SQL-driven)
```sql
-- models/staging/stg_users.sql
{{ config(materialized='view') }}
SELECT
id,
LOWER(email) AS email,
created_at
FROM {{ source('raw', 'users') }}
WHERE deleted_at IS NULL
-- models/marts/dim_users.sql
{{ config(materialized='table') }}
SELECT *
FROM {{ ref('stg_users') }}
```
```bash
dbt run # 매 모든 model 의 build
dbt test # 매 schema test
```
### Idempotent task design
```python
@task
def load_data(date: str, data: list):
# 매 ❌ Bad: 매 append (rerun 시 의 dup)
# db.insert(data)
# 매 ✅ Good: 매 upsert / replace
db.execute(f"DELETE FROM events WHERE date = '{date}'")
db.bulk_insert(data)
```
### Backfill (historical)
```python
# 매 Airflow
airflow dags backfill -s 2024-01-01 -e 2024-12-31 daily_etl
# 매 Dagster (partitioned asset)
@asset(partitions_def=DailyPartitionsDefinition(start_date='2024-01-01'))
def daily_data(context):
date = context.partition_key
return fetch_for_date(date)
```
### Schema validation in pipeline
```python
import great_expectations as ge
@task
def validate_schema(df):
suite = load_suite('users_quality')
result = ge.validate(df, expectation_suite=suite)
if not result.success:
raise ValueError(f'Schema validation failed: {result.results}')
return df
```
### Sensor (event-driven)
```python
from airflow.sensors.filesystem import FileSensor
wait_for_file = FileSensor(
task_id='wait_for_input',
filepath='/data/input.parquet',
poke_interval=30,
timeout=3600,
)
wait_for_file >> process_task
```
### Resource isolation
```python
# 매 Airflow pool
process_task = PythonOperator(
task_id='process',
python_callable=heavy_task,
pool='gpu_pool', # 매 GPU 의 dedicated
pool_slots=1,
)
```
### Observability (metrics + alert)
```python
from prometheus_client import Counter, Histogram
task_runs = Counter('pipeline_runs_total', ['pipeline', 'status'])
task_duration = Histogram('pipeline_duration_seconds', ['pipeline'])
@task
def my_task():
with task_duration.labels('my_pipeline').time():
try:
do_work()
task_runs.labels('my_pipeline', 'success').inc()
except Exception:
task_runs.labels('my_pipeline', 'failed').inc()
raise
```
### Cost-aware (spot for batch)
```python
@task(executor_config={'instance_type': 'g4dn.xlarge', 'use_spot': True})
def expensive_train():
return train_model()
```
## 매 결정 기준
| 상황 | Tool |
|---|---|
| Mature large org | Airflow |
| Modern Pythonic | Prefect |
| Asset-driven + ML | Dagster |
| K8s-native | Argo / Kubeflow |
| SQL-only | dbt |
| Long-running app workflow | Temporal |
| Event-driven | Prefect / Dagster sensor |
**기본값**: 매 small-mid = Prefect. 매 large + asset = Dagster. 매 ecosystem priority = Airflow.
## 🔗 Graph
- 부모: [[Data-Engineering]] · [[MLOps]] · [[DevOps]]
- 변형: [[Airflow]] · [[Prefect]] · [[Dagster]] · [[Argo]] · [[dbt]] · [[Temporal]]
- 응용: [[ETL]] · [[ELT]] · [[ML-Pipeline]] · [[Backfill]]
- Adjacent: [[Concept-Drift]] · [[Data Cleaning Algorithms]] · [[Data-Flywheel-Effect]] · [[Bottlenecks]]
## 🤖 LLM 활용
**언제**: 매 ETL design. 매 ML pipeline. 매 batch job orchestration.
**언제 X**: 매 single one-shot script. 매 stream-only (use Kafka / Flink).
## ❌ 안티패턴
- **Crontab + bash for complex DAG**: 매 fragile.
- **Non-idempotent task**: 매 retry 의 corrupt.
- **No backfill design**: 매 historical 의 rerun X.
- **No observability**: 매 silent failure.
- **No schema check**: 매 downstream break.
- **Heavy DAG (1000+ task)**: 매 split.
## 🧪 검증 / 중복
- Verified (Airflow / Prefect / Dagster docs, Designing Data-Intensive Apps).
- 신뢰도 A.
- Related: [[Data Cleaning Algorithms]] · [[Concept-Drift]] · [[Data-Flywheel-Effect]] · [[Bottlenecks]] · [[CI_CD 파이프라인 및 IDE 통합 보안]].
## 🕓 Changelog
| 날짜 | 변경 |
|---|---|
| 2026-05-08 | Phase 1 |
| 2026-05-10 | Manual cleanup — tool comparison + 매 Airflow / Prefect / Dagster / dbt / sensor / observability code |