Source code for ffiec_data_connect.data_normalizer

"""
FFIEC Data Normalizer - Phase 0 Critical Implementation

This module ensures 100% backward compatibility by normalizing REST API responses
to match SOAP format exactly. This prevents user-facing regressions when migrating
from SOAP to REST protocols.

CRITICAL DATA ISSUES ADDRESSED:
- RSSD IDs: REST integers (480228) → SOAP strings ("480228")
- ZIP Codes: REST loses leading zeros (2886) → SOAP format ("02886")
- Certificate Numbers: REST integers → SOAP strings
- Boolean Values: REST booleans → SOAP string format
- Financial Data: Preserve decimal precision

Author: FFIEC Data Connect Library
Version: Phase 0 - Data Normalization
"""

import logging
import re
from datetime import datetime
from typing import Any, Dict, List, Tuple, Union

logger = logging.getLogger(__name__)


[docs] class DataNormalizer: """ Ensures 100% backward compatibility by normalizing REST responses to SOAP format. This class prevents data regressions by applying type coercions and format transformations to REST API responses, making them identical to SOAP responses. """ # Normalization rules discovered from format analysis # These ensure REST responses match SOAP format exactly TYPE_COERCIONS: Dict[str, Dict[str, Any]] = { "RetrievePanelOfReporters": { # Critical fields that change type between protocols "ID_RSSD": lambda x: str(x), # int → str (CRITICAL) "FDICCertNumber": lambda x: str(x) if x is not None else "", # int → str "OCCChartNumber": lambda x: str(x) if x is not None else "", # int → str "OTSDockNumber": lambda x: str(x) if x is not None else "", # int → str "PrimaryABARoutNumber": lambda x: ( str(x) if x is not None else "" ), # int → str "ZIP": lambda x: DataNormalizer._fix_zip_code( x ), # Fix leading zeros (CRITICAL) "HasFiledForReportingPeriod": lambda x: ( str(x).lower() if x is not None else "false" ), # bool → str # Additional fields that may need normalization "InstitutionName": lambda x: str(x) if x is not None else "", "PhysicalStreetAddress": lambda x: str(x) if x is not None else "", "PhysicalCity": lambda x: str(x) if x is not None else "", "PhysicalState": lambda x: str(x) if x is not None else "", "MailingStreetAddress": lambda x: str(x) if x is not None else "", "MailingCity": lambda x: str(x) if x is not None else "", "MailingState": lambda x: str(x) if x is not None else "", "MailingZIP": lambda x: DataNormalizer._fix_zip_code(x), }, "RetrieveFilersSinceDate": { # Array of integers → array of strings (CRITICAL) "_array_items": lambda x: str(x) if x is not None else "" }, "RetrieveFilersSubmissionDateTime": { "ID_RSSD": lambda x: str(x), # int → str (CRITICAL) # DateTime format consistency - ensure MM/dd/yyyy HH:mm:ss AM/PM "SubmissionDateTime": lambda x: DataNormalizer._normalize_datetime(x), }, "RetrieveReportingPeriods": { # Likely already strings, but ensure consistency "_array_items": lambda x: DataNormalizer._normalize_date_string(x) }, "RetrieveFacsimile": { # Binary data should remain unchanged # But ensure consistent encoding if needed "_preserve_binary": True }, "RetrieveUBPRReportingPeriods": { # Similar to RetrieveReportingPeriods "_array_items": lambda x: DataNormalizer._normalize_date_string(x) }, "RetrieveUBPRXBRLFacsimile": { # Binary XBRL data "_preserve_binary": True }, } # Field-specific validation rules VALIDATION_RULES = { "ZIP": { "pattern": r"^\d{5}$", "description": "5-digit ZIP code with leading zeros preserved", }, "ID_RSSD": {"pattern": r"^\d+$", "description": "RSSD ID as string of digits"}, "FDICCertNumber": { "pattern": r"^\d*$", "description": "FDIC certificate number as string", }, } @staticmethod def _fix_zip_code(zip_value: Union[str, int, None]) -> str: """ Fix ZIP code precision loss from REST API. CRITICAL: REST API loses leading zeros (02886 → 2886) This restores proper 5-digit format with leading zeros. Args: zip_value: ZIP code from REST API (int or str) Returns: str: Properly formatted 5-digit ZIP code """ if zip_value is None or zip_value == "": return "" if isinstance(zip_value, int): zip_str = str(zip_value) # Add leading zeros for codes that should be 5 digits if len(zip_str) == 4: return f"0{zip_str}" elif len(zip_str) == 3: return f"00{zip_str}" elif len(zip_str) == 2: return f"000{zip_str}" elif len(zip_str) == 1: return f"0000{zip_str}" else: return zip_str elif isinstance(zip_value, str): # Already a string, but validate format zip_str = zip_value.strip() if zip_str.isdigit() and len(zip_str) < 5: return zip_str.zfill(5) # Pad with leading zeros return zip_str else: return str(zip_value) @staticmethod def _normalize_datetime(dt_value: Union[str, datetime, None]) -> str: """ Normalize datetime format to match SOAP API format. SOAP Format: "12/31/2023 11:59:59 PM" Ensures consistent datetime string format. Args: dt_value: DateTime value from REST API Returns: str: Normalized datetime string """ if dt_value is None: return "" if isinstance(dt_value, str): # Already string, validate format return dt_value.strip() elif isinstance(dt_value, datetime): # Convert datetime object to SOAP format return dt_value.strftime("%-m/%-d/%Y %-I:%M:%S %p") else: return str(dt_value) @staticmethod def _normalize_date_string(date_value: Union[str, None]) -> str: """ Normalize date string format for reporting periods. Ensures consistent MM/dd/yyyy format. Args: date_value: Date string from REST API Returns: str: Normalized date string """ if date_value is None: return "" if isinstance(date_value, str): date_str = date_value.strip() # Validate format - should be MM/dd/yyyy if re.match(r"^\d{1,2}/\d{1,2}/\d{4}$", date_str): return date_str else: logger.warning(f"Unexpected date format: {date_str}") return date_str else: return str(date_value)
[docs] @staticmethod def normalize_for_validation( data: Any, endpoint: str, protocol: str = "REST" ) -> Tuple[Any, Dict[str, Any]]: """ Normalize response data and return both normalized data and statistics. This method is designed to work with Pydantic validation by providing both the normalized data and metadata about transformations applied. Args: data: Raw response data from API endpoint: API endpoint name protocol: Source protocol ("REST" or "SOAP") Returns: Tuple of (normalized_data, normalization_stats) """ if protocol != "REST": return data, {"transformations": 0, "protocol": protocol} normalized = DataNormalizer.normalize_response(data, endpoint, protocol) stats = DataNormalizer.get_normalization_stats(data, normalized, endpoint) return normalized, stats
[docs] @staticmethod def validate_pydantic_compatibility(data: Any, endpoint: str) -> Dict[str, Any]: """ Check if normalized data is compatible with expected Pydantic models. This performs additional validation beyond basic normalization to ensure data will pass Pydantic validation without issues. Args: data: Normalized data endpoint: API endpoint name Returns: Dictionary with validation results and recommendations """ validation_report: Dict[str, Any] = { "endpoint": endpoint, "compatible": True, "warnings": [], "recommendations": [], } try: if endpoint == "RetrievePanelOfReporters": if isinstance(data, list): for i, item in enumerate(data[:3]): # Check first 3 items if isinstance(item, dict): # Check required fields if "ID_RSSD" not in item: validation_report["warnings"].append( f"Item {i} missing ID_RSSD" ) validation_report["compatible"] = False elif not isinstance(item["ID_RSSD"], str): validation_report["warnings"].append( f"Item {i} ID_RSSD not string: {type(item['ID_RSSD'])}" ) validation_report["compatible"] = False if "Name" not in item: validation_report["warnings"].append( f"Item {i} missing Name" ) validation_report["compatible"] = False # Check ZIP code format if "ZIP" in item and isinstance(item["ZIP"], str): if len(item["ZIP"]) == 4 and item["ZIP"].isdigit(): validation_report["warnings"].append( f"Item {i} ZIP missing leading zero: {item['ZIP']}" ) validation_report["recommendations"].append( "Apply DataNormalizer._fix_zip_code()" ) elif endpoint in ["RetrieveFilersSinceDate", "RSSDIDsResponse"]: if isinstance(data, list): for i, item in enumerate(data[:3]): if not isinstance(item, str): validation_report["warnings"].append( f"RSSD ID {i} not string: {type(item)}" ) validation_report["compatible"] = False except Exception as e: validation_report["error"] = str(e) validation_report["compatible"] = False return validation_report
[docs] @staticmethod def normalize_response(data: Any, endpoint: str, protocol: str = "REST") -> Any: """ Normalize REST response to match SOAP format exactly. This is the main entry point for data normalization. It ensures that REST API responses are transformed to be identical to SOAP responses, preventing any user-visible changes during protocol migration. Args: data: Raw response data from API endpoint: API endpoint name (e.g., "RetrievePanelOfReporters") protocol: Source protocol ("REST" or "SOAP") Returns: Normalized data matching SOAP format exactly """ if protocol != "REST": # SOAP data already in expected format logger.debug(f"Skipping normalization for {protocol} protocol") return data if not data: # Handle empty/null responses return data if endpoint not in DataNormalizer.TYPE_COERCIONS: logger.warning( f"No normalization rules defined for endpoint '{endpoint}'. " f"Data may not be normalized. Available endpoints: " f"{list(DataNormalizer.TYPE_COERCIONS.keys())}" ) return data logger.debug(f"Normalizing {endpoint} response from REST to SOAP format") try: coercions = DataNormalizer.TYPE_COERCIONS[endpoint] normalized_data = DataNormalizer._apply_normalizations( data, coercions, endpoint ) # Validate critical fields after normalization DataNormalizer._validate_normalized_data(normalized_data, endpoint) logger.debug(f"Successfully normalized {endpoint} response") return normalized_data except Exception as e: logger.error(f"Failed to normalize {endpoint} response: {e}") # Return original data if normalization fails # This ensures the system continues working even if normalization has bugs return data
@staticmethod def _apply_normalizations( data: Any, coercions: Dict[str, Any], endpoint: str ) -> Any: """Apply normalization rules to data structure.""" # Handle binary data preservation if coercions.get("_preserve_binary"): logger.debug(f"Preserving binary data for {endpoint}") return data if isinstance(data, list): # Handle arrays if "_array_items" in coercions: coercion_func = coercions["_array_items"] logger.debug(f"Applying array item coercion for {endpoint}") return [coercion_func(item) for item in data] else: # Array of objects return [ DataNormalizer._normalize_object( item, coercions, f"{endpoint}[{i}]" ) for i, item in enumerate(data) ] elif isinstance(data, dict): return DataNormalizer._normalize_object(data, coercions, endpoint) else: # Simple value - apply direct coercion if available if "_simple_value" in coercions: return coercions["_simple_value"](data) return data @staticmethod def _normalize_object( obj: Dict[str, Any], coercions: Dict[str, Any], context: str ) -> Dict[str, Any]: """Apply type coercions to dictionary object.""" if not isinstance(obj, dict): return obj normalized = obj.copy() normalization_count = 0 for field, coercion_func in coercions.items(): if field.startswith("_"): continue # Skip meta-fields like _array_items if field in normalized: try: original_value = normalized[field] normalized_value = coercion_func(original_value) # Only update if value actually changed if normalized_value != original_value: normalized[field] = normalized_value normalization_count += 1 logger.debug( f"Normalized {context}.{field}: " f"{type(original_value).__name__}({original_value}) → " f"{type(normalized_value).__name__}({normalized_value})" ) except Exception as e: logger.error(f"Failed to normalize {context}.{field}: {e}") # Keep original value if normalization fails continue if normalization_count > 0: logger.debug(f"Applied {normalization_count} normalizations to {context}") return normalized @staticmethod def _validate_normalized_data(data: Any, endpoint: str) -> None: """ Validate normalized data meets SOAP format requirements. This catches normalization bugs and ensures data quality. """ if not data: return validation_errors: list[str] = [] try: if isinstance(data, list): for i, item in enumerate(data[:3]): # Validate first few items if isinstance(item, dict): DataNormalizer._validate_object( item, f"{endpoint}[{i}]", validation_errors ) elif isinstance(data, dict): DataNormalizer._validate_object(data, endpoint, validation_errors) if validation_errors: error_summary = "; ".join(validation_errors[:5]) # First 5 errors logger.warning(f"Validation warnings for {endpoint}: {error_summary}") except Exception as e: logger.error(f"Validation failed for {endpoint}: {e}") @staticmethod def _validate_object(obj: Dict[str, Any], context: str, errors: List[str]) -> None: """Validate individual object meets format requirements.""" for field, value in obj.items(): if field in DataNormalizer.VALIDATION_RULES: rule = DataNormalizer.VALIDATION_RULES[field] pattern = rule.get("pattern") if pattern and isinstance(value, str): if not re.match(pattern, value): errors.append( f"{context}.{field} format invalid: '{value}' " f"(expected: {rule['description']})" ) elif field == "ZIP": # Special validation for ZIP codes if not isinstance(value, str): errors.append( f"{context}.{field} must be string, got {type(value)}" ) elif len(value) == 4 and value.isdigit(): errors.append( f"{context}.{field} missing leading zero: '{value}'" ) elif field == "ID_RSSD": # RSSD IDs must be strings if not isinstance(value, str): errors.append( f"{context}.{field} must be string, got {type(value)}" )
[docs] @staticmethod def get_normalization_stats( data_before: Any, data_after: Any, endpoint: str ) -> Dict[str, Any]: """ Generate statistics about normalization transformations applied. Useful for monitoring and debugging normalization effectiveness. """ stats = { "endpoint": endpoint, "timestamp": datetime.now().isoformat(), "transformations_applied": 0, "fields_normalized": [], "data_size": { "before": DataNormalizer._estimate_data_size(data_before), "after": DataNormalizer._estimate_data_size(data_after), }, "type_changes": {}, "validation_passed": False, } try: # Compare before/after to count transformations if isinstance(data_before, list) and isinstance(data_after, list): for i, (before_item, after_item) in enumerate( zip(data_before, data_after) ): if isinstance(before_item, dict) and isinstance(after_item, dict): stats.update( DataNormalizer._count_object_changes( before_item, after_item, f"[{i}]", stats ) ) elif isinstance(data_before, dict) and isinstance(data_after, dict): stats.update( DataNormalizer._count_object_changes( data_before, data_after, "root", stats ) ) # Validate final result DataNormalizer._validate_normalized_data(data_after, endpoint) stats["validation_passed"] = True except Exception as e: logger.error(f"Failed to generate normalization stats: {e}") stats["error"] = str(e) return stats
@staticmethod def _estimate_data_size(data: Any) -> int: """Estimate data size for statistics.""" if isinstance(data, (list, dict)): return len(str(data)) else: return len(str(data)) if data else 0 @staticmethod def _count_object_changes( before: Dict[str, Any], after: Dict[str, Any], path: str, stats: Dict[str, Any] ) -> Dict[str, Any]: """Count changes between before/after objects.""" for key in before.keys(): if key in after: before_val = before[key] after_val = after[key] if type(before_val) is not type(after_val): stats["transformations_applied"] += 1 stats["fields_normalized"].append(f"{path}.{key}") type_key = ( f"{type(before_val).__name__}_to_{type(after_val).__name__}" ) stats["type_changes"][type_key] = ( stats["type_changes"].get(type_key, 0) + 1 ) elif before_val != after_val: stats["transformations_applied"] += 1 stats["fields_normalized"].append(f"{path}.{key}") return stats