Skip to content
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions Libraries/Opc.Ua.Server/Fluent/INodeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ public interface INodeBuilder
/// </summary>
INodeBuilder OnEvent(EventNotificationHandler handler);

/// <summary>
/// Opts the resolved node into dynamic scaling of event consumer
/// tasks. When enabled, the <see cref="MonitoredNode2"/> for
/// this node scales consumer tasks with the number of event
/// monitored items, improving throughput under high event loads.
/// The Server node (<see cref="ObjectIds.Server"/>) is always
/// auto-opted-in regardless of this call.
/// </summary>
INodeBuilder EnableMultipleEventConsumers();
Comment thread
marcschier marked this conversation as resolved.
Outdated

/// <summary>
/// Resolves a child of the current node by browse name. Used by
/// source-generated typed traversal wrappers to walk one segment
Expand Down
7 changes: 7 additions & 0 deletions Libraries/Opc.Ua.Server/Fluent/NodeBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,13 @@ public INodeBuilder OnEvent(EventNotificationHandler handler)
return this;
}

/// <inheritdoc/>
public INodeBuilder EnableMultipleEventConsumers()
{
m_parent.RegisterMultiConsumerNode(Node);
return this;
}

/// <inheritdoc/>
public INodeBuilder Child(QualifiedName browseName)
{
Expand Down
14 changes: 14 additions & 0 deletions Libraries/Opc.Ua.Server/Fluent/NodeManagerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -455,6 +455,20 @@ internal void RegisterNodeRemoved(NodeState node, NodeLifecycleHandler handler)
m_nodeRemoved[node.NodeId] = handler;
}

internal void RegisterMultiConsumerNode(NodeState node)
{
if (NodeManager is not AsyncCustomNodeManager acnm)
{
throw ServiceResultException.Create(
StatusCodes.BadConfigurationError,
"EnableMultipleEventConsumers requires the node manager to derive from AsyncCustomNodeManager. " +
"Manager type '{0}' does not qualify.",
NodeManager?.GetType().FullName ?? "(unknown)");
}

acnm.MultiConsumerNodeIds[node.NodeId] = true;
}

private NodeState ResolveNodeId(NodeId nodeId)
{
if (nodeId.IsNull)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,6 +762,12 @@ public ValueTask<ServiceResult> ValidateRolePermissionsAsync(
return new ValueTask<ServiceResult>(ServiceResult.Good);
}

/// <inheritdoc/>
public bool IsMultipleEventConsumerNode(NodeId nodeId)
{
return false;
}

/// <summary>
/// Frees any unmanaged resources.
/// </summary>
Expand Down
30 changes: 30 additions & 0 deletions Libraries/Opc.Ua.Server/NodeManager/AsyncCustomNodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,29 @@ protected ConcurrentDictionary<uint, IMonitoredItem> MonitoredItems
/// <inheritdoc/>
public INodeManager SyncNodeManager => m_syncNodeManager;

/// <summary>
/// Marks a node for dynamic scaling of event consumer tasks. When a
/// monitored node is created for a node in this set, the
/// <see cref="MonitoredNode2"/> will scale its consumer tasks with
/// the number of event monitored items. The Server node
/// (<see cref="ObjectIds.Server"/>) is always auto-opted-in.
/// </summary>
/// <param name="nodeId">The <see cref="NodeId"/> of the node to opt in.</param>
protected void EnableMultipleEventConsumers(NodeId nodeId)
{
if (nodeId.IsNull)
{
throw new ArgumentException("NodeId must not be null.", nameof(nodeId));
}
MultiConsumerNodeIds[nodeId] = true;
}

/// <inheritdoc/>
public bool IsMultipleEventConsumerNode(NodeId nodeId)
{
return MultiConsumerNodeIds.ContainsKey(nodeId);
}

/// <summary>
/// Sets the namespaces supported by the NodeManager.
/// </summary>
Expand Down Expand Up @@ -6008,6 +6031,13 @@ public void Dispose()
/// </summary>
protected SemaphoreSlim m_monitoredItemSemaphore = new(1, 1);

/// <summary>
/// Set of <see cref="NodeId"/>s that opt into multiple event consumer
/// task handling. Nodes in this set will use dynamic scaling of
/// consumer tasks based on the number of event monitored items.
/// </summary>
internal NodeIdDictionary<bool> MultiConsumerNodeIds { get; } = new();
Comment thread
romanett marked this conversation as resolved.

/// <summary>
/// Counter for the NodeIdFactory.New Method
/// </summary>
Expand Down
7 changes: 7 additions & 0 deletions Libraries/Opc.Ua.Server/NodeManager/INodeManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -915,6 +915,13 @@ ValueTask<ServiceResult> ValidateRolePermissionsAsync(
NodeId nodeId,
PermissionType requestedPermission,
CancellationToken cancellationToken = default);

/// <summary>
/// Returns <c>true</c> if the specified node has opted into
/// multiple event consumer task handling.
/// </summary>
/// <param name="nodeId">The <see cref="NodeId"/> to check.</param>
bool IsMultipleEventConsumerNode(NodeId nodeId);
}

