"""ReconConfig API proxy / fake endpoints. When RECON_API_BASE_URL is set in the environment, POST and PUT calls are forwarded to the real API. GET calls always use fake data in this workspace so the editor has something to load without a real database. Also provides a /api/configs/test-url endpoint that inspects file:// URLs and returns metadata (no file contents are returned). """ from __future__ import annotations import csv import glob import os import xml.etree.ElementTree as ET from datetime import datetime from pathlib import Path from typing import Any import httpx from fastapi import APIRouter, HTTPException, Request from fastapi.responses import JSONResponse from app.core.refdata import ReconConfigStatus, ReconPatterns from app.service.fake_configs import get_fake_configs router = APIRouter(prefix="/api/configs", tags=["ReconConfig"]) _RECON_API_BASE = os.getenv("RECON_API_BASE_URL", "").rstrip("/") # ── Helpers ──────────────────────────────────────────────────────────────── def _fake_config_full(ref: str, now: datetime) -> dict[str, Any] | None: """Build a full ReconConfigRequest-shaped dict from a FakeReconConfig.""" fake = next((c for c in get_fake_configs(now) if c.reference == ref), None) if fake is None: return None return { "reference": fake.reference, "name": fake.name, "business_process": fake.business_process, "comment": "", "data_type": "Unknown", "pattern": ReconPatterns.ONE_TO_ONE.value, "status": fake.status, "frequency": fake.frequency, "start_datetime": fake.start_datetime.isoformat() if fake.start_datetime else None, "field_mapping": {}, "sources": [ { "name": "Source System", "url": "file:///data/input/{{as_at_date.strftime('%Y%m%d')}}.csv", "comment": "", "day_offset": 0, "system_schema": {}, "obfuscate_fields": [], "pci_redact_fields": [], "field_widths": [], "profile_thresholds": [], "index_fields": [], "filter": "", "sql": "", "csv_spec": {"delimiter": ",", "header": True, "encoding": "utf_8", "trailer_rows": 0, "quoting": True}, "xml_spec": None, } ], "destination": { "name": "Destination System", "url": "mssql://GROUPDW?db=BIODS_PROCESSING&table=EXAMPLE", "comment": "", "day_offset": 0, "system_schema": {}, "obfuscate_fields": [], "pci_redact_fields": [], "field_widths": [], "profile_thresholds": [], "index_fields": [], "filter": "", "sql": "", "csv_spec": None, "xml_spec": None, }, } # ── List ──────────────────────────────────────────────────────────────────── @router.get("/") async def list_configs(): now = datetime.now() items = [] for c in get_fake_configs(now): items.append({ "reference": c.reference, "name": c.name, "status": c.status, "frequency": c.frequency, "business_process": c.business_process, }) return items # ── Get one ───────────────────────────────────────────────────────────────── @router.get("/{reference}") async def get_config(reference: str): now = datetime.now() if _RECON_API_BASE: async with httpx.AsyncClient(verify=False) as client: resp = await client.get(f"{_RECON_API_BASE}/api/configs/{reference}") if resp.status_code == 404: raise HTTPException(404, f"Config '{reference}' not found") return resp.json() data = _fake_config_full(reference, now) if data is None: raise HTTPException(404, f"Config '{reference}' not found") return data # ── Create ────────────────────────────────────────────────────────────────── @router.post("/") async def create_config(request: Request): body = await request.json() if _RECON_API_BASE: async with httpx.AsyncClient(verify=False) as client: resp = await client.post(f"{_RECON_API_BASE}/api/configs/", json=body) return JSONResponse(content=resp.json(), status_code=resp.status_code) # Workspace stub: echo back with a fake revision return JSONResponse( content={**body, "revision_number": 1, "user": {"username": "workspace"}}, status_code=201, ) # ── Update ────────────────────────────────────────────────────────────────── @router.put("/{reference}") async def update_config(reference: str, request: Request): body = await request.json() if _RECON_API_BASE: async with httpx.AsyncClient(verify=False) as client: resp = await client.put(f"{_RECON_API_BASE}/api/configs/{reference}", json=body) return JSONResponse(content=resp.json(), status_code=resp.status_code) return JSONResponse( content={**body, "revision_number": body.get("revision_number", 1) + 1, "user": {"username": "workspace"}}, status_code=200, ) # ── Schema inference helpers ──────────────────────────────────────────────── _DATE_FORMATS = ["%Y-%m-%d", "%d/%m/%Y", "%m/%d/%Y", "%Y%m%d", "%d-%m-%Y"] _DATETIME_FORMATS = ["%Y-%m-%d %H:%M:%S", "%Y-%m-%dT%H:%M:%S", "%Y-%m-%d %H:%M:%S.%f"] def _infer_type(values: list[str]) -> str: sample = [v.strip() for v in values if v.strip()][:30] if not sample: return "str" try: [int(v.replace(",", "")) for v in sample] return "int" except ValueError: pass try: [float(v.replace(",", "")) for v in sample] return "float" except ValueError: pass for fmt in _DATETIME_FORMATS: try: [datetime.strptime(v, fmt) for v in sample] return f"datetime('{fmt}')" except ValueError: pass for fmt in _DATE_FORMATS: try: [datetime.strptime(v, fmt) for v in sample] return f"date('{fmt}')" except ValueError: pass return "str" def _csv_schema(path: str, delimiter: str = ",", has_header: bool = True) -> dict[str, str]: try: with open(path, newline="", encoding="utf-8", errors="replace") as f: reader = csv.reader(f, delimiter=delimiter) rows = [r for _, r in zip(range(31), reader)] if not rows: return {} headers = rows[0] if has_header else [f"col_{i}" for i in range(len(rows[0]))] data_rows = rows[1:] if has_header else rows return { col: _infer_type([r[i] for r in data_rows if i < len(r)]) for i, col in enumerate(headers) if col.strip() } except Exception: return {} def _xml_schema(path: str, xpathstr: str = "./*") -> dict[str, str]: try: tree = ET.parse(path) root = tree.getroot() elements = root.findall(xpathstr) or list(root) if not elements: return {} schema: dict[str, str] = {} for el in elements[:1]: for child in el: schema[child.tag] = "str" for attr in el.attrib: schema[f"@{attr}"] = "str" return schema except Exception: return {} # ── Test URL ──────────────────────────────────────────────────────────────── @router.post("/test-url") async def test_url(request: Request): """Return file/DB metadata and inferred schema — no row data ever returned.""" from datetime import date as date_type body = await request.json() url: str = body.get("url", "") as_at_date: str = body.get("as_at_date", datetime.now().strftime("%Y%m%d")) csv_spec: dict = body.get("csv_spec", {}) # ── DB URL — schema inspection wired up per-connector when available ────── if not url.startswith("file://"): scheme = url.split("://")[0] if "://" in url else url return {"type": "db", "scheme": scheme, "schema": {}, "found": False} # ── file:// URL ────────────────────────────────────────────────────────── raw_path = url[7:] try: as_at = datetime.strptime(as_at_date, "%Y%m%d").date() except ValueError: as_at = date_type.today() resolved = ( raw_path .replace("{{as_at_date.strftime('%Y%m%d')}}", as_at.strftime("%Y%m%d")) .replace("{{today.strftime('%Y%m%d')}}", date_type.today().strftime("%Y%m%d")) .replace("{{as_at_date.strftime('%Y-%m-%d')}}", as_at.strftime("%Y-%m-%d")) .replace("{{today.strftime('%Y-%m-%d')}}", date_type.today().strftime("%Y-%m-%d")) ) def _file_info(p: str) -> dict: try: st = os.stat(p) return {"path": p, "size_bytes": st.st_size, "modified": datetime.fromtimestamp(st.st_mtime).isoformat(timespec="seconds")} except OSError: return {"path": p, "error": "Could not stat file"} def _schema_for(p: str) -> dict[str, str]: pl = p.lower() if pl.endswith(".xml"): return _xml_schema(p) delimiter = csv_spec.get("delimiter", ",") or "," has_header = csv_spec.get("has_header", True) return _csv_schema(p, delimiter=delimiter, has_header=has_header) # Glob pattern if "*" in resolved or "?" in resolved or "{" in resolved: matches = sorted(glob.glob(resolved)) if not matches: return {"type": "file", "resolved": resolved, "found": False, "message": "No files matched the pattern.", "schema": {}} schema = _schema_for(matches[0]) return { "type": "file", "resolved": resolved, "found": True, "matches": len(matches), "files": [_file_info(p) for p in matches[:10]], "schema": schema, } # Exact path p = Path(resolved) if not p.exists(): return {"type": "file", "resolved": resolved, "found": False, "message": f"File not found: {resolved}", "schema": {}} try: schema = _schema_for(str(p)) return { "type": "file", "resolved": resolved, "found": True, "matches": 1, "files": [_file_info(str(p))], "schema": schema, } except OSError as e: return {"type": "file", "resolved": resolved, "found": False, "message": str(e), "schema": {}}