import logging from typing import Optional, Annotated, Union, Type, Any from datetime import date, datetime import xml.etree.ElementTree as ET import math from pydantic import BaseModel, ConfigDict, Field, WithJsonSchema, field_serializer import pandas as pd import numpy as np from app.core.config import config from app.core.refdata import ReconResultStates from app.core.exceptions import ReconExecption from app.engine.loaders import url_to_schema from app.core.file_util import url_to_filepath DataFrame = Annotated[ pd.DataFrame, WithJsonSchema({'type': 'object', 'additionalProperties': 'str'}), ] NaTType: Type[Any] = type(pd.NaT) sanitized_model_config: dict[str, Any] = { "arbitrary_types_allowed": True, "json_encoders": { NaTType: lambda v: None, np.integer: int, np.floating: lambda v: None if (math.isnan(v) or math.isinf(v)) else float(v), np.bool_: bool, np.ndarray: lambda v: v.tolist(), pd.Timestamp: lambda v: v.isoformat(), pd.Timedelta: lambda v: v.total_seconds(), pd.Period: str, float: lambda v: None if (math.isnan(v) or math.isinf(v)) else v, } } def sanitize_for_json(obj: Any) -> Any: """Recursively convert Pandas/NumPy types into JSON-safe Python types.""" if obj is None: return None if obj is pd.NaT or isinstance(obj, NaTType): return None if isinstance(obj, (np.integer,)): return int(obj) if isinstance(obj, (np.floating,)): return None if (math.isnan(obj) or math.isinf(obj)) else float(obj) if isinstance(obj, (np.bool_,)): return bool(obj) if isinstance(obj, (np.ndarray,)): return [sanitize_for_json(x) for x in obj.tolist()] if isinstance(obj, pd.Timestamp): return obj.isoformat() if isinstance(obj, pd.Timedelta): return obj.total_seconds() if isinstance(obj, pd.Period): return str(obj) if isinstance(obj, (list, tuple, set)): return [sanitize_for_json(x) for x in obj] if isinstance(obj, dict): return {k: sanitize_for_json(v) for k, v in obj.items()} return obj class Profile(BaseModel): model_config = ConfigDict(**sanitized_model_config) url: str name: str row_count: int total_nulls: int threshold_test_results: dict[str,str] = {} query: Optional[str] = None table_file: Optional[str] = None from typing import Optional from pydantic import BaseModel, Field class ReconResultValue(BaseModel): column_name: str column_value_raw: Any = Field(alias="_column_value") ordinal_position: int data_type: str character_maximum_length: int numeric_precision: int numeric_scale: int allow_null: int def __init__( self, column_name: str, column_value_raw: Any, ordinal_position: int, data_type: str, character_maximum_length: int, numeric_precision: int, numeric_scale: int, allow_null: int ): # Pass values to BaseModel using keyword args for validation super().__init__( column_name=column_name, _column_value=column_value_raw, ordinal_position=ordinal_position, data_type=data_type, character_maximum_length=character_maximum_length, numeric_precision=numeric_precision, numeric_scale=numeric_scale, allow_null=allow_null ) @property def column_value(self) -> str: """Return processed column value.""" val = self.column_value_raw if val is None: if not self.allow_null: logging.warning(f"Result published with empty value for non-nullable field: {self.column_name}") return "Undefined" return "" s = str(val) s = s.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ') while ' ' in s: s = s.replace(' ', ' ') s = s.strip() if self.character_maximum_length > 0 and len(s) > self.character_maximum_length: return s[:self.character_maximum_length] return s class ReconResult(BaseModel): model_config = ConfigDict(**sanitized_model_config) recon_job_id: int recon_job_name: str recon_config_reference: str recon_config_name: str business_process: str as_at_date: date data_type: str frequency: str start_timestamp: datetime = Field(default_factory=datetime.now) end_timestamp: Optional[datetime] = None recon_state: Optional[ReconResultStates] = None source_profiles: list[Profile] = [] dest_profile: Optional[Profile] = None problems: dict[str,dict|str] = {} @field_serializer("problems") def serialize_problems(self, problems: dict, _info): """Automatically sanitize problems before JSON serialization.""" return sanitize_for_json(problems) environment: str = "" pattern: str pattern_result: Optional[bool] = None pattern_failed_row_count: int = 0 pattern_rows_reconciled: int = 0 username: Optional[str] = None due_timestamp: Optional[datetime] = None _result_values: list[list[ReconResultValue]] = [] @property def result_values(self): if self._result_values == []: self.create_recon_result_values() return self._result_values _pattern_result_description: Optional[str] = None @property def pattern_result_description(self) -> Optional[str]: """ Escaped special characters like newlines, tabs, etc. """ if self._pattern_result_description is None: return None return self._pattern_result_description.encode('unicode_escape').decode('utf-8') @pattern_result_description.setter def pattern_result_description(self, value: Optional[str]) -> None: if value is not None and not isinstance(value, str): raise TypeError("pattern_result_description must be a string or None") self._pattern_result_description = value def create_recon_result_values(self) -> None: if self.dest_profile is None: raise ReconExecption("No dest profile created") if self.recon_state is None: raise ReconExecption("Unexpeded null recon_state") target_schema = url_to_schema(self.dest_profile.url) for source in self.source_profiles: row_values:list[ReconResultValue] = [] source_schema = url_to_schema(source.url) row_values.append( ReconResultValue("Environment", f"{self.environment}", 1, "varchar", 100, 0, 0, 0)) row_values.append( ReconResultValue("Rec Number", f"{self.recon_job_id}", 2, "int", 0, 10, 0, 0)) row_values.append( ReconResultValue("Business Ref", f"{self.recon_config_name}", 3, "varchar", 200, 0, 0, 0)) row_values.append( ReconResultValue("Business Data Type", f"{self.data_type}", 4, "varchar", 200, 0, 0, 0)) row_values.append( ReconResultValue("Business Process", f"{self.business_process}", 5, "varchar", 200, 0, 0, 0)) row_values.append( ReconResultValue("Config Reference", f"{self.recon_config_reference}", 6, "varchar", 200, 0, 0, 0)) row_values.append( ReconResultValue("Recon Date", f"{self.as_at_date.isoformat()}", 7, "datetime", 0, 0, 0, 0)) row_values.append( ReconResultValue("Run Date", f"{self.start_timestamp.isoformat()}", 8, "datetime", 0, 0, 0, 0)) row_values.append( ReconResultValue("Frequency", f"{self.frequency}", 9, "varchar", 30, 0, 0, 0)) row_values.append( ReconResultValue("Overall Status", f"{self.recon_state.value}", 10, "varchar", 150, 0, 0, 0)) row_values.append( ReconResultValue("Data Reconciled", f"{self.pattern_result}".lower(), 11, "boolean", 0, 0, 0, 0)) row_values.append( ReconResultValue("Data Recon Reason", f"{self.pattern_result_description}", 12, "varchar", 1024, 0, 0, 0)) row_values.append( ReconResultValue("Records Failed Count", f"{self.pattern_failed_row_count}", 13, "int", 0, 10, 0, 0)) row_values.append( ReconResultValue("Source Profile Results", f"{source.threshold_test_results}", 14, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Source System/DB", f"{source.name}", 15, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Source Schema", f"{source_schema}", 16, "varchar", 512, 0, 0, 1)) row_values.append( ReconResultValue("Source Table/File", f"{source.table_file}", 17, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Source URL", f"{source.url}", 18, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Source Records", f"{source.row_count}", 19, "int", 0, 0, 0, 0)) row_values.append( ReconResultValue("Target Profile Results", f"{self.dest_profile.threshold_test_results}", 20, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Target System/DB", f"{self.dest_profile.name}", 21, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Target Schema", f"{target_schema}", 22, "varchar", 512, 0, 0, 1)) row_values.append( ReconResultValue("Target Table/File", f"{self.dest_profile.table_file}", 23, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Target URL", f"{self.dest_profile.url}", 24, "varchar", 512, 0, 0, 0)) row_values.append( ReconResultValue("Target Records", f"{self.dest_profile.row_count}", 25, "int", 0, 10, 0, 0)) row_values.append( ReconResultValue("Rows Reconciled", f"{self.pattern_rows_reconciled}", 26, "int", 0, 10, 0, 0)) row_values.append( ReconResultValue("Start Timestamp", f"{self.start_timestamp.isoformat()}", 27, "datetime", 0, 0, 0, 1)) row_values.append( ReconResultValue("End Timestamp", self.end_timestamp.isoformat() if self.end_timestamp else "", 28, "datetime", 0, 0, 0, 1)) row_values.append( ReconResultValue("Username", self.username if self.username else "", 29, "varchar", 256, 0, 0, 1)) row_values.append( ReconResultValue("Due Timestamp", self.due_timestamp.isoformat() if self.due_timestamp else "", 30, "datetime", 0, 0, 0, 1)) row_values.append( ReconResultValue("Pattern", self.pattern if self.pattern else "", 31, "varchar", 128, 0, 0, 1)) row_values.append( ReconResultValue("Source Filter", source.query if source.query else "", 32, "varchar", 1024, 0, 0, 1)) row_values.append( ReconResultValue("Target Filter", self.dest_profile.query if self.dest_profile.query else "", 32, "varchar", 1024, 0, 0, 1)) self._result_values.append(row_values) def to_csv(self, result_url): if result_url.startswith("file:") != True: raise ReconExecption(f"Only file system urls are allowed for the result path so far. Config has {result_url}") # Create the csv dictionaries to be passed to writer csv_results:list[dict] = [] for row in self.result_values: csv_row = {} for column in row: csv_row[column.column_name] = column.column_value csv_results.append(csv_row) # generate filenames output_file_name = f"reconciliation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat" result_filename = url_to_filepath(result_url + output_file_name) control_file_name = f"{result_filename}.xml" # Write csv data file df = pd.DataFrame(csv_results) delimiter = "|~" with open(result_filename, "w", encoding="utf-8") as f: # Write header f.write(delimiter.join(df.columns) + "\n") # Write rows for row in df.itertuples(index=False): f.write(delimiter.join(map(str, row)) + "\n") logging.info(f"Wrote results file {result_filename} with {len(df)} rows") # Create the csv control file data control_file_content = self.output_control_file( file_name=output_file_name, number_of_records=len(csv_results), legacy_feed_date=self.start_timestamp.date(), delimiter=delimiter ) # Write csv control file with open(control_file_name, mode='w', newline='') as file: file.write(control_file_content) logging.info(f"Wrote results control file {control_file_name}") def output_control_file(self, file_name: str, number_of_records: int, legacy_feed_date: Union[str, date], delimiter) -> str: if isinstance(legacy_feed_date, date): legacy_feed_date = legacy_feed_date.isoformat() else: legacy_feed_date = str(legacy_feed_date) root = ET.Element("Feed") def add(tag: str, text: str) -> None: el = ET.SubElement(root, tag) el.text = text # Header (hardcoded to match example) add("File", file_name) add("ExtractProg", "RECONRANGER") add("GenerationDate", f"{datetime.now().strftime('%b %d %Y %I:%M%p').upper()}") add("NumberOfRecords", str(number_of_records)) add("LegacyFeedDate", legacy_feed_date) add("FieldDelimiter", delimiter) add("ContactEmailAddress", config.contact_email) data_schema = ET.SubElement(root, "DataSchema") cols = ET.SubElement(data_schema, "COLS") def add_col( column_name: str, ordinal_position: int, data_type: str, character_maximum_length: int, numeric_precision: int, numeric_scale: int, allow_null: int, ) -> None: col = ET.SubElement(cols, "COL") ET.SubElement(col, "COLUMN_NAME").text = column_name ET.SubElement(col, "ORDINAL_POSITION").text = str(ordinal_position) ET.SubElement(col, "DATA_TYPE").text = data_type ET.SubElement(col, "CHARACTER_MAXIMUM_LENGTH").text = str(character_maximum_length) ET.SubElement(col, "NUMERIC_PRECISION").text = str(numeric_precision) ET.SubElement(col, "NUMERIC_SCALE").text = str(numeric_scale) ET.SubElement(col, "ALLOW_NULL").text = str(allow_null) for column in self.result_values[0]: add_col( column.column_name.replace(" ","_").replace("/","_"), column.ordinal_position, column.data_type, column.character_maximum_length, column.numeric_precision, column.numeric_scale, column.allow_null ) xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=False) return xml_bytes.decode("utf-8")