Add app/models/recon_result.py
This commit is contained in:
@@ -0,0 +1,347 @@
|
||||
import logging
|
||||
from typing import Optional, Annotated, Union, Type, Any
|
||||
from datetime import date, datetime
|
||||
import xml.etree.ElementTree as ET
|
||||
import math
|
||||
|
||||
|
||||
from pydantic import BaseModel, ConfigDict, Field, WithJsonSchema, field_serializer
|
||||
import pandas as pd
|
||||
import numpy as np
|
||||
|
||||
from app.core.config import config
|
||||
from app.core.refdata import ReconResultStates
|
||||
from app.core.exceptions import ReconExecption
|
||||
from app.engine.loaders import url_to_schema
|
||||
from app.core.file_util import url_to_filepath
|
||||
|
||||
DataFrame = Annotated[
|
||||
pd.DataFrame,
|
||||
WithJsonSchema({'type': 'object', 'additionalProperties': 'str'}),
|
||||
]
|
||||
|
||||
|
||||
NaTType: Type[Any] = type(pd.NaT)
|
||||
|
||||
sanitized_model_config: dict[str, Any] = {
|
||||
"arbitrary_types_allowed": True,
|
||||
"json_encoders": {
|
||||
NaTType: lambda v: None,
|
||||
np.integer: int,
|
||||
np.floating: lambda v: None if (math.isnan(v) or math.isinf(v)) else float(v),
|
||||
np.bool_: bool,
|
||||
np.ndarray: lambda v: v.tolist(),
|
||||
pd.Timestamp: lambda v: v.isoformat(),
|
||||
pd.Timedelta: lambda v: v.total_seconds(),
|
||||
pd.Period: str,
|
||||
float: lambda v: None if (math.isnan(v) or math.isinf(v)) else v,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def sanitize_for_json(obj: Any) -> Any:
|
||||
"""Recursively convert Pandas/NumPy types into JSON-safe Python types."""
|
||||
if obj is None:
|
||||
return None
|
||||
if obj is pd.NaT or isinstance(obj, NaTType):
|
||||
return None
|
||||
if isinstance(obj, (np.integer,)):
|
||||
return int(obj)
|
||||
if isinstance(obj, (np.floating,)):
|
||||
return None if (math.isnan(obj) or math.isinf(obj)) else float(obj)
|
||||
if isinstance(obj, (np.bool_,)):
|
||||
return bool(obj)
|
||||
if isinstance(obj, (np.ndarray,)):
|
||||
return [sanitize_for_json(x) for x in obj.tolist()]
|
||||
if isinstance(obj, pd.Timestamp):
|
||||
return obj.isoformat()
|
||||
if isinstance(obj, pd.Timedelta):
|
||||
return obj.total_seconds()
|
||||
if isinstance(obj, pd.Period):
|
||||
return str(obj)
|
||||
if isinstance(obj, (list, tuple, set)):
|
||||
return [sanitize_for_json(x) for x in obj]
|
||||
if isinstance(obj, dict):
|
||||
return {k: sanitize_for_json(v) for k, v in obj.items()}
|
||||
return obj
|
||||
|
||||
|
||||
class Profile(BaseModel):
|
||||
model_config = ConfigDict(**sanitized_model_config)
|
||||
|
||||
url: str
|
||||
name: str
|
||||
row_count: int
|
||||
total_nulls: int
|
||||
threshold_test_results: dict[str,str] = {}
|
||||
query: Optional[str] = None
|
||||
table_file: Optional[str] = None
|
||||
|
||||
|
||||
from typing import Optional
|
||||
from pydantic import BaseModel, Field
|
||||
|
||||
|
||||
class ReconResultValue(BaseModel):
|
||||
column_name: str
|
||||
column_value_raw: Any = Field(alias="_column_value")
|
||||
ordinal_position: int
|
||||
data_type: str
|
||||
character_maximum_length: int
|
||||
numeric_precision: int
|
||||
numeric_scale: int
|
||||
allow_null: int
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
column_name: str,
|
||||
column_value_raw: Any,
|
||||
ordinal_position: int,
|
||||
data_type: str,
|
||||
character_maximum_length: int,
|
||||
numeric_precision: int,
|
||||
numeric_scale: int,
|
||||
allow_null: int
|
||||
):
|
||||
# Pass values to BaseModel using keyword args for validation
|
||||
super().__init__(
|
||||
column_name=column_name,
|
||||
_column_value=column_value_raw,
|
||||
ordinal_position=ordinal_position,
|
||||
data_type=data_type,
|
||||
character_maximum_length=character_maximum_length,
|
||||
numeric_precision=numeric_precision,
|
||||
numeric_scale=numeric_scale,
|
||||
allow_null=allow_null
|
||||
)
|
||||
|
||||
@property
|
||||
def column_value(self) -> str:
|
||||
"""Return processed column value."""
|
||||
val = self.column_value_raw
|
||||
|
||||
if val is None:
|
||||
if not self.allow_null:
|
||||
logging.warning(f"Result published with empty value for non-nullable field: {self.column_name}")
|
||||
return "Undefined"
|
||||
return ""
|
||||
|
||||
s = str(val)
|
||||
s = s.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
|
||||
while ' ' in s:
|
||||
s = s.replace(' ', ' ')
|
||||
s = s.strip()
|
||||
|
||||
if self.character_maximum_length > 0 and len(s) > self.character_maximum_length:
|
||||
return s[:self.character_maximum_length]
|
||||
|
||||
return s
|
||||
|
||||
|
||||
|
||||
class ReconResult(BaseModel):
|
||||
model_config = ConfigDict(**sanitized_model_config)
|
||||
|
||||
recon_job_id: int
|
||||
recon_job_name: str
|
||||
recon_config_reference: str
|
||||
recon_config_name: str
|
||||
business_process: str
|
||||
as_at_date: date
|
||||
data_type: str
|
||||
frequency: str
|
||||
start_timestamp: datetime = Field(default_factory=datetime.now)
|
||||
end_timestamp: Optional[datetime] = None
|
||||
recon_state: Optional[ReconResultStates] = None
|
||||
source_profiles: list[Profile] = []
|
||||
dest_profile: Optional[Profile] = None
|
||||
|
||||
problems: dict[str,dict|str] = {}
|
||||
@field_serializer("problems")
|
||||
def serialize_problems(self, problems: dict, _info):
|
||||
"""Automatically sanitize problems before JSON serialization."""
|
||||
return sanitize_for_json(problems)
|
||||
|
||||
environment: str = ""
|
||||
|
||||
pattern: str
|
||||
pattern_result: Optional[bool] = None
|
||||
pattern_failed_row_count: int = 0
|
||||
pattern_rows_reconciled: int = 0
|
||||
|
||||
username: Optional[str] = None
|
||||
due_timestamp: Optional[datetime] = None
|
||||
|
||||
_result_values: list[list[ReconResultValue]] = []
|
||||
@property
|
||||
def result_values(self):
|
||||
if self._result_values == []:
|
||||
self.create_recon_result_values()
|
||||
return self._result_values
|
||||
|
||||
|
||||
_pattern_result_description: Optional[str] = None
|
||||
@property
|
||||
def pattern_result_description(self) -> Optional[str]:
|
||||
"""
|
||||
Escaped special characters like newlines, tabs, etc.
|
||||
"""
|
||||
if self._pattern_result_description is None:
|
||||
return None
|
||||
return self._pattern_result_description.encode('unicode_escape').decode('utf-8')
|
||||
|
||||
@pattern_result_description.setter
|
||||
def pattern_result_description(self, value: Optional[str]) -> None:
|
||||
if value is not None and not isinstance(value, str):
|
||||
raise TypeError("pattern_result_description must be a string or None")
|
||||
self._pattern_result_description = value
|
||||
|
||||
|
||||
def create_recon_result_values(self) -> None:
|
||||
if self.dest_profile is None:
|
||||
raise ReconExecption("No dest profile created")
|
||||
if self.recon_state is None:
|
||||
raise ReconExecption("Unexpeded null recon_state")
|
||||
|
||||
target_schema = url_to_schema(self.dest_profile.url)
|
||||
|
||||
for source in self.source_profiles:
|
||||
row_values:list[ReconResultValue] = []
|
||||
source_schema = url_to_schema(source.url)
|
||||
|
||||
row_values.append( ReconResultValue("Environment", f"{self.environment}", 1, "varchar", 100, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Rec Number", f"{self.recon_job_id}", 2, "int", 0, 10, 0, 0))
|
||||
row_values.append( ReconResultValue("Business Ref", f"{self.recon_config_name}", 3, "varchar", 200, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Business Data Type", f"{self.data_type}", 4, "varchar", 200, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Business Process", f"{self.business_process}", 5, "varchar", 200, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Config Reference", f"{self.recon_config_reference}", 6, "varchar", 200, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Recon Date", f"{self.as_at_date.isoformat()}", 7, "datetime", 0, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Run Date", f"{self.start_timestamp.isoformat()}", 8, "datetime", 0, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Frequency", f"{self.frequency}", 9, "varchar", 30, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Overall Status", f"{self.recon_state.value}", 10, "varchar", 150, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Data Reconciled", f"{self.pattern_result}".lower(), 11, "boolean", 0, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Data Recon Reason", f"{self.pattern_result_description}", 12, "varchar", 1024, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Records Failed Count", f"{self.pattern_failed_row_count}", 13, "int", 0, 10, 0, 0))
|
||||
row_values.append( ReconResultValue("Source Profile Results", f"{source.threshold_test_results}", 14, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Source System/DB", f"{source.name}", 15, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Source Schema", f"{source_schema}", 16, "varchar", 512, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Source Table/File", f"{source.table_file}", 17, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Source URL", f"{source.url}", 18, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Source Records", f"{source.row_count}", 19, "int", 0, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Target Profile Results", f"{self.dest_profile.threshold_test_results}", 20, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Target System/DB", f"{self.dest_profile.name}", 21, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Target Schema", f"{target_schema}", 22, "varchar", 512, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Target Table/File", f"{self.dest_profile.table_file}", 23, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Target URL", f"{self.dest_profile.url}", 24, "varchar", 512, 0, 0, 0))
|
||||
row_values.append( ReconResultValue("Target Records", f"{self.dest_profile.row_count}", 25, "int", 0, 10, 0, 0))
|
||||
row_values.append( ReconResultValue("Rows Reconciled", f"{self.pattern_rows_reconciled}", 26, "int", 0, 10, 0, 0))
|
||||
row_values.append( ReconResultValue("Start Timestamp", f"{self.start_timestamp.isoformat()}", 27, "datetime", 0, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("End Timestamp", self.end_timestamp.isoformat() if self.end_timestamp else "", 28, "datetime", 0, 0, 0, 1))
|
||||
|
||||
row_values.append( ReconResultValue("Username", self.username if self.username else "", 29, "varchar", 256, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Due Timestamp", self.due_timestamp.isoformat() if self.due_timestamp else "", 30, "datetime", 0, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Pattern", self.pattern if self.pattern else "", 31, "varchar", 128, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Source Filter", source.query if source.query else "", 32, "varchar", 1024, 0, 0, 1))
|
||||
row_values.append( ReconResultValue("Target Filter", self.dest_profile.query if self.dest_profile.query else "", 32, "varchar", 1024, 0, 0, 1))
|
||||
|
||||
self._result_values.append(row_values)
|
||||
|
||||
|
||||
def to_csv(self, result_url):
|
||||
if result_url.startswith("file:") != True:
|
||||
raise ReconExecption(f"Only file system urls are allowed for the result path so far. Config has {result_url}")
|
||||
|
||||
# Create the csv dictionaries to be passed to writer
|
||||
csv_results:list[dict] = []
|
||||
for row in self.result_values:
|
||||
csv_row = {}
|
||||
for column in row:
|
||||
csv_row[column.column_name] = column.column_value
|
||||
csv_results.append(csv_row)
|
||||
|
||||
# generate filenames
|
||||
output_file_name = f"reconciliation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat"
|
||||
result_filename = url_to_filepath(result_url + output_file_name)
|
||||
control_file_name = f"{result_filename}.xml"
|
||||
|
||||
# Write csv data file
|
||||
df = pd.DataFrame(csv_results)
|
||||
delimiter = "|~"
|
||||
|
||||
with open(result_filename, "w", encoding="utf-8") as f:
|
||||
# Write header
|
||||
f.write(delimiter.join(df.columns) + "\n")
|
||||
# Write rows
|
||||
for row in df.itertuples(index=False):
|
||||
f.write(delimiter.join(map(str, row)) + "\n")
|
||||
logging.info(f"Wrote results file {result_filename} with {len(df)} rows")
|
||||
|
||||
# Create the csv control file data
|
||||
control_file_content = self.output_control_file(
|
||||
file_name=output_file_name,
|
||||
number_of_records=len(csv_results),
|
||||
legacy_feed_date=self.start_timestamp.date(),
|
||||
delimiter=delimiter
|
||||
)
|
||||
# Write csv control file
|
||||
with open(control_file_name, mode='w', newline='') as file:
|
||||
file.write(control_file_content)
|
||||
logging.info(f"Wrote results control file {control_file_name}")
|
||||
|
||||
|
||||
def output_control_file(self, file_name: str, number_of_records: int, legacy_feed_date: Union[str, date], delimiter) -> str:
|
||||
|
||||
if isinstance(legacy_feed_date, date):
|
||||
legacy_feed_date = legacy_feed_date.isoformat()
|
||||
else:
|
||||
legacy_feed_date = str(legacy_feed_date)
|
||||
|
||||
root = ET.Element("Feed")
|
||||
|
||||
def add(tag: str, text: str) -> None:
|
||||
el = ET.SubElement(root, tag)
|
||||
el.text = text
|
||||
|
||||
# Header (hardcoded to match example)
|
||||
add("File", file_name)
|
||||
add("ExtractProg", "RECONRANGER")
|
||||
add("GenerationDate", f"{datetime.now().strftime('%b %d %Y %I:%M%p').upper()}")
|
||||
add("NumberOfRecords", str(number_of_records))
|
||||
add("LegacyFeedDate", legacy_feed_date)
|
||||
add("FieldDelimiter", delimiter)
|
||||
add("ContactEmailAddress", config.contact_email)
|
||||
|
||||
data_schema = ET.SubElement(root, "DataSchema")
|
||||
cols = ET.SubElement(data_schema, "COLS")
|
||||
|
||||
def add_col(
|
||||
column_name: str,
|
||||
ordinal_position: int,
|
||||
data_type: str,
|
||||
character_maximum_length: int,
|
||||
numeric_precision: int,
|
||||
numeric_scale: int,
|
||||
allow_null: int,
|
||||
) -> None:
|
||||
col = ET.SubElement(cols, "COL")
|
||||
ET.SubElement(col, "COLUMN_NAME").text = column_name
|
||||
ET.SubElement(col, "ORDINAL_POSITION").text = str(ordinal_position)
|
||||
ET.SubElement(col, "DATA_TYPE").text = data_type
|
||||
ET.SubElement(col, "CHARACTER_MAXIMUM_LENGTH").text = str(character_maximum_length)
|
||||
ET.SubElement(col, "NUMERIC_PRECISION").text = str(numeric_precision)
|
||||
ET.SubElement(col, "NUMERIC_SCALE").text = str(numeric_scale)
|
||||
ET.SubElement(col, "ALLOW_NULL").text = str(allow_null)
|
||||
|
||||
for column in self.result_values[0]:
|
||||
add_col(
|
||||
column.column_name.replace(" ","_").replace("/","_"),
|
||||
column.ordinal_position,
|
||||
column.data_type,
|
||||
column.character_maximum_length,
|
||||
column.numeric_precision,
|
||||
column.numeric_scale,
|
||||
column.allow_null
|
||||
)
|
||||
|
||||
xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=False)
|
||||
return xml_bytes.decode("utf-8")
|
||||
Reference in New Issue
Block a user