Files
connectai/core_py/loader.py
T

62 lines
2.3 KiB
Python

import asyncio
import aiofiles
import json
import os
from typing import AsyncGenerator, Dict, Any
class AsyncDataLoader:
"""
비동기 데이터 로딩 파이프라인 (Phase 2: Non-blocking I/O Optimized)
asyncio를 사용하여 I/O 대기 시간을 연산과 겹치게 하여 Throughput을 극대화함.
"""
def __init__(self, batch_size: int = 100):
self.batch_size = batch_size
async def stream_dataset(self, file_path: str) -> AsyncGenerator[Dict[str, Any], None]:
"""
데이터셋을 비동기적으로 스트리밍함.
대용량 파일을 한꺼번에 메모리에 올리지 않고 Chunk 단위로 처리함.
"""
if not os.path.exists(file_path):
print(f"[Loader] Error: File not found {file_path}")
return
print(f"[Loader] Starting Asynchronous Stream: {file_path}")
async with aiofiles.open(file_path, mode='r', encoding='utf-8') as f:
async for line in f:
if not line.strip():
continue
try:
# 비차단 방식으로 JSON 파싱 및 데이터 반환
data = json.loads(line)
yield data
# 다른 작업(추론 등)에 제어권을 넘겨주어 CPU 유휴 방지
await asyncio.sleep(0)
except json.JSONDecodeError:
continue
async def load_batch(self, file_path: str) -> AsyncGenerator[list, None]:
"""
배치 단위로 데이터를 로드하여 추론 엔진의 Throughput을 최적화함.
"""
batch = []
async for item in self.stream_dataset(file_path):
batch.append(item)
if len(batch) >= self.batch_size:
yield batch
batch = []
if batch:
yield batch
async def example_usage():
loader = AsyncDataLoader(batch_size=5)
# 가상의 대용량 데이터 파일 처리 예시
async for batch in loader.load_batch("large_dataset.jsonl"):
print(f"[Loader] Batch Loaded: {len(batch)} items")
# 여기서 InferenceEngine.match_features()를 호출하여 병렬 처리 가능
if __name__ == "__main__":
# asyncio.run(example_usage())
print("Async Loader Module Initialized (Phase 2 Ready)")