/// <summary>
Expand Down
144 changes: 141 additions & 3 deletions Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,20 +56,35 @@ public class MonitoredNode2 : IDisposable
/// <param name="nodeManager">The node manager.</param>
/// <param name="server">The server.</param>
/// <param name="node">The node.</param>
public MonitoredNode2(IAsyncNodeManager nodeManager, IServerInternal server, NodeState node)
/// <param name="enableMultipleEventConsumers">
/// When <c>true</c>, enables dynamic scaling of consumer tasks based on
/// the number of event monitored items. The Server node
/// (<see cref="ObjectIds.Server"/>) always opts in automatically.
/// </param>
public MonitoredNode2(
IAsyncNodeManager nodeManager,
IServerInternal server,
NodeState node,
bool enableMultipleEventConsumers = false)
{
NodeManager = nodeManager ?? throw new ArgumentNullException(nameof(nodeManager));
m_server = server ?? throw new ArgumentNullException(nameof(server));
Node = node ?? throw new ArgumentNullException(nameof(node));
m_logger = server.Telemetry?.CreateLogger<MonitoredNode2>();
m_useMultipleConsumers = enableMultipleEventConsumers || node.NodeId == ObjectIds.Server;
m_channel = Channel.CreateBounded<INodeNotification>(new BoundedChannelOptions(k_defaultChannelCapacity)
{
SingleReader = true,
SingleReader = !m_useMultipleConsumers,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
m_consumerCts = new CancellationTokenSource();
m_consumerTask = Task.Run(() => ProcessChannelAsync(m_consumerCts.Token));

if (m_useMultipleConsumers)
{
m_additionalConsumers = new List<ConsumerEntry>();
}
}

/// <summary>
Expand Down Expand Up @@ -165,6 +180,21 @@ public void Add(IEventMonitoredItem eventItem)
EventMonitoredItems.TryAdd(eventItem.Id, eventItem);

Node.OnReportEvent = OnReportEvent;

// Scale up: add a consumer task for each new event MI beyond the first.
if (m_useMultipleConsumers && m_additionalConsumers != null)
{
lock (m_additionalConsumersLock)
{
// The primary consumer task always runs; add additional ones
// so total consumers = EventMonitoredItems.Count.
int totalDesired = EventMonitoredItems.Count;
if (totalDesired > m_additionalConsumers.Count + 1)
{
AddConsumer();
}
}
}
}

/// <summary>
Expand All @@ -180,6 +210,20 @@ public void Remove(IEventMonitoredItem eventItem)
{
Node.OnReportEvent = null;
}

// Scale down: remove a consumer task when MIs decrease (keep at least 1 total = primary).
if (m_useMultipleConsumers && m_additionalConsumers != null)
{
lock (m_additionalConsumersLock)
{
// Total consumers = 1 (primary) + m_additionalConsumers.Count
int totalDesired = Math.Max(1, EventMonitoredItems.Count);
while (m_additionalConsumers.Count + 1 > totalDesired)
{
RemoveLastConsumer();
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -690,6 +734,50 @@ private void DropEventPermissionCacheEntries(uint monitoredItemId)
}
}

/// <summary>
/// Adds a new consumer task to the pool for the regular channel.
/// Must be called while holding the <see cref="m_additionalConsumersLock"/> lock.
/// </summary>
private void AddConsumer()
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(m_consumerCts.Token);
var task = Task.Run(() => ProcessChannelAsync(cts.Token));
m_additionalConsumers!.Add(new ConsumerEntry(task, cts));
}

/// <summary>
/// Removes the last additional consumer task from the pool.
/// Must be called while holding the <see cref="m_additionalConsumersLock"/> lock.
/// </summary>
private void RemoveLastConsumer()
{
if (m_additionalConsumers!.Count == 0)
{
return;
}

int lastIndex = m_additionalConsumers.Count - 1;
ConsumerEntry entry = m_additionalConsumers[lastIndex];
m_additionalConsumers.RemoveAt(lastIndex);
entry.Cts.Cancel();
entry.Cts.Dispose();
}

/// <summary>
/// Represents a single consumer task and its associated cancellation token.
/// </summary>
private readonly struct ConsumerEntry
{
public ConsumerEntry(Task task, CancellationTokenSource cts)
{
Task = task;
Cts = cts;
}

public Task Task { get; }
public CancellationTokenSource Cts { get; }
}

private readonly ConcurrentDictionary<uint, (ServerSystemContext Context, int CreatedAtTicks)> m_contextCache =
new();

Expand All @@ -706,6 +794,9 @@ private void DropEventPermissionCacheEntries(uint monitoredItemId)
private readonly Channel<INodeNotification> m_channel;
private readonly CancellationTokenSource m_consumerCts;
private readonly Task m_consumerTask;
private readonly bool m_useMultipleConsumers;
private readonly List<ConsumerEntry>? m_additionalConsumers;
private readonly Lock m_additionalConsumersLock = new();
private bool m_disposed;

/// <inheritdoc/>
Expand All @@ -728,9 +819,56 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
// Complete the writer; the consumer drains remaining items and exits normally.
// Complete the writer; consumers drain remaining items and exit normally.
m_channel.Writer.TryComplete();

// Wait for additional consumer tasks to finish.
if (m_additionalConsumers != null)
{
ConsumerEntry[] entries;
lock (m_additionalConsumersLock)
{
entries = m_additionalConsumers.ToArray();
m_additionalConsumers.Clear();
}

Task[] tasks = Array.ConvertAll(entries, e => e.Task);

try
Comment thread
romanett marked this conversation as resolved.
{
// Bound the wait — do not block indefinitely if consumers are stuck.
bool completed = Task.WaitAll(tasks, TimeSpan.FromSeconds(5));

if (!completed)
{
m_logger?.LogWarning(
"MonitoredNode2 additional consumers did not drain within 5 s; cancelling forcibly.");

foreach (ConsumerEntry entry in entries)
{
entry.Cts.Cancel();
}

try
{
Task.WaitAll(tasks);
}
catch { }
}
}
catch (Exception ex)
{
m_logger?.LogWarning(ex, "MonitoredNode2 additional consumers faulted during shutdown.");
}
finally
{
foreach (ConsumerEntry entry in entries)
{
entry.Cts.Dispose();
}
}
}

if (m_consumerTask != null)
{
try
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,8 @@ public ISampledDataChangeMonitoredItem CreateMonitoredItem(
{
NodeState cachedNode = addNodeToComponentCache(context, handle, handle.Node);
MonitoredNodes[handle.Node.NodeId]
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, cachedNode);
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, cachedNode,
IsMultiConsumerNode(cachedNode.NodeId));
}

handle.Node = monitoredNode!.Node;
Expand Down Expand Up @@ -224,7 +225,8 @@ public bool RestoreMonitoredItem(
{
NodeState cachedNode = addNodeToComponentCache(context, handle, handle.Node);
MonitoredNodes[handle.Node.NodeId]
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, cachedNode);
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, cachedNode,
IsMultiConsumerNode(cachedNode.NodeId));
}

