347 lines
15 KiB
Python
347 lines
15 KiB
Python
import logging
|
|
from typing import Optional, Annotated, Union, Type, Any
|
|
from datetime import date, datetime
|
|
import xml.etree.ElementTree as ET
|
|
import math
|
|
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field, WithJsonSchema, field_serializer
|
|
import pandas as pd
|
|
import numpy as np
|
|
|
|
from app.core.config import config
|
|
from app.core.refdata import ReconResultStates
|
|
from app.core.exceptions import ReconExecption
|
|
from app.engine.loaders import url_to_schema
|
|
from app.core.file_util import url_to_filepath
|
|
|
|
DataFrame = Annotated[
|
|
pd.DataFrame,
|
|
WithJsonSchema({'type': 'object', 'additionalProperties': 'str'}),
|
|
]
|
|
|
|
|
|
NaTType: Type[Any] = type(pd.NaT)
|
|
|
|
sanitized_model_config: dict[str, Any] = {
|
|
"arbitrary_types_allowed": True,
|
|
"json_encoders": {
|
|
NaTType: lambda v: None,
|
|
np.integer: int,
|
|
np.floating: lambda v: None if (math.isnan(v) or math.isinf(v)) else float(v),
|
|
np.bool_: bool,
|
|
np.ndarray: lambda v: v.tolist(),
|
|
pd.Timestamp: lambda v: v.isoformat(),
|
|
pd.Timedelta: lambda v: v.total_seconds(),
|
|
pd.Period: str,
|
|
float: lambda v: None if (math.isnan(v) or math.isinf(v)) else v,
|
|
}
|
|
}
|
|
|
|
|
|
def sanitize_for_json(obj: Any) -> Any:
|
|
"""Recursively convert Pandas/NumPy types into JSON-safe Python types."""
|
|
if obj is None:
|
|
return None
|
|
if obj is pd.NaT or isinstance(obj, NaTType):
|
|
return None
|
|
if isinstance(obj, (np.integer,)):
|
|
return int(obj)
|
|
if isinstance(obj, (np.floating,)):
|
|
return None if (math.isnan(obj) or math.isinf(obj)) else float(obj)
|
|
if isinstance(obj, (np.bool_,)):
|
|
return bool(obj)
|
|
if isinstance(obj, (np.ndarray,)):
|
|
return [sanitize_for_json(x) for x in obj.tolist()]
|
|
if isinstance(obj, pd.Timestamp):
|
|
return obj.isoformat()
|
|
if isinstance(obj, pd.Timedelta):
|
|
return obj.total_seconds()
|
|
if isinstance(obj, pd.Period):
|
|
return str(obj)
|
|
if isinstance(obj, (list, tuple, set)):
|
|
return [sanitize_for_json(x) for x in obj]
|
|
if isinstance(obj, dict):
|
|
return {k: sanitize_for_json(v) for k, v in obj.items()}
|
|
return obj
|
|
|
|
|
|
class Profile(BaseModel):
|
|
model_config = ConfigDict(**sanitized_model_config)
|
|
|
|
url: str
|
|
name: str
|
|
row_count: int
|
|
total_nulls: int
|
|
threshold_test_results: dict[str,str] = {}
|
|
query: Optional[str] = None
|
|
table_file: Optional[str] = None
|
|
|
|
|
|
from typing import Optional
|
|
from pydantic import BaseModel, Field
|
|
|
|
|
|
class ReconResultValue(BaseModel):
|
|
column_name: str
|
|
column_value_raw: Any = Field(alias="_column_value")
|
|
ordinal_position: int
|
|
data_type: str
|
|
character_maximum_length: int
|
|
numeric_precision: int
|
|
numeric_scale: int
|
|
allow_null: int
|
|
|
|
def __init__(
|
|
self,
|
|
column_name: str,
|
|
column_value_raw: Any,
|
|
ordinal_position: int,
|
|
data_type: str,
|
|
character_maximum_length: int,
|
|
numeric_precision: int,
|
|
numeric_scale: int,
|
|
allow_null: int
|
|
):
|
|
# Pass values to BaseModel using keyword args for validation
|
|
super().__init__(
|
|
column_name=column_name,
|
|
_column_value=column_value_raw,
|
|
ordinal_position=ordinal_position,
|
|
data_type=data_type,
|
|
character_maximum_length=character_maximum_length,
|
|
numeric_precision=numeric_precision,
|
|
numeric_scale=numeric_scale,
|
|
allow_null=allow_null
|
|
)
|
|
|
|
@property
|
|
def column_value(self) -> str:
|
|
"""Return processed column value."""
|
|
val = self.column_value_raw
|
|
|
|
if val is None:
|
|
if not self.allow_null:
|
|
logging.warning(f"Result published with empty value for non-nullable field: {self.column_name}")
|
|
return "Undefined"
|
|
return ""
|
|
|
|
s = str(val)
|
|
s = s.replace('\t', ' ').replace('\n', ' ').replace('\r', ' ')
|
|
while ' ' in s:
|
|
s = s.replace(' ', ' ')
|
|
s = s.strip()
|
|
|
|
if self.character_maximum_length > 0 and len(s) > self.character_maximum_length:
|
|
return s[:self.character_maximum_length]
|
|
|
|
return s
|
|
|
|
|
|
|
|
class ReconResult(BaseModel):
|
|
model_config = ConfigDict(**sanitized_model_config)
|
|
|
|
recon_job_id: int
|
|
recon_job_name: str
|
|
recon_config_reference: str
|
|
recon_config_name: str
|
|
business_process: str
|
|
as_at_date: date
|
|
data_type: str
|
|
frequency: str
|
|
start_timestamp: datetime = Field(default_factory=datetime.now)
|
|
end_timestamp: Optional[datetime] = None
|
|
recon_state: Optional[ReconResultStates] = None
|
|
source_profiles: list[Profile] = []
|
|
dest_profile: Optional[Profile] = None
|
|
|
|
problems: dict[str,dict|str] = {}
|
|
@field_serializer("problems")
|
|
def serialize_problems(self, problems: dict, _info):
|
|
"""Automatically sanitize problems before JSON serialization."""
|
|
return sanitize_for_json(problems)
|
|
|
|
environment: str = ""
|
|
|
|
pattern: str
|
|
pattern_result: Optional[bool] = None
|
|
pattern_failed_row_count: int = 0
|
|
pattern_rows_reconciled: int = 0
|
|
|
|
username: Optional[str] = None
|
|
due_timestamp: Optional[datetime] = None
|
|
|
|
_result_values: list[list[ReconResultValue]] = []
|
|
@property
|
|
def result_values(self):
|
|
if self._result_values == []:
|
|
self.create_recon_result_values()
|
|
return self._result_values
|
|
|
|
|
|
_pattern_result_description: Optional[str] = None
|
|
@property
|
|
def pattern_result_description(self) -> Optional[str]:
|
|
"""
|
|
Escaped special characters like newlines, tabs, etc.
|
|
"""
|
|
if self._pattern_result_description is None:
|
|
return None
|
|
return self._pattern_result_description.encode('unicode_escape').decode('utf-8')
|
|
|
|
@pattern_result_description.setter
|
|
def pattern_result_description(self, value: Optional[str]) -> None:
|
|
if value is not None and not isinstance(value, str):
|
|
raise TypeError("pattern_result_description must be a string or None")
|
|
self._pattern_result_description = value
|
|
|
|
|
|
def create_recon_result_values(self) -> None:
|
|
if self.dest_profile is None:
|
|
raise ReconExecption("No dest profile created")
|
|
if self.recon_state is None:
|
|
raise ReconExecption("Unexpeded null recon_state")
|
|
|
|
target_schema = url_to_schema(self.dest_profile.url)
|
|
|
|
for source in self.source_profiles:
|
|
row_values:list[ReconResultValue] = []
|
|
source_schema = url_to_schema(source.url)
|
|
|
|
row_values.append( ReconResultValue("Environment", f"{self.environment}", 1, "varchar", 100, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Rec Number", f"{self.recon_job_id}", 2, "int", 0, 10, 0, 0))
|
|
row_values.append( ReconResultValue("Business Ref", f"{self.recon_config_name}", 3, "varchar", 200, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Business Data Type", f"{self.data_type}", 4, "varchar", 200, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Business Process", f"{self.business_process}", 5, "varchar", 200, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Config Reference", f"{self.recon_config_reference}", 6, "varchar", 200, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Recon Date", f"{self.as_at_date.isoformat()}", 7, "datetime", 0, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Run Date", f"{self.start_timestamp.isoformat()}", 8, "datetime", 0, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Frequency", f"{self.frequency}", 9, "varchar", 30, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Overall Status", f"{self.recon_state.value}", 10, "varchar", 150, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Data Reconciled", f"{self.pattern_result}".lower(), 11, "boolean", 0, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Data Recon Reason", f"{self.pattern_result_description}", 12, "varchar", 1024, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Records Failed Count", f"{self.pattern_failed_row_count}", 13, "int", 0, 10, 0, 0))
|
|
row_values.append( ReconResultValue("Source Profile Results", f"{source.threshold_test_results}", 14, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Source System/DB", f"{source.name}", 15, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Source Schema", f"{source_schema}", 16, "varchar", 512, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Source Table/File", f"{source.table_file}", 17, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Source URL", f"{source.url}", 18, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Source Records", f"{source.row_count}", 19, "int", 0, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Target Profile Results", f"{self.dest_profile.threshold_test_results}", 20, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Target System/DB", f"{self.dest_profile.name}", 21, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Target Schema", f"{target_schema}", 22, "varchar", 512, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Target Table/File", f"{self.dest_profile.table_file}", 23, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Target URL", f"{self.dest_profile.url}", 24, "varchar", 512, 0, 0, 0))
|
|
row_values.append( ReconResultValue("Target Records", f"{self.dest_profile.row_count}", 25, "int", 0, 10, 0, 0))
|
|
row_values.append( ReconResultValue("Rows Reconciled", f"{self.pattern_rows_reconciled}", 26, "int", 0, 10, 0, 0))
|
|
row_values.append( ReconResultValue("Start Timestamp", f"{self.start_timestamp.isoformat()}", 27, "datetime", 0, 0, 0, 1))
|
|
row_values.append( ReconResultValue("End Timestamp", self.end_timestamp.isoformat() if self.end_timestamp else "", 28, "datetime", 0, 0, 0, 1))
|
|
|
|
row_values.append( ReconResultValue("Username", self.username if self.username else "", 29, "varchar", 256, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Due Timestamp", self.due_timestamp.isoformat() if self.due_timestamp else "", 30, "datetime", 0, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Pattern", self.pattern if self.pattern else "", 31, "varchar", 128, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Source Filter", source.query if source.query else "", 32, "varchar", 1024, 0, 0, 1))
|
|
row_values.append( ReconResultValue("Target Filter", self.dest_profile.query if self.dest_profile.query else "", 32, "varchar", 1024, 0, 0, 1))
|
|
|
|
self._result_values.append(row_values)
|
|
|
|
|
|
def to_csv(self, result_url):
|
|
if result_url.startswith("file:") != True:
|
|
raise ReconExecption(f"Only file system urls are allowed for the result path so far. Config has {result_url}")
|
|
|
|
# Create the csv dictionaries to be passed to writer
|
|
csv_results:list[dict] = []
|
|
for row in self.result_values:
|
|
csv_row = {}
|
|
for column in row:
|
|
csv_row[column.column_name] = column.column_value
|
|
csv_results.append(csv_row)
|
|
|
|
# generate filenames
|
|
output_file_name = f"reconciliation_results_{datetime.now().strftime('%Y%m%d_%H%M%S')}.dat"
|
|
result_filename = url_to_filepath(result_url + output_file_name)
|
|
control_file_name = f"{result_filename}.xml"
|
|
|
|
# Write csv data file
|
|
df = pd.DataFrame(csv_results)
|
|
delimiter = "|~"
|
|
|
|
with open(result_filename, "w", encoding="utf-8") as f:
|
|
# Write header
|
|
f.write(delimiter.join(df.columns) + "\n")
|
|
# Write rows
|
|
for row in df.itertuples(index=False):
|
|
f.write(delimiter.join(map(str, row)) + "\n")
|
|
logging.info(f"Wrote results file {result_filename} with {len(df)} rows")
|
|
|
|
# Create the csv control file data
|
|
control_file_content = self.output_control_file(
|
|
file_name=output_file_name,
|
|
number_of_records=len(csv_results),
|
|
legacy_feed_date=self.start_timestamp.date(),
|
|
delimiter=delimiter
|
|
)
|
|
# Write csv control file
|
|
with open(control_file_name, mode='w', newline='') as file:
|
|
file.write(control_file_content)
|
|
logging.info(f"Wrote results control file {control_file_name}")
|
|
|
|
|
|
def output_control_file(self, file_name: str, number_of_records: int, legacy_feed_date: Union[str, date], delimiter) -> str:
|
|
|
|
if isinstance(legacy_feed_date, date):
|
|
legacy_feed_date = legacy_feed_date.isoformat()
|
|
else:
|
|
legacy_feed_date = str(legacy_feed_date)
|
|
|
|
root = ET.Element("Feed")
|
|
|
|
def add(tag: str, text: str) -> None:
|
|
el = ET.SubElement(root, tag)
|
|
el.text = text
|
|
|
|
# Header (hardcoded to match example)
|
|
add("File", file_name)
|
|
add("ExtractProg", "RECONRANGER")
|
|
add("GenerationDate", f"{datetime.now().strftime('%b %d %Y %I:%M%p').upper()}")
|
|
add("NumberOfRecords", str(number_of_records))
|
|
add("LegacyFeedDate", legacy_feed_date)
|
|
add("FieldDelimiter", delimiter)
|
|
add("ContactEmailAddress", config.contact_email)
|
|
|
|
data_schema = ET.SubElement(root, "DataSchema")
|
|
cols = ET.SubElement(data_schema, "COLS")
|
|
|
|
def add_col(
|
|
column_name: str,
|
|
ordinal_position: int,
|
|
data_type: str,
|
|
character_maximum_length: int,
|
|
numeric_precision: int,
|
|
numeric_scale: int,
|
|
allow_null: int,
|
|
) -> None:
|
|
col = ET.SubElement(cols, "COL")
|
|
ET.SubElement(col, "COLUMN_NAME").text = column_name
|
|
ET.SubElement(col, "ORDINAL_POSITION").text = str(ordinal_position)
|
|
ET.SubElement(col, "DATA_TYPE").text = data_type
|
|
ET.SubElement(col, "CHARACTER_MAXIMUM_LENGTH").text = str(character_maximum_length)
|
|
ET.SubElement(col, "NUMERIC_PRECISION").text = str(numeric_precision)
|
|
ET.SubElement(col, "NUMERIC_SCALE").text = str(numeric_scale)
|
|
ET.SubElement(col, "ALLOW_NULL").text = str(allow_null)
|
|
|
|
for column in self.result_values[0]:
|
|
add_col(
|
|
column.column_name.replace(" ","_").replace("/","_"),
|
|
column.ordinal_position,
|
|
column.data_type,
|
|
column.character_maximum_length,
|
|
column.numeric_precision,
|
|
column.numeric_scale,
|
|
column.allow_null
|
|
)
|
|
|
|
xml_bytes = ET.tostring(root, encoding="utf-8", xml_declaration=False)
|
|
return xml_bytes.decode("utf-8") |