Files
connectai/core_py/inference.py
T

92 lines
3.7 KiB
Python

import numpy as np
import time
from typing import List, Optional
class FeatureExtractor:
"""
고성능 특징 추출 및 매칭 엔진 (Phase 1: Vectorization Optimized)
기존의 O(N^2) 중첩 루프를 NumPy 행렬 연산으로 대체하여 계산 효율을 극대화함.
"""
def __init__(self, dimension: int = 128):
self.dimension = dimension
self.memory_pool = {} # Phase 1: Simple memory pooling for tensor reuse
def calculate_similarity_vectorized(self, query_vector: np.ndarray, feature_matrix: np.ndarray) -> np.ndarray:
"""
벡터화된 유사도 계산 (O(N))
중첩 루프 없이 행렬 곱을 통해 모든 특징점과의 유사도를 한 번에 계산함.
"""
# 정규화 (Cosine Similarity 준비)
query_norm = query_vector / (np.linalg.norm(query_vector) + 1e-9)
matrix_norm = feature_matrix / (np.linalg.norm(feature_matrix, axis=1, keepdims=True) + 1e-9)
# 행렬 곱을 통한 유사도 산출 (Dot Product)
# O(N^2) 루프를 C로 최적화된 NumPy 연산으로 대체
similarities = np.dot(matrix_norm, query_norm)
return similarities
def match_features(self, query: List[float], database: List[List[float]], threshold: float = 0.8) -> List[int]:
"""
특징 매칭 메인 인터페이스 (P1 & P2 최적화)
"""
if not database:
return []
# P2: NumPy 배열로 데이터 구조 최적화 (메모리 연속성 확보)
q = np.array(query, dtype=np.float32)
db = np.array(database, dtype=np.float32)
start_time = time.perf_counter()
# P1: 벡터화 연산 수행 (O(N))
scores = self.calculate_similarity_vectorized(q, db)
matches = np.where(scores >= threshold)[0].tolist()
latency = (time.perf_counter() - start_time) * 1000
print(f"[Inference] Vectorized Match Complete: {len(matches)} matches, Latency: {latency:.4f}ms")
return matches
def match_features_parallel(self, query: List[float], database: List[List[float]], threshold: float = 0.8, n_jobs: int = -1) -> List[int]:
"""
P3: 멀티프로세싱 기반 병렬 매칭 (Scalability 최적화)
대규모 데이터셋을 여러 배치로 나누어 멀티 코어 CPU에서 병렬 처리함.
"""
import multiprocessing as mp
from concurrent.futures import ProcessPoolExecutor
if n_jobs == -1:
n_jobs = mp.cpu_count()
db_size = len(database)
batch_size = max(1, db_size // n_jobs)
batches = [database[i:i + batch_size] for i in range(0, db_size, batch_size)]
print(f"[Inference] P3 Parallelization Active: Using {n_jobs} cores for {len(batches)} batches.")
all_matches = []
with ProcessPoolExecutor(max_workers=n_jobs) as executor:
# 각 프로세스에서 벡터화된 매칭 수행
futures = [executor.submit(self.match_features, query, batch, threshold) for batch in batches]
current_offset = 0
for i, future in enumerate(futures):
batch_matches = future.result()
# 오프셋 보정하여 전체 인덱스로 변환
all_matches.extend([idx + current_offset for idx in batch_matches])
current_offset += len(batches[i])
return all_matches
# Proof of Concept (Benchmark)
if __name__ == "__main__":
extractor = FeatureExtractor(dimension=256)
N = 10000
dummy_query = np.random.rand(256).tolist()
dummy_db = np.random.rand(N, 256).tolist()
print(f"Benchmarking N={N} with Vectorized Engine...")
extractor.match_features(dummy_query, dummy_db)