feat: implement next-gen vectorized engine, async architecture, and modernization roadmap v2.32.0

This commit is contained in:
Wonseok Jung
2026-04-30 23:44:36 +09:00
parent 39d46d7c54
commit cd1d6a3da8
20 changed files with 1086 additions and 282 deletions
+64
View File
@@ -0,0 +1,64 @@
from typing import Callable, Dict, List, Any
import asyncio
class EventBus:
"""
중앙 이벤트 버스 (Phase 3: Decoupling & Observer Pattern)
DIP(의존성 역전 원칙)를 준수하여 모듈 간 강한 결합을 해제함.
"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, callback: Callable):
"""이벤트 구독 등록"""
if event_type not in self._subscribers:
self._subscribers[event_type] = []
self._subscribers[event_type].append(callback)
print(f"[EventBus] Subscribed to {event_type}")
async def emit(self, event_type: str, data: Any):
"""이벤트 발행 및 구독자 알림 (비동기 처리)"""
if event_type in self._subscribers:
print(f"[EventBus] Emitting {event_type}...")
# 모든 구독자에게 비동기적으로 데이터 전달
tasks = [asyncio.create_task(callback(data)) for callback in self._subscribers[event_type]]
if tasks:
await asyncio.gather(*tasks)
# 이벤트 정의
class ConnectEvents:
DATA_READY = "data_ready"
INFERENCE_COMPLETE = "inference_complete"
ERROR_OCCURRED = "error_occurred"
# Pipeline Orchestrator 예시
class DataPipeline:
def __init__(self, bus: EventBus):
self.bus = bus
async def process_data(self, raw_data: Any):
print("[Pipeline] Processing Raw Data...")
# 전처리 로직 (DIP 준수: 모델을 직접 호출하지 않음)
await self.bus.emit(ConnectEvents.DATA_READY, raw_data)
class InferenceEngineSubscriber:
def __init__(self, bus: EventBus):
self.bus = bus
self.bus.subscribe(ConnectEvents.DATA_READY, self.on_data_ready)
async def on_data_ready(self, data: Any):
print(f"[Model] Event Received! Starting Inference on: {data}")
# 여기서 FeatureExtractor.calculate_similarity_vectorized() 호출
await self.bus.emit(ConnectEvents.INFERENCE_COMPLETE, {"result": "success"})
async def main():
bus = EventBus()
pipeline = DataPipeline(bus)
model = InferenceEngineSubscriber(bus)
await pipeline.process_data("Sample Input Data")
if __name__ == "__main__":
# asyncio.run(main())
print("Event-driven Architecture Initialized (Phase 3 Ready)")
+91
View File
@@ -0,0 +1,91 @@
import numpy as np
import time
from typing import List, Optional
class FeatureExtractor:
"""
고성능 특징 추출 및 매칭 엔진 (Phase 1: Vectorization Optimized)
기존의 O(N^2) 중첩 루프를 NumPy 행렬 연산으로 대체하여 계산 효율을 극대화함.
"""
def __init__(self, dimension: int = 128):
self.dimension = dimension
self.memory_pool = {} # Phase 1: Simple memory pooling for tensor reuse
def calculate_similarity_vectorized(self, query_vector: np.ndarray, feature_matrix: np.ndarray) -> np.ndarray:
"""
벡터화된 유사도 계산 (O(N))
중첩 루프 없이 행렬 곱을 통해 모든 특징점과의 유사도를 한 번에 계산함.
"""
# 정규화 (Cosine Similarity 준비)
query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-9)
matrix_norm = feature_matrix / (np.linalg.norm(feature_matrix, axis=1, keepdims=True) + 1e-9)
# 행렬 곱을 통한 유사도 산출 (Dot Product)
# O(N^2) 루프를 C로 최적화된 NumPy 연산으로 대체
similarities = np.dot(matrix_norm, query_norm)
return similarities
def match_features(self, query: List[float], database: List[List[float]], threshold: float = 0.8) -> List[int]:
"""
특징 매칭 메인 인터페이스 (P1 & P2 최적화)
"""
if not database:
return []
# P2: NumPy 배열로 데이터 구조 최적화 (메모리 연속성 확보)
q = np.array(query, dtype=np.float32)
db = np.array(database, dtype=np.float32)
start_time = time.perf_counter()
# P1: 벡터화 연산 수행 (O(N))
scores = self.calculate_similarity_vectorized(q, db)
matches = np.where(scores >= threshold)[0].tolist()
latency = (time.perf_counter() - start_time) * 1000
print(f"[Inference] Vectorized Match Complete: {len(matches)} matches, Latency: {latency:.4f}ms")
return matches
def match_features_parallel(self, query: List[float], database: List[List[float]], threshold: float = 0.8, n_jobs: int = -1) -> List[int]:
"""
P3: 멀티프로세싱 기반 병렬 매칭 (Scalability 최적화)
대규모 데이터셋을 여러 배치로 나누어 멀티 코어 CPU에서 병렬 처리함.
"""
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
if n_jobs == -1:
n_jobs = mp.cpu_count()
db_size = len(database)
batch_size = max(1, db_size // n_jobs)
batches = [database[i:i + batch_size] for i in range(0, db_size, batch_size)]
print(f"[Inference] P3 Parallelization Active: Using {n_jobs} cores for {len(batches)} batches.")
all_matches = []
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
# 각 프로세스에서 벡터화된 매칭 수행
futures = [executor.submit(self.match_features, query, batch, threshold) for batch in batches]
current_offset = 0
for i, future in enumerate(futures):
batch_matches = future.result()
# 오프셋 보정하여 전체 인덱스로 변환
all_matches.extend([idx + current_offset for idx in batch_matches])
current_offset += len(batches[i])
return all_matches
# Proof of Concept (Benchmark)
if __name__ == "__main__":
extractor = FeatureExtractor(dimension=256)
N = 10000
dummy_query = np.random.rand(256).tolist()
dummy_db = np.random.rand(N, 256).tolist()
print(f"Benchmarking N={N} with Vectorized Engine...")
extractor.match_features(dummy_query, dummy_db)
+61
View File
@@ -0,0 +1,61 @@
import asyncio
import aiofiles
import json
import os
from typing import AsyncGenerator, Dict, Any
class AsyncDataLoader:
"""
비동기 데이터 로딩 파이프라인 (Phase 2: Non-blocking I/O Optimized)
asyncio를 사용하여 I/O 대기 시간을 연산과 겹치게 하여 Throughput을 극대화함.
"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
async def stream_dataset(self, file_path: str) -> AsyncGenerator[Dict[str, Any], None]:
"""
데이터셋을 비동기적으로 스트리밍함.
대용량 파일을 한꺼번에 메모리에 올리지 않고 Chunk 단위로 처리함.
"""
if not os.path.exists(file_path):
print(f"[Loader] Error: File not found {file_path}")
return
print(f"[Loader] Starting Asynchronous Stream: {file_path}")
async with aiofiles.open(file_path, mode='r', encoding='utf-8') as f:
async for line in f:
if not line.strip():
continue
try:
# 비차단 방식으로 JSON 파싱 및 데이터 반환
data = json.loads(line)
yield data
# 다른 작업(추론 등)에 제어권을 넘겨주어 CPU 유휴 방지
await asyncio.sleep(0)
except json.JSONDecodeError:
continue
async def load_batch(self, file_path: str) -> AsyncGenerator[list, None]:
"""
배치 단위로 데이터를 로드하여 추론 엔진의 Throughput을 최적화함.
"""
batch = []
async for item in self.stream_dataset(file_path):
batch.append(item)
if len(batch) >= self.batch_size:
yield batch
batch = []
if batch:
yield batch
async def example_usage():
loader = AsyncDataLoader(batch_size=5)
# 가상의 대용량 데이터 파일 처리 예시
async for batch in loader.load_batch("large_dataset.jsonl"):
print(f"[Loader] Batch Loaded: {len(batch)} items")
# 여기서 InferenceEngine.match_features()를 호출하여 병렬 처리 가능
if __name__ == "__main__":
# asyncio.run(example_usage())
print("Async Loader Module Initialized (Phase 2 Ready)")
+56
View File
@@ -0,0 +1,56 @@
import time
import functools
import statistics
from typing import List, Callable, Any
class PerformanceMonitor:
"""
모니터링 및 SLO 측정 엔진 (Phase 3: Monitoring Integration)
모든 병목 지점에 타이밍 래퍼를 삽입하여 실시간 가시성 확보.
"""
def __init__(self):
self.latencies: List[float] = []
self.slo_threshold_ms = 200.0 # P95 목표: 200ms 이하
def track_latency(self, func: Callable):
"""메서드 실행 시간을 측정하는 데코레이터"""
@functools.wraps(func)
async def wrapper(*args, **kwargs):
start_time = time.perf_counter()
result = await func(*args, **kwargs) if asyncio.iscoroutinefunction(func) else func(*args, **kwargs)
latency = (time.perf_counter() - start_time) * 1000
self.latencies.append(latency)
if latency > self.slo_threshold_ms:
print(f"[SLO Alert] {func.__name__} violated SLO! Latency: {latency:.2f}ms (Goal: {self.slo_threshold_ms}ms)")
return result
return wrapper
def get_stats(self) -> dict:
"""현재까지의 지연 시간 통계 산출 (P95 포함)"""
if not self.latencies:
return {"count": 0}
stats = {
"count": len(self.latencies),
"avg_ms": statistics.mean(self.latencies),
"max_ms": max(self.latencies),
"p95_ms": statistics.quantiles(self.latencies, n=20)[18] if len(self.latencies) >= 20 else "N/A"
}
return stats
def report(self):
"""정기 성능 보고서 출력"""
stats = self.get_stats()
print("\n" + "="*40)
print("📊 [SYSTEM PERFORMANCE REPORT]")
print(f"Total Requests: {stats['count']}")
print(f"Average Latency: {stats.get('avg_ms', 0):.2f}ms")
print(f"P95 Latency: {stats['p95_ms']}")
print("="*40 + "\n")
monitor = PerformanceMonitor()
import asyncio # for decorator awareness
+55
View File
@@ -0,0 +1,55 @@
import numpy as np
import random
from typing import Callable, Dict, Any
class ParameterOptimizer:
"""
지능형 파라미터 최적화 엔진 (Algorithmic Review 1.2 반영)
브루트 포스 대신 시뮬레이티드 어닐링 또는 경사 하강 초기화를 활용함.
"""
def __init__(self, objective_function: Callable):
self.objective_function = objective_function
def simulated_annealing(self, initial_params: np.ndarray, iterations: int = 1000, temp: float = 1.0, cooling_rate: float = 0.95):
"""
시뮬레이티드 어닐링(Simulated Annealing) 기반 최적화
지역 최적점(Local Optima) 탈출이 가능하며 브루트 포스보다 압도적으로 빠름.
"""
current_params = initial_params
current_score = self.objective_function(current_params)
best_params = current_params
best_score = current_score
for i in range(iterations):
# 이웃 해(Neighbor) 탐색
neighbor_params = current_params + np.random.normal(0, 0.1, size=current_params.shape)
neighbor_score = self.objective_function(neighbor_params)
# 수락 확률 계산 (Metropolis Criterion)
if neighbor_score > current_score or random.random() < np.exp((neighbor_score - current_score) / temp):
current_params = neighbor_params
current_score = neighbor_score
if current_score > best_score:
best_score = current_score
best_params = neighbor_params
# 냉각 (Cooling)
temp *= cooling_rate
print(f"[Optimizer] Best Score Found: {best_score:.4f}")
return best_params
# Example Objective Function (e.g., Accuracy based on threshold and weights)
def dummy_objective(params):
# 가상의 성능 평가 함수 (파라미터가 0.5에 가까울수록 높은 점수)
return -np.sum((params - 0.5)**2)
if __name__ == "__main__":
optimizer = ParameterOptimizer(dummy_objective)
initial = np.array([0.1, 0.9, 0.2])
print(f"Starting Intelligent Optimization from {initial}...")
best = optimizer.simulated_annealing(initial)
print(f"Optimized Parameters: {best}")
+82
View File
@@ -0,0 +1,82 @@
import asyncio
import time
import uuid
from typing import Callable, Any, Dict
from dataclasses import dataclass, field
@dataclass
class Task:
"""처리할 작업 단위 (Actor Model 기반)"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
payload: Any = None
created_at: float = field(default_factory=time.time)
result: Any = None
class QueueWorker:
"""
비동기 큐 기반 워커 엔진 (Phase 2: Actor/Queue Model)
수집 계층과 처리 계층을 완전히 분리하여 확장성 및 안정성 확보.
"""
def __init__(self, worker_count: int = 4):
self.queue = asyncio.Queue()
self.worker_count = worker_count
self.workers = []
self._is_running = False
async def _process_task(self, worker_id: int):
"""개별 워커 루프: 큐에서 작업을 꺼내 처리함"""
while self._is_running:
task: Task = await self.queue.get()
start_time = time.perf_counter()
try:
print(f"[Worker-{worker_id}] Processing Task {task.id}...")
# 실제 처리 로직 (이곳에 InferenceEngine 연동 가능)
await asyncio.sleep(0.1) # 비동기 연산 시뮬레이션
task.result = "Success"
latency = (time.perf_counter() - start_time) * 1000
# Phase 3: SLO 모니터링 로그
print(f"[Worker-{worker_id}] Task {task.id} Complete. Latency: {latency:.2f}ms")
except Exception as e:
print(f"[Worker-{worker_id}] Task {task.id} Failed: {str(e)}")
finally:
self.queue.task_done()
async def submit_task(self, payload: Any) -> str:
"""외부에서 작업을 큐에 투입 (Ingestion Layer)"""
task = Task(payload=payload)
await self.queue.put(task)
print(f"[Ingestion] Task {task.id} submitted to queue.")
return task.id
async def start(self):
"""워커 풀 가동"""
self._is_running = True
self.workers = [asyncio.create_task(self._process_task(i)) for i in range(self.worker_count)]
print(f"[System] Actor Queue Engine started with {self.worker_count} workers.")
async def stop(self):
"""워커 풀 정지"""
self._is_running = False
for w in self.workers:
w.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
print("[System] Actor Queue Engine stopped.")
async def example_run():
engine = QueueWorker(worker_count=2)
await engine.start()
# 트래픽 스파이크 시뮬레이션 (버퍼링 확인)
for i in range(10):
await engine.submit_task(f"Data-Chunk-{i}")
await asyncio.sleep(2)
await engine.stop()
if __name__ == "__main__":
# asyncio.run(example_run())
print("Actor/Queue Engine Module Initialized (Phase 2 Ready)")
+3
View File
@@ -0,0 +1,3 @@
numpy>=1.24.0
aiofiles>=23.1.0
typing-extensions>=4.5.0