Files
css-test/app/models/recon_result.py
T
2026-05-25 06:45:57 +00:00

347 lines
15 KiB
Python

import logging
from typing import Optional, Annotated, Union, Type, Any
from datetime import date, datetime
import xml.etree.ElementTree as ET
import math
from pydantic import BaseModel, ConfigDict, Field, WithJsonSchema, field_serializer
import pandas as pd
import numpy as np
from app.core.config import config
from app.core.refdata import ReconResultStates
from app.core.exceptions import ReconExecption
from app.engine.loaders import url_to_schema
from app.core.file_util import url_to_filepath
DataFrame = Annotated[
pd.DataFrame,
WithJsonSchema({'type': 'object', 'additionalProperties': 'str'}),
]
NaTType: Type[Any] = type(pd.NaT)
sanitized_model_config: dict[str, Any] = {
"arbitrary_types_allowed": True,
"json_encoders": {
NaTType: lambda v: None,
np.integer: int,
np.floating: lambda v: None if (math.isnan(v) or math.isinf(v)) else float(v),
np.bool_: bool,
np.ndarray: lambda v: v.tolist(),
pd.Timestamp: lambda v: v.isoformat(),
pd.Timedelta: lambda v: v.total_seconds(),
pd.Period: str,
float: lambda v: None if (math.isnan(v) or math.isinf(v)) else v,
}
}
def sanitize_for_json(obj: Any) -> Any:
"""Recursively convert Pandas/NumPy types into JSON-safe Python types."""
if obj is None:
return None
if obj is pd.NaT or isinstance(obj, NaTType):
return None
if isinstance(obj, (np.integer,)):
return int(obj)
if isinstance(obj, (np.floating,)):
return None if (math.isnan(obj) or math.isinf(obj)) else float(obj)
if isinstance(obj, (np.bool_,)):
return bool(obj)
if isinstance(obj, (np.ndarray,)):
return [sanitize_for_json(x) for x in obj.tolist()]
if isinstance(obj, pd.Timestamp):
return obj.isoformat()
if isinstance(obj, pd.Timedelta):
return obj.total_seconds()
if isinstance(obj, pd.Period):
return str(obj)
if isinstance(obj, (list, tuple, set)):
return [sanitize_for_json(x) for x in obj]
if isinstance(obj, dict):
return {k: sanitize_for_json(v) for k, v in obj.items()}
return obj
class Profile(BaseModel):
model_config = ConfigDict(**sanitized_model_config)
url: str
name: str
row_count: int
total_nulls: int
threshold_test_results: dict[str,str] = {}
query: Optional[str] = None
table_file: Optional[str] = None
from typing import Optional
from pydantic import BaseModel, Field
class ReconResultValue(BaseModel):
column_name: str
column_value_raw: Any = Field(alias="_column_value")
ordinal_position: int
data_type: str
character_maximum_length: int
numeric_precision: int
numeric_scale: int
allow_null: int
def __init__(
self,
column_name: str,
column_value_raw: Any,
ordinal_position: int,
data_type: str,
character_maximum_length: int,
numeric_precision: int,
numeric_scale: int,
allow_null: int
):
# Pass values to BaseModel using keyword args for validation
super().__init__(
column_name=column_name,
_column_value=column_value_raw,
ordinal_position=ordinal_position,
data_type=data_type,
character_maximum_length=character_maximum_length,
numeric_precision=numeric_precision,
numeric_scale=numeric_scale,
allow_null=allow_null
)
@property
def column_value(self) -> str:
"""Return processed column value."""
val = self.column_value_raw
if val is None:
if not self.allow_null:
logging.warning(f"Result published with empty value for non-nullable field: {self.column_name}")
return "Undefined"
return ""
s = str(val)
s = s.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
while ' ' in s:
s = s.replace(' ', ' ')
s = s.strip()
if self.character_maximum_length > 0 and len(s) > self.character_maximum_length:
return s[:self.character_maximum_length]
return s
class ReconResult(BaseModel):
model_config = ConfigDict(**sanitized_model_config)
recon_job_id: int
recon_job_name: str
recon_config_reference: str
recon_config_name: str
business_process: str
as_at_date: date
data_type: str
frequency: str
start_timestamp: datetime = Field(default_factory=datetime.now)
end_timestamp: Optional[datetime] = None
recon_state: Optional[ReconResultStates] = None
source_profiles: list[Profile] = []
dest_profile: Optional[Profile] = None
problems: dict[str,dict|str] = {}
@field_serializer("problems")
def serialize_problems(self, problems: dict, _info):
"""Automatically sanitize problems before JSON serialization."""
return sanitize_for_json(problems)
environment: str = ""
pattern: str
pattern_result: Optional[bool] = None
pattern_failed_row_count: int = 0
pattern_rows_reconciled: int = 0
username: Optional[str] = None
due_timestamp: Optional[datetime] = None
_result_values: list[list[ReconResultValue]] = []
@property
def result_values(self):
if self._result_values == []:
self.create_recon_result_values()
return self._result_values
_pattern_result_description: Optional[str] = None
@property
def pattern_result_description(self) -> Optional[str]:
"""
Escaped special characters like newlines, tabs, etc.
"""
if self._pattern_result_description is None:
return None
return self._pattern_result_description.encode('unicode_escape').decode('utf-8')
@pattern_result_description.setter
def pattern_result_description(self, value: Optional[str]) -> None:
if value is not None and not isinstance(value, str):
raise TypeError("pattern_result_description must be a string or None")
self._pattern_result_description = value
def create_recon_result_values(self) -> None:
if self.dest_profile is None:
raise ReconExecption("No dest profile created")
if self.recon_state is None:
raise ReconExecption("Unexpeded null recon_state")
target_schema = url_to_schema(self.dest_profile.url)
for source in self.source_profiles:
row_values:list[ReconResultValue] = []
source_schema = url_to_schema(source.url)
row_values.append( ReconResultValue("Environment", f"{self.environment}", 1, "varchar", 100, 0, 0, 0))
row_values.append( ReconResultValue("Rec Number", f"{self.recon_job_id}", 2, "int", 0, 10, 0, 0))
row_values.append( ReconResultValue("Business Ref", f"{self.recon_config_name}", 3, "varchar", 200, 0, 0, 0))
row_values.append( ReconResultValue("Business Data Type", f"{self.data_type}", 4, "varchar", 200, 0, 0, 0))
row_values.append( ReconResultValue("Business Process", f"{self.business_process}", 5, "varchar", 200, 0, 0, 0))
row_values.append( ReconResultValue("Config Reference", f"{self.recon_config_reference}", 6, "varchar", 200, 0, 0, 0))
row_values.append( ReconResultValue("Recon Date", f"{self.as_at_date.isoformat()}", 7, "datetime", 0, 0, 0, 0))
row_values.append( ReconResultValue("Run Date", f"{self.start_timestamp.isoformat()}", 8, "datetime", 0, 0, 0, 0))
row_values.append( ReconResultValue("Frequency", f"{self.frequency}", 9, "varchar", 30, 0, 0, 0))
row_values.append( ReconResultValue("Overall Status", f"{self.recon_state.value}", 10, "varchar", 150, 0, 0, 0))
row_values.append( ReconResultValue("Data Reconciled", f"{self.pattern_result}".lower(), 11, "boolean", 0, 0, 0, 0))
row_values.append( ReconResultValue("Data Recon Reason", f"{self.pattern_result_description}", 12, "varchar", 1024, 0, 0, 0))
row_values.append( ReconResultValue("Records Failed Count", f"{self.pattern_failed_row_count}", 13, "int", 0, 10, 0, 0))
row_values.append( ReconResultValue("Source Profile Results", f"{source.threshold_test_results}", 14, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Source System/DB", f"{source.name}", 15, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Source Schema", f"{source_schema}", 16, "varchar", 512, 0, 0, 1))
row_values.append( ReconResultValue("Source Table/File", f"{source.table_file}", 17, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Source URL", f"{source.url}", 18, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Source Records", f"{source.row_count}", 19, "int", 0, 0, 0, 0))
row_values.append( ReconResultValue("Target Profile Results", f"{self.dest_profile.threshold_test_results}", 20, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Target System/DB", f"{self.dest_profile.name}", 21, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Target Schema", f"{target_schema}", 22, "varchar", 512, 0, 0, 1))
row_values.append( ReconResultValue("Target Table/File", f"{self.dest_profile.table_file}", 23, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Target URL", f"{self.dest_profile.url}", 24, "varchar", 512, 0, 0, 0))
row_values.append( ReconResultValue("Target Records", f"{self.dest_profile.row_count}", 25, "int", 0, 10, 0, 0))
row_values.append( ReconResultValue("Rows Reconciled", f"{self.pattern_rows_reconciled}", 26, "int", 0, 10, 0, 0))
row_values.append( ReconResultValue("Start Timestamp", f"{self.start_timestamp.isoformat()}", 27, "datetime", 0, 0, 0, 1))
row_values.append( ReconResultValue("End Timestamp", self.end_timestamp.isoformat() if self.end_timestamp else "", 28, "datetime", 0, 0, 0, 1))
row_values.append( ReconResultValue("Username", self.username if self.username else "", 29, "varchar", 256, 0, 0, 1))
row_values.append( ReconResultValue("Due Timestamp", self.due_timestamp.isoformat() if self.due_timestamp else "", 30, "datetime", 0, 0, 0, 1))
row_values.append( ReconResultValue("Pattern", self.pattern if self.pattern else "", 31, "varchar", 128, 0, 0, 1))
row_values.append( ReconResultValue("Source Filter", source.query if source.query else "", 32, "varchar", 1024, 0, 0, 1))
row_values.append( ReconResultValue("Target Filter", self.dest_profile.query if self.dest_profile.query else "", 32, "varchar", 1024, 0, 0, 1))
self._result_values.append(row_values)
def to_csv(self, result_url):
if result_url.startswith("file:") != True:
raise ReconExecption(f"Only file system urls are allowed for the result path so far. Config has {result_url}")
# Create the csv dictionaries to be passed to writer
csv_results:list[dict] = []
for row in self.result_values:
csv_row = {}
for column in row:
csv_row[column.column_name] = column.column_value
csv_results.append(csv_row)
# generate filenames
output_file_name = f"reconciliation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat"
result_filename = url_to_filepath(result_url + output_file_name)
control_file_name = f"{result_filename}.xml"
# Write csv data file
df = pd.DataFrame(csv_results)
delimiter = "|~"
with open(result_filename, "w", encoding="utf-8") as f:
# Write header
f.write(delimiter.join(df.columns) + "\n")
# Write rows
for row in df.itertuples(index=False):
f.write(delimiter.join(map(str, row)) + "\n")
logging.info(f"Wrote results file {result_filename} with {len(df)} rows")
# Create the csv control file data
control_file_content = self.output_control_file(
file_name=output_file_name,
number_of_records=len(csv_results),
legacy_feed_date=self.start_timestamp.date(),
delimiter=delimiter
)
# Write csv control file
with open(control_file_name, mode='w', newline='') as file:
file.write(control_file_content)
logging.info(f"Wrote results control file {control_file_name}")
def output_control_file(self, file_name: str, number_of_records: int, legacy_feed_date: Union[str, date], delimiter) -> str:
if isinstance(legacy_feed_date, date):
legacy_feed_date = legacy_feed_date.isoformat()
else:
legacy_feed_date = str(legacy_feed_date)
root = ET.Element("Feed")
def add(tag: str, text: str) -> None:
el = ET.SubElement(root, tag)
el.text = text
# Header (hardcoded to match example)
add("File", file_name)
add("ExtractProg", "RECONRANGER")
add("GenerationDate", f"{datetime.now().strftime('%b %d %Y %I:%M%p').upper()}")
add("NumberOfRecords", str(number_of_records))
add("LegacyFeedDate", legacy_feed_date)
add("FieldDelimiter", delimiter)
add("ContactEmailAddress", config.contact_email)
data_schema = ET.SubElement(root, "DataSchema")
cols = ET.SubElement(data_schema, "COLS")
def add_col(
column_name: str,
ordinal_position: int,
data_type: str,
character_maximum_length: int,
numeric_precision: int,
numeric_scale: int,
allow_null: int,
) -> None:
col = ET.SubElement(cols, "COL")
ET.SubElement(col, "COLUMN_NAME").text = column_name
ET.SubElement(col, "ORDINAL_POSITION").text = str(ordinal_position)
ET.SubElement(col, "DATA_TYPE").text = data_type
ET.SubElement(col, "CHARACTER_MAXIMUM_LENGTH").text = str(character_maximum_length)
ET.SubElement(col, "NUMERIC_PRECISION").text = str(numeric_precision)
ET.SubElement(col, "NUMERIC_SCALE").text = str(numeric_scale)
ET.SubElement(col, "ALLOW_NULL").text = str(allow_null)
for column in self.result_values[0]:
add_col(
column.column_name.replace(" ","_").replace("/","_"),
column.ordinal_position,
column.data_type,
column.character_maximum_length,
column.numeric_precision,
column.numeric_scale,
column.allow_null
)
xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=False)
return xml_bytes.decode("utf-8")