feat: Wiki 지식 자산 업데이트 - UX Scenarios, Frontend, Game Design, Topics 추가 [2026-05-08]
This commit is contained in:
@@ -0,0 +1,431 @@
|
||||
"""
|
||||
P-Reinforce Phase 1 — Duplicate Detection Indexer
|
||||
==================================================
|
||||
Scans 10_Wiki/Topics/, builds an index of every .md file, and emits
|
||||
duplicate-candidate clusters into 20_Meta/ReviewQueue/.
|
||||
|
||||
Read-only with respect to wiki content. No file is modified or moved.
|
||||
|
||||
Outputs:
|
||||
20_Meta/ReviewQueue/_index.json - per-file metadata
|
||||
20_Meta/ReviewQueue/duplicate_candidates.md - human-readable cluster report
|
||||
20_Meta/ReviewQueue/_clusters.json - machine-readable clusters
|
||||
|
||||
Detection channels (any one match -> candidate cluster):
|
||||
1. Normalized filename match (case-insensitive, strips spaces/underscores/hyphens/parens)
|
||||
2. Normalized frontmatter title match
|
||||
3. Normalized first-paragraph fingerprint (first 400 chars of body)
|
||||
4. Alias intersection (frontmatter aliases overlap)
|
||||
|
||||
Similarity tiers per P-Reinforce rules:
|
||||
>= 0.92 : near-duplicate (UPDATE candidate)
|
||||
0.80-0.92 : duplicate candidate (ReviewQueue)
|
||||
0.65-0.80 : related (link-only candidate)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import unicodedata
|
||||
from collections import defaultdict
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from difflib import SequenceMatcher
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
ROOT = Path(r"E:/Wiki/2nd")
|
||||
TOPICS = ROOT / "10_Wiki" / "Topics"
|
||||
OUT_DIR = ROOT / "20_Meta" / "ReviewQueue"
|
||||
INDEX_JSON = OUT_DIR / "_index.json"
|
||||
CLUSTERS_JSON = OUT_DIR / "_clusters.json"
|
||||
REPORT_MD = OUT_DIR / "duplicate_candidates.md"
|
||||
|
||||
SKIP_DIR_NAMES = {".obsidian", ".git", "__pycache__", "node_modules"}
|
||||
|
||||
# Path components that mark "operational logs / agent runtime", not knowledge.
|
||||
# Files under any of these are scanned for awareness but excluded from
|
||||
# duplicate-cluster building so they don't drown out real concept duplicates.
|
||||
EXCLUDE_PATH_FRAGMENTS = (
|
||||
"/sessions/",
|
||||
"/_agents/",
|
||||
"/_company/",
|
||||
"/memory/",
|
||||
"/Project_Logs/",
|
||||
"/Harness_Research_",
|
||||
"/docs/records/",
|
||||
"/_Archive_Orphans/",
|
||||
"/Post_Drafts/",
|
||||
"/UX_Scenarios/",
|
||||
)
|
||||
|
||||
FRONTMATTER_RE = re.compile(r"^---\s*\n(.*?)\n---\s*\n?", re.DOTALL)
|
||||
WIKI_LINK_RE = re.compile(r"\[\[([^\]|]+?)(?:\|[^\]]+)?\]\]")
|
||||
CODE_BLOCK_RE = re.compile(r"```.*?```", re.DOTALL)
|
||||
HEADING_RE = re.compile(r"^#{1,6}\s+.*$", re.MULTILINE)
|
||||
WHITESPACE_RE = re.compile(r"\s+")
|
||||
NONALNUM_RE = re.compile(r"[^0-9a-z가-힣]+")
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileEntry:
|
||||
path: str # relative to ROOT
|
||||
abs_path: str
|
||||
folder: str # immediate parent folder name under Topics
|
||||
filename: str # base name without extension
|
||||
norm_name: str # normalized filename for matching
|
||||
title: str # H1 title or filename
|
||||
norm_title: str
|
||||
fm_id: str | None
|
||||
fm_aliases: list[str] = field(default_factory=list)
|
||||
fm_tags: list[str] = field(default_factory=list)
|
||||
fm_status: str | None = None
|
||||
fm_trust: str | None = None
|
||||
fm_last_reinforced: str | None = None
|
||||
fm_redirect_to: str | None = None # if present, this is a merged-stub placeholder
|
||||
fm_canonical_id: str | None = None
|
||||
body_chars: int = 0
|
||||
body_first_para_hash: str = ""
|
||||
body_fingerprint: str = "" # short normalized excerpt for similarity
|
||||
is_stub: bool = False # body < 200 chars
|
||||
is_huge: bool = False # body > 50 KB
|
||||
is_redirect: bool = False # already-merged redirect placeholder
|
||||
is_operational: bool = False # under sessions/, _agents/, etc — excluded from clustering
|
||||
|
||||
|
||||
def normalize(s: str) -> str:
|
||||
"""Aggressive normalization for fuzzy match."""
|
||||
if not s:
|
||||
return ""
|
||||
s = unicodedata.normalize("NFKC", s).lower()
|
||||
s = NONALNUM_RE.sub("", s)
|
||||
return s
|
||||
|
||||
|
||||
def parse_frontmatter(text: str) -> tuple[dict, str]:
|
||||
"""Cheap YAML-ish parser. Tolerates the malformed [[wiki-link]] tags
|
||||
and other quirks present in this wiki — no PyYAML dependency."""
|
||||
m = FRONTMATTER_RE.match(text)
|
||||
if not m:
|
||||
return {}, text
|
||||
raw = m.group(1)
|
||||
body = text[m.end():]
|
||||
fm: dict = {}
|
||||
current_key: str | None = None
|
||||
for line in raw.splitlines():
|
||||
if not line.strip() or line.lstrip().startswith("#"):
|
||||
continue
|
||||
if line.startswith((" ", "\t")) and current_key:
|
||||
fm[current_key] = (str(fm.get(current_key, "")) + " " + line.strip()).strip()
|
||||
continue
|
||||
if ":" not in line:
|
||||
continue
|
||||
key, _, val = line.partition(":")
|
||||
key = key.strip()
|
||||
val = val.strip()
|
||||
# list form
|
||||
if val.startswith("[") and val.endswith("]"):
|
||||
inner = val[1:-1].strip()
|
||||
items = []
|
||||
for it in re.split(r",(?![^\[]*\])", inner):
|
||||
it = it.strip().strip("'\"")
|
||||
# strip [[wiki-link]] decoration to bare alias
|
||||
wm = WIKI_LINK_RE.fullmatch(it)
|
||||
if wm:
|
||||
it = wm.group(1)
|
||||
if it:
|
||||
items.append(it)
|
||||
fm[key] = items
|
||||
else:
|
||||
fm[key] = val.strip("'\"")
|
||||
current_key = key
|
||||
return fm, body
|
||||
|
||||
|
||||
def first_h1(body: str) -> str | None:
|
||||
for line in body.splitlines():
|
||||
if line.startswith("# ") and not line.startswith("##"):
|
||||
return line[2:].strip().lstrip("[").rstrip("]").split("|")[0].strip()
|
||||
return None
|
||||
|
||||
|
||||
def fingerprint_body(body: str, max_chars: int = 600) -> str:
|
||||
"""Strip frontmatter/headings/code/links, lowercase, collapse whitespace,
|
||||
take leading max_chars. Used for SequenceMatcher similarity."""
|
||||
b = CODE_BLOCK_RE.sub(" ", body)
|
||||
b = HEADING_RE.sub(" ", b)
|
||||
b = WIKI_LINK_RE.sub(lambda m: m.group(1), b)
|
||||
b = re.sub(r"[*_`>#\-]+", " ", b)
|
||||
b = WHITESPACE_RE.sub(" ", b).strip().lower()
|
||||
return b[:max_chars]
|
||||
|
||||
|
||||
def first_para_hash(body: str) -> str:
|
||||
fp = fingerprint_body(body, 400)
|
||||
if not fp:
|
||||
return ""
|
||||
return hashlib.sha1(fp.encode("utf-8")).hexdigest()[:12]
|
||||
|
||||
|
||||
def iter_md_files(root: Path) -> Iterable[Path]:
|
||||
for dirpath, dirs, files in os.walk(root):
|
||||
dirs[:] = [d for d in dirs if d not in SKIP_DIR_NAMES]
|
||||
for f in files:
|
||||
if f.endswith(".md"):
|
||||
yield Path(dirpath) / f
|
||||
|
||||
|
||||
def scan() -> list[FileEntry]:
|
||||
entries: list[FileEntry] = []
|
||||
for p in iter_md_files(TOPICS):
|
||||
try:
|
||||
text = p.read_text(encoding="utf-8", errors="replace")
|
||||
except OSError as e:
|
||||
print(f"WARN read fail {p}: {e}", file=sys.stderr)
|
||||
continue
|
||||
fm, body = parse_frontmatter(text)
|
||||
filename = p.stem
|
||||
title = first_h1(body) or filename
|
||||
body_strip = body.strip()
|
||||
redirect_to = fm.get("redirect_to")
|
||||
if isinstance(redirect_to, list):
|
||||
redirect_to = redirect_to[0] if redirect_to else None
|
||||
is_redirect = bool(redirect_to) or (
|
||||
title.strip().lower() == "redirect" and len(body_strip) < 400
|
||||
)
|
||||
rel_path = str(p.relative_to(ROOT)).replace("\\", "/")
|
||||
is_operational = any(frag in "/" + rel_path for frag in EXCLUDE_PATH_FRAGMENTS)
|
||||
e = FileEntry(
|
||||
path=rel_path,
|
||||
abs_path=str(p),
|
||||
folder=p.parent.name,
|
||||
filename=filename,
|
||||
norm_name=normalize(filename),
|
||||
title=title,
|
||||
norm_title=normalize(title),
|
||||
fm_id=str(fm.get("id")) if fm.get("id") else None,
|
||||
fm_aliases=fm.get("aliases", []) if isinstance(fm.get("aliases"), list) else [],
|
||||
fm_tags=fm.get("tags", []) if isinstance(fm.get("tags"), list) else [],
|
||||
fm_status=fm.get("status"),
|
||||
fm_trust=fm.get("source_trust_level"),
|
||||
fm_last_reinforced=fm.get("last_reinforced"),
|
||||
fm_redirect_to=str(redirect_to) if redirect_to else None,
|
||||
fm_canonical_id=str(fm.get("canonical_id")) if fm.get("canonical_id") else None,
|
||||
body_chars=len(body_strip),
|
||||
body_first_para_hash=first_para_hash(body_strip),
|
||||
body_fingerprint=fingerprint_body(body_strip, 600),
|
||||
is_stub=len(body_strip) < 200,
|
||||
is_huge=len(body_strip) > 50000,
|
||||
is_redirect=is_redirect,
|
||||
is_operational=is_operational,
|
||||
)
|
||||
entries.append(e)
|
||||
return entries
|
||||
|
||||
|
||||
def build_clusters(entries: list[FileEntry]) -> list[list[FileEntry]]:
|
||||
"""Union-find by exact-match channels: norm_name, norm_title, body_first_para_hash.
|
||||
|
||||
Redirect placeholders are NEVER unioned via body fingerprint (they all share
|
||||
the same boilerplate, which would create a giant false-positive cluster).
|
||||
They're still unioned via norm_name / norm_title so that a redirect and its
|
||||
canonical document end up in the same cluster — that's the relationship we
|
||||
want to surface.
|
||||
"""
|
||||
parent = list(range(len(entries)))
|
||||
|
||||
def find(x: int) -> int:
|
||||
while parent[x] != x:
|
||||
parent[x] = parent[parent[x]]
|
||||
x = parent[x]
|
||||
return x
|
||||
|
||||
def union(a: int, b: int) -> None:
|
||||
ra, rb = find(a), find(b)
|
||||
if ra != rb:
|
||||
parent[ra] = rb
|
||||
|
||||
by_name: dict[str, list[int]] = defaultdict(list)
|
||||
by_title: dict[str, list[int]] = defaultdict(list)
|
||||
by_hash: dict[str, list[int]] = defaultdict(list)
|
||||
|
||||
for i, e in enumerate(entries):
|
||||
if e.is_operational:
|
||||
continue # session/agent runtime files: not knowledge candidates
|
||||
if e.norm_name:
|
||||
by_name[e.norm_name].append(i)
|
||||
if e.norm_title and not e.is_redirect: # redirects all titled "Redirect"
|
||||
by_title[e.norm_title].append(i)
|
||||
# body fingerprint: only meaningful, non-redirect, non-stub bodies
|
||||
if (
|
||||
e.body_first_para_hash
|
||||
and len(e.body_fingerprint) >= 200
|
||||
and not e.is_redirect
|
||||
and not e.is_stub
|
||||
):
|
||||
by_hash[e.body_first_para_hash].append(i)
|
||||
|
||||
for group in list(by_name.values()) + list(by_title.values()) + list(by_hash.values()):
|
||||
if len(group) > 1:
|
||||
for i in group[1:]:
|
||||
union(group[0], i)
|
||||
|
||||
clusters_map: dict[int, list[int]] = defaultdict(list)
|
||||
for i in range(len(entries)):
|
||||
clusters_map[find(i)].append(i)
|
||||
|
||||
clusters = [[entries[i] for i in idxs] for idxs in clusters_map.values() if len(idxs) > 1]
|
||||
# sort: largest cluster first, then by first member's title
|
||||
clusters.sort(key=lambda c: (-len(c), c[0].norm_title or c[0].norm_name))
|
||||
return clusters
|
||||
|
||||
|
||||
def cluster_similarity(c: list[FileEntry]) -> dict:
|
||||
"""Compute pairwise body fingerprint similarity within a cluster.
|
||||
Returns max/min/avg similarity and the dominant tier."""
|
||||
if len(c) < 2:
|
||||
return {"max": 1.0, "min": 1.0, "avg": 1.0, "tier": "solo"}
|
||||
sims: list[float] = []
|
||||
for i in range(len(c)):
|
||||
for j in range(i + 1, len(c)):
|
||||
a = c[i].body_fingerprint
|
||||
b = c[j].body_fingerprint
|
||||
if not a or not b:
|
||||
sims.append(0.5)
|
||||
continue
|
||||
sims.append(SequenceMatcher(None, a, b).ratio())
|
||||
if not sims:
|
||||
return {"max": 0.0, "min": 0.0, "avg": 0.0, "tier": "unknown"}
|
||||
mx, mn = max(sims), min(sims)
|
||||
avg = sum(sims) / len(sims)
|
||||
tier = (
|
||||
"near-dup (>=0.92)" if mx >= 0.92
|
||||
else "duplicate-candidate (0.80-0.92)" if mx >= 0.80
|
||||
else "related (0.65-0.80)" if mx >= 0.65
|
||||
else "weak-link (<0.65)"
|
||||
)
|
||||
return {"max": round(mx, 3), "min": round(mn, 3), "avg": round(avg, 3), "tier": tier}
|
||||
|
||||
|
||||
def write_index(entries: list[FileEntry]) -> None:
|
||||
OUT_DIR.mkdir(parents=True, exist_ok=True)
|
||||
with INDEX_JSON.open("w", encoding="utf-8") as f:
|
||||
json.dump([asdict(e) for e in entries], f, ensure_ascii=False, indent=1)
|
||||
|
||||
|
||||
def write_clusters(clusters: list[list[FileEntry]], stats_per_cluster: list[dict]) -> None:
|
||||
payload = []
|
||||
for c, s in zip(clusters, stats_per_cluster):
|
||||
payload.append({
|
||||
"size": len(c),
|
||||
"stats": s,
|
||||
"members": [
|
||||
{
|
||||
"path": e.path,
|
||||
"folder": e.folder,
|
||||
"filename": e.filename,
|
||||
"title": e.title,
|
||||
"body_chars": e.body_chars,
|
||||
"fm_trust": e.fm_trust,
|
||||
"fm_last_reinforced": e.fm_last_reinforced,
|
||||
"is_stub": e.is_stub,
|
||||
} for e in c
|
||||
],
|
||||
})
|
||||
with CLUSTERS_JSON.open("w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, ensure_ascii=False, indent=1)
|
||||
|
||||
|
||||
def write_report(entries: list[FileEntry], clusters: list[list[FileEntry]], stats: list[dict]) -> None:
|
||||
n_files = len(entries)
|
||||
n_clustered = sum(len(c) for c in clusters)
|
||||
n_stub = sum(1 for e in entries if e.is_stub)
|
||||
n_huge = sum(1 for e in entries if e.is_huge)
|
||||
n_redirect = sum(1 for e in entries if e.is_redirect)
|
||||
n_operational = sum(1 for e in entries if e.is_operational)
|
||||
near_dup = [c for c, s in zip(clusters, stats) if s["max"] >= 0.92]
|
||||
dup_cand = [c for c, s in zip(clusters, stats) if 0.80 <= s["max"] < 0.92]
|
||||
related = [c for c, s in zip(clusters, stats) if 0.65 <= s["max"] < 0.80]
|
||||
|
||||
folder_dup_pairs: dict[tuple[str, str], int] = defaultdict(int)
|
||||
for c in clusters:
|
||||
folders = sorted({e.folder for e in c})
|
||||
if len(folders) >= 2:
|
||||
for i in range(len(folders)):
|
||||
for j in range(i + 1, len(folders)):
|
||||
folder_dup_pairs[(folders[i], folders[j])] += 1
|
||||
|
||||
lines: list[str] = []
|
||||
lines.append("# Duplicate Candidates (P-Reinforce Phase 1 Index)\n")
|
||||
lines.append("> 자동 생성. 이 보고서는 **변경 제안**일 뿐 실제 파일은 수정되지 않았다.\n")
|
||||
lines.append("> 사용자가 클러스터별로 검토하고 MERGE/UPDATE/CREATE/REJECT 판단을 내려야 한다.\n")
|
||||
lines.append("")
|
||||
lines.append("## 요약\n")
|
||||
lines.append(f"- 총 파일: **{n_files}**")
|
||||
lines.append(f"- 중복 후보 클러스터에 포함된 파일: **{n_clustered}**")
|
||||
lines.append(f"- 클러스터 수: **{len(clusters)}** (>=0.92 near-dup: {len(near_dup)}, 0.80-0.92 dup-cand: {len(dup_cand)}, 0.65-0.80 related: {len(related)})")
|
||||
lines.append(f"- 이미 merged (`redirect_to` 필드 보유): **{n_redirect}**")
|
||||
lines.append(f"- 운영 로그 (sessions/_agents/_company 등, 클러스터링 제외): **{n_operational}**")
|
||||
lines.append(f"- 지식 문서 후보 (총수 - 운영 로그): **{n_files - n_operational}**")
|
||||
lines.append(f"- 빈약 stub (<200 chars, redirect 제외): **{n_stub - n_redirect}**")
|
||||
lines.append(f"- 거대 문서 (>50KB): **{n_huge}**")
|
||||
lines.append("")
|
||||
|
||||
if folder_dup_pairs:
|
||||
lines.append("## 폴더 간 중복 핫스팟 (Top 20)\n")
|
||||
lines.append("| 폴더 A | 폴더 B | 공유 클러스터 |")
|
||||
lines.append("|---|---|---|")
|
||||
for (a, b), n in sorted(folder_dup_pairs.items(), key=lambda x: -x[1])[:20]:
|
||||
lines.append(f"| `{a}` | `{b}` | {n} |")
|
||||
lines.append("")
|
||||
|
||||
def emit_section(title: str, group: list[list[FileEntry]], group_stats: list[dict], cap: int = 80) -> None:
|
||||
if not group:
|
||||
return
|
||||
lines.append(f"## {title} (총 {len(group)})\n")
|
||||
if len(group) > cap:
|
||||
lines.append(f"> 상위 {cap}개만 표시. 전체는 `_clusters.json` 참조.\n")
|
||||
for c, s in list(zip(group, group_stats))[:cap]:
|
||||
head = c[0].title or c[0].filename
|
||||
lines.append(f"### `{head}` (members: {len(c)}, max_sim: {s['max']}, tier: {s['tier']})")
|
||||
for e in c:
|
||||
stub_tag = " *[stub]*" if e.is_stub else ""
|
||||
huge_tag = " *[huge]*" if e.is_huge else ""
|
||||
lr = e.fm_last_reinforced or "?"
|
||||
trust = e.fm_trust or "?"
|
||||
lines.append(f"- [{e.path}]({e.path}) — {e.body_chars} chars, trust={trust}, last={lr}{stub_tag}{huge_tag}")
|
||||
lines.append("")
|
||||
|
||||
pairs = list(zip(clusters, stats))
|
||||
emit_section("🔴 Near-duplicate (>=0.92) — UPDATE 권장", [c for c, s in pairs if s["max"] >= 0.92], [s for c, s in pairs if s["max"] >= 0.92])
|
||||
emit_section("🟡 Duplicate candidate (0.80-0.92) — 검토 필요", [c for c, s in pairs if 0.80 <= s["max"] < 0.92], [s for c, s in pairs if 0.80 <= s["max"] < 0.92])
|
||||
emit_section("🟢 Related (0.65-0.80) — 연결만 권장", [c for c, s in pairs if 0.65 <= s["max"] < 0.80], [s for c, s in pairs if 0.65 <= s["max"] < 0.80])
|
||||
emit_section("⚪ Weak-link (<0.65) — 동명/동일 hash지만 내용 다름", [c for c, s in pairs if s["max"] < 0.65], [s for c, s in pairs if s["max"] < 0.65])
|
||||
|
||||
REPORT_MD.write_text("\n".join(lines), encoding="utf-8")
|
||||
|
||||
|
||||
def main() -> None:
|
||||
print(f"[1/4] Scanning {TOPICS} ...", file=sys.stderr)
|
||||
entries = scan()
|
||||
print(f" {len(entries)} files indexed", file=sys.stderr)
|
||||
|
||||
print(f"[2/4] Writing per-file index -> {INDEX_JSON}", file=sys.stderr)
|
||||
write_index(entries)
|
||||
|
||||
print(f"[3/4] Building duplicate clusters ...", file=sys.stderr)
|
||||
clusters = build_clusters(entries)
|
||||
stats = [cluster_similarity(c) for c in clusters]
|
||||
print(f" {len(clusters)} clusters with >=2 members", file=sys.stderr)
|
||||
|
||||
print(f"[4/4] Writing report -> {REPORT_MD}", file=sys.stderr)
|
||||
write_clusters(clusters, stats)
|
||||
write_report(entries, clusters, stats)
|
||||
print("DONE.", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user