83 lines
2.9 KiB
Python
83 lines
2.9 KiB
Python
import asyncio
|
|
import time
|
|
import uuid
|
|
from typing import Callable, Any, Dict
|
|
from dataclasses import dataclass, field
|
|
|
|
@dataclass
|
|
class Task:
|
|
"""처리할 작업 단위 (Actor Model 기반)"""
|
|
id: str = field(default_factory=lambda: str(uuid.uuid4()))
|
|
payload: Any = None
|
|
created_at: float = field(default_factory=time.time)
|
|
result: Any = None
|
|
|
|
class QueueWorker:
|
|
"""
|
|
비동기 큐 기반 워커 엔진 (Phase 2: Actor/Queue Model)
|
|
수집 계층과 처리 계층을 완전히 분리하여 확장성 및 안정성 확보.
|
|
"""
|
|
|
|
def __init__(self, worker_count: int = 4):
|
|
self.queue = asyncio.Queue()
|
|
self.worker_count = worker_count
|
|
self.workers = []
|
|
self._is_running = False
|
|
|
|
async def _process_task(self, worker_id: int):
|
|
"""개별 워커 루프: 큐에서 작업을 꺼내 처리함"""
|
|
while self._is_running:
|
|
task: Task = await self.queue.get()
|
|
start_time = time.perf_counter()
|
|
|
|
try:
|
|
print(f"[Worker-{worker_id}] Processing Task {task.id}...")
|
|
# 실제 처리 로직 (이곳에 InferenceEngine 연동 가능)
|
|
await asyncio.sleep(0.1) # 비동기 연산 시뮬레이션
|
|
task.result = "Success"
|
|
|
|
latency = (time.perf_counter() - start_time) * 1000
|
|
# Phase 3: SLO 모니터링 로그
|
|
print(f"[Worker-{worker_id}] Task {task.id} Complete. Latency: {latency:.2f}ms")
|
|
|
|
except Exception as e:
|
|
print(f"[Worker-{worker_id}] Task {task.id} Failed: {str(e)}")
|
|
finally:
|
|
self.queue.task_done()
|
|
|
|
async def submit_task(self, payload: Any) -> str:
|
|
"""외부에서 작업을 큐에 투입 (Ingestion Layer)"""
|
|
task = Task(payload=payload)
|
|
await self.queue.put(task)
|
|
print(f"[Ingestion] Task {task.id} submitted to queue.")
|
|
return task.id
|
|
|
|
async def start(self):
|
|
"""워커 풀 가동"""
|
|
self._is_running = True
|
|
self.workers = [asyncio.create_task(self._process_task(i)) for i in range(self.worker_count)]
|
|
print(f"[System] Actor Queue Engine started with {self.worker_count} workers.")
|
|
|
|
async def stop(self):
|
|
"""워커 풀 정지"""
|
|
self._is_running = False
|
|
for w in self.workers:
|
|
w.cancel()
|
|
await asyncio.gather(*self.workers, return_exceptions=True)
|
|
print("[System] Actor Queue Engine stopped.")
|
|
|
|
async def example_run():
|
|
engine = QueueWorker(worker_count=2)
|
|
await engine.start()
|
|
|
|
# 트래픽 스파이크 시뮬레이션 (버퍼링 확인)
|
|
for i in range(10):
|
|
await engine.submit_task(f"Data-Chunk-{i}")
|
|
|
|
await asyncio.sleep(2)
|
|
await engine.stop()
|
|
|
|
if __name__ == "__main__":
|
|
# asyncio.run(example_run())
|
|
print("Actor/Queue Engine Module Initialized (Phase 2 Ready)")
|