This project provides tools for gathering UDP monitoring messages from XRootD servers and sending them to a reliable message bus. It includes two binaries: shoveler for minimal processing and high throughput, and xrootd-monitoring-collector for full packet parsing and correlation.
graph LR
subgraph Site
subgraph Node 1
node1[XRootD] -- UDP --> shoveler1{Shoveler};
end
subgraph Node 2
node2[XRootD] -- UDP --> shoveler1{Shoveler};
end
end;
subgraph OSG Operations
shoveler1 -- TCP/TLS --> C[Message Bus];
C -- Raw --> D[XRootD Collector];
D -- Summary --> C;
C --> E[(Storage)];
style shoveler1 font-weight:bolder,stroke-width:4px,stroke:#E74C3C,font-size:4em,color:#E74C3C
end;
- An open UDP port from the XRootD servers, defaults to port 9993. The port does not need to be open to the public internet, only the XRootD servers.
- Outgoing network access to connect to the message bus.
- Disk space for a persistent message queue if the shoveler is disconnected from the message bus. Calculations have shown production servers generate <30 MB of data a day.
The shoveler can run on a dedicated server or on a shared server. The shoveler does not require many resources. For example, a shoveler serving 12 production XRootD servers can be expected to consume 10-50 MB of ram, and require a small fraction of a CPU.
Binaries and packages are provided in the latest Github releases.
The shoveler reads configuration from multiple sources with the following precedence (highest to lowest):
- Command line arguments - Highest priority
- Environment Variables - Override config file settings
- Configuration file - Default configuration source
Both shoveler and xrootd-monitoring-collector binaries support the following flags:
# Specify custom configuration file path
./bin/shoveler -c /path/to/config.yaml
./bin/shoveler --config /path/to/config.yaml
# Display help
./bin/shoveler -hAn example configuration file, config.yaml is in the repo. By default, the shoveler searches for configuration files in the following locations (in order):
/etc/xrootd-monitoring-shoveler/config.yaml$HOME/.xrootd-monitoring-shoveler/config.yaml./config.yaml(current directory)./config/config.yaml
You can override this by specifying a custom path with -c or --config.
Every configuration option can be set via environment variables using the SHOVELER_ prefix. Nested configuration keys use underscores. For example:
mode→SHOVELER_MODElisten.port→SHOVELER_LISTEN_PORTamqp.url→SHOVELER_AMQP_URLstate.entry_ttl→SHOVELER_STATE_ENTRY_TTL
When running as a daemon, environment variables can be set in /etc/sysconfig/xrootd-monitoring-shoveler.
The system provides two separate binaries for different use cases:
The traditional shoveler performs minimal processing:
- Validates packet boundaries and type
- Forwards packets to message bus with minimal overhead
- Preserves current behavior for maximum throughput
- Suitable for high-volume environments
Run with: shoveler (or shoveler -c /path/to/config.yaml)
The collector performs full packet parsing and correlation:
- Parses XRootD monitoring packets according to the XRootD monitoring specification
- Correlates file open and close events to compute latency and throughput
- Maintains stateful tracking of file operations with TTL-based cleanup
- Emits structured collector records with detailed metrics
- Tracks parsing performance and state management via Prometheus metrics
- WLCG Format Conversion: Automatically converts records to WLCG format when:
- The VO is
cms, OR - The file path starts with
/storeor/user/dteam - WLCG records are sent to a separate exchange (
amqp.exchange_wlcg) instead of the main exchange
- The VO is
Run with: xrootd-monitoring-collector (or xrootd-monitoring-collector -c /path/to/config.yaml)
Configure state management parameters:
state:
entry_ttl: 300 # Time-to-live for state entries in seconds
max_entries: 10000 # Maximum state entries (0 for unlimited)
disable_reverse_dns: true # Disable reverse DNS lookups (default: true for performance)See config-collector.yaml for a complete example.
General Configuration:
SHOVELER_DEBUG- Enable debug logging:trueorfalseSHOVELER_VERIFY- Verify packet format:trueorfalse(default:true)
Input Configuration:
SHOVELER_INPUT_TYPE- Input source:udp,file, orrabbitmq(default:udp)SHOVELER_INPUT_HOST- Input host addressSHOVELER_INPUT_PORT- Input port numberSHOVELER_INPUT_BUFFER_SIZE- Buffer size for UDP packets (default:65536)SHOVELER_INPUT_BROKER_URL- Message broker URL (for rabbitmq input)SHOVELER_INPUT_TOPIC- Message broker topic/queue nameSHOVELER_INPUT_QUEUE- Alias for topic (RabbitMQ)SHOVELER_INPUT_SUBSCRIPTION- Subscription name (for message bus)SHOVELER_INPUT_BASE64_ENCODED- Packets are base64 encoded:trueorfalse(default:true)SHOVELER_INPUT_PATH- File path (for file input type)SHOVELER_INPUT_FOLLOW- Follow file mode like tail:trueorfalse
State Management (Collector Mode):
SHOVELER_STATE_ENTRY_TTL- TTL for state entries in seconds (default:300)SHOVELER_STATE_MAX_ENTRIES- Maximum state entries, 0 for unlimited (default:0)SHOVELER_STATE_DISABLE_REVERSE_DNS- Disable reverse DNS lookups for performance (default:true)
Output Configuration (Collector Mode):
SHOVELER_OUTPUT_TYPE- Output destination:mq,file, orboth(default:mq)SHOVELER_OUTPUT_PATH- File path for file output
Message Queue Configuration:
SHOVELER_MQ- Message queue type:amqporstomp(default:amqp)
AMQP Configuration:
SHOVELER_AMQP_URL- AMQP broker URLSHOVELER_AMQP_EXCHANGE- Main exchange name (default:shoveled-xrd)SHOVELER_AMQP_EXCHANGE_CACHE- Cache events exchange (default:xrd-cache-events)SHOVELER_AMQP_EXCHANGE_TCP- TCP events exchange (default:xrd-tcp-events)SHOVELER_AMQP_EXCHANGE_TPC- TPC events exchange (default:xrd-tpc-events)SHOVELER_AMQP_EXCHANGE_WLCG- WLCG formatted events exchange (default:xrd-wlcg-events)SHOVELER_AMQP_TOKEN_LOCATION- JWT token file path (default:/etc/xrootd-monitoring-shoveler/token)SHOVELER_AMQP_PUBLISH_WORKERS- Number of concurrent publishing workers for collector mode (default:10, forced to1in shoveler mode)
STOMP Configuration:
SHOVELER_STOMP_USER- STOMP usernameSHOVELER_STOMP_PASSWORD- STOMP passwordSHOVELER_STOMP_URL- STOMP broker URLSHOVELER_STOMP_TOPIC- STOMP topic (default:xrootd.shoveler)SHOVELER_STOMP_CERT- Client certificate pathSHOVELER_STOMP_CERTKEY- Client certificate key path
Listening Configuration:
SHOVELER_LISTEN_PORT- UDP listening port (default:9993)SHOVELER_LISTEN_IP- UDP listening IP address
Output Destinations:
SHOVELER_OUTPUTS_DESTINATIONS- Additional UDP destination addresses
Metrics:
SHOVELER_METRICS_ENABLE- Enable Prometheus metrics:trueorfalse(default:true)SHOVELER_METRICS_PORT- Metrics HTTP server port (default:8000)
Queue:
SHOVELER_QUEUE_DIRECTORY- Persistent queue directory (default:/var/spool/xrootd-monitoring-shoveler/queue)
IP Mapping:
SHOVELER_MAP_ALL- Map all IPs to a single address
When running using AMQP as the protocol to connect the shoveler uses a JWT to authorize with the message bus. The token will be issued by an automated process, but for now, long lived tokens are issued to sites.
On the other hand, if STOMP is the selected protocol user and password will need to be provided when configuring the shoveler.
If the verify option or SHOVELER_VERIFY env. var. is set to true (the default), the shoveler will perform
simple verification that the incoming UDP packets conform to XRootD monitoring packets.
When the shoveler runs on the same node as the XRootD server, or in the same private network, the IP of the incoming XRootD
packets may report the private IP address rather than the public IP address. The public ip address is used for reverse
DNS lookup when summarizing the records. You may map incoming IP addresses to other addresses with the map configuration value.
To map all incoming messages to a single IP:
map:
all: <ip address>
or the environment variable SHOVELER_MAP_ALL=
To map multiple ip addresses, the config file would be:
map:
<ip address>: <ip address>
<ip address>: <ip address>
The shoveler is a statically linked binary, distributed as an RPM and uploaded to docker hub and OSG's container hub. You will need to configure the config.yaml before starting.
Install the RPM from the latest release.
Start the systemd service with:
systemctl start xrootd-monitoring-shoveler.service
From Docker, you can start the container from the OSG hub with the following command.
docker run -v config.yaml:/etc/xrootd-monitoring-shoveler/config.yaml hub.opensciencegrid.org/opensciencegrid/xrootd-monitoring-shoveler
The system uses a configurable pool of concurrent worker goroutines for publishing messages to RabbitMQ, improving throughput and resource utilization.
Worker Pool Components:
- Shared Message Queue: Single buffered channel (1000 messages) that all workers read from
- Multiple Workers: Configurable number of worker goroutines (default: 10)
- Independent Connections: Each worker maintains its own AMQP connection to the broker
- Context-Based Cancellation: Clean shutdown using Go contexts
- Automatic Token Rotation: Workers restart with new credentials when JWT tokens are updated
Collector Mode (Default: 10 workers)
- Uses configured number of workers for parallel publishing
- Workers compete for messages from the shared queue (automatic load balancing)
- Improves throughput for high-volume collector output
- Configure via
amqp.publish_workersin YAML orSHOVELER_AMQP_PUBLISH_WORKERSenvironment variable
Shoveler Mode (Always: 1 worker)
- Forced to use exactly 1 worker regardless of configuration
- Preserves strict message ordering required for shoveling mode
- Configuration value is ignored to ensure data integrity
- Single worker guarantees messages are published in the exact order received
Workers use a shared channel pattern instead of round-robin distribution:
- All workers read from a single buffered channel
- Fastest available worker picks up the next message
- Natural load balancing - busy workers don't block others
- No per-worker queues, reducing memory overhead
amqp:
url: amqps://broker.example.com:5671/
token_location: /etc/xrootd-monitoring-shoveler/token
publish_workers: 20 # Only used in collector mode, ignored in shoveler mode
exchange: shoveled-xrdEnvironment Variable:
export SHOVELER_AMQP_PUBLISH_WORKERS=20Recommended worker counts based on message volume:
| Message Rate | Recommended Workers |
|---|---|
| < 1,000/sec | 5-10 workers |
| 1,000-10,000/sec | 10-20 workers |
| > 10,000/sec | 20-50 workers |
Note: Too many workers can:
- Consume excessive connections to RabbitMQ
- Increase memory overhead
- Cause contention on the broker
Monitor the shoveler_queue_size metric to tune worker count appropriately.
When using JWT token-based authentication:
- System monitors token file every 10 seconds
- On token update (file modification time changes):
- All workers' contexts are cancelled
- Workers gracefully close their AMQP connections
- New workers are created with updated credentials
- Message publishing resumes seamlessly
This ensures zero-downtime token rotation with automatic credential updates.
The project provides two binaries with distinct processing pipelines:
- Receive UDP packet
- Optional: Validate packet header
- Package packet with metadata (IP, timestamp)
- Enqueue to message bus
- Optional: Forward to additional UDP destinations
- Receive UDP packet (or from message bus/file)
- Parse packet according to XRootD monitoring specification
- Extract structured fields (file operations, user info, etc.)
- Correlate with existing state (open/close matching)
- Calculate metrics (latency, throughput)
- Emit structured collector record
- Enqueue to message bus (or write to file)
The collector uses a TTL-based state map with automatic cleanup to track file operations across multiple packets. This enables correlation of file open events with their corresponding close events to compute accurate latency and transfer metrics.
The collector can seamlessly integrate with message bus systems like RabbitMQ. Messages can be consumed from the message bus, parsed, and published back with enriched correlation data:
graph LR
MB1["Message Bus<br/>Raw Packets"] -->|consume packet| P["Packet Parser"]
P -->|parse XRootD format| C["Correlator<br/>State Management"]
C -->|emit record| F["Format Record"]
F -->|enriched data| MB2["Message Bus<br/>Parsed Records"]
style P fill:#4CAF50,color:#fff
style C fill:#2196F3,color:#fff
style F fill:#FF9800,color:#fff
The shoveler receives UDP packets and stores them onto a queue before being sent to the message bus. 100 messages
are stored in memory. When the in memory messages reaches over 100, the messages are written to disk under the
SHOVELER_QUEUE_DIRECTORY (env) or queue_directory (yaml) configured directories. A good default is
/var/spool/xrootd-monitoring-shoveler/queue. Note that /var/run or /tmp should not be used, as these directories
are not persistent and may be cleaned regularly by tooling such as systemd-tmpfiles.
The on-disk queue is persistent across shoveler restarts.
The queue length can be monitored through the prometheus monitoring metric name: shoveler_queue_size.
The shoveler exports Prometheus metrics for monitoring. Common metrics include:
Shoveling Mode:
shoveler_packets_received- Total packets receivedshoveler_validations_failed- Packets that failed validationshoveler_queue_size- Current queue sizeshoveler_rabbitmq_reconnects- MQ reconnection count
Collector Binary (additional):
shoveler_packets_parsed_ok- Successfully parsed packetsshoveler_parse_errors- Parse errors by reasonshoveler_state_size- Current state map entriesshoveler_ttl_evictions- State entries evicted due to TTLshoveler_records_emitted- Collector records emittedshoveler_parse_time_ms- Packet parsing time histogramshoveler_request_latency_ms- Request latency histogram
Metrics are available at http://localhost:8000/metrics by default (configurable via metrics.port).
Both shoveler and xrootd-monitoring-collector support pprof profiling for performance analysis and troubleshooting. Enable profiling in your configuration:
profile:
enable: true
port: 3030 # Default portWhen enabled, pprof endpoints are available at http://localhost:3030/debug/pprof/:
/debug/pprof/- Index of available profiles/debug/pprof/profile- 30-second CPU profile/debug/pprof/heap- Heap memory profile/debug/pprof/goroutine- Goroutine stack traces/debug/pprof/block- Blocking profile/debug/pprof/mutex- Mutex contention profile/debug/pprof/trace- Execution trace
Example Usage:
# CPU profiling (30 seconds)
go tool pprof http://localhost:3030/debug/pprof/profile
# Heap profiling
go tool pprof http://localhost:3030/debug/pprof/heap
# View goroutines
curl http://localhost:3030/debug/pprof/goroutine?debug=1Note: Profiling is disabled by default and should only be enabled when troubleshooting performance issues.
The packet parser (parser/xrootd_parser.go) implements the XRootD monitoring specification:
Supported Packet Types:
=Map/Dictionary records - Maps numeric IDs to strings (file paths, user info)fFile open - Initiates file operation trackingdFile close - Records file statistics and operation metricstTime records - Timestamp and server informationxTransfer records - Data transfer metrics- XML summary packets - Summary format packets
Key Features:
- Binary parsing with proper byte order handling
- Variable-length record support
- Packet validation (length, checksums)
- Comprehensive error handling
Example Usage:
packet, err := parser.ParsePacket(rawBytes)
if err != nil {
// Handle error
}
// Access parsed data
switch rec := packet.FileRecords[0].(type) {
case parser.FileCloseRecord:
fmt.Printf("Read: %d bytes\n", rec.Xfr.Read)
}The collector uses a TTL-based concurrent state map (collector/state.go) to track file operations:
Features:
- Automatic TTL-based expiration
- Background janitor for cleanup
- Configurable max entries (prevents unbounded memory growth)
- Thread-safe (RWMutex)
- O(1) operations
- Optional reverse DNS lookup (disabled by default for performance)
Example Usage:
stateMap := collector.NewStateMap(
5*time.Minute, // TTL
10000, // max entries
30*time.Second, // cleanup interval
)
defer stateMap.Stop()
stateMap.Set("key", data)
value, exists := stateMap.Get("key")The correlator (collector/correlator.go) correlates file operations across packets:
Features:
- Matches file open with close events
- Calculates latency and throughput
- Handles standalone events (open without close, close without open)
- Produces structured CollectorRecord output
- Implements server ID scoping for multiple XRootD instances
Server ID Format:
- Format:
serverStart#remoteAddr - Purpose: Scope state maps per server instance to handle multiple XRootD servers
- Example:
1234567890#192.168.1.100
CollectorRecord Format:
{
"@timestamp": "2025-11-20T19:33:40.526767022Z",
"start_time": 1763663574000,
"end_time": 1763663574000,
"operation_time": 0,
"read_operations": 1,
"read": 131072,
"write": 0,
"filename": "/path/to/file.nc",
"HasFileCloseMsg": 1
}The input package (input/input.go) provides a unified interface for packet sources:
Implementations:
UDPListener: Traditional UDP packet reception from XRootD serversFileReader: File-based packet reading for testing and replayRabbitMQConsumer: Message bus support with base64 packet decoding
Interface:
type PacketSource interface {
Start() error
Stop() error
Packets() <-chan []byte
}The collector implements proper scoping to handle multiple XRootD servers and dictionary-based file identification:
According to the XRootD specification, dictionary mappings reduce packet size by mapping numeric IDs to strings:
- Used for file paths and user information
- Stored in
dpacket records - Enabled when
xrootd.monitorincludes dictid option
File open records may not include the filename directly:
- The XRootD specification allows omitting the
XrdXrootdMonFileLFNstructure - When omitted, the filename must be looked up using the FileID in the dictionary map
- The collector automatically falls back to dictionary lookup when needed
The correlator scopes all state map entries by server ID to properly handle:
- Multiple XRootD servers reporting to the same collector
- Server restarts (detected via server start timestamp changes)
- Multiple network interfaces on the same server
Parser Tests: parser/xrootd_parser_test.go
- XML packet handling
- Binary packet parsing
- Length validation
- Map records, file operations
- Error cases
State Tests: collector/state_test.go
- TTL expiration
- Max entries enforcement
- Concurrent access
- Janitor cleanup
Correlator Tests: collector/correlator_test.go
- Open/close matching
- Standalone events
- Average calculations
- JSON serialization
- Server ID scoping
- Dictionary ID lookups
End-to-End Tests: integration_test.go
- Complete file operation flow
- Packet verification
- Correlation accuracy
Running Tests:
# All tests
go test ./...
# Integration tests
go test -tags integration -v .
# Specific package
go test ./parser -v
# With coverage
go test -cover ./...- Overhead: Negligible (same as before)
- Throughput: Optimal for high-volume environments (>100k packets/sec)
- Memory: Minimal (queue only)
- Use Case: High-volume monitoring environments
- Parse Time: 0.01-1ms per packet (tracked via histogram)
- State Memory: Bounded by
max_entries * ~1KB - CPU: Additional ~10% for parsing and correlation
- Suitable For: Moderate-volume monitoring (< 10k packets/sec)
- Memory Safety: TTL-based cleanup prevents unbounded growth
No changes required! The shoveler binary preserves existing behavior.
-
Install the
xrootd-monitoring-collectorbinary alongside or instead ofshoveler. -
Configure state management (optional, defaults shown):
state: entry_ttl: 300 # seconds max_entries: 10000 # 0 for unlimited
-
Run the collector:
xrootd-monitoring-collector -c /path/to/config.yaml
Or as a service:
systemctl start xrootd-monitoring-collector.service
-
Monitor metrics:
curl http://localhost:8000/metrics | grep shoveler_ -
Verify records:
- Records are now structured CollectorRecord format
- Check message bus for new format
- Backward Compatibility: Existing deployments continue working
- Performance: Shoveling mode optimized for throughput
- Flexibility: Choose processing level based on needs
- Prevents Memory Leaks: Automatic cleanup of stale entries
- Handles Incomplete Flows: Close without open, open without close
- Configurable: Adjust TTL based on environment
- Separation of Concerns: Parsing separate from correlation
- Testability: Easy to unit test each component
- Extensibility: Easy to add new correlation logic
- Multi-Server Support: Properly handle multiple XRootD instances
- State Isolation: Prevent cross-server state contamination
- Server Restart Detection: Detect and handle server restarts via timestamp changes
The implementation spans the following files and packages:
Core Configuration:
config.go- Configuration system with mode selection and state management parameters
Parser Package (parser/):
xrootd_parser.go- XRootD binary packet parsing (323 lines)xrootd_parser_test.go- Parser unit tests (190 lines)
Collector Package (collector/):
state.go- TTL-based state management (150 lines)state_test.go- State map unit tests (158 lines)correlator.go- File operation correlation engine (262 lines)correlator_test.go- Correlator unit tests (245 lines)
Input Package (input/):
input.go- Unified packet source interface and implementations (254 lines)
Main Application:
cmd/shoveler/main.go- Shoveling mode implementation (110 lines)cmd/collector/main.go- Collector mode implementation (refactored, ~434 lines)integration_test.go- End-to-end integration tests (140 lines)
Metrics & Configuration:
metrics.go- Extended Prometheus metrics (43 lines)config/config-collector.yaml- Example collector configuration (84 lines)
Total Implementation: 2,102 lines added across 15 files
Current Limitations:
- Message Bus Input - Infrastructure created but not fully tested in production
- G-Stream Packets - Parsed but not fully decoded (can be enhanced)
- Integration Tests - Limited coverage (can be expanded)
Possible Future Additions:
- Full g-stream packet decoding
- Additional correlation patterns (e.g., concurrent transfer tracking)
- Performance benchmarks and optimization
- More comprehensive integration tests
- Support for additional message bus types (Kafka, NATS)
- Distributed state management for horizontally scaled collectors
- XRootD Monitoring Protocol Specification
- Pelican Reference Implementation
- Python Collector (for parity)
For questions, issues, or troubleshooting:
- Check the configuration examples in
config/directory - Review test cases for usage examples in
*_test.gofiles - Monitor Prometheus metrics for debugging at
http://localhost:8000/metrics - Enable debug logging with environment variable:
SHOVELER_DEBUG=true - Review packet parsing by checking collector logs for parse errors
- Verify state management by monitoring
shoveler_state_sizemetric
Distributed under the Apache 2.0 License. See LICENSE.txt for more information.
This project is supported by the National Science Foundation under Cooperative Agreements OAC-2030508 and OAC-1836650.