# The job config is stored in the DB as a JSON document. These job config classes are used to conform the JSON. # This is done for several reasons: # - We have a record of what data was in the API call rather than a schema migrated record # - We don't need to maintain the many config tables that would result - just the job config JSON field. # - The pattern selected for a recon job defines what config is needed so a 3rd normal form would be complex # The recon workers use the loaded JSON as a dictionary so must deal with confg schema changes gracefully. from datetime import datetime from typing import List, Optional, Dict from enum import Enum import operator from pydantic import BaseModel, Field from app.core.refdata import ReconPatterns, ReconConfigStatus, ProfileFields from app.models.recon_auth import UserResponse from app.db.schema import ReconConfig as ReconConfigSchema from app.core.config import db_config_map class Frequencies(str, Enum): AD_HOC = "Ad Hoc" INTRA_DAY = "Intra Day" DAILY = "Daily" WEEKLY = "Weekly" MONTHLY = "Monthly" QUARTERLY = "Quarterly" class ComparisonOperators(str, Enum): GREATER_THAN = "Greater Than" LESS_THAN = "Less Than" EQUAL_TO = "Equal To" @property def to_symbol(self): mapping = { self.GREATER_THAN.value: ">", self.LESS_THAN.value: "<", self.EQUAL_TO.value: "=", } return mapping[self.value] @property def to_operator(self): mapping = { self.GREATER_THAN.value: operator.gt, self.LESS_THAN.value: operator.lt, self.EQUAL_TO.value: operator.eq, } return mapping[self.value] class ProfileThreshold(BaseModel): profile_field: ProfileFields value: float test: ComparisonOperators model_config = { "use_enum_values": True # Always output enum values } class CSVSpec(BaseModel): delimiter: str = Field(default=',') header: bool = Field(default=True) encoding: str = Field(default='utf_8') trailer_rows: int = Field(default=0) quoting: bool = Field(default=True) class XMLSpec(BaseModel): xpathstr: str = Field(default='./*') encoding: str = Field(default='utf-8') url_description = f"URL to source. If CSV, date can be specified via template which has these variables available: today, as_at_date, as_at_offset (job date + system config offset), today_offset. For DB connections we map connection name to correct name for the environment. Options are: {db_config_map.keys()} " url_examples = [ "file:///Z:/PredatorArchive/PredatorArchive_Customer_File_{{as_at_date.strftime('%Y%m%d')}}.csv", "file:///Z:/PredatorArchive/PredatorArchive_Customer_File_{{today.strftime('%Y%m%d')}}.csv", "oracle://NETREVEAL?db=NETREVEAL&table=CUSTOMERS", "snowflake://DATAPRODUCT?db=PDP_ONYX&schema=ODS&table=CUSTOMERS_SCD", "mssql://GROUPDW?db=BIODS_PROCESSING&table=CUSTOMERS" ] schema_description = 'An ordered dictionary of column names & column types. Types can be "int","str","date(\'format\')","datetime(\'format\')","float"' schema_examples = [{},{"my column name":"str", "an integer column":"int", "Update_Datetime":"datetime(%Y-%m-%d %H:%M:%S.%f)", "a float field":"float"}] filter_description = "A list of filters to load only certain rows. For a SQL connection this will appear in the WHERE part of the SELECT statement. The comparison can use the date fields (today, as_at_date, as_at_offset, today_offset) with 'strftime' formatting." class SystemConfig(BaseModel): url: str = Field(description=url_description, examples=url_examples) name: str = Field(default="") comment: Optional[str] = Field(default="") csv_spec: Optional[CSVSpec] = Field(default=None) xml_spec: Optional[XMLSpec] = Field(default=None) day_offset: int = Field(default=0, description="When a config is run, for a given date, what is the number of days difference to the date in the filename or slq query.") system_schema: Dict[str, str] = Field(default={}, description=schema_description, examples=schema_examples) obfuscate_fields: list[str] = Field(default=[]) pci_redact_fields: list[str] = Field(default=[]) field_widths: list[int] = Field(default=[]) profile_thresholds: List[ProfileThreshold] = Field(default=[]) index_fields: list[str] = Field(default=[]) filter: Optional[str] = Field(default="", description="If provided, appended to the sql query (file and database supported) after the WHERE clause.") sql: Optional[str] = Field(default="", description="If provided, replaces entire sql query (database supported, file tbd)") field_mapping_description = 'A dictionary of : column names where column names differ but values should reconcile. For columns not mapped here, it is expected that source(s) and destination have the same column names. Note: With CSVs the names from the system_schema field will override the column names in the header so column matching can be done this way.' field_mapping_examples = [{},{"my source column 1":"my dest column 1"}] class ReconConfigRequest(BaseModel): reference: str = Field(default="Unnamed", description="User supplied id of the config") name: str = Field(default="Unnamed", description="Only used to name the results.") business_process: str = Field(default="Unnamed Process", description="Only used in the results.") comment: Optional[str] = None data_type: str = Field(default="Unknown", description="10000 foot view of what the data in the recon relates to. E.g. Customer or Transaction") sources: List[SystemConfig] destination: SystemConfig pattern: ReconPatterns status: ReconConfigStatus = Field(description="This controls where the result of recon jobs using this config go. 'draft' results will go to the draft snowflake tables, 'published' will go to the live snowflake tables which power the recon dashboards.") field_mapping: Dict[str, str] = Field(default={}, description=field_mapping_description, examples=field_mapping_examples) # Unlike the other recon config attributes, these map to database columns, not part of the config JSON. frequency: Frequencies = Field(default = Frequencies.AD_HOC) start_datetime: Optional[datetime] = None model_config = { "use_enum_values": True # Always output enum values } class ReconConfigResponse(ReconConfigRequest): """ Response when serializing """ # reference: str revision_number: int user: UserResponse class ReconConfig(ReconConfigRequest): """ Internal Representation of the config object. """ # reference: str revision_number: int username: str model_config = { "from_attributes": True, # Ability to load from ORM model # "use_enum_values": False, # TODO: ORM loading needs to load enums properly } def config_from_schema(recon_config_schema: ReconConfigSchema) -> ReconConfig: """ Config uses the json 'config' field of the db to allow non-breaking schema changes but also adds read only fields from the other table columns. - 2 user writable fields are now in the ReconConfigSchema: frequency and start_datetime. This is to allow filtering of configs for scheduling. We use username over User object so we don't need to do an extra db lookup for the user table. """ recon_config_schema.config["reference"] = recon_config_schema.reference recon_config_schema.config["revision_number"] = recon_config_schema.revision_number recon_config_schema.config["username"] = recon_config_schema.username if recon_config_schema.frequency is not None: recon_config_schema.config["frequency"] = recon_config_schema.frequency else: recon_config_schema.config["frequency"] = Frequencies.AD_HOC.value if recon_config_schema.start_datetime is not None: recon_config_schema.config["start_datetime"] = recon_config_schema.start_datetime.isoformat() else: recon_config_schema.config["start_datetime"] = None recon_config_model = ReconConfig.model_validate(recon_config_schema.config) return recon_config_model