Files
2nd/_tools/p_reinforce_index.py
T

432 lines
17 KiB
Python

"""
P-Reinforce Phase 1 — Duplicate Detection Indexer
==================================================
Scans 10_Wiki/Topics/, builds an index of every .md file, and emits
duplicate-candidate clusters into 20_Meta/ReviewQueue/.
Read-only with respect to wiki content. No file is modified or moved.
Outputs:
20_Meta/ReviewQueue/_index.json - per-file metadata
20_Meta/ReviewQueue/duplicate_candidates.md - human-readable cluster report
20_Meta/ReviewQueue/_clusters.json - machine-readable clusters
Detection channels (any one match -> candidate cluster):
1. Normalized filename match (case-insensitive, strips spaces/underscores/hyphens/parens)
2. Normalized frontmatter title match
3. Normalized first-paragraph fingerprint (first 400 chars of body)
4. Alias intersection (frontmatter aliases overlap)
Similarity tiers per P-Reinforce rules:
>= 0.92 : near-duplicate (UPDATE candidate)
0.80-0.92 : duplicate candidate (ReviewQueue)
0.65-0.80 : related (link-only candidate)
"""
from __future__ import annotations
import hashlib
import json
import os
import re
import sys
import unicodedata
from collections import defaultdict
from dataclasses import dataclass, field, asdict
from difflib import SequenceMatcher
from pathlib import Path
from typing import Iterable
ROOT = Path(r"E:/Wiki/2nd")
TOPICS = ROOT / "10_Wiki" / "Topics"
OUT_DIR = ROOT / "20_Meta" / "ReviewQueue"
INDEX_JSON = OUT_DIR / "_index.json"
CLUSTERS_JSON = OUT_DIR / "_clusters.json"
REPORT_MD = OUT_DIR / "duplicate_candidates.md"
SKIP_DIR_NAMES = {".obsidian", ".git", "__pycache__", "node_modules"}
# Path components that mark "operational logs / agent runtime", not knowledge.
# Files under any of these are scanned for awareness but excluded from
# duplicate-cluster building so they don't drown out real concept duplicates.
EXCLUDE_PATH_FRAGMENTS = (
"/sessions/",
"/_agents/",
"/_company/",
"/memory/",
"/Project_Logs/",
"/Harness_Research_",
"/docs/records/",
"/_Archive_Orphans/",
"/Post_Drafts/",
"/UX_Scenarios/",
)
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]")
CODE_BLOCK_RE = re.compile(r"```.*?```", re.DOTALL)
HEADING_RE = re.compile(r"^#{1,6}\s+.*$", re.MULTILINE)
WHITESPACE_RE = re.compile(r"\s+")
NONALNUM_RE = re.compile(r"[^0-9a-z가-힣]+")
@dataclass
class FileEntry:
path: str # relative to ROOT
abs_path: str
folder: str # immediate parent folder name under Topics
filename: str # base name without extension
norm_name: str # normalized filename for matching
title: str # H1 title or filename
norm_title: str
fm_id: str | None
fm_aliases: list[str] = field(default_factory=list)
fm_tags: list[str] = field(default_factory=list)
fm_status: str | None = None
fm_trust: str | None = None
fm_last_reinforced: str | None = None
fm_redirect_to: str | None = None # if present, this is a merged-stub placeholder
fm_canonical_id: str | None = None
body_chars: int = 0
body_first_para_hash: str = ""
body_fingerprint: str = "" # short normalized excerpt for similarity
is_stub: bool = False # body < 200 chars
is_huge: bool = False # body > 50 KB
is_redirect: bool = False # already-merged redirect placeholder
is_operational: bool = False # under sessions/, _agents/, etc — excluded from clustering
def normalize(s: str) -> str:
"""Aggressive normalization for fuzzy match."""
if not s:
return ""
s = unicodedata.normalize("NFKC", s).lower()
s = NONALNUM_RE.sub("", s)
return s
def parse_frontmatter(text: str) -> tuple[dict, str]:
"""Cheap YAML-ish parser. Tolerates the malformed [[wiki-link]] tags
and other quirks present in this wiki — no PyYAML dependency."""
m = FRONTMATTER_RE.match(text)
if not m:
return {}, text
raw = m.group(1)
body = text[m.end():]
fm: dict = {}
current_key: str | None = None
for line in raw.splitlines():
if not line.strip() or line.lstrip().startswith("#"):
continue
if line.startswith((" ", "\t")) and current_key:
fm[current_key] = (str(fm.get(current_key, "")) + " " + line.strip()).strip()
continue
if ":" not in line:
continue
key, _, val = line.partition(":")
key = key.strip()
val = val.strip()
# list form
if val.startswith("[") and val.endswith("]"):
inner = val[1:-1].strip()
items = []
for it in re.split(r",(?![^\[]*\])", inner):
it = it.strip().strip("'\"")
# strip [[wiki-link]] decoration to bare alias
wm = WIKI_LINK_RE.fullmatch(it)
if wm:
it = wm.group(1)
if it:
items.append(it)
fm[key] = items
else:
fm[key] = val.strip("'\"")
current_key = key
return fm, body
def first_h1(body: str) -> str | None:
for line in body.splitlines():
if line.startswith("# ") and not line.startswith("##"):
return line[2:].strip().lstrip("[").rstrip("]").split("|")[0].strip()
return None
def fingerprint_body(body: str, max_chars: int = 600) -> str:
"""Strip frontmatter/headings/code/links, lowercase, collapse whitespace,
take leading max_chars. Used for SequenceMatcher similarity."""
b = CODE_BLOCK_RE.sub(" ", body)
b = HEADING_RE.sub(" ", b)
b = WIKI_LINK_RE.sub(lambda m: m.group(1), b)
b = re.sub(r"[*_`>#\-]+", " ", b)
b = WHITESPACE_RE.sub(" ", b).strip().lower()
return b[:max_chars]
def first_para_hash(body: str) -> str:
fp = fingerprint_body(body, 400)
if not fp:
return ""
return hashlib.sha1(fp.encode("utf-8")).hexdigest()[:12]
def iter_md_files(root: Path) -> Iterable[Path]:
for dirpath, dirs, files in os.walk(root):
dirs[:] = [d for d in dirs if d not in SKIP_DIR_NAMES]
for f in files:
if f.endswith(".md"):
yield Path(dirpath) / f
def scan() -> list[FileEntry]:
entries: list[FileEntry] = []
for p in iter_md_files(TOPICS):
try:
text = p.read_text(encoding="utf-8", errors="replace")
except OSError as e:
print(f"WARN read fail {p}: {e}", file=sys.stderr)
continue
fm, body = parse_frontmatter(text)
filename = p.stem
title = first_h1(body) or filename
body_strip = body.strip()
redirect_to = fm.get("redirect_to")
if isinstance(redirect_to, list):
redirect_to = redirect_to[0] if redirect_to else None
is_redirect = bool(redirect_to) or (
title.strip().lower() == "redirect" and len(body_strip) < 400
)
rel_path = str(p.relative_to(ROOT)).replace("\\", "/")
is_operational = any(frag in "/" + rel_path for frag in EXCLUDE_PATH_FRAGMENTS)
e = FileEntry(
path=rel_path,
abs_path=str(p),
folder=p.parent.name,
filename=filename,
norm_name=normalize(filename),
title=title,
norm_title=normalize(title),
fm_id=str(fm.get("id")) if fm.get("id") else None,
fm_aliases=fm.get("aliases", []) if isinstance(fm.get("aliases"), list) else [],
fm_tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else [],
fm_status=fm.get("status"),
fm_trust=fm.get("source_trust_level"),
fm_last_reinforced=fm.get("last_reinforced"),
fm_redirect_to=str(redirect_to) if redirect_to else None,
fm_canonical_id=str(fm.get("canonical_id")) if fm.get("canonical_id") else None,
body_chars=len(body_strip),
body_first_para_hash=first_para_hash(body_strip),
body_fingerprint=fingerprint_body(body_strip, 600),
is_stub=len(body_strip) < 200,
is_huge=len(body_strip) > 50000,
is_redirect=is_redirect,
is_operational=is_operational,
)
entries.append(e)
return entries
def build_clusters(entries: list[FileEntry]) -> list[list[FileEntry]]:
"""Union-find by exact-match channels: norm_name, norm_title, body_first_para_hash.
Redirect placeholders are NEVER unioned via body fingerprint (they all share
the same boilerplate, which would create a giant false-positive cluster).
They're still unioned via norm_name / norm_title so that a redirect and its
canonical document end up in the same cluster — that's the relationship we
want to surface.
"""
parent = list(range(len(entries)))
def find(x: int) -> int:
while parent[x] != x:
parent[x] = parent[parent[x]]
x = parent[x]
return x
def union(a: int, b: int) -> None:
ra, rb = find(a), find(b)
if ra != rb:
parent[ra] = rb
by_name: dict[str, list[int]] = defaultdict(list)
by_title: dict[str, list[int]] = defaultdict(list)
by_hash: dict[str, list[int]] = defaultdict(list)
for i, e in enumerate(entries):
if e.is_operational:
continue # session/agent runtime files: not knowledge candidates
if e.norm_name:
by_name[e.norm_name].append(i)
if e.norm_title and not e.is_redirect: # redirects all titled "Redirect"
by_title[e.norm_title].append(i)
# body fingerprint: only meaningful, non-redirect, non-stub bodies
if (
e.body_first_para_hash
and len(e.body_fingerprint) >= 200
and not e.is_redirect
and not e.is_stub
):
by_hash[e.body_first_para_hash].append(i)
for group in list(by_name.values()) + list(by_title.values()) + list(by_hash.values()):
if len(group) > 1:
for i in group[1:]:
union(group[0], i)
clusters_map: dict[int, list[int]] = defaultdict(list)
for i in range(len(entries)):
clusters_map[find(i)].append(i)
clusters = [[entries[i] for i in idxs] for idxs in clusters_map.values() if len(idxs) > 1]
# sort: largest cluster first, then by first member's title
clusters.sort(key=lambda c: (-len(c), c[0].norm_title or c[0].norm_name))
return clusters
def cluster_similarity(c: list[FileEntry]) -> dict:
"""Compute pairwise body fingerprint similarity within a cluster.
Returns max/min/avg similarity and the dominant tier."""
if len(c) < 2:
return {"max": 1.0, "min": 1.0, "avg": 1.0, "tier": "solo"}
sims: list[float] = []
for i in range(len(c)):
for j in range(i + 1, len(c)):
a = c[i].body_fingerprint
b = c[j].body_fingerprint
if not a or not b:
sims.append(0.5)
continue
sims.append(SequenceMatcher(None, a, b).ratio())
if not sims:
return {"max": 0.0, "min": 0.0, "avg": 0.0, "tier": "unknown"}
mx, mn = max(sims), min(sims)
avg = sum(sims) / len(sims)
tier = (
"near-dup (>=0.92)" if mx >= 0.92
else "duplicate-candidate (0.80-0.92)" if mx >= 0.80
else "related (0.65-0.80)" if mx >= 0.65
else "weak-link (<0.65)"
)
return {"max": round(mx, 3), "min": round(mn, 3), "avg": round(avg, 3), "tier": tier}
def write_index(entries: list[FileEntry]) -> None:
OUT_DIR.mkdir(parents=True, exist_ok=True)
with INDEX_JSON.open("w", encoding="utf-8") as f:
json.dump([asdict(e) for e in entries], f, ensure_ascii=False, indent=1)
def write_clusters(clusters: list[list[FileEntry]], stats_per_cluster: list[dict]) -> None:
payload = []
for c, s in zip(clusters, stats_per_cluster):
payload.append({
"size": len(c),
"stats": s,
"members": [
{
"path": e.path,
"folder": e.folder,
"filename": e.filename,
"title": e.title,
"body_chars": e.body_chars,
"fm_trust": e.fm_trust,
"fm_last_reinforced": e.fm_last_reinforced,
"is_stub": e.is_stub,
} for e in c
],
})
with CLUSTERS_JSON.open("w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=1)
def write_report(entries: list[FileEntry], clusters: list[list[FileEntry]], stats: list[dict]) -> None:
n_files = len(entries)
n_clustered = sum(len(c) for c in clusters)
n_stub = sum(1 for e in entries if e.is_stub)
n_huge = sum(1 for e in entries if e.is_huge)
n_redirect = sum(1 for e in entries if e.is_redirect)
n_operational = sum(1 for e in entries if e.is_operational)
near_dup = [c for c, s in zip(clusters, stats) if s["max"] >= 0.92]
dup_cand = [c for c, s in zip(clusters, stats) if 0.80 <= s["max"] < 0.92]
related = [c for c, s in zip(clusters, stats) if 0.65 <= s["max"] < 0.80]
folder_dup_pairs: dict[tuple[str, str], int] = defaultdict(int)
for c in clusters:
folders = sorted({e.folder for e in c})
if len(folders) >= 2:
for i in range(len(folders)):
for j in range(i + 1, len(folders)):
folder_dup_pairs[(folders[i], folders[j])] += 1
lines: list[str] = []
lines.append("# Duplicate Candidates (P-Reinforce Phase 1 Index)\n")
lines.append("> 자동 생성. 이 보고서는 **변경 제안**일 뿐 실제 파일은 수정되지 않았다.\n")
lines.append("> 사용자가 클러스터별로 검토하고 MERGE/UPDATE/CREATE/REJECT 판단을 내려야 한다.\n")
lines.append("")
lines.append("## 요약\n")
lines.append(f"- 총 파일: **{n_files}**")
lines.append(f"- 중복 후보 클러스터에 포함된 파일: **{n_clustered}**")
lines.append(f"- 클러스터 수: **{len(clusters)}** (>=0.92 near-dup: {len(near_dup)}, 0.80-0.92 dup-cand: {len(dup_cand)}, 0.65-0.80 related: {len(related)})")
lines.append(f"- 이미 merged (`redirect_to` 필드 보유): **{n_redirect}**")
lines.append(f"- 운영 로그 (sessions/_agents/_company 등, 클러스터링 제외): **{n_operational}**")
lines.append(f"- 지식 문서 후보 (총수 - 운영 로그): **{n_files - n_operational}**")
lines.append(f"- 빈약 stub (<200 chars, redirect 제외): **{n_stub - n_redirect}**")
lines.append(f"- 거대 문서 (>50KB): **{n_huge}**")
lines.append("")
if folder_dup_pairs:
lines.append("## 폴더 간 중복 핫스팟 (Top 20)\n")
lines.append("| 폴더 A | 폴더 B | 공유 클러스터 |")
lines.append("|---|---|---|")
for (a, b), n in sorted(folder_dup_pairs.items(), key=lambda x: -x[1])[:20]:
lines.append(f"| `{a}` | `{b}` | {n} |")
lines.append("")
def emit_section(title: str, group: list[list[FileEntry]], group_stats: list[dict], cap: int = 80) -> None:
if not group:
return
lines.append(f"## {title} (총 {len(group)})\n")
if len(group) > cap:
lines.append(f"> 상위 {cap}개만 표시. 전체는 `_clusters.json` 참조.\n")
for c, s in list(zip(group, group_stats))[:cap]:
head = c[0].title or c[0].filename
lines.append(f"### `{head}` (members: {len(c)}, max_sim: {s['max']}, tier: {s['tier']})")
for e in c:
stub_tag = " *[stub]*" if e.is_stub else ""
huge_tag = " *[huge]*" if e.is_huge else ""
lr = e.fm_last_reinforced or "?"
trust = e.fm_trust or "?"
lines.append(f"- [{e.path}]({e.path}) — {e.body_chars} chars, trust={trust}, last={lr}{stub_tag}{huge_tag}")
lines.append("")
pairs = list(zip(clusters, stats))
emit_section("🔴 Near-duplicate (>=0.92) — UPDATE 권장", [c for c, s in pairs if s["max"] >= 0.92], [s for c, s in pairs if s["max"] >= 0.92])
emit_section("🟡 Duplicate candidate (0.80-0.92) — 검토 필요", [c for c, s in pairs if 0.80 <= s["max"] < 0.92], [s for c, s in pairs if 0.80 <= s["max"] < 0.92])
emit_section("🟢 Related (0.65-0.80) — 연결만 권장", [c for c, s in pairs if 0.65 <= s["max"] < 0.80], [s for c, s in pairs if 0.65 <= s["max"] < 0.80])
emit_section("⚪ Weak-link (<0.65) — 동명/동일 hash지만 내용 다름", [c for c, s in pairs if s["max"] < 0.65], [s for c, s in pairs if s["max"] < 0.65])
REPORT_MD.write_text("\n".join(lines), encoding="utf-8")
def main() -> None:
print(f"[1/4] Scanning {TOPICS} ...", file=sys.stderr)
entries = scan()
print(f" {len(entries)} files indexed", file=sys.stderr)
print(f"[2/4] Writing per-file index -> {INDEX_JSON}", file=sys.stderr)
write_index(entries)
print(f"[3/4] Building duplicate clusters ...", file=sys.stderr)
clusters = build_clusters(entries)
stats = [cluster_similarity(c) for c in clusters]
print(f" {len(clusters)} clusters with >=2 members", file=sys.stderr)
print(f"[4/4] Writing report -> {REPORT_MD}", file=sys.stderr)
write_clusters(clusters, stats)
write_report(entries, clusters, stats)
print("DONE.", file=sys.stderr)
if __name__ == "__main__":
main()