from typing import Callable, Dict, List, Any import asyncio class EventBus: """ 중앙 이벤트 버스 (Phase 3: Decoupling & Observer Pattern) DIP(의존성 역전 원칙)를 준수하여 모듈 간 강한 결합을 해제함. """ def __init__(self): self._subscribers: Dict[str, List[Callable]] = {} def subscribe(self, event_type: str, callback: Callable): """이벤트 구독 등록""" if event_type not in self._subscribers: self._subscribers[event_type] = [] self._subscribers[event_type].append(callback) print(f"[EventBus] Subscribed to {event_type}") async def emit(self, event_type: str, data: Any): """이벤트 발행 및 구독자 알림 (비동기 처리)""" if event_type in self._subscribers: print(f"[EventBus] Emitting {event_type}...") # 모든 구독자에게 비동기적으로 데이터 전달 tasks = [asyncio.create_task(callback(data)) for callback in self._subscribers[event_type]] if tasks: await asyncio.gather(*tasks) # 이벤트 정의 class ConnectEvents: DATA_READY = "data_ready" INFERENCE_COMPLETE = "inference_complete" ERROR_OCCURRED = "error_occurred" # Pipeline Orchestrator 예시 class DataPipeline: def __init__(self, bus: EventBus): self.bus = bus async def process_data(self, raw_data: Any): print("[Pipeline] Processing Raw Data...") # 전처리 로직 (DIP 준수: 모델을 직접 호출하지 않음) await self.bus.emit(ConnectEvents.DATA_READY, raw_data) class InferenceEngineSubscriber: def __init__(self, bus: EventBus): self.bus = bus self.bus.subscribe(ConnectEvents.DATA_READY, self.on_data_ready) async def on_data_ready(self, data: Any): print(f"[Model] Event Received! Starting Inference on: {data}") # 여기서 FeatureExtractor.calculate_similarity_vectorized() 호출 await self.bus.emit(ConnectEvents.INFERENCE_COMPLETE, {"result": "success"}) async def main(): bus = EventBus() pipeline = DataPipeline(bus) model = InferenceEngineSubscriber(bus) await pipeline.process_data("Sample Input Data") if __name__ == "__main__": # asyncio.run(main()) print("Event-driven Architecture Initialized (Phase 3 Ready)")