432 lines
17 KiB
Python
432 lines
17 KiB
Python
"""
|
|
P-Reinforce Phase 1 — Duplicate Detection Indexer
|
|
==================================================
|
|
Scans 10_Wiki/Topics/, builds an index of every .md file, and emits
|
|
duplicate-candidate clusters into 20_Meta/ReviewQueue/.
|
|
|
|
Read-only with respect to wiki content. No file is modified or moved.
|
|
|
|
Outputs:
|
|
20_Meta/ReviewQueue/_index.json - per-file metadata
|
|
20_Meta/ReviewQueue/duplicate_candidates.md - human-readable cluster report
|
|
20_Meta/ReviewQueue/_clusters.json - machine-readable clusters
|
|
|
|
Detection channels (any one match -> candidate cluster):
|
|
1. Normalized filename match (case-insensitive, strips spaces/underscores/hyphens/parens)
|
|
2. Normalized frontmatter title match
|
|
3. Normalized first-paragraph fingerprint (first 400 chars of body)
|
|
4. Alias intersection (frontmatter aliases overlap)
|
|
|
|
Similarity tiers per P-Reinforce rules:
|
|
>= 0.92 : near-duplicate (UPDATE candidate)
|
|
0.80-0.92 : duplicate candidate (ReviewQueue)
|
|
0.65-0.80 : related (link-only candidate)
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import sys
|
|
import unicodedata
|
|
from collections import defaultdict
|
|
from dataclasses import dataclass, field, asdict
|
|
from difflib import SequenceMatcher
|
|
from pathlib import Path
|
|
from typing import Iterable
|
|
|
|
ROOT = Path(r"E:/Wiki/2nd")
|
|
TOPICS = ROOT / "10_Wiki" / "Topics"
|
|
OUT_DIR = ROOT / "20_Meta" / "ReviewQueue"
|
|
INDEX_JSON = OUT_DIR / "_index.json"
|
|
CLUSTERS_JSON = OUT_DIR / "_clusters.json"
|
|
REPORT_MD = OUT_DIR / "duplicate_candidates.md"
|
|
|
|
SKIP_DIR_NAMES = {".obsidian", ".git", "__pycache__", "node_modules"}
|
|
|
|
# Path components that mark "operational logs / agent runtime", not knowledge.
|
|
# Files under any of these are scanned for awareness but excluded from
|
|
# duplicate-cluster building so they don't drown out real concept duplicates.
|
|
EXCLUDE_PATH_FRAGMENTS = (
|
|
"/sessions/",
|
|
"/_agents/",
|
|
"/_company/",
|
|
"/memory/",
|
|
"/Project_Logs/",
|
|
"/Harness_Research_",
|
|
"/docs/records/",
|
|
"/_Archive_Orphans/",
|
|
"/Post_Drafts/",
|
|
"/UX_Scenarios/",
|
|
)
|
|
|
|
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
|
|
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]")
|
|
CODE_BLOCK_RE = re.compile(r"```.*?```", re.DOTALL)
|
|
HEADING_RE = re.compile(r"^#{1,6}\s+.*$", re.MULTILINE)
|
|
WHITESPACE_RE = re.compile(r"\s+")
|
|
NONALNUM_RE = re.compile(r"[^0-9a-z가-힣]+")
|
|
|
|
|
|
@dataclass
|
|
class FileEntry:
|
|
path: str # relative to ROOT
|
|
abs_path: str
|
|
folder: str # immediate parent folder name under Topics
|
|
filename: str # base name without extension
|
|
norm_name: str # normalized filename for matching
|
|
title: str # H1 title or filename
|
|
norm_title: str
|
|
fm_id: str | None
|
|
fm_aliases: list[str] = field(default_factory=list)
|
|
fm_tags: list[str] = field(default_factory=list)
|
|
fm_status: str | None = None
|
|
fm_trust: str | None = None
|
|
fm_last_reinforced: str | None = None
|
|
fm_redirect_to: str | None = None # if present, this is a merged-stub placeholder
|
|
fm_canonical_id: str | None = None
|
|
body_chars: int = 0
|
|
body_first_para_hash: str = ""
|
|
body_fingerprint: str = "" # short normalized excerpt for similarity
|
|
is_stub: bool = False # body < 200 chars
|
|
is_huge: bool = False # body > 50 KB
|
|
is_redirect: bool = False # already-merged redirect placeholder
|
|
is_operational: bool = False # under sessions/, _agents/, etc — excluded from clustering
|
|
|
|
|
|
def normalize(s: str) -> str:
|
|
"""Aggressive normalization for fuzzy match."""
|
|
if not s:
|
|
return ""
|
|
s = unicodedata.normalize("NFKC", s).lower()
|
|
s = NONALNUM_RE.sub("", s)
|
|
return s
|
|
|
|
|
|
def parse_frontmatter(text: str) -> tuple[dict, str]:
|
|
"""Cheap YAML-ish parser. Tolerates the malformed [[wiki-link]] tags
|
|
and other quirks present in this wiki — no PyYAML dependency."""
|
|
m = FRONTMATTER_RE.match(text)
|
|
if not m:
|
|
return {}, text
|
|
raw = m.group(1)
|
|
body = text[m.end():]
|
|
fm: dict = {}
|
|
current_key: str | None = None
|
|
for line in raw.splitlines():
|
|
if not line.strip() or line.lstrip().startswith("#"):
|
|
continue
|
|
if line.startswith((" ", "\t")) and current_key:
|
|
fm[current_key] = (str(fm.get(current_key, "")) + " " + line.strip()).strip()
|
|
continue
|
|
if ":" not in line:
|
|
continue
|
|
key, _, val = line.partition(":")
|
|
key = key.strip()
|
|
val = val.strip()
|
|
# list form
|
|
if val.startswith("[") and val.endswith("]"):
|
|
inner = val[1:-1].strip()
|
|
items = []
|
|
for it in re.split(r",(?![^\[]*\])", inner):
|
|
it = it.strip().strip("'\"")
|
|
# strip [[wiki-link]] decoration to bare alias
|
|
wm = WIKI_LINK_RE.fullmatch(it)
|
|
if wm:
|
|
it = wm.group(1)
|
|
if it:
|
|
items.append(it)
|
|
fm[key] = items
|
|
else:
|
|
fm[key] = val.strip("'\"")
|
|
current_key = key
|
|
return fm, body
|
|
|
|
|
|
def first_h1(body: str) -> str | None:
|
|
for line in body.splitlines():
|
|
if line.startswith("# ") and not line.startswith("##"):
|
|
return line[2:].strip().lstrip("[").rstrip("]").split("|")[0].strip()
|
|
return None
|
|
|
|
|
|
def fingerprint_body(body: str, max_chars: int = 600) -> str:
|
|
"""Strip frontmatter/headings/code/links, lowercase, collapse whitespace,
|
|
take leading max_chars. Used for SequenceMatcher similarity."""
|
|
b = CODE_BLOCK_RE.sub(" ", body)
|
|
b = HEADING_RE.sub(" ", b)
|
|
b = WIKI_LINK_RE.sub(lambda m: m.group(1), b)
|
|
b = re.sub(r"[*_`>#\-]+", " ", b)
|
|
b = WHITESPACE_RE.sub(" ", b).strip().lower()
|
|
return b[:max_chars]
|
|
|
|
|
|
def first_para_hash(body: str) -> str:
|
|
fp = fingerprint_body(body, 400)
|
|
if not fp:
|
|
return ""
|
|
return hashlib.sha1(fp.encode("utf-8")).hexdigest()[:12]
|
|
|
|
|
|
def iter_md_files(root: Path) -> Iterable[Path]:
|
|
for dirpath, dirs, files in os.walk(root):
|
|
dirs[:] = [d for d in dirs if d not in SKIP_DIR_NAMES]
|
|
for f in files:
|
|
if f.endswith(".md"):
|
|
yield Path(dirpath) / f
|
|
|
|
|
|
def scan() -> list[FileEntry]:
|
|
entries: list[FileEntry] = []
|
|
for p in iter_md_files(TOPICS):
|
|
try:
|
|
text = p.read_text(encoding="utf-8", errors="replace")
|
|
except OSError as e:
|
|
print(f"WARN read fail {p}: {e}", file=sys.stderr)
|
|
continue
|
|
fm, body = parse_frontmatter(text)
|
|
filename = p.stem
|
|
title = first_h1(body) or filename
|
|
body_strip = body.strip()
|
|
redirect_to = fm.get("redirect_to")
|
|
if isinstance(redirect_to, list):
|
|
redirect_to = redirect_to[0] if redirect_to else None
|
|
is_redirect = bool(redirect_to) or (
|
|
title.strip().lower() == "redirect" and len(body_strip) < 400
|
|
)
|
|
rel_path = str(p.relative_to(ROOT)).replace("\\", "/")
|
|
is_operational = any(frag in "/" + rel_path for frag in EXCLUDE_PATH_FRAGMENTS)
|
|
e = FileEntry(
|
|
path=rel_path,
|
|
abs_path=str(p),
|
|
folder=p.parent.name,
|
|
filename=filename,
|
|
norm_name=normalize(filename),
|
|
title=title,
|
|
norm_title=normalize(title),
|
|
fm_id=str(fm.get("id")) if fm.get("id") else None,
|
|
fm_aliases=fm.get("aliases", []) if isinstance(fm.get("aliases"), list) else [],
|
|
fm_tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else [],
|
|
fm_status=fm.get("status"),
|
|
fm_trust=fm.get("source_trust_level"),
|
|
fm_last_reinforced=fm.get("last_reinforced"),
|
|
fm_redirect_to=str(redirect_to) if redirect_to else None,
|
|
fm_canonical_id=str(fm.get("canonical_id")) if fm.get("canonical_id") else None,
|
|
body_chars=len(body_strip),
|
|
body_first_para_hash=first_para_hash(body_strip),
|
|
body_fingerprint=fingerprint_body(body_strip, 600),
|
|
is_stub=len(body_strip) < 200,
|
|
is_huge=len(body_strip) > 50000,
|
|
is_redirect=is_redirect,
|
|
is_operational=is_operational,
|
|
)
|
|
entries.append(e)
|
|
return entries
|
|
|
|
|
|
def build_clusters(entries: list[FileEntry]) -> list[list[FileEntry]]:
|
|
"""Union-find by exact-match channels: norm_name, norm_title, body_first_para_hash.
|
|
|
|
Redirect placeholders are NEVER unioned via body fingerprint (they all share
|
|
the same boilerplate, which would create a giant false-positive cluster).
|
|
They're still unioned via norm_name / norm_title so that a redirect and its
|
|
canonical document end up in the same cluster — that's the relationship we
|
|
want to surface.
|
|
"""
|
|
parent = list(range(len(entries)))
|
|
|
|
def find(x: int) -> int:
|
|
while parent[x] != x:
|
|
parent[x] = parent[parent[x]]
|
|
x = parent[x]
|
|
return x
|
|
|
|
def union(a: int, b: int) -> None:
|
|
ra, rb = find(a), find(b)
|
|
if ra != rb:
|
|
parent[ra] = rb
|
|
|
|
by_name: dict[str, list[int]] = defaultdict(list)
|
|
by_title: dict[str, list[int]] = defaultdict(list)
|
|
by_hash: dict[str, list[int]] = defaultdict(list)
|
|
|
|
for i, e in enumerate(entries):
|
|
if e.is_operational:
|
|
continue # session/agent runtime files: not knowledge candidates
|
|
if e.norm_name:
|
|
by_name[e.norm_name].append(i)
|
|
if e.norm_title and not e.is_redirect: # redirects all titled "Redirect"
|
|
by_title[e.norm_title].append(i)
|
|
# body fingerprint: only meaningful, non-redirect, non-stub bodies
|
|
if (
|
|
e.body_first_para_hash
|
|
and len(e.body_fingerprint) >= 200
|
|
and not e.is_redirect
|
|
and not e.is_stub
|
|
):
|
|
by_hash[e.body_first_para_hash].append(i)
|
|
|
|
for group in list(by_name.values()) + list(by_title.values()) + list(by_hash.values()):
|
|
if len(group) > 1:
|
|
for i in group[1:]:
|
|
union(group[0], i)
|
|
|
|
clusters_map: dict[int, list[int]] = defaultdict(list)
|
|
for i in range(len(entries)):
|
|
clusters_map[find(i)].append(i)
|
|
|
|
clusters = [[entries[i] for i in idxs] for idxs in clusters_map.values() if len(idxs) > 1]
|
|
# sort: largest cluster first, then by first member's title
|
|
clusters.sort(key=lambda c: (-len(c), c[0].norm_title or c[0].norm_name))
|
|
return clusters
|
|
|
|
|
|
def cluster_similarity(c: list[FileEntry]) -> dict:
|
|
"""Compute pairwise body fingerprint similarity within a cluster.
|
|
Returns max/min/avg similarity and the dominant tier."""
|
|
if len(c) < 2:
|
|
return {"max": 1.0, "min": 1.0, "avg": 1.0, "tier": "solo"}
|
|
sims: list[float] = []
|
|
for i in range(len(c)):
|
|
for j in range(i + 1, len(c)):
|
|
a = c[i].body_fingerprint
|
|
b = c[j].body_fingerprint
|
|
if not a or not b:
|
|
sims.append(0.5)
|
|
continue
|
|
sims.append(SequenceMatcher(None, a, b).ratio())
|
|
if not sims:
|
|
return {"max": 0.0, "min": 0.0, "avg": 0.0, "tier": "unknown"}
|
|
mx, mn = max(sims), min(sims)
|
|
avg = sum(sims) / len(sims)
|
|
tier = (
|
|
"near-dup (>=0.92)" if mx >= 0.92
|
|
else "duplicate-candidate (0.80-0.92)" if mx >= 0.80
|
|
else "related (0.65-0.80)" if mx >= 0.65
|
|
else "weak-link (<0.65)"
|
|
)
|
|
return {"max": round(mx, 3), "min": round(mn, 3), "avg": round(avg, 3), "tier": tier}
|
|
|
|
|
|
def write_index(entries: list[FileEntry]) -> None:
|
|
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
|
with INDEX_JSON.open("w", encoding="utf-8") as f:
|
|
json.dump([asdict(e) for e in entries], f, ensure_ascii=False, indent=1)
|
|
|
|
|
|
def write_clusters(clusters: list[list[FileEntry]], stats_per_cluster: list[dict]) -> None:
|
|
payload = []
|
|
for c, s in zip(clusters, stats_per_cluster):
|
|
payload.append({
|
|
"size": len(c),
|
|
"stats": s,
|
|
"members": [
|
|
{
|
|
"path": e.path,
|
|
"folder": e.folder,
|
|
"filename": e.filename,
|
|
"title": e.title,
|
|
"body_chars": e.body_chars,
|
|
"fm_trust": e.fm_trust,
|
|
"fm_last_reinforced": e.fm_last_reinforced,
|
|
"is_stub": e.is_stub,
|
|
} for e in c
|
|
],
|
|
})
|
|
with CLUSTERS_JSON.open("w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=1)
|
|
|
|
|
|
def write_report(entries: list[FileEntry], clusters: list[list[FileEntry]], stats: list[dict]) -> None:
|
|
n_files = len(entries)
|
|
n_clustered = sum(len(c) for c in clusters)
|
|
n_stub = sum(1 for e in entries if e.is_stub)
|
|
n_huge = sum(1 for e in entries if e.is_huge)
|
|
n_redirect = sum(1 for e in entries if e.is_redirect)
|
|
n_operational = sum(1 for e in entries if e.is_operational)
|
|
near_dup = [c for c, s in zip(clusters, stats) if s["max"] >= 0.92]
|
|
dup_cand = [c for c, s in zip(clusters, stats) if 0.80 <= s["max"] < 0.92]
|
|
related = [c for c, s in zip(clusters, stats) if 0.65 <= s["max"] < 0.80]
|
|
|
|
folder_dup_pairs: dict[tuple[str, str], int] = defaultdict(int)
|
|
for c in clusters:
|
|
folders = sorted({e.folder for e in c})
|
|
if len(folders) >= 2:
|
|
for i in range(len(folders)):
|
|
for j in range(i + 1, len(folders)):
|
|
folder_dup_pairs[(folders[i], folders[j])] += 1
|
|
|
|
lines: list[str] = []
|
|
lines.append("# Duplicate Candidates (P-Reinforce Phase 1 Index)\n")
|
|
lines.append("> 자동 생성. 이 보고서는 **변경 제안**일 뿐 실제 파일은 수정되지 않았다.\n")
|
|
lines.append("> 사용자가 클러스터별로 검토하고 MERGE/UPDATE/CREATE/REJECT 판단을 내려야 한다.\n")
|
|
lines.append("")
|
|
lines.append("## 요약\n")
|
|
lines.append(f"- 총 파일: **{n_files}**")
|
|
lines.append(f"- 중복 후보 클러스터에 포함된 파일: **{n_clustered}**")
|
|
lines.append(f"- 클러스터 수: **{len(clusters)}** (>=0.92 near-dup: {len(near_dup)}, 0.80-0.92 dup-cand: {len(dup_cand)}, 0.65-0.80 related: {len(related)})")
|
|
lines.append(f"- 이미 merged (`redirect_to` 필드 보유): **{n_redirect}**")
|
|
lines.append(f"- 운영 로그 (sessions/_agents/_company 등, 클러스터링 제외): **{n_operational}**")
|
|
lines.append(f"- 지식 문서 후보 (총수 - 운영 로그): **{n_files - n_operational}**")
|
|
lines.append(f"- 빈약 stub (<200 chars, redirect 제외): **{n_stub - n_redirect}**")
|
|
lines.append(f"- 거대 문서 (>50KB): **{n_huge}**")
|
|
lines.append("")
|
|
|
|
if folder_dup_pairs:
|
|
lines.append("## 폴더 간 중복 핫스팟 (Top 20)\n")
|
|
lines.append("| 폴더 A | 폴더 B | 공유 클러스터 |")
|
|
lines.append("|---|---|---|")
|
|
for (a, b), n in sorted(folder_dup_pairs.items(), key=lambda x: -x[1])[:20]:
|
|
lines.append(f"| `{a}` | `{b}` | {n} |")
|
|
lines.append("")
|
|
|
|
def emit_section(title: str, group: list[list[FileEntry]], group_stats: list[dict], cap: int = 80) -> None:
|
|
if not group:
|
|
return
|
|
lines.append(f"## {title} (총 {len(group)})\n")
|
|
if len(group) > cap:
|
|
lines.append(f"> 상위 {cap}개만 표시. 전체는 `_clusters.json` 참조.\n")
|
|
for c, s in list(zip(group, group_stats))[:cap]:
|
|
head = c[0].title or c[0].filename
|
|
lines.append(f"### `{head}` (members: {len(c)}, max_sim: {s['max']}, tier: {s['tier']})")
|
|
for e in c:
|
|
stub_tag = " *[stub]*" if e.is_stub else ""
|
|
huge_tag = " *[huge]*" if e.is_huge else ""
|
|
lr = e.fm_last_reinforced or "?"
|
|
trust = e.fm_trust or "?"
|
|
lines.append(f"- [{e.path}]({e.path}) — {e.body_chars} chars, trust={trust}, last={lr}{stub_tag}{huge_tag}")
|
|
lines.append("")
|
|
|
|
pairs = list(zip(clusters, stats))
|
|
emit_section("🔴 Near-duplicate (>=0.92) — UPDATE 권장", [c for c, s in pairs if s["max"] >= 0.92], [s for c, s in pairs if s["max"] >= 0.92])
|
|
emit_section("🟡 Duplicate candidate (0.80-0.92) — 검토 필요", [c for c, s in pairs if 0.80 <= s["max"] < 0.92], [s for c, s in pairs if 0.80 <= s["max"] < 0.92])
|
|
emit_section("🟢 Related (0.65-0.80) — 연결만 권장", [c for c, s in pairs if 0.65 <= s["max"] < 0.80], [s for c, s in pairs if 0.65 <= s["max"] < 0.80])
|
|
emit_section("⚪ Weak-link (<0.65) — 동명/동일 hash지만 내용 다름", [c for c, s in pairs if s["max"] < 0.65], [s for c, s in pairs if s["max"] < 0.65])
|
|
|
|
REPORT_MD.write_text("\n".join(lines), encoding="utf-8")
|
|
|
|
|
|
def main() -> None:
|
|
print(f"[1/4] Scanning {TOPICS} ...", file=sys.stderr)
|
|
entries = scan()
|
|
print(f" {len(entries)} files indexed", file=sys.stderr)
|
|
|
|
print(f"[2/4] Writing per-file index -> {INDEX_JSON}", file=sys.stderr)
|
|
write_index(entries)
|
|
|
|
print(f"[3/4] Building duplicate clusters ...", file=sys.stderr)
|
|
clusters = build_clusters(entries)
|
|
stats = [cluster_similarity(c) for c in clusters]
|
|
print(f" {len(clusters)} clusters with >=2 members", file=sys.stderr)
|
|
|
|
print(f"[4/4] Writing report -> {REPORT_MD}", file=sys.stderr)
|
|
write_clusters(clusters, stats)
|
|
write_report(entries, clusters, stats)
|
|
print("DONE.", file=sys.stderr)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|