Skip to content

Commit 14de339

Browse files
PrunklesMpdreamz
andauthored
Add ItemDropped callback to ChannelOptionsBase (#76)
* Add `BufferItemDropped` callback to `ChannelOptionsBase` * Add xmldocs to BufferItemDropped * do not call BufferItemDropped for special `null` value * small refactor so we can document why we wrap itemDropped --------- Co-authored-by: Martijn Laarman <[email protected]>
1 parent 4d98bdb commit 14de339

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

src/Elastic.Channels/BufferedChannelBase.cs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,6 +173,13 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
173173
// DropWrite will make `TryWrite` always return true, which is not what we want.
174174
FullMode = BoundedChannelFullMode.Wait
175175
});
176+
177+
//we don't expose the fact TEvent is nullable to the consumer
178+
Action<TEvent?>? itemDropped = options.BufferItemDropped is null ? null : e =>
179+
{
180+
if (e is not null)
181+
options.BufferItemDropped?.Invoke(e);
182+
};
176183
InChannel = Channel.CreateBounded<TEvent?>(new BoundedChannelOptions(maxIn)
177184
{
178185
SingleReader = false,
@@ -183,7 +190,7 @@ protected BufferedChannelBase(TChannelOptions options, ICollection<IChannelCallb
183190
// wait does not block it simply signals that Writer.TryWrite should return false and be retried
184191
// DropWrite will make `TryWrite` always return true, which is not what we want.
185192
FullMode = options.BufferOptions.BoundedChannelFullMode
186-
});
193+
}, itemDropped);
187194

188195
InboundBuffer = new InboundBuffer<TEvent>(BatchExportSize, BufferOptions.OutboundBufferMaxLifetime);
189196

src/Elastic.Channels/ChannelOptionsBase.cs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ public abstract class ChannelOptionsBase<TEvent, TResponse> : IChannelCallbacks<
2020
/// <inheritdoc cref="BufferOptions"/>
2121
public BufferOptions BufferOptions { get; set; } = new();
2222

23+
/// <summary>
24+
/// Delegate that will be called when item is being dropped from channel. See <see cref="BufferOptions.BoundedChannelFullMode"/>.
25+
/// </summary>
26+
public Action<TEvent>? BufferItemDropped { get; set; }
2327

2428
/// <summary>
2529
/// Ensures a <see cref="ChannelDiagnosticsListener{TEvent,TResponse}"/> gets registered so this <see cref="BufferedChannelBase{TChannelOptions,TEvent,TResponse}"/>

0 commit comments

Comments
 (0)