A high-performance, configurable data migration tool that processes XML content using a "contract-driven" approach using mapping rules that define the ETL into normalized relational structures.
- OS: Windows
- SHELL: powershell
- DB: MS SQL Server
- LANGUAGE: Python
- CONNECTIVITY: pyodbc
- TESTING: pytest
- XML: lxml
# Development installation (editable)
pip install -e .
# Production installation
pip install .
# With optional dependencies
pip install -e ".[dev]" # Include development tools (pytest, black, etc.)
pip install -e ".[optional]" # Include optional featurespip install -r requirements.txtThree main tools available:
- Configuration Display:
xml-extractorcommand for system status
# Display system information and configuration
xml-extractor
# Shows: database settings, processing config, environment variables- Direct Processing:
production_processor.py- Flexible single-invocation processor
# Gap filling / cleanup (limit mode)
python production_processor.py --server "server" --database "db" --limit 10000
# Specific range (range mode)
python production_processor.py --server "server" --database "db" --app-id-start 1 --app-id-end 50000
# Testing with defaults (10k limit applied automatically)
python production_processor.py --server "server" --database "db"- Chunked Processing:
run_production_processor.py- For large datasets (>100k)
# Breaks range into chunks, spawns fresh process per chunk
python run_production_processor.py --app-id-start 1 --app-id-end 300000
# Custom chunk size (default 10k)
python run_production_processor.py --app-id-start 1 --app-id-end 1000000 --chunk-size 5000Key Distinctions:
production_processor.py: Supports both LIMIT mode (gap filling) and RANGE mode. Single process.run_production_processor.py: RANGE MODE ONLY. Chunks large ranges into fresh processes to prevent memory degradation.
# Small test run (10k records with defaults)
python production_processor.py --server "localhost\SQLEXPRESS" --database "XmlConversionDB"
# Gap filling (processes up to 50k records, skips already-processed)
python production_processor.py --server "localhost\SQLEXPRESS" --database "XmlConversionDB" --limit 50000
# Medium production run (<100k apps)
python production_processor.py --server "localhost\SQLEXPRESS" --database "XmlConversionDB" \
--app-id-start 1 --app-id-end 50000 --workers 6 --batch-size 1000
# Large production run (>100k apps - use orchestrator)
python run_production_processor.py --app-id-start 1 --app-id-end 300000xml_extractor/
βββ __init__.py # Main package with core exports
βββ cli.py # Command-line interface (xml-extractor command)
βββ models.py # Core data classes and models
βββ interfaces.py # Abstract interfaces and base classes
βββ exceptions.py # Custom exception classes
βββ utils.py # Utility functions and helpers
βββ config/ # Configuration management
β βββ manager.py # Centralized configuration system
βββ database/ # Database operations and migration
β βββ connection_test.py # Database connectivity testing
β βββ migration_engine.py # High-performance bulk insert operations
βββ mapping/ # Data transformation and mapping
β βββ data_mapper.py # Core XML-to-database mapping engine
β βββ reverse_mapper.py # Reverse mapping utilities
β βββ calculated_field_engine.py # Calculated field expression evaluation
βββ parsing/ # XML parsing and processing
β βββ xml_parser.py # Memory-efficient XML parser
βββ validation/ # Multi-layered data validation system
βββ data_integrity_validator.py # End-to-end validation engine
βββ element_filter.py # XML element filtering and validation
βββ pre_processing_validator.py # Pre-extraction validation
βββ validation_integration.py # Validation orchestration
βββ validation_models.py # Validation data structures
βββ test_validation_system.py # Validation system tests
βββ README.md # Validation system documentation
# Production Scripts
production_processor.py # Main production processing script
# Configuration & Samples
config/
βββ mapping_contract.json # CRITICAL project contract for field mapping definitions
βββ data-model.md # Data model specifications
βββ database_config.json # Database configuration
βββ samples/ # Sample files and documentation
βββ configuration_summary.md
βββ create_destination_tables.sql
βββ enum_handling_guide.md
βββ insert_enum_values.sql
βββ migrate_table_logic.sql
βββ new_datamodel_queries.sql
βββ README.md
βββ sample-source-xml-contact-test.xml # Key source file used in tests to validate complex mappings
βββ test_mapping_contract.py
βββ validate_mapping_contract.sql
# Documentation
docs/
βββ bulk-insert-architecture.md # Bulk insert design and optimization
βββ data-intake-and-preparation.md # Data intake processes
βββ mapping-principles.md # Mapping system principles
βββ testing-philosophy.md # Testing approach and strategy
βββ validation-and-testing-strategy.md # Validation framework
βββ xml-hierarchy-corrections.md # XML structure corrections
# Tests
tests/
βββ test_end_to_end_integration.py # End-to-end integration tests
βββ test_production_xml_batch.py # Production batch processing tests
βββ test_real_sample_xml_validation.py # Real XML validation tests
βββ test_xml_validation_scenarios.py # XML validation scenarios
# Build & Dependencies
setup.py # Package setup configuration
requirements.txt # Python dependencies
README.md # This file
The XML Database Extraction System operates as a comprehensive pipeline that transforms XML content stored in database text columns into normalized relational structures:
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β XML Source ββββΆβ Pre-Processing βββββΆβ Extraction ββββΆβ Data Integrity β
β β β Validation β β Pipeline β β Validation β
β β’ Raw XML file β β β’ ElementFilter β β β’ XMLParser β β β’ End-to-End β
β β’ Provenir data β β β’ Business rules β β β’ DataMapper β β β’ Referential β
βββββββββββββββββββ ββββββββββββββββββββ βββββββββββββββββββ βββββββββββββββββββ
β β β
βΌ βΌ βΌ
ββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ
β ValidationResult β β Extracted Tables β β ValidationResult β
β β’ Can process? β β β’ Relational data β β β’ Quality OK? β
β β’ Early errors β β β’ Ready for DB β β β’ Detailed errors β
ββββββββββββββββββββ βββββββββββββββββββββ βββββββββββββββββββββ
- XML Source (Database) β Raw Provenir XML data from database text columns
- Pre-Processing Validation β ElementFilter + PreProcessingValidator quality gate
- Extraction Pipeline β XMLParser + DataMapper transformation engine
- Data Integrity Validation β DataIntegrityValidator quality assurance
- Database Migration β MigrationEngine bulk insert operations
- Gate 1: Pre-processing validation (can we process this XML?)
- Gate 2: Data integrity validation (is extracted data quality acceptable?)
- Gate 3: Migration success (were records successfully loaded?)
The system uses a contract-first approach where mapping contracts define the exact data structure and validation rules:
- Mapping Contracts: JSON specifications defining XML-to-database transformations
- Schema-Derived Metadata: Automatic addition of nullable/required/default_value fields
- DataMapper Validation: Ensures only contract-compliant columns are processed
- MigrationEngine Optimization: Focuses on high-performance bulk insertion of validated data
- Purpose: Memory-efficient XML parsing with selective element extraction
- Key Features: Selective parsing, contact deduplication, flattened data structures
- Integration: Provides data to DataMapper and validation components
- Purpose: Core data transformation engine orchestrating XML-to-database conversion
- Key Features: Contract-driven column selection, calculated field evaluation, enum handling
- Recent Changes: Now handles schema-derived nullable/required/default_value validation
- Integration: Receives flattened XML from XMLParser, produces contract-compliant tables for MigrationEngine
- Purpose: Safe evaluation of calculated field expressions with cross-element references
- Key Features: SQL-like expression language, safety features, performance optimization
- Integration: Called by DataMapper for complex field calculations
- Purpose: High-performance bulk insertion engine for contract-compliant relational data
- Key Features: Contract-driven column handling, fast_executemany optimization, transaction safety
- Recent Changes: Simplified to focus on bulk insertion; column validation now handled by DataMapper
- Integration: Receives pre-validated tables from DataMapper, performs optimized SQL Server bulk inserts
- Purpose: Multi-layered validation ensuring data quality throughout the pipeline
- Components: ElementFilter, PreProcessingValidator, DataIntegrityValidator, ValidationOrchestrator
- Integration: Validates at multiple pipeline stages, provides quality gates and reporting
- Centralized Config: Environment variable-based configuration system
- Mapping Contracts: JSON-based field mapping definitions with calculated field support
- Schema Flexibility: Configurable database schema prefixes for multi-environment support
- Proven Performance: 1,477-1,691 records/minute with >95% success rate (10x above original target of 150/min)
- Scalability: Multi-worker parallel processing, configurable batch sizes
- Memory Efficiency: Streaming XML parsing, configurable memory limits
- Monitoring: Real-time progress tracking and comprehensive metrics
- MappingContract: Defines how XML data maps to relational structure
- FieldMapping: Maps XML elements/attributes to database columns with calculated field support
- RelationshipMapping: Defines parent-child relationships between tables
- ProcessingConfig: Configuration parameters for extraction operations
- ProcessingResult: Results and metrics from processing operations
- Calculated Fields: Support for arithmetic expressions and CASE statements
- Contact Validation: "Last valid element" approach for duplicate handling
- Performance Monitoring: Real-time progress tracking and metrics
- Schema Flexibility: Configurable database schema prefixes for multi-environment support
- Centralized Configuration: Environment variable-based configuration management
- Schema-Derived Metadata: Automatic enhancement of mapping contracts with nullable/required/default_value fields from database schema
- Simplified MigrationEngine: Removed dynamic column filtering; now focuses purely on high-performance bulk insertion
- Consolidated Default Handling: Migrated contract-level defaults to field-level for better maintainability
- Enhanced Data Validation: Contract-compliant data processing ensures compatibility throughout the pipeline
- Comprehensive Test Coverage: 128 tests across unit, integration, and end-to-end scenarios (100% pass rate)
- Updated Documentation: Enhanced docstrings and README to reflect architectural changes
- Cleaned Configuration: Removed unused contract sections and consolidated default value handling
Issues Identified & Fixed:
-
RangeS-U Lock Contention (RESOLVED)
- Symptom: Batch processing hanging during parallel inserts
- Root Cause: Duplicate check queries acquiring shared locks, serializing 4 workers
- Solution: Added
WITH (NOLOCK)to 3 duplicate detection queries inmigration_engine.py - Result: Workers now proceed in parallel without lock serialization
-
Resume Logic Bug (RESOLVED)
- Symptom: Consecutive runs without clearing
processing_logwould reprocess already-successful apps - Root Cause: WHERE clause excluded only
status='failed', notstatus='success' - Solution: Changed to
AND pl.status IN ('success', 'failed')inproduction_processor.py - Result: Second run correctly returns 0 records, enabling true resume capability
- Symptom: Consecutive runs without clearing
-
Pagination Bug (RESOLVED)
- Symptom: OFFSET-based pagination skipped records (pattern: apps 1-20, 41-60, 81-100)
- Root Cause: OFFSET applied after WHERE filtering, causing cursor misalignment
- Solution: Implemented cursor-based pagination using
app_id > last_app_idwithOFFSET 0 ROWS FETCH - Result: Sequential processing without gaps
Baseline Metrics Established:
- Optimal batch-size: 500 (on this machine)
- Throughput: 1477-1691 applications/minute (batch-size 500)
- Target was 3000+ rec/min (not achieved, CPU-bound bottleneck identified)
Tests Performed & Results:
| Batch Size | Throughput (rec/min) | Finding |
|---|---|---|
| 20 | 534 | Too small, high orchestration overhead |
| 50 | 1192 | Better, still suboptimal |
| 100 | 1791 | Good, but unstable with larger volumes |
| 500 | 1477-1691 | Optimal - consistent, reliable peak |
| 1000 | 1387 | Declining, memory pressure begins |
| 2000 | 1393 | Further decline, orchestration overhead |
Optimization Attempts (Inconclusive):
- Conditional logging (reduced DEBUG overhead): β No improvement
- Connection pooling tuning: β No improvement
- FK removal + index rebuild: β No improvement
Root Cause Analysis:
- Bottleneck: CPU-bound processing (XML parsing with lxml, data mapping/transformation)
- Database I/O: Not a bottleneck (confirmed by FK removal test)
- Logging overhead: Negligible (confirmed by conditional logging test)
Architectural Decisions:
- Batch-size 500: Balances memory efficiency vs orchestration overhead
- 4 Workers: One per CPU core, prevents context-switching overhead
- Connection pooling disabled for SQLExpress: No benefit for local connections
- Three-layer duplicate detection: Pragmatic balance between performance and correctness
- Layer 1:
processing_log(fast app-level check) - Layer 2: Contact-level table queries with
NOLOCK(de-duplication) - Layer 3: FK/PK constraints (safety net)
- Layer 1:
Documentation Cleanup:
- Consolidated 18+ WIP performance docs to single
FINAL_PERFORMANCE_SUMMARY.md - Archived detailed investigation docs to
performance_tuning/archived_analysis/ - Kept architectural decisions and methodology for future reference
# Clone repository
git clone <repository-url>
cd xml-database-extraction
# Create virtual environment
python -m venv .venv
source .venv/bin/activate # Linux/Mac
# or
.venv\Scripts\activate # Windows
# Install in development mode
pip install -e ".[dev]"
# Run tests
pytest # All tests
python run_integration_suite.py # Integration test suite (if moved to root)
# Code quality
black xml_extractor/ # Code formatting
mypy xml_extractor/ # Type checking
flake8 xml_extractor/ # Linting# Unit tests
pytest tests/unit/
# Integration tests
pytest tests/integration/
# End-to-end tests
pytest tests/e2e/
# Comprehensive test suite
python tests/run_integration_suite.py # Runs all test categories with reporting# Database connection
export XML_EXTRACTOR_DB_SERVER="your-sql-server"
export XML_EXTRACTOR_DB_DATABASE="YourDatabase"
export XML_EXTRACTOR_CONNECTION_STRING="Driver={ODBC Driver 17 for SQL Server};Server=...;"
# Processing configuration
export XML_EXTRACTOR_BATCH_SIZE=100
export XML_EXTRACTOR_PARALLEL_PROCESSES=4
export XML_EXTRACTOR_MEMORY_LIMIT_MB=512
# Schema configuration (for multi-environment support)
export XML_EXTRACTOR_DB_SCHEMA_PREFIX=sandbox # Optional: for non-production schemas# Check configuration status
xml-extractor
# Test database connectivity
python production_processor.py --server "server" --database "db" --limit 1 --log-level DEBUGSee docs/production-deployment.md for comprehensive production deployment guide including:
- Performance optimization
- Monitoring and alerting
- Database configuration
- Operational procedures
- Troubleshooting
# 1. Install package
pip install .
# 2. Test connectivity
python production_processor.py --server "prod-server" --database "DB" --limit 1
# 3. Run production batch
python production_processor.py \
--server "prod-server" \
--database "ProductionDB" \
--workers 4 \
--batch-size 100 \
--log-level ERROR- Achieved Performance: 1,477-1,691 records/minute with >95% success rate
- Original Target: >150 records/minute (exceeded by 10x)
- Parallel Processing: Multi-worker support for high throughput
- Memory Efficient: Configurable batch sizes and memory limits
- Real-time Monitoring: Progress tracking and performance metrics
- Python 3.8+
- lxml for high-performance XML processing
- pyodbc for SQL Server connectivity
- Additional dependencies listed in requirements.txt