Files
connectai/core_py/queue_worker.py
T

83 lines
2.9 KiB
Python

import asyncio
import time
import uuid
from typing import Callable, Any, Dict
from dataclasses import dataclass, field
@dataclass
class Task:
"""처리할 작업 단위 (Actor Model 기반)"""
id: str = field(default_factory=lambda: str(uuid.uuid4()))
payload: Any = None
created_at: float = field(default_factory=time.time)
result: Any = None
class QueueWorker:
"""
비동기 큐 기반 워커 엔진 (Phase 2: Actor/Queue Model)
수집 계층과 처리 계층을 완전히 분리하여 확장성 및 안정성 확보.
"""
def __init__(self, worker_count: int = 4):
self.queue = asyncio.Queue()
self.worker_count = worker_count
self.workers = []
self._is_running = False
async def _process_task(self, worker_id: int):
"""개별 워커 루프: 큐에서 작업을 꺼내 처리함"""
while self._is_running:
task: Task = await self.queue.get()
start_time = time.perf_counter()
try:
print(f"[Worker-{worker_id}] Processing Task {task.id}...")
# 실제 처리 로직 (이곳에 InferenceEngine 연동 가능)
await asyncio.sleep(0.1) # 비동기 연산 시뮬레이션
task.result = "Success"
latency = (time.perf_counter() - start_time) * 1000
# Phase 3: SLO 모니터링 로그
print(f"[Worker-{worker_id}] Task {task.id} Complete. Latency: {latency:.2f}ms")
except Exception as e:
print(f"[Worker-{worker_id}] Task {task.id} Failed: {str(e)}")
finally:
self.queue.task_done()
async def submit_task(self, payload: Any) -> str:
"""외부에서 작업을 큐에 투입 (Ingestion Layer)"""
task = Task(payload=payload)
await self.queue.put(task)
print(f"[Ingestion] Task {task.id} submitted to queue.")
return task.id
async def start(self):
"""워커 풀 가동"""
self._is_running = True
self.workers = [asyncio.create_task(self._process_task(i)) for i in range(self.worker_count)]
print(f"[System] Actor Queue Engine started with {self.worker_count} workers.")
async def stop(self):
"""워커 풀 정지"""
self._is_running = False
for w in self.workers:
w.cancel()
await asyncio.gather(*self.workers, return_exceptions=True)
print("[System] Actor Queue Engine stopped.")
async def example_run():
engine = QueueWorker(worker_count=2)
await engine.start()
# 트래픽 스파이크 시뮬레이션 (버퍼링 확인)
for i in range(10):
await engine.submit_task(f"Data-Chunk-{i}")
await asyncio.sleep(2)
await engine.stop()
if __name__ == "__main__":
# asyncio.run(example_run())
print("Actor/Queue Engine Module Initialized (Phase 2 Ready)")