diff --git a/app/models/recon_result.py b/app/models/recon_result.py new file mode 100644 index 0000000..43c4a77 --- /dev/null +++ b/app/models/recon_result.py @@ -0,0 +1,347 @@ +import logging +from typing import Optional, Annotated, Union, Type, Any +from datetime import date, datetime +import xml.etree.ElementTree as ET +import math + + +from pydantic import BaseModel, ConfigDict, Field, WithJsonSchema, field_serializer +import pandas as pd +import numpy as np + +from app.core.config import config +from app.core.refdata import ReconResultStates +from app.core.exceptions import ReconExecption +from app.engine.loaders import url_to_schema +from app.core.file_util import url_to_filepath + +DataFrame = Annotated[ + pd.DataFrame, + WithJsonSchema({'type': 'object', 'additionalProperties': 'str'}), +] + + +NaTType: Type[Any] = type(pd.NaT) + +sanitized_model_config: dict[str, Any] = { + "arbitrary_types_allowed": True, + "json_encoders": { + NaTType: lambda v: None, + np.integer: int, + np.floating: lambda v: None if (math.isnan(v) or math.isinf(v)) else float(v), + np.bool_: bool, + np.ndarray: lambda v: v.tolist(), + pd.Timestamp: lambda v: v.isoformat(), + pd.Timedelta: lambda v: v.total_seconds(), + pd.Period: str, + float: lambda v: None if (math.isnan(v) or math.isinf(v)) else v, + } +} + + +def sanitize_for_json(obj: Any) -> Any: + """Recursively convert Pandas/NumPy types into JSON-safe Python types.""" + if obj is None: + return None + if obj is pd.NaT or isinstance(obj, NaTType): + return None + if isinstance(obj, (np.integer,)): + return int(obj) + if isinstance(obj, (np.floating,)): + return None if (math.isnan(obj) or math.isinf(obj)) else float(obj) + if isinstance(obj, (np.bool_,)): + return bool(obj) + if isinstance(obj, (np.ndarray,)): + return [sanitize_for_json(x) for x in obj.tolist()] + if isinstance(obj, pd.Timestamp): + return obj.isoformat() + if isinstance(obj, pd.Timedelta): + return obj.total_seconds() + if isinstance(obj, pd.Period): + return str(obj) + if isinstance(obj, (list, tuple, set)): + return [sanitize_for_json(x) for x in obj] + if isinstance(obj, dict): + return {k: sanitize_for_json(v) for k, v in obj.items()} + return obj + + +class Profile(BaseModel): + model_config = ConfigDict(**sanitized_model_config) + + url: str + name: str + row_count: int + total_nulls: int + threshold_test_results: dict[str,str] = {} + query: Optional[str] = None + table_file: Optional[str] = None + + +from typing import Optional +from pydantic import BaseModel, Field + + +class ReconResultValue(BaseModel): + column_name: str + column_value_raw: Any = Field(alias="_column_value") + ordinal_position: int + data_type: str + character_maximum_length: int + numeric_precision: int + numeric_scale: int + allow_null: int + + def __init__( + self, + column_name: str, + column_value_raw: Any, + ordinal_position: int, + data_type: str, + character_maximum_length: int, + numeric_precision: int, + numeric_scale: int, + allow_null: int + ): + # Pass values to BaseModel using keyword args for validation + super().__init__( + column_name=column_name, + _column_value=column_value_raw, + ordinal_position=ordinal_position, + data_type=data_type, + character_maximum_length=character_maximum_length, + numeric_precision=numeric_precision, + numeric_scale=numeric_scale, + allow_null=allow_null + ) + + @property + def column_value(self) -> str: + """Return processed column value.""" + val = self.column_value_raw + + if val is None: + if not self.allow_null: + logging.warning(f"Result published with empty value for non-nullable field: {self.column_name}") + return "Undefined" + return "" + + s = str(val) + s = s.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ') + while ' ' in s: + s = s.replace(' ', ' ') + s = s.strip() + + if self.character_maximum_length > 0 and len(s) > self.character_maximum_length: + return s[:self.character_maximum_length] + + return s + + + +class ReconResult(BaseModel): + model_config = ConfigDict(**sanitized_model_config) + + recon_job_id: int + recon_job_name: str + recon_config_reference: str + recon_config_name: str + business_process: str + as_at_date: date + data_type: str + frequency: str + start_timestamp: datetime = Field(default_factory=datetime.now) + end_timestamp: Optional[datetime] = None + recon_state: Optional[ReconResultStates] = None + source_profiles: list[Profile] = [] + dest_profile: Optional[Profile] = None + + problems: dict[str,dict|str] = {} + @field_serializer("problems") + def serialize_problems(self, problems: dict, _info): + """Automatically sanitize problems before JSON serialization.""" + return sanitize_for_json(problems) + + environment: str = "" + + pattern: str + pattern_result: Optional[bool] = None + pattern_failed_row_count: int = 0 + pattern_rows_reconciled: int = 0 + + username: Optional[str] = None + due_timestamp: Optional[datetime] = None + + _result_values: list[list[ReconResultValue]] = [] + @property + def result_values(self): + if self._result_values == []: + self.create_recon_result_values() + return self._result_values + + + _pattern_result_description: Optional[str] = None + @property + def pattern_result_description(self) -> Optional[str]: + """ + Escaped special characters like newlines, tabs, etc. + """ + if self._pattern_result_description is None: + return None + return self._pattern_result_description.encode('unicode_escape').decode('utf-8') + + @pattern_result_description.setter + def pattern_result_description(self, value: Optional[str]) -> None: + if value is not None and not isinstance(value, str): + raise TypeError("pattern_result_description must be a string or None") + self._pattern_result_description = value + + + def create_recon_result_values(self) -> None: + if self.dest_profile is None: + raise ReconExecption("No dest profile created") + if self.recon_state is None: + raise ReconExecption("Unexpeded null recon_state") + + target_schema = url_to_schema(self.dest_profile.url) + + for source in self.source_profiles: + row_values:list[ReconResultValue] = [] + source_schema = url_to_schema(source.url) + + row_values.append( ReconResultValue("Environment", f"{self.environment}", 1, "varchar", 100, 0, 0, 0)) + row_values.append( ReconResultValue("Rec Number", f"{self.recon_job_id}", 2, "int", 0, 10, 0, 0)) + row_values.append( ReconResultValue("Business Ref", f"{self.recon_config_name}", 3, "varchar", 200, 0, 0, 0)) + row_values.append( ReconResultValue("Business Data Type", f"{self.data_type}", 4, "varchar", 200, 0, 0, 0)) + row_values.append( ReconResultValue("Business Process", f"{self.business_process}", 5, "varchar", 200, 0, 0, 0)) + row_values.append( ReconResultValue("Config Reference", f"{self.recon_config_reference}", 6, "varchar", 200, 0, 0, 0)) + row_values.append( ReconResultValue("Recon Date", f"{self.as_at_date.isoformat()}", 7, "datetime", 0, 0, 0, 0)) + row_values.append( ReconResultValue("Run Date", f"{self.start_timestamp.isoformat()}", 8, "datetime", 0, 0, 0, 0)) + row_values.append( ReconResultValue("Frequency", f"{self.frequency}", 9, "varchar", 30, 0, 0, 0)) + row_values.append( ReconResultValue("Overall Status", f"{self.recon_state.value}", 10, "varchar", 150, 0, 0, 0)) + row_values.append( ReconResultValue("Data Reconciled", f"{self.pattern_result}".lower(), 11, "boolean", 0, 0, 0, 0)) + row_values.append( ReconResultValue("Data Recon Reason", f"{self.pattern_result_description}", 12, "varchar", 1024, 0, 0, 0)) + row_values.append( ReconResultValue("Records Failed Count", f"{self.pattern_failed_row_count}", 13, "int", 0, 10, 0, 0)) + row_values.append( ReconResultValue("Source Profile Results", f"{source.threshold_test_results}", 14, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Source System/DB", f"{source.name}", 15, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Source Schema", f"{source_schema}", 16, "varchar", 512, 0, 0, 1)) + row_values.append( ReconResultValue("Source Table/File", f"{source.table_file}", 17, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Source URL", f"{source.url}", 18, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Source Records", f"{source.row_count}", 19, "int", 0, 0, 0, 0)) + row_values.append( ReconResultValue("Target Profile Results", f"{self.dest_profile.threshold_test_results}", 20, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Target System/DB", f"{self.dest_profile.name}", 21, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Target Schema", f"{target_schema}", 22, "varchar", 512, 0, 0, 1)) + row_values.append( ReconResultValue("Target Table/File", f"{self.dest_profile.table_file}", 23, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Target URL", f"{self.dest_profile.url}", 24, "varchar", 512, 0, 0, 0)) + row_values.append( ReconResultValue("Target Records", f"{self.dest_profile.row_count}", 25, "int", 0, 10, 0, 0)) + row_values.append( ReconResultValue("Rows Reconciled", f"{self.pattern_rows_reconciled}", 26, "int", 0, 10, 0, 0)) + row_values.append( ReconResultValue("Start Timestamp", f"{self.start_timestamp.isoformat()}", 27, "datetime", 0, 0, 0, 1)) + row_values.append( ReconResultValue("End Timestamp", self.end_timestamp.isoformat() if self.end_timestamp else "", 28, "datetime", 0, 0, 0, 1)) + + row_values.append( ReconResultValue("Username", self.username if self.username else "", 29, "varchar", 256, 0, 0, 1)) + row_values.append( ReconResultValue("Due Timestamp", self.due_timestamp.isoformat() if self.due_timestamp else "", 30, "datetime", 0, 0, 0, 1)) + row_values.append( ReconResultValue("Pattern", self.pattern if self.pattern else "", 31, "varchar", 128, 0, 0, 1)) + row_values.append( ReconResultValue("Source Filter", source.query if source.query else "", 32, "varchar", 1024, 0, 0, 1)) + row_values.append( ReconResultValue("Target Filter", self.dest_profile.query if self.dest_profile.query else "", 32, "varchar", 1024, 0, 0, 1)) + + self._result_values.append(row_values) + + + def to_csv(self, result_url): + if result_url.startswith("file:") != True: + raise ReconExecption(f"Only file system urls are allowed for the result path so far. Config has {result_url}") + + # Create the csv dictionaries to be passed to writer + csv_results:list[dict] = [] + for row in self.result_values: + csv_row = {} + for column in row: + csv_row[column.column_name] = column.column_value + csv_results.append(csv_row) + + # generate filenames + output_file_name = f"reconciliation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat" + result_filename = url_to_filepath(result_url + output_file_name) + control_file_name = f"{result_filename}.xml" + + # Write csv data file + df = pd.DataFrame(csv_results) + delimiter = "|~" + + with open(result_filename, "w", encoding="utf-8") as f: + # Write header + f.write(delimiter.join(df.columns) + "\n") + # Write rows + for row in df.itertuples(index=False): + f.write(delimiter.join(map(str, row)) + "\n") + logging.info(f"Wrote results file {result_filename} with {len(df)} rows") + + # Create the csv control file data + control_file_content = self.output_control_file( + file_name=output_file_name, + number_of_records=len(csv_results), + legacy_feed_date=self.start_timestamp.date(), + delimiter=delimiter + ) + # Write csv control file + with open(control_file_name, mode='w', newline='') as file: + file.write(control_file_content) + logging.info(f"Wrote results control file {control_file_name}") + + + def output_control_file(self, file_name: str, number_of_records: int, legacy_feed_date: Union[str, date], delimiter) -> str: + + if isinstance(legacy_feed_date, date): + legacy_feed_date = legacy_feed_date.isoformat() + else: + legacy_feed_date = str(legacy_feed_date) + + root = ET.Element("Feed") + + def add(tag: str, text: str) -> None: + el = ET.SubElement(root, tag) + el.text = text + + # Header (hardcoded to match example) + add("File", file_name) + add("ExtractProg", "RECONRANGER") + add("GenerationDate", f"{datetime.now().strftime('%b %d %Y %I:%M%p').upper()}") + add("NumberOfRecords", str(number_of_records)) + add("LegacyFeedDate", legacy_feed_date) + add("FieldDelimiter", delimiter) + add("ContactEmailAddress", config.contact_email) + + data_schema = ET.SubElement(root, "DataSchema") + cols = ET.SubElement(data_schema, "COLS") + + def add_col( + column_name: str, + ordinal_position: int, + data_type: str, + character_maximum_length: int, + numeric_precision: int, + numeric_scale: int, + allow_null: int, + ) -> None: + col = ET.SubElement(cols, "COL") + ET.SubElement(col, "COLUMN_NAME").text = column_name + ET.SubElement(col, "ORDINAL_POSITION").text = str(ordinal_position) + ET.SubElement(col, "DATA_TYPE").text = data_type + ET.SubElement(col, "CHARACTER_MAXIMUM_LENGTH").text = str(character_maximum_length) + ET.SubElement(col, "NUMERIC_PRECISION").text = str(numeric_precision) + ET.SubElement(col, "NUMERIC_SCALE").text = str(numeric_scale) + ET.SubElement(col, "ALLOW_NULL").text = str(allow_null) + + for column in self.result_values[0]: + add_col( + column.column_name.replace(" ","_").replace("/","_"), + column.ordinal_position, + column.data_type, + column.character_maximum_length, + column.numeric_precision, + column.numeric_scale, + column.allow_null + ) + + xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=False) + return xml_bytes.decode("utf-8") \ No newline at end of file