Initial commit: Financial Crime domain exemplar
This commit is contained in:
@@ -0,0 +1,445 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
MD-DDL Pre-Flight Check
|
||||
|
||||
Mechanical Level-1 syntax validation per the MD-DDL spec (1-Foundation.md).
|
||||
Run against a domain folder before committing or publishing.
|
||||
|
||||
Usage:
|
||||
python preflight.py <domain-folder>
|
||||
python preflight.py domains/customer
|
||||
python preflight.py . (run from within the domain folder)
|
||||
|
||||
Requires: Python 3.8+, pyyaml (pip install pyyaml)
|
||||
|
||||
Exit codes:
|
||||
0 no findings
|
||||
1 one or more pre-flight failures
|
||||
2 usage or invocation error
|
||||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
import yaml
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data types
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class Finding:
|
||||
file: str
|
||||
line: int
|
||||
check: str
|
||||
message: str
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Mermaid diagram types recognised by the spec
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
MERMAID_DIAGRAM_TYPES = {
|
||||
"graph", "flowchart", "sequenceDiagram", "classDiagram",
|
||||
"stateDiagram", "stateDiagram-v2", "erDiagram", "gantt",
|
||||
"journey", "gitGraph", "pie", "quadrantChart", "requirementDiagram",
|
||||
"mindmap", "timeline", "block-beta", "packet-beta",
|
||||
"xychart-beta", "sankey-beta", "kanban", "architecture-beta",
|
||||
}
|
||||
|
||||
# YAML keys in relationship and event blocks that must name a domain entity.
|
||||
# 'actor' is deliberately excluded — event actors may be roles or external
|
||||
# systems, not MD-DDL entities.
|
||||
ENTITY_REF_KEYS = {"source", "target", "entity", "extends"}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _read(path: Path) -> str:
|
||||
return path.read_text(encoding="utf-8", errors="replace")
|
||||
|
||||
|
||||
def _extract_code_blocks(text: str, lang: str) -> list[tuple[int, str]]:
|
||||
"""Return (start_line_1indexed, content) for every fenced block of lang."""
|
||||
blocks: list[tuple[int, str]] = []
|
||||
lines = text.splitlines()
|
||||
fence = re.compile(rf"^```{re.escape(lang)}\s*$", re.IGNORECASE)
|
||||
close = re.compile(r"^```\s*$")
|
||||
in_block = False
|
||||
start = 0
|
||||
buf: list[str] = []
|
||||
for i, line in enumerate(lines, 1):
|
||||
if not in_block:
|
||||
if fence.match(line):
|
||||
in_block = True
|
||||
start = i + 1
|
||||
buf = []
|
||||
else:
|
||||
if close.match(line):
|
||||
blocks.append((start, "\n".join(buf)))
|
||||
in_block = False
|
||||
buf = []
|
||||
else:
|
||||
buf.append(line)
|
||||
return blocks
|
||||
|
||||
|
||||
def _heading_slug(text: str) -> str:
|
||||
"""GitHub-compatible anchor slug for a heading line."""
|
||||
# Strip inline markdown (bold, italic, backticks, links)
|
||||
text = re.sub(r"\[([^\]]*)\]\([^)]*\)", r"\1", text)
|
||||
text = re.sub(r"[`*_]", "", text)
|
||||
text = text.lower().strip()
|
||||
text = re.sub(r"[^\w\s-]", "", text)
|
||||
text = re.sub(r"[\s_]+", "-", text).strip("-")
|
||||
return text
|
||||
|
||||
|
||||
def _get_headings(path: Path) -> set[str]:
|
||||
"""All heading slugs in a markdown file."""
|
||||
slugs: set[str] = set()
|
||||
try:
|
||||
for line in _read(path).splitlines():
|
||||
m = re.match(r"^#{1,6}\s+(.+)$", line)
|
||||
if m:
|
||||
slugs.add(_heading_slug(m.group(1)))
|
||||
except OSError:
|
||||
pass
|
||||
return slugs
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Check 1 — YAML syntax
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_yaml_syntax(md_file: Path) -> list[Finding]:
|
||||
findings: list[Finding] = []
|
||||
text = _read(md_file)
|
||||
for start, content in _extract_code_blocks(text, "yaml"):
|
||||
try:
|
||||
yaml.safe_load(content)
|
||||
except yaml.YAMLError as exc:
|
||||
line_num = start
|
||||
if hasattr(exc, "problem_mark") and exc.problem_mark:
|
||||
line_num = start + exc.problem_mark.line
|
||||
msg = exc.problem if hasattr(exc, "problem") else str(exc)
|
||||
findings.append(Finding(str(md_file), line_num, "yaml-syntax",
|
||||
f"YAML parse error: {msg}"))
|
||||
return findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Check 2 — Mermaid syntax
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_mermaid_syntax(md_file: Path) -> list[Finding]:
|
||||
findings: list[Finding] = []
|
||||
text = _read(md_file)
|
||||
for start, content in _extract_code_blocks(text, "mermaid"):
|
||||
stripped = content.strip()
|
||||
if not stripped:
|
||||
findings.append(Finding(str(md_file), start, "mermaid-syntax",
|
||||
"Empty Mermaid block"))
|
||||
continue
|
||||
|
||||
lines = stripped.splitlines()
|
||||
idx = 0
|
||||
|
||||
# Skip optional YAML config block (---...---)
|
||||
if lines[0].strip() == "---":
|
||||
idx = 1
|
||||
while idx < len(lines) and lines[idx].strip() != "---":
|
||||
idx += 1
|
||||
idx += 1 # step past closing ---
|
||||
|
||||
# Find first meaningful content line
|
||||
diagram_type = None
|
||||
while idx < len(lines):
|
||||
line = lines[idx].strip()
|
||||
if line and not line.startswith("%%"):
|
||||
diagram_type = line.split()[0].rstrip(":")
|
||||
break
|
||||
idx += 1
|
||||
|
||||
if diagram_type is None:
|
||||
findings.append(Finding(str(md_file), start, "mermaid-syntax",
|
||||
"Mermaid block has no diagram type declaration"))
|
||||
elif diagram_type not in MERMAID_DIAGRAM_TYPES:
|
||||
findings.append(Finding(str(md_file), start + idx, "mermaid-syntax",
|
||||
f"Unrecognised Mermaid diagram type: '{diagram_type}'"))
|
||||
return findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Check 3 — Internal link integrity
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Patterns that carry URL/path values in MD-DDL files:
|
||||
# [text](url) — standard markdown link (also catches  images)
|
||||
# href='url' — HTML anchor inside Mermaid node labels (single quotes)
|
||||
# href="url" — same, double-quote variant
|
||||
#
|
||||
# Not checked here (out of scope for domain preflight):
|
||||
# {{INCLUDE: path}} — agent/skill file directive; not used in domain folders
|
||||
# reference-style links [text][ref] / [ref]: url — only found as external URLs in README
|
||||
|
||||
_MD_LINK_RE = re.compile(r"\[[^\]]*\]\(([^)]+)\)")
|
||||
_HREF_RE = re.compile(r'href=["\']([^"\']+)["\']')
|
||||
|
||||
|
||||
def _check_url(url: str, line_num: int, md_file: Path, findings: list[Finding]) -> None:
|
||||
"""Validate a single URL extracted from md_file at line_num."""
|
||||
# Skip external links — not our concern
|
||||
if url.startswith(("http://", "https://", "mailto:")):
|
||||
return
|
||||
|
||||
# Pure same-page anchor: #heading — verify heading exists in this file
|
||||
if url.startswith("#"):
|
||||
anchor = url[1:]
|
||||
if anchor and _heading_slug(anchor) not in _get_headings(md_file):
|
||||
findings.append(Finding(str(md_file), line_num, "internal-links",
|
||||
f"Broken same-page anchor: '#{anchor}' not found in this file"))
|
||||
return
|
||||
|
||||
# File path, with optional anchor
|
||||
file_part, anchor = (url.rsplit("#", 1) if "#" in url else (url, None))
|
||||
if not file_part:
|
||||
return
|
||||
|
||||
target = (md_file.parent / file_part).resolve()
|
||||
if not target.exists():
|
||||
findings.append(Finding(str(md_file), line_num, "internal-links",
|
||||
f"Broken link: '{file_part}' does not exist"))
|
||||
elif anchor:
|
||||
if _heading_slug(anchor) not in _get_headings(target):
|
||||
findings.append(Finding(str(md_file), line_num, "internal-links",
|
||||
f"Broken anchor: '#{anchor}' not found in {file_part}"))
|
||||
|
||||
|
||||
def check_internal_links(md_file: Path) -> list[Finding]:
|
||||
findings: list[Finding] = []
|
||||
lines = _read(md_file).splitlines()
|
||||
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
seen: set[str] = set()
|
||||
for m in _MD_LINK_RE.finditer(line):
|
||||
url = m.group(1).strip()
|
||||
if url not in seen:
|
||||
seen.add(url)
|
||||
_check_url(url, line_num, md_file, findings)
|
||||
for m in _HREF_RE.finditer(line):
|
||||
url = m.group(1).strip()
|
||||
if url not in seen:
|
||||
seen.add(url)
|
||||
_check_url(url, line_num, md_file, findings)
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Check 4 — Entity reference consistency
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def _entity_names_from_domain(domain_file: Path) -> set[str]:
|
||||
"""Extract canonical entity names from the ## Entities table in domain.md."""
|
||||
names: set[str] = set()
|
||||
text = _read(domain_file)
|
||||
in_section = False
|
||||
in_table = False
|
||||
|
||||
for line in text.splitlines():
|
||||
if re.match(r"^##\s+Entities\s*$", line):
|
||||
in_section = True
|
||||
in_table = False
|
||||
continue
|
||||
if re.match(r"^##\s+", line) and in_section:
|
||||
break
|
||||
if in_section:
|
||||
if re.match(r"^\s*Name\s*\|", line):
|
||||
in_table = True
|
||||
continue
|
||||
if in_table and "|" in line:
|
||||
# Strip separator rows (--- | --- | ...)
|
||||
cells = [c.strip() for c in line.split("|") if c.strip()]
|
||||
if not cells or re.match(r"^-+$", cells[0]):
|
||||
continue
|
||||
name_cell = cells[0]
|
||||
lm = re.match(r"\[([^\]]+)\]\([^)]*\)", name_cell)
|
||||
name = lm.group(1).strip() if lm else name_cell
|
||||
if name:
|
||||
names.add(name)
|
||||
return names
|
||||
|
||||
|
||||
def check_entity_references(domain_file: Path) -> list[Finding]:
|
||||
"""YAML source/target/actor/entity/extends values must name a domain entity."""
|
||||
findings: list[Finding] = []
|
||||
entity_names = _entity_names_from_domain(domain_file)
|
||||
if not entity_names:
|
||||
return []
|
||||
|
||||
domain_root = domain_file.parent
|
||||
|
||||
# Check entity and event detail files; skip sources/ (cross-domain references allowed)
|
||||
for md_file in domain_root.rglob("*.md"):
|
||||
rel = md_file.relative_to(domain_root)
|
||||
# Skip sources/ (cross-domain entity refs allowed) and products/
|
||||
# (the 'source' key there names a data source system, not an entity)
|
||||
if rel.parts and rel.parts[0] in {"sources", "products"}:
|
||||
continue
|
||||
|
||||
text = _read(md_file)
|
||||
for start, content in _extract_code_blocks(text, "yaml"):
|
||||
try:
|
||||
data = yaml.safe_load(content)
|
||||
except yaml.YAMLError:
|
||||
continue # caught by yaml-syntax check
|
||||
if not isinstance(data, dict):
|
||||
continue
|
||||
|
||||
block_lines = content.splitlines()
|
||||
for key in ENTITY_REF_KEYS:
|
||||
value = data.get(key)
|
||||
if not isinstance(value, str):
|
||||
continue
|
||||
if value in entity_names:
|
||||
continue
|
||||
|
||||
# Find the line number of the key within this block
|
||||
key_line = start
|
||||
for i, bl in enumerate(block_lines):
|
||||
if re.match(rf"^{re.escape(key)}\s*:", bl):
|
||||
key_line = start + i
|
||||
break
|
||||
|
||||
findings.append(Finding(
|
||||
str(md_file), key_line, "entity-references",
|
||||
f"'{key}: {value}' does not match any entity in domain.md",
|
||||
))
|
||||
return findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Check 5 — Domain version field
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def check_domain_version(domain_file: Path) -> list[Finding]:
|
||||
findings: list[Finding] = []
|
||||
text = _read(domain_file)
|
||||
lines = text.splitlines()
|
||||
|
||||
in_metadata = False
|
||||
collecting = False
|
||||
buf: list[str] = []
|
||||
start = 1
|
||||
found_block = False
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
if re.match(r"^##\s+Metadata\s*$", line):
|
||||
in_metadata = True
|
||||
continue
|
||||
if in_metadata and re.match(r"^##\s+", line):
|
||||
in_metadata = False
|
||||
if in_metadata and re.match(r"^```yaml\s*$", line):
|
||||
collecting = True
|
||||
start = i + 1
|
||||
buf = []
|
||||
continue
|
||||
if collecting:
|
||||
if re.match(r"^```\s*$", line):
|
||||
collecting = False
|
||||
in_metadata = False
|
||||
found_block = True
|
||||
content = "\n".join(buf)
|
||||
try:
|
||||
data = yaml.safe_load(content)
|
||||
if not isinstance(data, dict) or "version" not in data:
|
||||
findings.append(Finding(
|
||||
str(domain_file), start, "domain-version",
|
||||
"Metadata YAML block is missing the 'version:' field",
|
||||
))
|
||||
except yaml.YAMLError:
|
||||
pass # caught by yaml-syntax check
|
||||
break
|
||||
buf.append(line)
|
||||
|
||||
if not found_block:
|
||||
findings.append(Finding(
|
||||
str(domain_file), 1, "domain-version",
|
||||
"No YAML block found under '## Metadata' — 'version:' field cannot be verified",
|
||||
))
|
||||
return findings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Orchestration
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def run_preflight(domain_folder: str) -> list[Finding]:
|
||||
domain_root = Path(domain_folder).resolve()
|
||||
if not domain_root.exists():
|
||||
print(f"error: path not found: {domain_folder}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
if not domain_root.is_dir():
|
||||
print(f"error: not a directory: {domain_folder}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
domain_file = domain_root / "domain.md"
|
||||
findings: list[Finding] = []
|
||||
|
||||
# Checks 1–3: run across every .md file in the domain folder
|
||||
for md_file in sorted(domain_root.rglob("*.md")):
|
||||
findings += check_yaml_syntax(md_file)
|
||||
findings += check_mermaid_syntax(md_file)
|
||||
findings += check_internal_links(md_file)
|
||||
|
||||
# Check 4: entity reference consistency (requires domain.md)
|
||||
if domain_file.exists():
|
||||
findings += check_entity_references(domain_file)
|
||||
|
||||
# Check 5: domain version (requires domain.md)
|
||||
if domain_file.exists():
|
||||
findings += check_domain_version(domain_file)
|
||||
else:
|
||||
findings.append(Finding(
|
||||
str(domain_file), 0, "domain-version",
|
||||
"domain.md not found — is this a domain folder?",
|
||||
))
|
||||
|
||||
return findings
|
||||
|
||||
|
||||
def main() -> None:
|
||||
if len(sys.argv) != 2:
|
||||
print("Usage: python preflight.py <domain-folder>", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
findings = run_preflight(sys.argv[1])
|
||||
|
||||
if not findings:
|
||||
print("Pre-flight passed. No findings.")
|
||||
sys.exit(0)
|
||||
|
||||
# Group by check for readability
|
||||
by_check: dict[str, list[Finding]] = {}
|
||||
for f in findings:
|
||||
by_check.setdefault(f.check, []).append(f)
|
||||
|
||||
print(f"Pre-flight: {len(findings)} finding(s)\n")
|
||||
for check, group in by_check.items():
|
||||
print(f" [{check}] {len(group)} finding(s)")
|
||||
for f in group:
|
||||
path = f.file
|
||||
print(f" {path}:{f.line}")
|
||||
print(f" {f.message}")
|
||||
print()
|
||||
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user