Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.StreamRefSettings Create(Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.StreamRefSettings WithBufferCapacity(int value) { }
public Akka.Streams.Dsl.StreamRefSettings WithDemandRedeliveryInterval(System.TimeSpan value) { }
public Akka.Streams.Dsl.StreamRefSettings WithFinalTerminationSignalDeadline(System.TimeSpan value) { }
public Akka.Streams.Dsl.StreamRefSettings WithSubscriptionTimeout(System.TimeSpan value) { }
}
[Akka.Annotations.ApiMayChangeAttribute()]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2269,6 +2269,7 @@ namespace Akka.Streams.Dsl
public static Akka.Streams.Dsl.StreamRefSettings Create(Akka.Configuration.Config config) { }
public Akka.Streams.Dsl.StreamRefSettings WithBufferCapacity(int value) { }
public Akka.Streams.Dsl.StreamRefSettings WithDemandRedeliveryInterval(System.TimeSpan value) { }
public Akka.Streams.Dsl.StreamRefSettings WithFinalTerminationSignalDeadline(System.TimeSpan value) { }
public Akka.Streams.Dsl.StreamRefSettings WithSubscriptionTimeout(System.TimeSpan value) { }
}
[Akka.Annotations.ApiMayChangeAttribute()]
Expand Down
74 changes: 74 additions & 0 deletions src/core/Akka.Streams.Tests/ActorMaterializerSettingsSpec.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// -----------------------------------------------------------------------
// <copyright file="ActorMaterializerSettingsSpec.cs" company="Akka.NET Project">
// Copyright (C) 2009-2022 Lightbend Inc. <http://www.lightbend.com>
// Copyright (C) 2013-2025 .NET Foundation <https://github.com/akkadotnet/akka.net>
// </copyright>
// -----------------------------------------------------------------------

using System;
using System.Linq;
using Akka.Configuration;
using Akka.Streams.Dsl;
using Xunit;
using Xunit.Abstractions;

namespace Akka.Streams.Tests;

public class ActorMaterializerSettingsSpec: Akka.TestKit.Xunit2.TestKit
{
private readonly ActorMaterializer _materializer;
public ActorMaterializerSettingsSpec(ITestOutputHelper output) : base(Config.Empty, nameof(ActorMaterializerSettingsSpec), output)
{
_materializer = ActorMaterializer.Create(Sys);
}

[Fact]
public void ActorMaterializerSettings_Should_contain_default_values()
{
var settings = _materializer.Settings;
Assert.Equal(4, settings.InitialInputBufferSize);
Assert.Equal(16, settings.MaxInputBufferSize);
Assert.Equal(string.Empty, settings.Dispatcher);
Assert.False(settings.IsDebugLogging);
Assert.Equal(1000, settings.OutputBurstLimit);
Assert.True(settings.IsAutoFusing);
Assert.Equal(1000000000, settings.MaxFixedBufferSize);
Assert.Equal(1000, settings.SyncProcessingLimit);
Assert.False(settings.IsFuzzingMode);

var subscriptionTimeoutSettings = settings.SubscriptionTimeoutSettings;
Assert.Equal(StreamSubscriptionTimeoutTerminationMode.CancelTermination, subscriptionTimeoutSettings.Mode);
Assert.Equal(TimeSpan.FromSeconds(5), subscriptionTimeoutSettings.Timeout);

var streamRefSettings = settings.StreamRefSettings;
Assert.Equal(32, streamRefSettings.BufferCapacity);
Assert.Equal(TimeSpan.FromSeconds(1), streamRefSettings.DemandRedeliveryInterval);
Assert.Equal(TimeSpan.FromSeconds(30), streamRefSettings.SubscriptionTimeout);
Assert.Equal(TimeSpan.FromSeconds(2), streamRefSettings.FinalTerminationSignalDeadline);
}

[Fact]
public void ActorMaterializer_serialization_binding_should_be_correct()
{
var config = Sys.Settings.Config.GetConfig("akka.actor");

// Serializer should be registered
var serializers = config.GetConfig("serializers").AsEnumerable().ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString());
Assert.Contains(serializers.Keys, s => s is "akka-stream-ref");
Assert.Equal("\"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams\"", serializers["akka-stream-ref"]);

// Serializer should have proper type binding
var binding = config.GetConfig("serialization-bindings").AsEnumerable().ToDictionary(kvp => kvp.Key, kvp => kvp.Value.ToString());
Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams");
Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams"]);

Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams");
Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams"]);

Assert.Contains(binding.Keys, s => s is "Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams");
Assert.Equal("akka-stream-ref", binding["Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams"]);

// Serializer should have correct id
Assert.Equal(30, config.GetInt("serialization-identifiers.\"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams\""));
}
}
2 changes: 2 additions & 0 deletions src/core/Akka.Streams/Dsl/StreamRefs.cs
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,15 @@ public StreamRefSettings(int bufferCapacity, TimeSpan demandRedeliveryInterval,
BufferCapacity = bufferCapacity;
DemandRedeliveryInterval = demandRedeliveryInterval;
SubscriptionTimeout = subscriptionTimeout;
FinalTerminationSignalDeadline = finalTerminationSignalDeadline;
}

public string ProductPrefix => nameof(StreamRefSettings);

