Files
css-test/app/views/views.py
T

487 lines
16 KiB
Python

"""User-interface views.
The original codebase pulls recon jobs from a database. For this workspace we
fake the data so the dashboard/results pages run without `app.core.config`,
`app.core.refdata`, or a database.
"""
from pathlib import Path
from datetime import date, datetime, time, timedelta
from fastapi import APIRouter, Request
from fastapi.responses import HTMLResponse
from fastapi.templating import Jinja2Templates
from app.models.recon_job import ReconJob # noqa: F401 (validates that the real model imports cleanly)
from app.service.fake_jobs import FakeReconJob, get_fake_jobs
from app.service.fake_configs import FakeReconConfig, get_fake_configs
router = APIRouter(prefix="", tags=["User_Interface"])
_project_root = Path(__file__).resolve().parents[2]
templates = Jinja2Templates(directory=str(_project_root / "data" / "templates"))
SECONDS_IN_DAY = 24 * 60 * 60
def _frac(dt: datetime, day_start: datetime) -> float:
delta = (dt - day_start).total_seconds() / SECONDS_IN_DAY
return max(0.0, min(1.0, delta))
def _build_day_timeline(
jobs: list[FakeReconJob],
day: date,
now: datetime,
) -> dict:
"""Per-day timeline entries with fractional left/width (%) and a kind:
"completed", "running", or "scheduled".
"""
day_start = datetime.combine(day, time.min)
day_end = day_start + timedelta(days=1)
entries = []
for j in jobs:
start = j.start_datetime
finish = j.finish_datetime
due = j.due_datetime
if start is not None and start < day_end:
run_end = finish if finish is not None else now
if run_end <= day_start:
continue
left = _frac(start, day_start)
right = _frac(run_end, day_start)
if right <= left:
right = min(1.0, left + 0.005)
entries.append({
"id": j.id,
"name": j.name,
"kind": "completed" if finish is not None else "running",
"status": j.status,
"left": left * 100.0,
"width": (right - left) * 100.0,
"start": start.isoformat(timespec="minutes"),
"finish": finish.isoformat(timespec="minutes") if finish else None,
"due": due.isoformat(timespec="minutes") if due else None,
})
continue
if due is not None and day_start <= due < day_end and due >= now:
left = _frac(due, day_start)
entries.append({
"id": j.id,
"name": j.name,
"kind": "scheduled",
"status": j.status,
"left": left * 100.0,
"width": 0.6,
"start": None,
"finish": None,
"due": due.isoformat(timespec="minutes"),
})
entries.sort(key=lambda e: (e["left"], e["kind"] == "scheduled"))
# Lane (row) assignment so overlapping/adjacent bars stack vertically.
# An entry occupies [left, left+width); for collision purposes scheduled
# markers are widened slightly so back-to-back scheduled jobs separate.
GAP = 0.2 # % units
MIN_COLLISION_WIDTH = 1.2
lane_ends: list[float] = []
for e in entries:
coll_width = max(e["width"], MIN_COLLISION_WIDTH)
coll_end = e["left"] + coll_width
placed = False
for i, end in enumerate(lane_ends):
if e["left"] >= end + GAP:
lane_ends[i] = coll_end
e["lane"] = i
placed = True
break
if not placed:
e["lane"] = len(lane_ends)
lane_ends.append(coll_end)
return {
"date": day,
"is_today": day == now.date(),
"now_pct": _frac(now, day_start) * 100.0 if day_start <= now < day_end else None,
"hours": list(range(0, 25)),
"entries": entries,
"lane_count": max(1, len(lane_ends)),
}
def _fmt_duration(seconds: float) -> str:
seconds = int(max(0, seconds))
h, rem = divmod(seconds, 3600)
m, s = divmod(rem, 60)
if h:
return f"{h}h {m:02d}m"
return f"{m}m {s:02d}s"
def _jobs_for_day(jobs: list[FakeReconJob], day: date) -> list[FakeReconJob]:
return [j for j in jobs if j.as_at_date == day]
def _status_counts(jobs: list[FakeReconJob]) -> dict[str, int]:
counts = {"completed": 0, "failed": 0, "running": 0, "created": 0, "other": 0}
for j in jobs:
key = j.status if j.status in counts else "other"
counts[key] += 1
return counts
def _success_rate(counts: dict[str, int]) -> float | None:
finished = counts["completed"] + counts["failed"]
return (counts["completed"] / finished * 100.0) if finished else None
def _compute_metrics(
jobs: list[FakeReconJob],
configs: list[FakeReconConfig],
now: datetime,
) -> dict:
today = now.date()
yesterday = today - timedelta(days=1)
week_ago = today - timedelta(days=6)
today_jobs = _jobs_for_day(jobs, today)
yesterday_jobs = _jobs_for_day(jobs, yesterday)
week_jobs = [j for j in jobs if week_ago <= j.as_at_date <= today]
today_counts = _status_counts(today_jobs)
yesterday_counts = _status_counts(yesterday_jobs)
total_counts = _status_counts(jobs)
week_counts = _status_counts(week_jobs)
# --- Config metrics ---
active_configs = [c for c in configs if c.status != "draft"]
by_freq: dict[str, int] = {}
for c in active_configs:
by_freq[c.frequency] = by_freq.get(c.frequency, 0) + 1
by_freq_sorted = sorted(by_freq.items(), key=lambda kv: -kv[1])
config_status_counts: dict[str, int] = {}
for c in configs:
config_status_counts[c.status] = config_status_counts.get(c.status, 0) + 1
# --- Performance ---
durations_today = [
(j.finish_datetime - j.start_datetime).total_seconds()
for j in today_jobs
if j.start_datetime and j.finish_datetime
]
avg_duration_today = (sum(durations_today) / len(durations_today)) if durations_today else None
longest_today = None
longest_today_seconds = 0.0
if today_jobs:
for j in today_jobs:
if j.start_datetime and j.finish_datetime:
secs = (j.finish_datetime - j.start_datetime).total_seconds()
if longest_today is None or secs > longest_today_seconds:
longest_today = j
longest_today_seconds = secs
# On-time = job started within 15 min of due_datetime (when both known)
on_time = late = 0
for j in today_jobs:
if j.due_datetime and j.start_datetime:
if (j.start_datetime - j.due_datetime).total_seconds() <= 15 * 60:
on_time += 1
else:
late += 1
on_time_rate_today = (on_time / (on_time + late) * 100.0) if (on_time + late) else None
# --- Match quality (from results) ---
def _matched_unmatched(js: list[FakeReconJob]) -> tuple[int, int]:
m = u = 0
for j in js:
r = j.results or {}
m += int(r.get("Matched") or 0)
u += int(r.get("Unmatched") or 0)
return m, u
m_today, u_today = _matched_unmatched(today_jobs)
m_week, u_week = _matched_unmatched(week_jobs)
match_rate_today = (m_today / (m_today + u_today) * 100.0) if (m_today + u_today) else None
match_rate_week = (m_week / (m_week + u_week) * 100.0) if (m_week + u_week) else None
# --- Failures ---
recent_failures = sorted(
(j for j in jobs if j.status == "failed"),
key=lambda j: (j.finish_datetime or j.start_datetime or datetime.min),
reverse=True,
)[:3]
# --- Upcoming today ---
upcoming_today = sorted(
(j for j in today_jobs
if j.start_datetime is None and j.due_datetime is not None and j.due_datetime >= now),
key=lambda j: j.due_datetime or datetime.max,
)[:5]
return {
"jobs": {
"today": today_counts,
"yesterday": yesterday_counts,
"week": week_counts,
"total": total_counts,
"success_rate_today": _success_rate(today_counts),
"success_rate_yesterday": _success_rate(yesterday_counts),
"success_rate_week": _success_rate(week_counts),
"success_rate_total": _success_rate(total_counts),
},
"configs": {
"active": len(active_configs),
"total": len(configs),
"draft": config_status_counts.get("draft", 0),
"archived": config_status_counts.get("archived", 0),
"by_frequency": by_freq_sorted,
},
"performance": {
"avg_duration_today": _fmt_duration(avg_duration_today) if avg_duration_today is not None else None,
"longest_today_name": longest_today.name if longest_today else None,
"longest_today_duration": (
_fmt_duration(longest_today_seconds) if longest_today else None
),
"on_time_rate_today": on_time_rate_today,
"on_time": on_time,
"late": late,
},
"match_quality": {
"matched_today": m_today,
"unmatched_today": u_today,
"match_rate_today": match_rate_today,
"matched_week": m_week,
"unmatched_week": u_week,
"match_rate_week": match_rate_week,
},
"recent_failures": [
{
"id": j.id,
"name": j.name,
"as_at": j.as_at_date.isoformat(),
"reason": j.status_reason or "",
"when": (
_when.isoformat(timespec="minutes")
if (_when := j.finish_datetime or j.start_datetime) is not None
else ""
),
}
for j in recent_failures
],
"upcoming_today": [
{
"id": j.id,
"name": j.name,
"due": j.due_datetime.strftime("%H:%M") if j.due_datetime else "",
}
for j in upcoming_today
],
}
@router.get("/", response_class=HTMLResponse)
async def dashboard_view(request: Request):
now = datetime.now()
today = now.date()
yesterday = today - timedelta(days=1)
jobs = get_fake_jobs(now)
configs = get_fake_configs(now)
timelines = [
_build_day_timeline(jobs, yesterday, now),
_build_day_timeline(jobs, today, now),
]
metrics = _compute_metrics(jobs, configs, now)
return templates.TemplateResponse(
request=request,
name="dashboard.html",
context={
"title": "Dashboard",
"timelines": timelines,
"metrics": metrics,
},
)
@router.get("/results", response_class=HTMLResponse)
async def results_view(
request: Request,
cursor: int = 999999,
recon_job_name: str | None = None,
as_at_date: str | None = None,
):
PAGE_SIZE = 20
now = datetime.now()
jobs = get_fake_jobs(now)
if recon_job_name:
jobs = [j for j in jobs if j.name == recon_job_name]
if as_at_date:
jobs = [j for j in jobs if j.as_at_date.isoformat() == as_at_date]
jobs.sort(key=lambda j: j.id, reverse=True)
page = [j for j in jobs if j.id < cursor][:PAGE_SIZE]
result_rows = [
{"job_id": j.id, "name": j.name, "data": j.results}
for j in page if j.results
]
next_cursor = page[-1].id - 1 if page else None
prev_cursor = page[0].id + PAGE_SIZE if page else None
return templates.TemplateResponse(
request=request,
name="results.html",
context={
"title": "Results",
"result_rows": result_rows,
"next_cursor": next_cursor,
"prev_cursor": prev_cursor,
"recon_job_name": recon_job_name,
"as_at_date": as_at_date,
},
)
def _build_history_timeline(
related: list[FakeReconJob],
now: datetime,
) -> dict:
"""Timeline of executions of a single recon-config from the first run to now.
Each entry is a marker positioned by ``as_at_date`` along a [first, today] axis.
"""
if not related:
return {"entries": [], "first_date": None, "last_date": now.date(), "days": 0, "ticks": []}
dates = sorted({j.as_at_date for j in related})
first = dates[0]
last = now.date()
span_days = max((last - first).days, 1)
entries = []
for j in sorted(related, key=lambda x: (x.as_at_date, x.start_datetime or datetime.min)):
days_in = (j.as_at_date - first).days
left = (days_in / span_days) * 100.0
when = j.finish_datetime or j.start_datetime or j.due_datetime
entries.append({
"id": j.id,
"name": j.name,
"status": j.status,
"kind": (
"running" if j.status == "running"
else "scheduled" if (j.start_datetime is None)
else "failed" if j.status == "failed"
else "completed"
),
"left": max(0.0, min(100.0, left)),
"as_at": j.as_at_date.isoformat(),
"when": when.isoformat(timespec="minutes") if when else None,
})
# axis ticks: spread up to ~6 evenly-spaced day labels across the span
tick_count = min(6, span_days + 1)
ticks = []
for i in range(tick_count):
frac = i / max(tick_count - 1, 1)
day = first + timedelta(days=int(round(frac * span_days)))
ticks.append({"left": frac * 100.0, "label": day.strftime("%d %b")})
return {
"entries": entries,
"first_date": first,
"last_date": last,
"days": span_days,
"ticks": ticks,
}
@router.get("/jobs/{job_id}", response_class=HTMLResponse)
async def job_detail_view(request: Request, job_id: int):
now = datetime.now()
jobs = get_fake_jobs(now)
configs = get_fake_configs(now)
job = next((j for j in jobs if j.id == job_id), None)
if job is None:
return HTMLResponse(
f"<h1>Job #{job_id} not found</h1>"
f"<p><a href='/'>Back to dashboard</a></p>",
status_code=404,
)
# Group related executions by recon_config_reference (fallback to name)
related = [
j for j in jobs
if j.recon_config_reference == job.recon_config_reference
or j.name == job.name
]
history = _build_history_timeline(related, now)
config = next(
(c for c in configs if c.reference == job.recon_config_reference), None
)
# Duration (if finished or currently running)
duration_str = None
if job.start_datetime:
end = job.finish_datetime or now
duration_str = _fmt_duration((end - job.start_datetime).total_seconds())
# Lateness vs due
lateness_str = None
if job.start_datetime and job.due_datetime:
delta = (job.start_datetime - job.due_datetime).total_seconds()
if delta <= 0:
lateness_str = f"on time ({_fmt_duration(-delta)} early)"
else:
lateness_str = f"{_fmt_duration(delta)} late"
# Aggregate stats across history
finished_related = [
j for j in related
if j.start_datetime is not None and j.finish_datetime is not None
]
finished_durations = [
(j.finish_datetime - j.start_datetime).total_seconds()
for j in finished_related
if j.start_datetime is not None and j.finish_datetime is not None
]
avg_dur = (sum(finished_durations) / len(finished_durations)) if finished_durations else None
success_count = sum(1 for j in related if j.status == "completed")
fail_count = sum(1 for j in related if j.status == "failed")
finished_count = success_count + fail_count
success_rate = (success_count / finished_count * 100.0) if finished_count else None
return templates.TemplateResponse(
request=request,
name="job_detail.html",
context={
"title": f"{job.name} #{job.id}",
"job": job,
"config": config,
"history": history,
"duration_str": duration_str,
"lateness_str": lateness_str,
"stats": {
"total": len(related),
"completed": success_count,
"failed": fail_count,
"success_rate": success_rate,
"avg_duration": _fmt_duration(avg_dur) if avg_dur is not None else None,
},
},
)