Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/buildandtest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ jobs:
matrix:
# os: [ubuntu-latest, windows-latest, macOS-latest] - disable mac os due to cost
os: [ubuntu-latest, windows-latest]
csproj: [Security.Certificates, Types, Core, Core.Encoders, Core.Security, Server, Client, Client.ComplexTypes, History, InformationModel, Lds, PubSub, Sessions, Subscriptions, Configuration, Gds, SourceGeneration.Stack, SourceGeneration, SourceGeneration.Core, WotCon]
csproj: [Security.Certificates, Types, Core, Core.Encoders, Core.Security, Server, Client, Client.ComplexTypes, History, InformationModel, Lds, PubSub, Sessions, Subscriptions, Subscriptions.Classic, Configuration, Gds, SourceGeneration.Stack, SourceGeneration, SourceGeneration.Core, WotCon]
include:
- framework: 'net10.0'
dotnet-version: '10.0.x'
Expand Down
140 changes: 140 additions & 0 deletions Libraries/Opc.Ua.Client/Fluent/ManagedSessionExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
* ======================================================================*/

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua.Client.Subscriptions;

namespace Opc.Ua.Client
{
Expand Down Expand Up @@ -156,5 +162,139 @@ public static bool TryAddMonitoredItem(
configure(new Subscriptions.MonitoredItems.MonitoredItemOptions { StartNodeId = nodeId }),
out monitoredItem);
}

/// <summary>
/// Persist every subscription managed by <paramref name="session"/>
/// (or a caller-supplied subset) to <paramref name="destination"/>
/// in OPC UA binary encoding. The format starts with the
/// session's namespace and server URI tables so the snapshot is
/// portable across sessions whose tables index URIs in different
/// positions.
/// </summary>
/// <param name="session">Session whose
/// <see cref="ManagedSession.SubscriptionManager"/> is being
/// snapshotted.</param>
/// <param name="destination">Writable destination stream.</param>
/// <param name="subscriptions">Optional subset of subscriptions
/// to include. When <c>null</c> every subscription currently
/// managed by <paramref name="session"/> is included.</param>
/// <param name="ct">Cancellation token.</param>
public static ValueTask SaveSubscriptionsAsync(
this ManagedSession session,
Stream destination,
IEnumerable<ISubscription>? subscriptions = null,
CancellationToken ct = default)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
return session.SubscriptionManager.SaveAsync(
destination, session.MessageContext, subscriptions, ct);
}

/// <summary>
/// Restore subscriptions previously persisted by
/// <see cref="SaveSubscriptionsAsync"/>. Each restored subscription is
/// re-registered with
/// <see cref="ManagedSession.SubscriptionManager"/>.
/// </summary>
/// <param name="session">Session that owns the V2 subscription
/// manager and supplies the active message context.</param>
/// <param name="source">Readable source stream produced by
/// <see cref="SaveSubscriptionsAsync"/>.</param>
/// <param name="handlerFactory">Factory invoked once per
/// restored subscription to construct the application's
/// <see cref="ISubscriptionNotificationHandler"/>. The factory
/// receives the per-subscription stable name captured in the
/// snapshot.</param>
/// <param name="transferSubscriptions">When <c>true</c> the
/// restored subscriptions take over the original server-side
/// state via <c>TransferSubscriptions</c>; if that fails for any
/// subscription the V2 manager falls back to recreate.</param>
/// <param name="ct">Cancellation token.</param>
public static ValueTask<IReadOnlyList<ISubscription>> LoadSubscriptionsAsync(
this ManagedSession session, Stream source,
Func<string, ISubscriptionNotificationHandler> handlerFactory,
bool transferSubscriptions = false, CancellationToken ct = default)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
return session.SubscriptionManager.LoadAsync(source,
session.MessageContext, handlerFactory,
transferSubscriptions, ct);
}

/// <summary>
/// Capture an in-memory snapshot of every subscription managed
/// by <paramref name="session"/>. The returned list of
/// <see cref="SubscriptionStateSnapshot"/>s can be persisted by
/// the caller in any format and later passed to
/// <see cref="RestoreSubscriptionsAsync"/>.
/// </summary>
public static IReadOnlyList<SubscriptionStateSnapshot> SnapshotSubscriptions(
this ManagedSession session)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
var result = new List<SubscriptionStateSnapshot>();
foreach (ISubscription s in session.SubscriptionManager.Items)
{
if (s is Subscriptions.Subscription concrete)
{
result.Add(concrete.Snapshot());
}
}
return result;
}

