Purpose: Shared utilities and helper functions for the GNN processing pipeline
Pipeline Step: Infrastructure module (not a numbered step)
Category: Utility Functions / Infrastructure Support
Status: ✅ Production Ready
Version: 2.0.0
Last Updated: 2026-01-21
- Pipeline orchestration and coordination utilities
- Logging and diagnostic utilities
- Configuration and argument parsing utilities
- Resource management and monitoring utilities
- Error handling and recovery utilities
- Performance tracking and optimization utilities
- Centralized logging and diagnostic system
- Argument parsing and configuration management
- Resource monitoring and performance tracking
- Error handling and recovery mechanisms
- Pipeline orchestration and coordination
- Utility functions for common operations
Description: Set up standardized logging for a pipeline step with correlation ID tracking
Parameters:
step_name(str): Name of the pipeline step (e.g., "3_gnn")verbose(bool): Enable verbose logging (default: False)
Returns: logging.Logger - Configured logger instance with correlation ID
Example:
from utils import setup_step_logging
logger = setup_step_logging("3_gnn", verbose=True)Description: Set up logging for main pipeline orchestrator
Parameters:
log_dir(Optional[Path]): Directory for log files (default: None)verbose(bool): Enable verbose logging (default: False)
Returns: logging.Logger - Configured main logger instance
log_step_start(logger_or_step_name: Union[logging.Logger, str], message: str = None, step_number: int = None, **metadata) -> None
Description: Log the start of a pipeline step with performance tracking
Parameters:
logger_or_step_name(Union[logging.Logger, str]): Logger instance or step namemessage(str, optional): Custom start messagestep_number(int, optional): Step number for display**metadata: Additional metadata to log
Returns: None
log_step_success(logger_or_step_name: Union[logging.Logger, str], message: str = None, step_number: int = None, **metadata) -> None
Description: Log successful completion of a pipeline step with metrics
Parameters:
logger_or_step_name(Union[logging.Logger, str]): Logger instance or step namemessage(str, optional): Custom success messagestep_number(int, optional): Step number for display**metadata: Additional metadata (results, file counts, etc.)
Returns: None
log_step_error(logger_or_step_name: Union[logging.Logger, str], message: str = None, step_number: int = None, **metadata) -> None
Description: Log an error during pipeline step execution with context
Parameters:
logger_or_step_name(Union[logging.Logger, str]): Logger instance or step namemessage(str, optional): Custom error messagestep_number(int, optional): Step number for display**metadata: Error context (exception, traceback, etc.)
Returns: None
log_step_warning(logger_or_step_name: Union[logging.Logger, str], message: str = None, step_number: int = None, **metadata) -> None
Description: Log a warning during pipeline step execution
Parameters:
logger_or_step_name(Union[logging.Logger, str]): Logger instance or step namemessage(str, optional): Warning messagestep_number(int, optional): Step number for display**metadata: Warning context
Returns: None
Description: Get summary of performance metrics across all tracked operations
Returns: Dict[str, Any] - Performance summary with timing, memory, and resource usage
Description: Set up correlation context for request tracking
Parameters:
step_name(str): Name of the pipeline stepcorrelation_id(Optional[str]): Existing correlation ID or None to generate new
Returns: str - Correlation ID for this context
Description: Parse arguments for a specific pipeline step with recovery support
Parameters:
step_name(str): Name of the pipeline step
Returns: argparse.Namespace - Parsed arguments with standard pipeline options
Standard Arguments:
--target-dir: Target directory for input files--output-dir: Output directory for results--verbose: Enable verbose logging--recursive: Recursively process directories
Description: Build command-line arguments for a pipeline step
Parameters:
step_name(str): Name of the pipeline stepargs(argparse.Namespace): Parsed arguments
Returns: List[str] - Command-line argument list
Description: Validate and convert string paths to Path objects
Parameters:
args(argparse.Namespace): Arguments with path strings
Returns: argparse.Namespace - Arguments with Path objects
Description: Get standardized output directory for a pipeline script
Parameters:
script_name(str): Name of the script (e.g., "3_gnn.py")base_output_dir(Optional[Path]): Base output directory (default: Path("output"))
Returns: Path - Output directory path (e.g., "output/3_gnn_output/")
Description: Validate and optionally create output directory
Parameters:
output_dir(Path): Output directory pathcreate(bool): Create directory if it doesn't exist (default: True)
Returns: bool - True if directory is valid/created, False otherwise
Description: Get current process memory usage
Returns: float - Memory usage in megabytes (MB)
ErrorRecoveryManager.recover(step_name: str, error: Exception, context: Dict[str, Any]) -> Optional[Dict[str, Any]]
Description: Attempt to recover from a step failure
Parameters:
step_name(str): Name of the failed steperror(Exception): The exception that occurredcontext(Dict[str, Any]): Error context and state
Returns: Optional[Dict[str, Any]] - Recovery result or None if recovery not possible
format_and_log_error(logger: logging.Logger, error: Exception, context: Dict[str, Any] = None) -> None
Description: Format and log an error with full context
Parameters:
logger(logging.Logger): Logger instanceerror(Exception): The exception to logcontext(Dict[str, Any], optional): Additional error context
Returns: None
Description: Load configuration from YAML or JSON file
Parameters:
config_path(Path): Path to configuration file
Returns: Dict[str, Any] - Configuration dictionary
Description: Get a configuration value by key
Parameters:
key(str): Configuration key (supports dot notation, e.g., "pipeline.steps")default(Any): Default value if key not found
Returns: Any - Configuration value or default
Description: Set a configuration value
Parameters:
key(str): Configuration key (supports dot notation)value(Any): Value to set
Returns: None
Description: Validate all pipeline dependencies are installed
Returns: Dict[str, bool] - Dependency status (package_name: is_installed)
Description: Check if optional dependency group is available
Parameters:
dependency_group(str): Dependency group name (e.g., "pymdp", "jax")
Returns: bool - True if dependencies are available
Description: Install missing dependencies
Parameters:
dependencies(List[str]): List of package names to install
Returns: bool - True if installation succeeded
Description: Track performance of an operation
Parameters:
name(str): Operation namefunc(Callable): Function to track*args: Function arguments**kwargs: Function keyword arguments
Returns: Any - Function return value
Description: Standalone function to track operation performance
Parameters:
name(str): Operation namefunc(Callable): Function to track*args: Function arguments**kwargs: Function keyword arguments
Returns: Any - Function return value
pathlib- Path manipulationlogging- Logging functionalityargparse- Argument parsingtyping- Type hints
psutil- System resource monitoringnumpy- Numerical computations
- None (base infrastructure module)
LOGGING_CONFIG = {
'console_level': 'INFO',
'file_level': 'DEBUG',
'correlation_tracking': True,
'structured_logging': True
}PERFORMANCE_CONFIG = {
'memory_tracking': True,
'timing_tracking': True,
'resource_monitoring': True
}from utils.logging_utils import setup_step_logging
logger = setup_step_logging("3_gnn.py", verbose=True)
logger.info("Starting GNN processing")from utils.pipeline import get_output_dir_for_script
output_dir = get_output_dir_for_script("3_gnn.py", Path("output"))
print(f"GNN output directory: {output_dir}")from utils.pipeline_template import create_standardized_pipeline_script
run_script = create_standardized_pipeline_script(
"3_gnn.py",
process_gnn_files,
"GNN file processing"
)
# Execute the script
exit_code = run_script()from utils.resource_manager import get_current_memory_usage
memory_before = get_current_memory_usage()
# ... do some work ...
memory_after = get_current_memory_usage()
print(f"Memory delta: {memory_after - memory_before} MB")- Log files in configured log directory
- Performance metrics and timing data
- Error reports and recovery logs
- Configuration validation reports
output/
├── logs/
│ ├── pipeline.log
│ ├── step_logs/
│ └── error_logs/
└── performance/
├── timing_data.json
└── memory_usage.json
- Duration: Variable (utility functions)
- Memory: ~10-50MB overhead
- Status: ✅ Production Ready
- Logging: < 1ms per log entry
- Path Operations: < 1ms per operation
- Memory Monitoring: < 5ms per check
- Configuration: < 10ms per operation
- Configuration Errors: Invalid configuration parameters
- Path Errors: Invalid or inaccessible paths
- Logging Errors: Logging system failures
- Resource Errors: Resource monitoring failures
- Configuration Repair: Use default values
- Path Resolution: Resolve relative paths
- Logging Recovery: Use basic logging
- Resource Monitoring: Continue without monitoring
- All pipeline scripts and modules
- None (base infrastructure module)
- All pipeline scripts (0_template.py through 24_intelligent_analysis.py)
- All pipeline modules
Configuration → Logging Setup → Resource Monitoring → Error Handling → Performance Tracking
src/tests/test_utils_core.py- Core utils testssrc/tests/test_new_utils.py- Additional utils tests
- Current: 93%
- Target: 95%+
- Logging and diagnostic utilities
- Configuration and argument parsing
- Resource management and monitoring
- Error handling and recovery
utils.get_system_info- Get system informationutils.get_environment_info- Get environment informationutils.get_logging_info- Get logging configurationutils.validate_dependencies- Validate dependenciesutils.get_performance_metrics- Get performance metrics
@mcp_tool("utils.get_system_info")
def get_system_info_tool():
"""Get system information"""
# ImplementationSymptom: No log output or logs in wrong location
Cause: Logging configuration incorrect or permissions issues
Solution:
- Verify log directory exists and is writable
- Check logging level configuration
- Use
--verboseflag for detailed logging - Review logging configuration in pipeline config
Symptom: Script fails with argument parsing errors
Cause: Argument definition mismatch or missing required arguments
Solution:
- Verify argument definitions match script usage
- Check required arguments are provided
- Review argument parser configuration
- Use
--helpflag to see expected arguments
Features:
- Centralized logging system
- Argument parsing utilities
- Resource monitoring
- Performance tracking
- Error handling utilities
Known Issues:
- None currently
- Next Version: Enhanced performance monitoring
- Future: Real-time resource tracking
Last Updated: 2026-01-21 Maintainer: GNN Pipeline Team Status: ✅ Production Ready Version: 2.0.0 Architecture Compliance: ✅ 100% Thin Orchestrator Pattern