65 lines
2.3 KiB
Python
65 lines
2.3 KiB
Python
from typing import Callable, Dict, List, Any
|
|
import asyncio
|
|
|
|
class EventBus:
|
|
"""
|
|
중앙 이벤트 버스 (Phase 3: Decoupling & Observer Pattern)
|
|
DIP(의존성 역전 원칙)를 준수하여 모듈 간 강한 결합을 해제함.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self._subscribers: Dict[str, List[Callable]] = {}
|
|
|
|
def subscribe(self, event_type: str, callback: Callable):
|
|
"""이벤트 구독 등록"""
|
|
if event_type not in self._subscribers:
|
|
self._subscribers[event_type] = []
|
|
self._subscribers[event_type].append(callback)
|
|
print(f"[EventBus] Subscribed to {event_type}")
|
|
|
|
async def emit(self, event_type: str, data: Any):
|
|
"""이벤트 발행 및 구독자 알림 (비동기 처리)"""
|
|
if event_type in self._subscribers:
|
|
print(f"[EventBus] Emitting {event_type}...")
|
|
# 모든 구독자에게 비동기적으로 데이터 전달
|
|
tasks = [asyncio.create_task(callback(data)) for callback in self._subscribers[event_type]]
|
|
if tasks:
|
|
await asyncio.gather(*tasks)
|
|
|
|
# 이벤트 정의
|
|
class ConnectEvents:
|
|
DATA_READY = "data_ready"
|
|
INFERENCE_COMPLETE = "inference_complete"
|
|
ERROR_OCCURRED = "error_occurred"
|
|
|
|
# Pipeline Orchestrator 예시
|
|
class DataPipeline:
|
|
def __init__(self, bus: EventBus):
|
|
self.bus = bus
|
|
|
|
async def process_data(self, raw_data: Any):
|
|
print("[Pipeline] Processing Raw Data...")
|
|
# 전처리 로직 (DIP 준수: 모델을 직접 호출하지 않음)
|
|
await self.bus.emit(ConnectEvents.DATA_READY, raw_data)
|
|
|
|
class InferenceEngineSubscriber:
|
|
def __init__(self, bus: EventBus):
|
|
self.bus = bus
|
|
self.bus.subscribe(ConnectEvents.DATA_READY, self.on_data_ready)
|
|
|
|
async def on_data_ready(self, data: Any):
|
|
print(f"[Model] Event Received! Starting Inference on: {data}")
|
|
# 여기서 FeatureExtractor.calculate_similarity_vectorized() 호출
|
|
await self.bus.emit(ConnectEvents.INFERENCE_COMPLETE, {"result": "success"})
|
|
|
|
async def main():
|
|
bus = EventBus()
|
|
pipeline = DataPipeline(bus)
|
|
model = InferenceEngineSubscriber(bus)
|
|
|
|
await pipeline.process_data("Sample Input Data")
|
|
|
|
if __name__ == "__main__":
|
|
# asyncio.run(main())
|
|
print("Event-driven Architecture Initialized (Phase 3 Ready)")
|