[Server]: Queue Events from the Server Node using multiple channel consumers#3823
[Server]: Queue Events from the Server Node using multiple channel consumers#3823Copilot wants to merge 15 commits into
Conversation
…ngle-channel bottleneck For the Server node (ObjectIds.Server), which subscribes to all events, the previous single-channel/single-consumer architecture serialized event processing across all event monitored items. This change creates a dedicated channel and consumer task per event MonitoredItem on the Server node, allowing concurrent event processing and eliminating the bottleneck.
Add ConsumerCount option to HistorianCaptureOptions (default 1). When ConsumerCount > 1, multiple consumer tasks drain the same bounded Channel concurrently, improving throughput when the historian provider has significant insert latency.
…nitoredNode2 Replace per-item event channels/consumers with a single shared event channel drained by multiple consumer tasks for the Server node. This reduces write amplification and simplifies lifecycle management while retaining concurrent event processing throughput. Add ServerNode_MultipleEventConsumers_AllEventsDelivered test.
Replace fixed event consumer count with dynamic scaling that starts with 1 consumer task and adds/removes consumers as event monitored items are created/deleted on the Server node. - Remove k_serverNodeEventConsumerCount constant and internal constructor - Always start with 1 event consumer task - Scale up when event MI is added (1 consumer per MI) - Scale down when event MI is removed (keep minimum of 1) - Use List<EventConsumerEntry> with lock for thread-safe scaling - Update tests to use public constructor (scaling is automatic)
…iple consumers for Server node
|
|
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #3823 +/- ##
==========================================
- Coverage 71.13% 71.13% -0.01%
==========================================
Files 778 778
Lines 143274 143351 +77
Branches 24234 24252 +18
==========================================
+ Hits 101918 101967 +49
- Misses 33005 33030 +25
- Partials 8351 8354 +3 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
marcschier
left a comment
There was a problem hiding this comment.
@copilot address my feedback
…th opt-out support, fix method ordering and test style
All feedback addressed in the latest commit:
|
|
@copilot check the flaky CI and fix the test to run reliably |
…QueueValue The tests were signaling on ValidateRolePermissionsAsync which fires before the cache is populated (line 545). By the time DefaultPermissionsChanged cleared the cache, it was still empty, and the consumer then populated it after the clear. The second notification then found a populated cache and skipped validation. Fix: signal on QueueValue which fires AFTER the cache is populated, ensuring the cache invalidation actually clears a populated cache.
Fixed in the latest commit. The race condition was caused by signaling on |
|
@copilot resolve the merge conflicts in this pull request |
…-case-for-consumer-task # Conflicts: # Tests/Opc.Ua.Server.Tests/MonitoredNode2Tests.cs
Resolved the merge conflicts in |
Description
This pull request introduces a scalable consumer task model for the
MonitoredNode2class, optimizing the handling of event notifications for notifier nodes. Any notifier node can opt into (or out of) dynamic scaling of consumer tasks based on the number of event monitored items, improving throughput and responsiveness under high event loads. The Server node (ObjectIds.Server) is always auto-opted-in. The implementation includes safe scaling up and down of consumers, robust disposal logic, Fluent API integration, and tests to verify correct delivery of events.Scalable consumer task management:
enableMultipleEventConsumersconstructor parameter toMonitoredNode2that enables dynamic scaling of consumer tasks to match the number of event monitored items. The Server node (ObjectIds.Server) always auto-opts-in regardless of this parameter.AllowMultipleEventConsumers(NodeId, bool enable = true)protected method onAsyncCustomNodeManagerwith an internalMultiConsumerNodeIdsregistry, so any node manager can programmatically opt in or out specific nodes.IsMultipleEventConsumerNode(NodeId)to theIAsyncNodeManagerinterface, allowing consumers to query multi-consumer opt-in status through the interface without casting toAsyncCustomNodeManager.MonitoredNodeMonitoredItemManagerandSamplingGroupMonitoredItemManageruse theIAsyncNodeManagerinterface method when creatingMonitoredNode2instances.Fluent API integration:
AllowMultipleEventConsumers(bool enable = true)method toINodeBuilderand itsNodeBuilderimplementation, allowing any notifier node to be opted in or out via the fluent builder pipeline (e.g.builder.Node("myNotifier").AllowMultipleEventConsumers()).NodeManagerBuilderroutes the registration to the owningAsyncCustomNodeManager's multi-consumer registry, supporting both opt-in and opt-out.Testing and verification:
OnMonitoredNodeChanged_DefaultPermissionsChanged_CacheInvalidatedandInvalidatePermissionCacheForSessiontests) that caused flaky CI failures. The tests now signal onQueueValue(which fires after the cache is populated) instead ofValidateRolePermissionsAsync(which fires before), ensuring cache invalidation operates on a populated cache.Related Issues
Checklist