from datetime import date, datetime from typing import List, Dict from sqlalchemy import inspect, Enum from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy import String, Date, func, ForeignKey, JSON, Integer, Boolean, DateTime from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship from app.core.config import config from app.core.refdata import ReconJobStatus, ReconConfigStatus engine = create_async_engine(config.db_url, connect_args={"check_same_thread": False}) SessionLocal = async_sessionmaker(engine, close_resets_only=False) def get_async_sessionmaker(): return SessionLocal class Base(DeclarativeBase): pass def object_as_dict(obj): return { c.key: getattr(obj, c.key) for c in inspect(obj).mapper.column_attrs } class User(Base): """ Users are recorded to track who made changes and created jobs Only support users need passwords as a backup solution to JWT provided via the IDP When a valid JWT is provided via IDP and unpacked, a user record will be looked up/created """ __tablename__ = "users" username: Mapped[str] = mapped_column(String, primary_key=True) email: Mapped[str] = mapped_column(String(100), nullable=False, unique=True, index=True) disabled: Mapped[bool] = mapped_column(Boolean, nullable=False, index=True) # hashed_password: Mapped[str] = deferred(mapped_column(String(100))) hashed_password: Mapped[str] = mapped_column(String(100), nullable=True) recon_jobs: Mapped[List["ReconJob"]] = relationship( back_populates="user", cascade="all, delete-orphan" ) recon_configs: Mapped[List["ReconConfig"]] = relationship( back_populates="user", cascade="all, delete-orphan" ) class ReconConfig(Base): """ Holds the details of a reconciliation. Config info is stored as JSON. Revision and user are set by Recon Ranger. """ __tablename__ = "recon_configs" reference: Mapped[str] = mapped_column(String, primary_key=True) revision_number: Mapped[int] = mapped_column(Integer, nullable=False) status: Mapped[ReconConfigStatus] = mapped_column(String, server_default="draft") config: Mapped[dict] = mapped_column(JSON) start_datetime: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=True, server_default=func.current_date()) frequency: Mapped[str] = mapped_column(String, server_default="Daily") recon_jobs: Mapped[List["ReconJob"]] = relationship( back_populates="recon_config", cascade="all, delete-orphan" ) username: Mapped[str] = mapped_column(ForeignKey("users.username"), nullable=False) user: Mapped["User"] = relationship(back_populates="recon_configs", lazy="select") __mapper_args__ = {"version_id_col": revision_number} class ReconJob(Base): """ A request to run & track a recon job using a recon config. When created, a stuck (in progress), or failed recon is updated, the recon engine will be called to run a recon """ __tablename__ = "recon_jobs" id: Mapped[int] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(String) due_datetime: Mapped[datetime] = mapped_column(DateTime, index=True, nullable=True, server_default=func.current_date()) start_datetime: Mapped[datetime] = mapped_column(DateTime, nullable=True) finish_datetime: Mapped[datetime] = mapped_column(DateTime, nullable=True) as_at_date: Mapped[date] = mapped_column(Date, index=True, server_default=func.current_date()) status: Mapped[ReconJobStatus] = mapped_column( Enum(ReconJobStatus, native_enum=False, validate_strings=False), server_default=ReconJobStatus.CREATED ) status_reason: Mapped[str] = mapped_column(String, server_default="") recon_config_reference: Mapped[str] = mapped_column(ForeignKey("recon_configs.reference"), nullable=False) recon_config: Mapped["ReconConfig"] = relationship(back_populates="recon_jobs", lazy="joined") username: Mapped[str] = mapped_column(ForeignKey("users.username"), nullable=False) user: Mapped["User"] = relationship(back_populates="recon_jobs", lazy="joined") results: Mapped[Dict] = mapped_column(JSON, nullable=True)