62 lines
2.3 KiB
Python
62 lines
2.3 KiB
Python
import asyncio
|
|
import aiofiles
|
|
import json
|
|
import os
|
|
from typing import AsyncGenerator, Dict, Any
|
|
|
|
class AsyncDataLoader:
|
|
"""
|
|
비동기 데이터 로딩 파이프라인 (Phase 2: Non-blocking I/O Optimized)
|
|
asyncio를 사용하여 I/O 대기 시간을 연산과 겹치게 하여 Throughput을 극대화함.
|
|
"""
|
|
|
|
def __init__(self, batch_size: int = 100):
|
|
self.batch_size = batch_size
|
|
|
|
async def stream_dataset(self, file_path: str) -> AsyncGenerator[Dict[str, Any], None]:
|
|
"""
|
|
데이터셋을 비동기적으로 스트리밍함.
|
|
대용량 파일을 한꺼번에 메모리에 올리지 않고 Chunk 단위로 처리함.
|
|
"""
|
|
if not os.path.exists(file_path):
|
|
print(f"[Loader] Error: File not found {file_path}")
|
|
return
|
|
|
|
print(f"[Loader] Starting Asynchronous Stream: {file_path}")
|
|
async with aiofiles.open(file_path, mode='r', encoding='utf-8') as f:
|
|
async for line in f:
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
# 비차단 방식으로 JSON 파싱 및 데이터 반환
|
|
data = json.loads(line)
|
|
yield data
|
|
# 다른 작업(추론 등)에 제어권을 넘겨주어 CPU 유휴 방지
|
|
await asyncio.sleep(0)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
|
|
async def load_batch(self, file_path: str) -> AsyncGenerator[list, None]:
|
|
"""
|
|
배치 단위로 데이터를 로드하여 추론 엔진의 Throughput을 최적화함.
|
|
"""
|
|
batch = []
|
|
async for item in self.stream_dataset(file_path):
|
|
batch.append(item)
|
|
if len(batch) >= self.batch_size:
|
|
yield batch
|
|
batch = []
|
|
if batch:
|
|
yield batch
|
|
|
|
async def example_usage():
|
|
loader = AsyncDataLoader(batch_size=5)
|
|
# 가상의 대용량 데이터 파일 처리 예시
|
|
async for batch in loader.load_batch("large_dataset.jsonl"):
|
|
print(f"[Loader] Batch Loaded: {len(batch)} items")
|
|
# 여기서 InferenceEngine.match_features()를 호출하여 병렬 처리 가능
|
|
|
|
if __name__ == "__main__":
|
|
# asyncio.run(example_usage())
|
|
print("Async Loader Module Initialized (Phase 2 Ready)")
|