Files
css-test/app/r.py
T
2026-05-25 05:15:53 +00:00

176 lines
8.1 KiB
Python

# The job config is stored in the DB as a JSON document. These job config classes are used to conform the JSON.
# This is done for several reasons:
# - We have a record of what data was in the API call rather than a schema migrated record
# - We don't need to maintain the many config tables that would result - just the job config JSON field.
# - The pattern selected for a recon job defines what config is needed so a 3rd normal form would be complex
# The recon workers use the loaded JSON as a dictionary so must deal with confg schema changes gracefully.
from datetime import datetime
from typing import List, Optional, Dict
from enum import Enum
import operator
from pydantic import BaseModel, Field
from app.core.refdata import ReconPatterns, ReconConfigStatus, ProfileFields
from app.models.recon_auth import UserResponse
from app.db.schema import ReconConfig as ReconConfigSchema
from app.core.config import db_config_map
class Frequencies(str, Enum):
AD_HOC = "Ad Hoc"
INTRA_DAY = "Intra Day"
DAILY = "Daily"
WEEKLY = "Weekly"
MONTHLY = "Monthly"
QUARTERLY = "Quarterly"
class ComparisonOperators(str, Enum):
GREATER_THAN = "Greater Than"
LESS_THAN = "Less Than"
EQUAL_TO = "Equal To"
@property
def to_symbol(self):
mapping = {
self.GREATER_THAN.value: ">",
self.LESS_THAN.value: "<",
self.EQUAL_TO.value: "=",
}
return mapping[self.value]
@property
def to_operator(self):
mapping = {
self.GREATER_THAN.value: operator.gt,
self.LESS_THAN.value: operator.lt,
self.EQUAL_TO.value: operator.eq,
}
return mapping[self.value]
class ProfileThreshold(BaseModel):
profile_field: ProfileFields
value: float
test: ComparisonOperators
model_config = {
"use_enum_values": True # Always output enum values
}
class CSVSpec(BaseModel):
delimiter: str = Field(default=',')
header: bool = Field(default=True)
encoding: str = Field(default='utf_8')
trailer_rows: int = Field(default=0)
quoting: bool = Field(default=True)
class XMLSpec(BaseModel):
xpathstr: str = Field(default='./*')
encoding: str = Field(default='utf-8')
url_description = f"URL to source. If CSV, date can be specified via template which has these variables available: today, as_at_date, as_at_offset (job date + system config offset), today_offset. For DB connections we map connection name to correct name for the environment. Options are: {db_config_map.keys()} "
url_examples = [
"file:///Z:/PredatorArchive/PredatorArchive_Customer_File_{{as_at_date.strftime('%Y%m%d')}}.csv",
"file:///Z:/PredatorArchive/PredatorArchive_Customer_File_{{today.strftime('%Y%m%d')}}.csv",
"oracle://NETREVEAL?db=NETREVEAL&table=CUSTOMERS",
"snowflake://DATAPRODUCT?db=PDP_ONYX&schema=ODS&table=CUSTOMERS_SCD",
"mssql://GROUPDW?db=BIODS_PROCESSING&table=CUSTOMERS"
]
schema_description = 'An ordered dictionary of column names & column types. Types can be "int","str","date(\'format\')","datetime(\'format\')","float"'
schema_examples = [{},{"my column name":"str", "an integer column":"int", "Update_Datetime":"datetime(%Y-%m-%d %H:%M:%S.%f)", "a float field":"float"}]
filter_description = "A list of filters to load only certain rows. For a SQL connection this will appear in the WHERE part of the SELECT statement. The comparison can use the date fields (today, as_at_date, as_at_offset, today_offset) with 'strftime' formatting."
class SystemConfig(BaseModel):
url: str = Field(description=url_description, examples=url_examples)
name: str = Field(default="<system name>")
comment: Optional[str] = Field(default="")
csv_spec: Optional[CSVSpec] = Field(default=None)
xml_spec: Optional[XMLSpec] = Field(default=None)
day_offset: int = Field(default=0, description="When a config is run, for a given date, what is the number of days difference to the date in the filename or slq query.")
system_schema: Dict[str, str] = Field(default={}, description=schema_description, examples=schema_examples)
obfuscate_fields: list[str] = Field(default=[])
pci_redact_fields: list[str] = Field(default=[])
field_widths: list[int] = Field(default=[])
profile_thresholds: List[ProfileThreshold] = Field(default=[])
index_fields: list[str] = Field(default=[])
filter: Optional[str] = Field(default="", description="If provided, appended to the sql query (file and database supported) after the WHERE clause.")
sql: Optional[str] = Field(default="", description="If provided, replaces entire sql query (database supported, file tbd)")
field_mapping_description = 'A dictionary of <source>:<destination> column names where column names differ but values should reconcile. For columns not mapped here, it is expected that source(s) and destination have the same column names. Note: With CSVs the names from the system_schema field will override the column names in the header so column matching can be done this way.'
field_mapping_examples = [{},{"my source column 1":"my dest column 1"}]
class ReconConfigRequest(BaseModel):
reference: str = Field(default="Unnamed", description="User supplied id of the config")
name: str = Field(default="Unnamed", description="Only used to name the results.")
business_process: str = Field(default="Unnamed Process", description="Only used in the results.")
comment: Optional[str] = None
data_type: str = Field(default="Unknown", description="10000 foot view of what the data in the recon relates to. E.g. Customer or Transaction")
sources: List[SystemConfig]
destination: SystemConfig
pattern: ReconPatterns
status: ReconConfigStatus = Field(description="This controls where the result of recon jobs using this config go. 'draft' results will go to the draft snowflake tables, 'published' will go to the live snowflake tables which power the recon dashboards.")
field_mapping: Dict[str, str] = Field(default={}, description=field_mapping_description, examples=field_mapping_examples)
# Unlike the other recon config attributes, these map to database columns, not part of the config JSON.
frequency: Frequencies = Field(default = Frequencies.AD_HOC)
start_datetime: Optional[datetime] = None
model_config = {
"use_enum_values": True # Always output enum values
}
class ReconConfigResponse(ReconConfigRequest):
"""
Response when serializing
"""
# reference: str
revision_number: int
user: UserResponse
class ReconConfig(ReconConfigRequest):
"""
Internal Representation of the config object.
"""
# reference: str
revision_number: int
username: str
model_config = {
"from_attributes": True, # Ability to load from ORM model
# "use_enum_values": False, # TODO: ORM loading needs to load enums properly
}
def config_from_schema(recon_config_schema: ReconConfigSchema) -> ReconConfig:
""" Config uses the json 'config' field of the db to allow non-breaking schema changes
but also adds read only fields from the other table columns.
- 2 user writable fields are now in the ReconConfigSchema: frequency and start_datetime. This is to allow filtering of configs for scheduling.
We use username over User object so we don't need to do an extra db lookup for the user table.
"""
recon_config_schema.config["reference"] = recon_config_schema.reference
recon_config_schema.config["revision_number"] = recon_config_schema.revision_number
recon_config_schema.config["username"] = recon_config_schema.username
if recon_config_schema.frequency is not None:
recon_config_schema.config["frequency"] = recon_config_schema.frequency
else:
recon_config_schema.config["frequency"] = Frequencies.AD_HOC.value
if recon_config_schema.start_datetime is not None:
recon_config_schema.config["start_datetime"] = recon_config_schema.start_datetime.isoformat()
else:
recon_config_schema.config["start_datetime"] = None
recon_config_model = ReconConfig.model_validate(recon_config_schema.config)
return recon_config_model