Skip to content
Open
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 113 additions & 2 deletions Libraries/Opc.Ua.Server/NodeManager/MonitoredItem/MonitoredNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,20 @@ public MonitoredNode2(IAsyncNodeManager nodeManager, IServerInternal server, Nod
m_server = server ?? throw new ArgumentNullException(nameof(server));
Node = node ?? throw new ArgumentNullException(nameof(node));
m_logger = server.Telemetry?.CreateLogger<MonitoredNode2>();
m_isServerNode = node.NodeId == ObjectIds.Server;
Comment thread
marcschier marked this conversation as resolved.
Outdated
m_channel = Channel.CreateBounded<INodeNotification>(new BoundedChannelOptions(k_defaultChannelCapacity)
{
SingleReader = true,
SingleReader = !m_isServerNode,
FullMode = BoundedChannelFullMode.Wait,
AllowSynchronousContinuations = false,
});
m_consumerCts = new CancellationTokenSource();
m_consumerTask = Task.Run(() => ProcessChannelAsync(m_consumerCts.Token));

if (m_isServerNode)
{
m_additionalConsumers = new List<ConsumerEntry>();
}
}

/// <summary>
Expand Down Expand Up @@ -165,6 +171,21 @@ public void Add(IEventMonitoredItem eventItem)
EventMonitoredItems.TryAdd(eventItem.Id, eventItem);

Node.OnReportEvent = OnReportEvent;

// Scale up: add a consumer task for each new event MI beyond the first.
if (m_isServerNode && m_additionalConsumers != null)
{
lock (m_additionalConsumersLock)
{
// The primary consumer task always runs; add additional ones
// so total consumers = EventMonitoredItems.Count.
int totalDesired = EventMonitoredItems.Count;
if (totalDesired > m_additionalConsumers.Count + 1)
{
AddConsumer();
}
}
}
}

/// <summary>
Expand All @@ -180,6 +201,20 @@ public void Remove(IEventMonitoredItem eventItem)
{
Node.OnReportEvent = null;
}

// Scale down: remove a consumer task when MIs decrease (keep at least 1 total = primary).
if (m_isServerNode && m_additionalConsumers != null)
{
lock (m_additionalConsumersLock)
{
// Total consumers = 1 (primary) + m_additionalConsumers.Count
int totalDesired = Math.Max(1, EventMonitoredItems.Count);
while (m_additionalConsumers.Count + 1 > totalDesired)
{
RemoveLastConsumer();
}
}
}
}

/// <summary>
Expand Down Expand Up @@ -690,6 +725,50 @@ private void DropEventPermissionCacheEntries(uint monitoredItemId)
}
}

/// <summary>
/// Adds a new consumer task to the pool for the regular channel.
/// Must be called while holding the <see cref="m_additionalConsumersLock"/> lock.
/// </summary>
private void AddConsumer()
{
var cts = CancellationTokenSource.CreateLinkedTokenSource(m_consumerCts.Token);
var task = Task.Run(() => ProcessChannelAsync(cts.Token));
m_additionalConsumers!.Add(new ConsumerEntry(task, cts));
}

/// <summary>
/// Removes the last additional consumer task from the pool.
/// Must be called while holding the <see cref="m_additionalConsumersLock"/> lock.
/// </summary>
private void RemoveLastConsumer()
{
if (m_additionalConsumers!.Count == 0)
{
return;
}

int lastIndex = m_additionalConsumers.Count - 1;
ConsumerEntry entry = m_additionalConsumers[lastIndex];
m_additionalConsumers.RemoveAt(lastIndex);
entry.Cts.Cancel();
entry.Cts.Dispose();
}

/// <summary>
/// Represents a single consumer task and its associated cancellation token.
/// </summary>
private readonly struct ConsumerEntry
{
public ConsumerEntry(Task task, CancellationTokenSource cts)
{
Task = task;
Cts = cts;
}

public Task Task { get; }
public CancellationTokenSource Cts { get; }
}

private readonly ConcurrentDictionary<uint, (ServerSystemContext Context, int CreatedAtTicks)> m_contextCache =
new();

Expand All @@ -706,6 +785,9 @@ private void DropEventPermissionCacheEntries(uint monitoredItemId)
private readonly Channel<INodeNotification> m_channel;
private readonly CancellationTokenSource m_consumerCts;
private readonly Task m_consumerTask;
private readonly bool m_isServerNode;
private readonly List<ConsumerEntry>? m_additionalConsumers;
private readonly Lock m_additionalConsumersLock = new();
private bool m_disposed;

