import asyncio import aiofiles import json import os from typing import AsyncGenerator, Dict, Any class AsyncDataLoader: """ 비동기 데이터 로딩 파이프라인 (Phase 2: Non-blocking I/O Optimized) asyncio를 사용하여 I/O 대기 시간을 연산과 겹치게 하여 Throughput을 극대화함. """ def __init__(self, batch_size: int = 100): self.batch_size = batch_size async def stream_dataset(self, file_path: str) -> AsyncGenerator[Dict[str, Any], None]: """ 데이터셋을 비동기적으로 스트리밍함. 대용량 파일을 한꺼번에 메모리에 올리지 않고 Chunk 단위로 처리함. """ if not os.path.exists(file_path): print(f"[Loader] Error: File not found {file_path}") return print(f"[Loader] Starting Asynchronous Stream: {file_path}") async with aiofiles.open(file_path, mode='r', encoding='utf-8') as f: async for line in f: if not line.strip(): continue try: # 비차단 방식으로 JSON 파싱 및 데이터 반환 data = json.loads(line) yield data # 다른 작업(추론 등)에 제어권을 넘겨주어 CPU 유휴 방지 await asyncio.sleep(0) except json.JSONDecodeError: continue async def load_batch(self, file_path: str) -> AsyncGenerator[list, None]: """ 배치 단위로 데이터를 로드하여 추론 엔진의 Throughput을 최적화함. """ batch = [] async for item in self.stream_dataset(file_path): batch.append(item) if len(batch) >= self.batch_size: yield batch batch = [] if batch: yield batch async def example_usage(): loader = AsyncDataLoader(batch_size=5) # 가상의 대용량 데이터 파일 처리 예시 async for batch in loader.load_batch("large_dataset.jsonl"): print(f"[Loader] Batch Loaded: {len(batch)} items") # 여기서 InferenceEngine.match_features()를 호출하여 병렬 처리 가능 if __name__ == "__main__": # asyncio.run(example_usage()) print("Async Loader Module Initialized (Phase 2 Ready)")