public StreamRefSettings WithBufferCapacity(int value) => Copy(bufferCapacity: value);
public StreamRefSettings WithDemandRedeliveryInterval(TimeSpan value) => Copy(demandRedeliveryInterval: value);
public StreamRefSettings WithSubscriptionTimeout(TimeSpan value) => Copy(subscriptionTimeout: value);
public StreamRefSettings WithFinalTerminationSignalDeadline(TimeSpan value) => Copy(finalTerminationSignalDeadline: value);

public StreamRefSettings Copy(int? bufferCapacity = null,
TimeSpan? demandRedeliveryInterval = null,
Expand Down
54 changes: 27 additions & 27 deletions src/core/Akka.Streams/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ akka {
materializer {

# Initial size of buffers used in stream elements
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
initial-input-buffer-size = 4

# Maximum size of buffers used in stream elements
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
max-input-buffer-size = 16

# Fully qualified config path which holds the dispatcher configuration
# to be used by FlowMaterialiser when creating Actors.
# When this value is left empty, the default-dispatcher will be used.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
dispatcher = ""

blocking-io-dispatcher = "akka.actor.default-blocking-io-dispatcher"
Expand All @@ -33,46 +33,46 @@ akka {
# `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the
# reference to it)
# noop - do nothing (not recommended)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
# noop - do nothing (not recommended)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
mode = cancel

# time after which a subscriber / publisher is considered stale and eligible
# for cancelation (see `akka.stream.subscription-timeout.mode`)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
# for cancelation (see `akka.stream.subscription-timeout.mode`)
# Note: If you change this value also change the fallback value in StreamSubscriptionTimeoutSettings
timeout = 5s
}

# Enable additional troubleshooting logging at DEBUG log level
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
debug-logging = off

# Maximum number of elements emitted in batch if downstream signals large demand
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
output-burst-limit = 1000

# Enable automatic fusing of all graphs that are run. For short-lived streams
# this may cause an initial runtime overhead, but most of the time fusing is
# desirable since it reduces the number of Actors that are created.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
auto-fusing = on

# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
# buffer upon stream materialization if the requested buffer size is less than this
# configuration parameter. The default is very high because failing early is better
# than failing under load.
#
# Buffers sized larger than this will dynamically grow/shrink and consume more memory
# per element than the fixed size buffers.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
max-fixed-buffer-size = 1000000000

# Maximum number of sync messages that actor can process for stream to substream communication.
# Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
# Allows to accelerate message processing that happening withing same actor but keep system responsive.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
sync-processing-limit = 1000
# Maximum number of sync messages that actor can process for stream to substream communication.
# Parameter allows to interrupt synchronous processing to get upsteam/downstream messages.
# Allows to accelerate message processing that happening withing same actor but keep system responsive.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
sync-processing-limit = 1000

debug {
# Enables the fuzzing mode which increases the chance of race conditions
Expand All @@ -82,17 +82,17 @@ akka {
# environment!
# To get the best results, try combining this setting with a throughput
# of 1 on the corresponding dispatchers.
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
# Note: If you change this value also change the fallback value in ActorMaterializerSettings
fuzzing-mode = off
}

stream-ref {
# Buffer of a SinkRef that is used to batch Request elements from the other side of the stream ref
#
# The buffer will be attempted to be filled eagerly even while the local stage did not request elements,
# because the delay of requesting over network boundaries is much higher.
buffer-capacity = 32

# Demand is signalled by sending a cumulative demand message ("requesting messages until the n-th sequence number)
# Using a cumulative demand model allows us to re-deliver the demand message in case of message loss (which should
# be very rare in any case, yet possible -- mostly under connection break-down and re-establishment).
Expand All @@ -102,13 +102,13 @@ akka {
# In normal operation, demand is signalled in response to arriving elements, however if no new elements arrive
# within `demand-redelivery-interval` a re-delivery of the demand will be triggered, assuming that it may have gotten lost.
demand-redelivery-interval = 1 second

# Subscription timeout, during which the "remote side" MUST subscribe (materialize) the handed out stream ref.
# This timeout does not have to be very low in normal situations, since the remote side may also need to
# prepare things before it is ready to materialize the reference. However the timeout is needed to avoid leaking
# in-active streams which are never subscribed to.
subscription-timeout = 30 seconds

# In order to guard the receiving end of a stream ref from never terminating (since awaiting a Completion or Failed
# message) after / before a Terminated is seen, a special timeout is applied once Terminated is received by it.
# This allows us to terminate stream refs that have been targeted to other nodes which are Downed, and as such the
Expand All @@ -133,17 +133,17 @@ akka {
protocol = "TLSv1"
}
actor {

serializers {
akka-stream-ref = "Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams"
}

serialization-bindings {
"Akka.Streams.Implementation.StreamRef.SinkRefImpl, Akka.Streams" = akka-stream-ref
"Akka.Streams.Implementation.StreamRef.SourceRefImpl, Akka.Streams" = akka-stream-ref
"Akka.Streams.Implementation.StreamRef.IStreamRefsProtocol, Akka.Streams" = akka-stream-ref
}

serialization-identifiers {
"Akka.Streams.Serialization.StreamRefSerializer, Akka.Streams" = 30
}
Expand Down
Loading