Files
2026-05-26 22:34:28 +12:00

307 lines
11 KiB
Python

"""ReconConfig API proxy / fake endpoints.
When RECON_API_BASE_URL is set in the environment, POST and PUT calls are
forwarded to the real API. GET calls always use fake data in this workspace
so the editor has something to load without a real database.
Also provides a /api/configs/test-url endpoint that inspects file:// URLs and
returns metadata (no file contents are returned).
"""
from __future__ import annotations
import csv
import glob
import os
import xml.etree.ElementTree as ET
from datetime import datetime
from pathlib import Path
from typing import Any
import httpx
from fastapi import APIRouter, HTTPException, Request
from fastapi.responses import JSONResponse
from app.core.refdata import ReconConfigStatus, ReconPatterns
from app.service.fake_configs import get_fake_configs
router = APIRouter(prefix="/api/configs", tags=["ReconConfig"])
_RECON_API_BASE = os.getenv("RECON_API_BASE_URL", "").rstrip("/")
# ── Helpers ────────────────────────────────────────────────────────────────
def _fake_config_full(ref: str, now: datetime) -> dict[str, Any] | None:
"""Build a full ReconConfigRequest-shaped dict from a FakeReconConfig."""
fake = next((c for c in get_fake_configs(now) if c.reference == ref), None)
if fake is None:
return None
return {
"reference": fake.reference,
"name": fake.name,
"business_process": fake.business_process,
"comment": "",
"data_type": "Unknown",
"pattern": ReconPatterns.ONE_TO_ONE.value,
"status": fake.status,
"frequency": fake.frequency,
"start_datetime": fake.start_datetime.isoformat() if fake.start_datetime else None,
"field_mapping": {},
"sources": [
{
"name": "Source System",
"url": "file:///data/input/{{as_at_date.strftime('%Y%m%d')}}.csv",
"comment": "",
"day_offset": 0,
"system_schema": {},
"obfuscate_fields": [],
"pci_redact_fields": [],
"field_widths": [],
"profile_thresholds": [],
"index_fields": [],
"filter": "",
"sql": "",
"csv_spec": {"delimiter": ",", "header": True, "encoding": "utf_8", "trailer_rows": 0, "quoting": True},
"xml_spec": None,
}
],
"destination": {
"name": "Destination System",
"url": "mssql://GROUPDW?db=BIODS_PROCESSING&table=EXAMPLE",
"comment": "",
"day_offset": 0,
"system_schema": {},
"obfuscate_fields": [],
"pci_redact_fields": [],
"field_widths": [],
"profile_thresholds": [],
"index_fields": [],
"filter": "",
"sql": "",
"csv_spec": None,
"xml_spec": None,
},
}
# ── List ────────────────────────────────────────────────────────────────────
@router.get("/")
async def list_configs():
now = datetime.now()
items = []
for c in get_fake_configs(now):
items.append({
"reference": c.reference,
"name": c.name,
"status": c.status,
"frequency": c.frequency,
"business_process": c.business_process,
})
return items
# ── Get one ─────────────────────────────────────────────────────────────────
@router.get("/{reference}")
async def get_config(reference: str):
now = datetime.now()
if _RECON_API_BASE:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.get(f"{_RECON_API_BASE}/api/configs/{reference}")
if resp.status_code == 404:
raise HTTPException(404, f"Config '{reference}' not found")
return resp.json()
data = _fake_config_full(reference, now)
if data is None:
raise HTTPException(404, f"Config '{reference}' not found")
return data
# ── Create ──────────────────────────────────────────────────────────────────
@router.post("/")
async def create_config(request: Request):
body = await request.json()
if _RECON_API_BASE:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.post(f"{_RECON_API_BASE}/api/configs/", json=body)
return JSONResponse(content=resp.json(), status_code=resp.status_code)
# Workspace stub: echo back with a fake revision
return JSONResponse(
content={**body, "revision_number": 1, "user": {"username": "workspace"}},
status_code=201,
)
# ── Update ──────────────────────────────────────────────────────────────────
@router.put("/{reference}")
async def update_config(reference: str, request: Request):
body = await request.json()
if _RECON_API_BASE:
async with httpx.AsyncClient(verify=False) as client:
resp = await client.put(f"{_RECON_API_BASE}/api/configs/{reference}", json=body)
return JSONResponse(content=resp.json(), status_code=resp.status_code)
return JSONResponse(
content={**body, "revision_number": body.get("revision_number", 1) + 1, "user": {"username": "workspace"}},
status_code=200,
)
# ── Schema inference helpers ────────────────────────────────────────────────
_DATE_FORMATS = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y%m%d", "%d-%m-%Y"]
_DATETIME_FORMATS = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"]
def _infer_type(values: list[str]) -> str:
sample = [v.strip() for v in values if v.strip()][:30]
if not sample:
return "str"
try:
[int(v.replace(",", "")) for v in sample]
return "int"
except ValueError:
pass
try:
[float(v.replace(",", "")) for v in sample]
return "float"
except ValueError:
pass
for fmt in _DATETIME_FORMATS:
try:
[datetime.strptime(v, fmt) for v in sample]
return f"datetime('{fmt}')"
except ValueError:
pass
for fmt in _DATE_FORMATS:
try:
[datetime.strptime(v, fmt) for v in sample]
return f"date('{fmt}')"
except ValueError:
pass
return "str"
def _csv_schema(path: str, delimiter: str = ",", has_header: bool = True) -> dict[str, str]:
try:
with open(path, newline="", encoding="utf-8", errors="replace") as f:
reader = csv.reader(f, delimiter=delimiter)
rows = [r for _, r in zip(range(31), reader)]
if not rows:
return {}
headers = rows[0] if has_header else [f"col_{i}" for i in range(len(rows[0]))]
data_rows = rows[1:] if has_header else rows
return {
col: _infer_type([r[i] for r in data_rows if i < len(r)])
for i, col in enumerate(headers)
if col.strip()
}
except Exception:
return {}
def _xml_schema(path: str, xpathstr: str = "./*") -> dict[str, str]:
try:
tree = ET.parse(path)
root = tree.getroot()
elements = root.findall(xpathstr) or list(root)
if not elements:
return {}
schema: dict[str, str] = {}
for el in elements[:1]:
for child in el:
schema[child.tag] = "str"
for attr in el.attrib:
schema[f"@{attr}"] = "str"
return schema
except Exception:
return {}
# ── Test URL ────────────────────────────────────────────────────────────────
@router.post("/test-url")
async def test_url(request: Request):
"""Return file/DB metadata and inferred schema — no row data ever returned."""
from datetime import date as date_type
body = await request.json()
url: str = body.get("url", "")
as_at_date: str = body.get("as_at_date", datetime.now().strftime("%Y%m%d"))
csv_spec: dict = body.get("csv_spec", {})
# ── DB URL — schema inspection wired up per-connector when available ──────
if not url.startswith("file://"):
scheme = url.split("://")[0] if "://" in url else url
return {"type": "db", "scheme": scheme, "schema": {}, "found": False}
# ── file:// URL ──────────────────────────────────────────────────────────
raw_path = url[7:]
try:
as_at = datetime.strptime(as_at_date, "%Y%m%d").date()
except ValueError:
as_at = date_type.today()
resolved = (
raw_path
.replace("{{as_at_date.strftime('%Y%m%d')}}", as_at.strftime("%Y%m%d"))
.replace("{{today.strftime('%Y%m%d')}}", date_type.today().strftime("%Y%m%d"))
.replace("{{as_at_date.strftime('%Y-%m-%d')}}", as_at.strftime("%Y-%m-%d"))
.replace("{{today.strftime('%Y-%m-%d')}}", date_type.today().strftime("%Y-%m-%d"))
)
def _file_info(p: str) -> dict:
try:
st = os.stat(p)
return {"path": p, "size_bytes": st.st_size,
"modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds")}
except OSError:
return {"path": p, "error": "Could not stat file"}
def _schema_for(p: str) -> dict[str, str]:
pl = p.lower()
if pl.endswith(".xml"):
return _xml_schema(p)
delimiter = csv_spec.get("delimiter", ",") or ","
has_header = csv_spec.get("has_header", True)
return _csv_schema(p, delimiter=delimiter, has_header=has_header)
# Glob pattern
if "*" in resolved or "?" in resolved or "{" in resolved:
matches = sorted(glob.glob(resolved))
if not matches:
return {"type": "file", "resolved": resolved, "found": False,
"message": "No files matched the pattern.", "schema": {}}
schema = _schema_for(matches[0])
return {
"type": "file", "resolved": resolved, "found": True,
"matches": len(matches),
"files": [_file_info(p) for p in matches[:10]],
"schema": schema,
}
# Exact path
p = Path(resolved)
if not p.exists():
return {"type": "file", "resolved": resolved, "found": False,
"message": f"File not found: {resolved}", "schema": {}}
try:
schema = _schema_for(str(p))
return {
"type": "file", "resolved": resolved, "found": True,
"matches": 1,
"files": [_file_info(str(p))],
"schema": schema,
}
except OSError as e:
return {"type": "file", "resolved": resolved, "found": False,
"message": str(e), "schema": {}}