import asyncio import time import uuid from typing import Callable, Any, Dict from dataclasses import dataclass, field @dataclass class Task: """처리할 작업 단위 (Actor Model 기반)""" id: str = field(default_factory=lambda: str(uuid.uuid4())) payload: Any = None created_at: float = field(default_factory=time.time) result: Any = None class QueueWorker: """ 비동기 큐 기반 워커 엔진 (Phase 2: Actor/Queue Model) 수집 계층과 처리 계층을 완전히 분리하여 확장성 및 안정성 확보. """ def __init__(self, worker_count: int = 4): self.queue = asyncio.Queue() self.worker_count = worker_count self.workers = [] self._is_running = False async def _process_task(self, worker_id: int): """개별 워커 루프: 큐에서 작업을 꺼내 처리함""" while self._is_running: task: Task = await self.queue.get() start_time = time.perf_counter() try: print(f"[Worker-{worker_id}] Processing Task {task.id}...") # 실제 처리 로직 (이곳에 InferenceEngine 연동 가능) await asyncio.sleep(0.1) # 비동기 연산 시뮬레이션 task.result = "Success" latency = (time.perf_counter() - start_time) * 1000 # Phase 3: SLO 모니터링 로그 print(f"[Worker-{worker_id}] Task {task.id} Complete. Latency: {latency:.2f}ms") except Exception as e: print(f"[Worker-{worker_id}] Task {task.id} Failed: {str(e)}") finally: self.queue.task_done() async def submit_task(self, payload: Any) -> str: """외부에서 작업을 큐에 투입 (Ingestion Layer)""" task = Task(payload=payload) await self.queue.put(task) print(f"[Ingestion] Task {task.id} submitted to queue.") return task.id async def start(self): """워커 풀 가동""" self._is_running = True self.workers = [asyncio.create_task(self._process_task(i)) for i in range(self.worker_count)] print(f"[System] Actor Queue Engine started with {self.worker_count} workers.") async def stop(self): """워커 풀 정지""" self._is_running = False for w in self.workers: w.cancel() await asyncio.gather(*self.workers, return_exceptions=True) print("[System] Actor Queue Engine stopped.") async def example_run(): engine = QueueWorker(worker_count=2) await engine.start() # 트래픽 스파이크 시뮬레이션 (버퍼링 확인) for i in range(10): await engine.submit_task(f"Data-Chunk-{i}") await asyncio.sleep(2) await engine.stop() if __name__ == "__main__": # asyncio.run(example_run()) print("Actor/Queue Engine Module Initialized (Phase 2 Ready)")