/// <inheritdoc/>
Expand All @@ -728,9 +810,38 @@ protected virtual void Dispose(bool disposing)

if (disposing)
{
// Complete the writer; the consumer drains remaining items and exits normally.
// Complete the writer; consumers drain remaining items and exit normally.
m_channel.Writer.TryComplete();

// Wait for additional consumer tasks to finish.
if (m_additionalConsumers != null)
{
Task[] tasks;
lock (m_additionalConsumersLock)
{
tasks = m_additionalConsumers.ConvertAll(e => e.Task).ToArray();
}

try
Comment thread
romanett marked this conversation as resolved.
{
Task.WaitAll(tasks, TimeSpan.FromSeconds(5));
}
catch (AggregateException)
{
// Ignore exceptions during shutdown
}

lock (m_additionalConsumersLock)
{
foreach (ConsumerEntry entry in m_additionalConsumers)
{
entry.Cts.Cancel();
entry.Cts.Dispose();
}
m_additionalConsumers.Clear();
}
}

if (m_consumerTask != null)
{
try
Expand Down
77 changes: 77 additions & 0 deletions Tests/Opc.Ua.Server.Tests/MonitoredNode2Tests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1443,5 +1443,82 @@ BaseEventState BuildEvent()

Assert.That(validationCalls, Is.EqualTo(2));
}

/// <summary>
/// Verifies that when the MonitoredNode2 is configured for the Server object
/// (ObjectIds.Server), consumer tasks scale up as event monitored items are added
/// and all events are delivered to all monitored items.
/// </summary>
[Test]
public void ServerNode_MultipleEventConsumers_AllEventsDelivered()
{
// Use ObjectIds.Server so the Server-node multi-consumer path is activated
var node = new BaseObjectState(null)
{
NodeId = ObjectIds.Server,
BrowseName = new QualifiedName("Server", 0)
};

int validationCalls = 0;
var nodeManagerMock = new Mock<IAsyncNodeManager>();
nodeManagerMock
.Setup(m => m.ValidateEventRolePermissionsAsync(
It.IsAny<IEventMonitoredItem>(),
It.IsAny<IFilterTarget>(),
It.IsAny<CancellationToken>()))
.Returns(() =>
{
Interlocked.Increment(ref validationCalls);
return new ValueTask<ServiceResult>(ServiceResult.Good);
});

var serverMock = new Mock<IServerInternal>();
serverMock.Setup(s => s.Auditing).Returns(false);

// Create multiple event monitored items
Mock<IEventMonitoredItem> item1Mock = CreateEventMonitoredItemMock(1u);
Mock<IEventMonitoredItem> item2Mock = CreateEventMonitoredItemMock(2u);
Mock<IEventMonitoredItem> item3Mock = CreateEventMonitoredItemMock(3u);

// Consumer tasks scale up as MIs are added (starts with 1, scales to 3)
var monitoredNode = new MonitoredNode2(
nodeManagerMock.Object, serverMock.Object, node);
monitoredNode.Add(item1Mock.Object);
monitoredNode.Add(item2Mock.Object);
monitoredNode.Add(item3Mock.Object);

ISystemContext context = new Mock<ISystemContext>().Object;

BaseEventState BuildEvent()
{
var ev = new BaseEventState(null);
ev.EventType = new PropertyState<NodeId>.Implementation<VariantBuilder>(ev) { Value = ObjectTypeIds.GeneralModelChangeEventType };
ev.SourceNode = new PropertyState<NodeId>.Implementation<VariantBuilder>(ev) { Value = ObjectIds.Server };
return ev;
}

const int eventCount = 10;
for (int i = 0; i < eventCount; i++)
{
monitoredNode.OnReportEvent(context, node, BuildEvent());
}

// Dispose drains the event channel
monitoredNode.Dispose();

// Each event should be delivered to all 3 monitored items
int item1Queued = item1Mock.Invocations
.Count(inv => inv.Method.Name == nameof(IEventMonitoredItem.QueueEvent));
int item2Queued = item2Mock.Invocations
.Count(inv => inv.Method.Name == nameof(IEventMonitoredItem.QueueEvent));
int item3Queued = item3Mock.Invocations
.Count(inv => inv.Method.Name == nameof(IEventMonitoredItem.QueueEvent));

Assert.That(item1Queued + item2Queued + item3Queued, Is.EqualTo(eventCount * 3),
"Every event must be delivered to every monitored item.");
Assert.That(item1Queued, Is.EqualTo(eventCount));
Assert.That(item2Queued, Is.EqualTo(eventCount));
Assert.That(item3Queued, Is.EqualTo(eventCount));
}
}
}
Loading