/// <summary>
/// Restore a list of <see cref="SubscriptionStateSnapshot"/>s
/// previously captured by <see cref="SnapshotSubscriptions"/>.
/// </summary>
/// <param name="session">Session that owns the V2 subscription
/// manager.</param>
/// <param name="states">Snapshots to restore.</param>
/// <param name="handlerFactory">Factory invoked once per
/// snapshot to construct the application's notification
/// handler. The factory receives the snapshot itself so callers
/// can route by options or per-item metadata.</param>
/// <param name="transferSubscriptions">When <c>true</c> the
/// restored subscriptions take over the original server-side
/// state via <c>TransferSubscriptions</c>; if that fails for
/// any subscription the V2 manager falls back to recreate.</param>
/// <param name="ct">Cancellation token.</param>
public static async ValueTask<IReadOnlyList<ISubscription>> RestoreSubscriptionsAsync(
this ManagedSession session,
IReadOnlyList<SubscriptionStateSnapshot> states,
Func<SubscriptionStateSnapshot, ISubscriptionNotificationHandler> handlerFactory,
bool transferSubscriptions = false,
CancellationToken ct = default)
{
if (session == null)
{
throw new ArgumentNullException(nameof(session));
}
if (states == null)
{
throw new ArgumentNullException(nameof(states));
}
if (handlerFactory == null)
{
throw new ArgumentNullException(nameof(handlerFactory));
}
var result = new List<ISubscription>(states.Count);
foreach (SubscriptionStateSnapshot state in states)
{
result.Add(await session.SubscriptionManager.RestoreAsync(
handlerFactory(state), state, transferSubscriptions, ct)
.ConfigureAwait(false));
}
return result;
}
}
}
1 change: 1 addition & 0 deletions Libraries/Opc.Ua.Client/Opc.Ua.Client.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
<InternalsVisibleTo Include="Opc.Ua.Session.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Sessions.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Subscriptions.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Subscriptions.Classic.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Subscriptions.Durable.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Features.Tests" />
<InternalsVisibleTo Include="Opc.Ua.Stress.Tests" />
Expand Down
7 changes: 5 additions & 2 deletions Libraries/Opc.Ua.Client/Session/DefaultSessionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ public class DefaultSessionFactory : ISessionFactory

/// <summary>
/// Optional subscription engine factory to use when constructing
/// a <see cref="Session"/>. When <c>null</c>, the session uses the
/// classic engine (<see cref="ClassicSubscriptionEngineFactory"/>).
Comment thread
marcschier marked this conversation as resolved.
/// a <see cref="Session"/>. When <c>null</c>, the session uses
/// the classic engine (<see cref="ClassicSubscriptionEngineFactory"/>).
/// New <see cref="ManagedSession"/> code paths default to the V2
/// engine (<see cref="DefaultSubscriptionEngineFactory"/>) via
/// the <c>ManagedSessionBuilder</c>.
/// </summary>
public ISubscriptionEngineFactory? SubscriptionEngineFactory { get; init; }

Expand Down
10 changes: 6 additions & 4 deletions Libraries/Opc.Ua.Client/Session/DefaultSubscriptionEngine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -198,13 +198,14 @@ public EngineContextAdapter(ISubscriptionEngineContext context)
public IManagedSubscription CreateSubscription(
ISubscriptionNotificationHandler handler,
IOptionsMonitor<Subscriptions.SubscriptionOptions> options,
IMessageAckQueue queue)
IMessageAckQueue queue,
Subscriptions.SubscriptionLoadState? loadState = null)
{
var subscriptionContext =
new SubscriptionContextAdapter(m_context);
return new DefaultSubscription(
subscriptionContext, handler, queue,
options, m_context.Telemetry);
options, m_context.Telemetry, loadState);
}

/// <inheritdoc/>
Expand Down Expand Up @@ -515,9 +516,10 @@ public DefaultSubscription(
ISubscriptionNotificationHandler handler,
IMessageAckQueue completion,
IOptionsMonitor<Subscriptions.SubscriptionOptions> options,
ITelemetryContext telemetry)
ITelemetryContext telemetry,
Subscriptions.SubscriptionLoadState? loadState = null)
: base(context, handler, completion,
options, telemetry)
options, telemetry, loadState)
{
}

