Files

200 lines
7.8 KiB
Python

"""In-memory fake of recon-job data so the dashboard/results views can run
without a database or the production `app.core.config` / `app.core.refdata`
modules. Replace with real DB-backed code when wiring up to the real repo.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import date, datetime, time, timedelta
from typing import Optional
@dataclass
class FakeReconJob:
id: int
name: str
as_at_date: date
status: str = "completed"
status_reason: str = ""
recon_config_reference: str = "demo-config"
username: str = "demo.user"
due_datetime: Optional[datetime] = None
start_datetime: Optional[datetime] = None
finish_datetime: Optional[datetime] = None
results: Optional[dict] = None
def _at(day: date, hour: int, minute: int = 0) -> datetime:
return datetime.combine(day, time(hour, minute))
def get_fake_jobs(now: datetime) -> list[FakeReconJob]:
today = now.date()
yesterday = today - timedelta(days=1)
sample_result_a = {
"Job": "FX Settlement Recon",
"As-at": yesterday.isoformat(),
"Matched": 1284,
"Unmatched": 12,
"Status": "Matched",
"Flag": "None",
}
sample_result_b = {
"Job": "Cash vs Ledger",
"As-at": yesterday.isoformat(),
"Matched": 980,
"Unmatched": 47,
"Status": "Unmatched",
"Flag": "Threshold Breach",
}
sample_result_c = {
"Job": "Intraday Liquidity Sweep",
"As-at": today.isoformat(),
"Matched": 311,
"Unmatched": 0,
"Status": "Matched",
"Flag": "None",
}
jobs: list[FakeReconJob] = [
# Yesterday — completed runs and one failure
FakeReconJob(
id=101, name="FX Settlement Recon", as_at_date=yesterday,
recon_config_reference="fx-settlement",
due_datetime=_at(yesterday, 6, 0),
start_datetime=_at(yesterday, 6, 2),
finish_datetime=_at(yesterday, 6, 47),
status="completed", results=sample_result_a,
),
FakeReconJob(
id=102, name="Cash vs Ledger", as_at_date=yesterday,
recon_config_reference="cash-vs-ledger",
due_datetime=_at(yesterday, 9, 30),
start_datetime=_at(yesterday, 9, 33),
finish_datetime=_at(yesterday, 10, 12),
status="completed", results=sample_result_b,
),
FakeReconJob(
id=103, name="Intraday Liquidity Sweep", as_at_date=yesterday,
recon_config_reference="intraday-liquidity",
due_datetime=_at(yesterday, 13, 0),
start_datetime=_at(yesterday, 13, 4),
finish_datetime=_at(yesterday, 13, 35),
status="completed",
results={"Job": "Intraday Liquidity Sweep", "As-at": yesterday.isoformat(),
"Matched": 290, "Unmatched": 4, "Status": "Matched", "Flag": "None"},
),
FakeReconJob(
id=104, name="EOD Position Recon", as_at_date=yesterday,
recon_config_reference="eod-positions",
due_datetime=_at(yesterday, 18, 0),
start_datetime=_at(yesterday, 18, 5),
finish_datetime=_at(yesterday, 19, 50),
status="completed",
),
FakeReconJob(
id=105, name="Regulatory Reporting Recon", as_at_date=yesterday,
recon_config_reference="monthly-reg",
due_datetime=_at(yesterday, 22, 0),
start_datetime=_at(yesterday, 22, 3),
finish_datetime=_at(yesterday, 22, 9),
status="failed", status_reason="Source feed unavailable",
),
# Today — mix of completed, running, scheduled, and a failure
FakeReconJob(
id=201, name="FX Settlement Recon", as_at_date=today,
recon_config_reference="fx-settlement",
due_datetime=_at(today, 6, 0),
start_datetime=_at(today, 6, 3),
finish_datetime=_at(today, 6, 51),
status="completed", results=sample_result_c,
),
FakeReconJob(
id=202, name="Cash vs Ledger", as_at_date=today,
recon_config_reference="cash-vs-ledger",
due_datetime=_at(today, 9, 30),
start_datetime=max(now - timedelta(minutes=18), _at(today, 0, 0)),
finish_datetime=None,
status="running",
),
FakeReconJob(
id=203, name="Intraday Liquidity Sweep", as_at_date=today,
recon_config_reference="intraday-liquidity",
due_datetime=_at(today, 13, 0) if now < _at(today, 13, 0) else now + timedelta(hours=1),
status="created",
),
FakeReconJob(
id=204, name="EOD Position Recon", as_at_date=today,
recon_config_reference="eod-positions",
due_datetime=_at(today, 18, 0) if now < _at(today, 18, 0) else now + timedelta(hours=2),
status="created",
),
FakeReconJob(
id=205, name="AML Spot Check", as_at_date=today,
recon_config_reference="ad-hoc-aml",
due_datetime=_at(today, 8, 0),
start_datetime=_at(today, 8, 2),
finish_datetime=_at(today, 8, 5),
status="failed", status_reason="Schema mismatch",
),
]
# A burst of jobs all scheduled at the same time (16:00) — the engine will
# serialise these so the dashboard caps the stack at 3 lanes and chains the
# rest horizontally on the bottom lane.
burst_due = _at(today, 16, 0) if now < _at(today, 16, 0) else now + timedelta(minutes=30)
burst_names = [
("Branch Cash Recon", "branch-cash"),
("Card Settlement Recon", "card-settlement"),
("ATM Float Recon", "atm-float"),
("Loan Disbursement Recon","loan-disb"),
("Mortgage Payment Recon", "mortgage-pmt"),
("Term Deposit Recon", "term-deposit"),
("Wire Transfer Recon", "wire-transfer"),
]
for i, (name, ref) in enumerate(burst_names):
jobs.append(FakeReconJob(
id=210 + i, name=name, as_at_date=today,
recon_config_reference=ref,
due_datetime=burst_due,
status="created",
))
# Add a 7-day history (excluding the two days above) so totals look real.
history_template = [
("FX Settlement Recon", "fx-settlement", 6, 45, "completed"),
("Cash vs Ledger", "cash-vs-ledger", 9, 42, "completed"),
("Intraday Liquidity Sweep","intraday-liquidity", 13, 30, "completed"),
("EOD Position Recon", "eod-positions", 18, 105, "completed"),
]
next_id = 300
for d in range(2, 8):
day = today - timedelta(days=d)
for name, ref, hour, duration_min, status in history_template:
start = _at(day, hour, 5)
jobs.append(FakeReconJob(
id=next_id, name=name, as_at_date=day,
recon_config_reference=ref,
due_datetime=_at(day, hour, 0),
start_datetime=start,
finish_datetime=start + timedelta(minutes=duration_min),
status=status,
results={"Matched": 800 + next_id % 500, "Unmatched": (next_id * 3) % 60,
"Status": "Matched" if (next_id % 5) else "Unmatched", "Flag": "None"},
))
next_id += 1
# Sprinkle in occasional failures
if d in (3, 5):
jobs.append(FakeReconJob(
id=next_id, name="Customer Master Recon", as_at_date=day,
recon_config_reference="weekly-customers",
due_datetime=_at(day, 4, 0),
start_datetime=_at(day, 4, 1),
finish_datetime=_at(day, 4, 7),
status="failed", status_reason="Upstream timeout",
))
next_id += 1
return jobs