handle.Node = monitoredNode!.Node;
Expand Down Expand Up @@ -315,7 +317,8 @@ public bool RestoreMonitoredItem(
if (!MonitoredNodes.TryGetValue(source.NodeId, out monitoredNode!))
{
MonitoredNodes[source.NodeId]
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, source);
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, source,
IsMultiConsumerNode(source.NodeId));
}

// remove existing monitored items with the same Id prior to insertion in order to avoid duplicates
Expand All @@ -335,5 +338,10 @@ public bool RestoreMonitoredItem(

private readonly IAsyncNodeManager m_nodeManager;
private readonly IServerInternal m_server;

Comment thread
marcschier marked this conversation as resolved.
Outdated
private bool IsMultiConsumerNode(NodeId nodeId)
{
return m_nodeManager.IsMultipleEventConsumerNode(nodeId);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,8 @@ public bool RestoreMonitoredItem(
if (!MonitoredNodes.TryGetValue(source.NodeId, out monitoredNode))
{
MonitoredNodes[source.NodeId]
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, source);
= monitoredNode = new MonitoredNode2(m_nodeManager, m_server, source,
IsMultiConsumerNode(source.NodeId));
}

// remove existing monitored items with the same Id prior to insertion in order to avoid duplicates
Expand All @@ -342,5 +343,10 @@ public bool RestoreMonitoredItem(
private readonly IAsyncNodeManager m_nodeManager;
private readonly IServerInternal m_server;
private readonly SamplingGroupManager m_samplingGroupManager;

Comment thread
marcschier marked this conversation as resolved.
Outdated
private bool IsMultiConsumerNode(NodeId nodeId)
{
return m_nodeManager.IsMultipleEventConsumerNode(nodeId);
}
}
}
3 changes: 3 additions & 0 deletions Tests/Opc.Ua.Server.Tests/AsyncCustomNodeManagerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5003,6 +5003,9 @@ public ValueTask<ServiceResult> ValidateRolePermissionsAsync(
CancellationToken cancellationToken = default)
=> m_adapter.ValidateRolePermissionsAsync(operationContext, nodeId, requestedPermission, cancellationToken);

public bool IsMultipleEventConsumerNode(NodeId nodeId)
Comment thread
marcschier marked this conversation as resolved.
=> m_adapter.IsMultipleEventConsumerNode(nodeId);

public NodeIdDictionary<NodeState> RootNotifiers => m_cnm2.RootNotifiersDictionary;

public ValueTask AddRootNotifierPublicAsync(NodeState notifier, CancellationToken cancellationToken = default)
Expand Down
Loading
Loading