Expand Down
11 changes: 8 additions & 3 deletions Libraries/Opc.Ua.Client/Session/Session.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,10 @@ channel is ITransportChannel transportChannel ?
/// <param name="discoveryProfileUris">The value of profileUris used in
/// GetEndpoints() request.</param>
/// <param name="engineFactory">Optional subscription engine factory. When
/// <c>null</c> the session uses <see cref="ClassicSubscriptionEngineFactory"/>
/// by default.</param>
/// <c>null</c> the session uses <see cref="DefaultSubscriptionEngineFactory"/>
Comment thread
marcschier marked this conversation as resolved.
Outdated
/// (the V2 engine) by default. Pass
/// <see cref="ClassicSubscriptionEngineFactory.Instance"/> explicitly
/// to opt into the classic engine.</param>
/// <remarks>
/// The application configuration is used to look up the certificate if none
/// is provided. The clientCertificate must have the private key. This will
Expand Down Expand Up @@ -282,7 +284,10 @@ private Session(
// Create timer for keep alive event triggering but in off state
m_keepAliveTimer = new Timer(_ => m_keepAliveEvent.Set(), this, Timeout.Infinite, Timeout.Infinite);

// Create the subscription engine.
// Create the subscription engine. Session defaults to the
// classic engine (legacy applications + classic Subscription
// API). ManagedSession explicitly opts in to the V2 engine
// via its builder.
SubscriptionEngineFactory = engineFactory
?? ClassicSubscriptionEngineFactory.Instance;
Comment thread
marcschier marked this conversation as resolved.
m_engine = SubscriptionEngineFactory.Create(new SessionEngineContext(this));
Expand Down
36 changes: 36 additions & 0 deletions Libraries/Opc.Ua.Client/Subscription/IMonitoredItem.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* ======================================================================*/

using System;
using System.Collections.Generic;

namespace Opc.Ua.Client.Subscriptions.MonitoredItems
{
Expand Down Expand Up @@ -85,5 +86,40 @@ public interface IMonitoredItem
/// The identifier assigned by the client.
/// </summary>
uint ClientHandle { get; }

/// <summary>
/// The client handle of the monitored item that triggers
/// this item, or <c>0</c> if no triggering relationship has
/// been recorded via
/// <see cref="ISubscription.SetTriggeringAsync"/>. Updated only
/// after a successful service call result for this link.
/// </summary>
uint TriggeringItemClientHandle { get; }

/// <summary>
/// Client handles of the monitored items that are triggered
/// by this item. Empty when this item does not currently
/// trigger any other items. Updated only after successful
/// service call results for each link.
/// </summary>
ArrayOf<uint> TriggeredItemClientHandles { get; }

/// <summary>
/// Issue an OPC UA Part 9 §5.5.7 ConditionRefresh2 method call
/// for this monitored item. The server responds by re-sending
/// the current state of every condition this item is monitoring
/// (bracketed by RefreshStartEvent and RefreshEndEvent), so the
/// client can rebuild a complete view after disconnect or
/// subscription transfer without missing currently-active
/// alarms.
/// </summary>
/// <param name="ct">Cancellation token.</param>
/// <exception cref="ServiceResultException">Raised with
/// <c>BadMonitoredItemIdInvalid</c> if this item has not been
/// created on the server yet, or with the server-returned
/// status if the method call fails.</exception>
System.Threading.Tasks.ValueTask ConditionRefreshAsync(
System.Threading.CancellationToken ct = default);
}
}

30 changes: 30 additions & 0 deletions Libraries/Opc.Ua.Client/Subscription/IMonitoredItemContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,36 @@ namespace Opc.Ua.Client.Subscriptions.MonitoredItems
/// </summary>
internal interface IMonitoredItemContext
{
/// <summary>
Comment thread
marcschier marked this conversation as resolved.
/// Server-assigned subscription id that owns this item.
/// Forwarded from <see cref="IMonitoredItemManagerContext.Id"/>
/// so per-item operations can issue service calls without
/// going back through the manager.
/// </summary>
uint SubscriptionId { get; }

/// <summary>
/// Method call services. Forwarded from
/// <see cref="IMonitoredItemManagerContext.MethodServiceSet"/>
/// for the same reason as <see cref="SubscriptionId"/>.
/// </summary>
IMethodServiceSetClientMethods MethodServiceSet { get; }

/// <summary>
/// Issue an OPC UA Part 9 §5.5.7 <c>ConditionRefresh2</c>
/// service call for the monitored item with the supplied
/// server-side <paramref name="monitoredItemServerId"/>. The
/// context already knows the subscription id and method service
/// set, so callers only forward their own server-side handle.
/// </summary>
/// <param name="monitoredItemServerId">Server-assigned monitored
/// item id (<see cref="IMonitoredItem.ServerId"/>). The item
/// must have been created on the server.</param>
/// <param name="ct">Cancellation token.</param>
System.Threading.Tasks.ValueTask ConditionRefreshAsync(
uint monitoredItemServerId,
System.Threading.CancellationToken ct = default);

/// <summary>
/// Notify item change results. This includes intermittent
/// errors trying to apply the monitored item options.
Expand Down
57 changes: 57 additions & 0 deletions Libraries/Opc.Ua.Client/Subscription/ISubscription.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* ======================================================================*/

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using Opc.Ua.Client.Subscriptions.MonitoredItems;
Expand Down Expand Up @@ -101,5 +102,61 @@ public interface ISubscription : IAsyncDisposable
/// <param name="ct"></param>
ValueTask ConditionRefreshAsync(
CancellationToken ct = default);

/// <summary>
/// Configure triggering relationships between monitored items
/// in this subscription. The triggering item, when it reports
/// a notification, causes the linked triggered items to report
/// their next sampled value too — even when those triggered
/// items are in <see cref="MonitoringMode.Sampling"/> mode.
/// Per OPC UA Part 4 §5.13.5, the service call reports per-link
/// status; this implementation updates the local triggering
/// state on the V2 <see cref="IMonitoredItem"/>s
/// <see cref="IMonitoredItem.TriggeringItemClientHandle"/> /
/// <see cref="IMonitoredItem.TriggeredItemClientHandles"/> only
/// for links whose service result is Good. Partial failures do
/// not corrupt local state; callers inspect the returned
/// <see cref="SetTriggeringResponse"/> for per-link results.
/// </summary>
/// <param name="triggeringItemClientHandle">
/// Client handle of the monitored item that owns the
/// triggering relationships.</param>
/// <param name="linksToAdd">Client handles of items to add as
/// triggered items. May be empty.</param>
/// <param name="linksToRemove">Client handles of items to
/// remove from the triggered set. May be empty.</param>
/// <param name="ct"></param>
/// <exception cref="ArgumentException">Raised when the
/// triggering item is not known to this subscription.</exception>
/// <exception cref="ServiceResultException">Raised when the
/// subscription has not been created on the server yet.</exception>
ValueTask<SetTriggeringResponse> SetTriggeringAsync(
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This API is too complicated to use. It should be via the IMonitoredItem items in some form, change the links between items, let the engine/manager apply the changed state.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Acknowledged - agree the API is too low-level. Not changed in this round; tracked as a separate follow-up plan to redesign as per-item LinkAsTriggeredBy(item) / UnlinkAsTriggeredBy(item) that the manager batches and applies via reconciliation. Needs its own design pass.

uint triggeringItemClientHandle,
IReadOnlyList<uint> linksToAdd,
IReadOnlyList<uint> linksToRemove,
CancellationToken ct = default);

/// <summary>
/// Mark this subscription as durable on the server (OPC UA Part 4
/// §5.13.9 <c>SetSubscriptionDurable</c>). A durable subscription
/// retains its monitored item state and message queue across
/// session disconnects for the duration of the requested
/// lifetime, so a later
/// <see cref="ISubscriptionManager.RestoreAsync"/> with
/// <c>transferSubscriptions: true</c> can take over without
/// losing buffered notifications.
/// </summary>
/// <param name="lifetimeInHours">Requested lifetime, in hours.
/// The server may revise downwards.</param>
/// <param name="ct">Cancellation token.</param>
/// <returns>The server-revised lifetime, in hours.</returns>
/// <exception cref="ServiceResultException">Raised when the
/// subscription is not yet created on the server, or when the
/// server rejects the call (e.g. it has monitored items already
/// — per spec <c>SetSubscriptionDurable</c> must be called
/// before any items are added).</exception>
ValueTask<uint> SetSubscriptionDurableAsync(
uint lifetimeInHours,
CancellationToken ct = default);
}
}
Loading
Loading