Skip to content

Commit 1657fed

Browse files
committed
fix: resolve review issues
1 parent e149f9c commit 1657fed

11 files changed

+191
-97
lines changed

src/Take.Elephant.Kafka/KafkaConsumedMessage.cs

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,17 +18,20 @@ public sealed class KafkaConsumedMessage<T>(T item, IReadOnlyDictionary<string,
1818
{
1919

2020
private readonly IReadOnlyDictionary<string, byte[]> _headers =
21-
headers is null
22-
? KafkaConsumedMessageDefaults.EmptyHeaders
23-
: new ReadOnlyDictionary<string, byte[]>(new Dictionary<string, byte[]>(headers));
21+
CreateReadOnlyHeaders(headers);
2422

2523
public T Item { get; } = item;
2624

2725
public IReadOnlyDictionary<string, byte[]> Headers => _headers;
2826

2927
public bool TryGetHeader(string key, out byte[] value)
3028
{
31-
if (!string.IsNullOrWhiteSpace(key)) return Headers.TryGetValue(key, out value);
29+
if (!string.IsNullOrWhiteSpace(key) && Headers.TryGetValue(key, out var headerValue))
30+
{
31+
value = headerValue != null ? (byte[])headerValue.Clone() : null;
32+
return true;
33+
}
34+
3235
value = null;
3336
return false;
3437

@@ -45,5 +48,22 @@ public bool TryGetHeaderAsUtf8String(string key, out string value)
4548
value = Encoding.UTF8.GetString(bytes);
4649
return true;
4750
}
51+
52+
private static IReadOnlyDictionary<string, byte[]> CreateReadOnlyHeaders(
53+
IReadOnlyDictionary<string, byte[]> headers)
54+
{
55+
if (headers is null || headers.Count == 0)
56+
{
57+
return KafkaConsumedMessageDefaults.EmptyHeaders;
58+
}
59+
60+
var result = new Dictionary<string, byte[]>(headers.Count);
61+
foreach (var header in headers)
62+
{
63+
result[header.Key] = header.Value != null ? (byte[])header.Value.Clone() : null;
64+
}
65+
66+
return new ReadOnlyDictionary<string, byte[]>(result);
67+
}
4868
}
4969
}

src/Take.Elephant.Kafka/KafkaEventStreamPublisher.cs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Collections.Generic;
23
using System.Text;
34
using System.Threading;
45
using System.Threading.Tasks;
@@ -126,18 +127,16 @@ public async Task PublishAsync(TKey key, TEvent item, CancellationToken cancella
126127

127128
if (_headerProvider != null)
128129
{
129-
message.Headers = [];
130-
var headers = _headerProvider.GetHeaders();
131-
if (headers != null)
130+
var headers = _headerProvider.GetHeaders() ?? Array.Empty<IHeader>();
131+
foreach (var header in headers)
132132
{
133-
foreach (var header in headers)
134-
{
135-
if (header == null || header.Key == null)
136-
{
137-
continue;
138-
}
139-
message.Headers.Add(header.Key, header.GetValueBytes());
140-
}
133+
if (header == null || header.Key == null)
134+
{
135+
continue;
136+
}
137+
138+
message.Headers ??= new Headers();
139+
message.Headers.Add(header.Key, header.GetValueBytes());
141140
}
142141
}
143142

src/Take.Elephant.Kafka/KafkaHeadersConverter.cs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Collections.Generic;
2+
using System.Collections.ObjectModel;
23
using Confluent.Kafka;
34

45
namespace Take.Elephant.Kafka
@@ -10,11 +11,11 @@ internal static KafkaConsumedMessage<T> BuildConsumedMessage<T>(T item, Headers
1011
return new KafkaConsumedMessage<T>(item, ToDictionary(headers));
1112
}
1213

13-
internal static Dictionary<string, byte[]> ToDictionary(Headers headers)
14+
internal static IReadOnlyDictionary<string, byte[]> ToDictionary(Headers headers)
1415
{
1516
if (headers == null || headers.Count == 0)
1617
{
17-
return null;
18+
return KafkaConsumedMessageDefaults.EmptyHeaders;
1819
}
1920

2021
var result = new Dictionary<string, byte[]>(headers.Count);
@@ -29,7 +30,12 @@ internal static Dictionary<string, byte[]> ToDictionary(Headers headers)
2930
result[header.Key] = valueBytes != null ? (byte[])valueBytes.Clone() : null;
3031
}
3132

32-
return result;
33+
if (result.Count == 0)
34+
{
35+
return KafkaConsumedMessageDefaults.EmptyHeaders;
36+
}
37+
38+
return new ReadOnlyDictionary<string, byte[]>(result);
3339
}
3440
}
3541
}

src/Take.Elephant.Kafka/KafkaPartitionQueue.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public Task<KafkaConsumedMessage<T>> DequeueWithHeadersOrDefaultAsync(
4040
return _receiverQueue.DequeueWithHeadersOrDefaultAsync(cancellationToken);
4141
}
4242

43-
public Task<T> DequeueAsync(CancellationToken cancellationToken)
43+
public Task<T> DequeueAsync(CancellationToken cancellationToken = default)
4444
{
4545
return _receiverQueue.DequeueAsync(cancellationToken);
4646
}

src/Take.Elephant.Kafka/SchemaRegistry/KafkaSchemaRegistrySenderQueue.cs

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ public KafkaSchemaRegistrySenderQueue(
9191
: this(
9292
producerConfig,
9393
topic,
94-
new CachedSchemaRegistryClient(schemaRegistryOptions.SchemaRegistryConfig),
94+
new CachedSchemaRegistryClient((schemaRegistryOptions ?? throw new ArgumentNullException(nameof(schemaRegistryOptions))).SchemaRegistryConfig),
9595
schemaRegistryOptions,
9696
ownsSchemaRegistryClient: true,
9797
headerProvider: headerProvider)
@@ -241,15 +241,16 @@ public virtual async Task EnqueueAsync(T item, CancellationToken cancellationTok
241241

242242
if (_headerProvider != null)
243243
{
244-
var headers = _headerProvider.GetHeaders();
245-
message.Headers = [];
246-
if (headers != null)
244+
var headers = _headerProvider.GetHeaders() ?? Array.Empty<IHeader>();
245+
foreach (var header in headers)
247246
{
248-
foreach (var header in headers)
249-
{
250-
if (header == null || header.Key == null) continue;
251-
message.Headers.Add(header.Key, header.GetValueBytes());
252-
}
247+
if (header == null || header.Key == null)
248+
{
249+
continue;
250+
}
251+
252+
message.Headers ??= new Headers();
253+
message.Headers.Add(header.Key, header.GetValueBytes());
253254
}
254255
}
255256

src/Take.Elephant.Tests/Kafka/KafkaHeadersConverterFacts.cs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
using System;
2+
using System.Collections.Generic;
13
using Confluent.Kafka;
24
using Take.Elephant.Kafka;
35
using Xunit;
@@ -8,19 +10,21 @@ namespace Take.Elephant.Tests.Kafka
810
public class KafkaHeadersConverterFacts
911
{
1012
[Fact]
11-
public void ToDictionary_WithNullHeaders_ShouldReturnNull()
13+
public void ToDictionary_WithNullHeaders_ShouldReturnEmptyReadOnlyDictionary()
1214
{
1315
var result = KafkaHeadersConverter.ToDictionary(null);
1416

15-
Assert.Null(result);
17+
Assert.NotNull(result);
18+
Assert.Empty(result);
1619
}
1720

1821
[Fact]
19-
public void ToDictionary_WithEmptyHeaders_ShouldReturnNull()
22+
public void ToDictionary_WithEmptyHeaders_ShouldReturnEmptyReadOnlyDictionary()
2023
{
2124
var result = KafkaHeadersConverter.ToDictionary(new Headers());
2225

23-
Assert.Null(result);
26+
Assert.NotNull(result);
27+
Assert.Empty(result);
2428
}
2529

2630
[Fact]
@@ -59,6 +63,21 @@ public void ToDictionary_ShouldCloneByteArrays()
5963
Assert.Equal(new byte[] { 9, 8, 7 }, result["x-test"]);
6064
}
6165

66+
[Fact]
67+
public void ToDictionary_WithNonEmptyHeaders_ShouldReturnReadOnlyDictionary()
68+
{
69+
var headers = new Headers
70+
{
71+
new Header("x-test", new byte[] { 9, 8, 7 })
72+
};
73+
74+
var result = KafkaHeadersConverter.ToDictionary(headers);
75+
var writableResult = Assert.IsAssignableFrom<IDictionary<string, byte[]>>(result);
76+
77+
Assert.Throws<NotSupportedException>(() => writableResult["x-test"] = new byte[] { 1 });
78+
Assert.Equal(new byte[] { 9, 8, 7 }, result["x-test"]);
79+
}
80+
6281
[Fact]
6382
public void BuildConsumedMessage_ShouldPreserveItemAndHeaders()
6483
{

src/Take.Elephant.Tests/Kafka/KafkaItemEventStreamPublisherConsumerFacts.cs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -113,35 +113,38 @@ private void DeleteCreatedTopics()
113113
}
114114
catch (DeleteTopicsException ex)
115115
{
116-
if (ex.Results == null)
116+
if (!ShouldIgnore(ex))
117117
{
118-
continue;
118+
throw;
119119
}
120+
}
121+
}
122+
}
120123

121-
var shouldIgnore = true;
122-
foreach (var result in ex.Results)
123-
{
124-
if (result == null)
125-
{
126-
continue;
127-
}
124+
private static bool ShouldIgnore(DeleteTopicsException ex)
125+
{
126+
if (ex.Results == null)
127+
{
128+
return true;
129+
}
128130

129-
if (
130-
result.Error.Code != ErrorCode.UnknownTopicOrPart
131-
&& result.Error.Code != ErrorCode.TopicDeletionDisabled
132-
)
133-
{
134-
shouldIgnore = false;
135-
break;
136-
}
137-
}
131+
foreach (var result in ex.Results)
132+
{
133+
if (result == null)
134+
{
135+
continue;
136+
}
138137

139-
if (!shouldIgnore)
140-
{
141-
throw;
142-
}
138+
if (
139+
result.Error.Code != ErrorCode.UnknownTopicOrPart
140+
&& result.Error.Code != ErrorCode.TopicDeletionDisabled
141+
)
142+
{
143+
return false;
143144
}
144145
}
146+
147+
return true;
145148
}
146149
}
147150
}

src/Take.Elephant.Tests/Kafka/KafkaItemSenderReceiverQueueFacts.cs

Lines changed: 25 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -119,35 +119,38 @@ private void DeleteCreatedTopics()
119119
}
120120
catch (DeleteTopicsException ex)
121121
{
122-
if (ex.Results == null)
122+
if (!ShouldIgnore(ex))
123123
{
124-
continue;
124+
throw;
125125
}
126+
}
127+
}
128+
}
126129

127-
var shouldIgnore = true;
128-
foreach (var result in ex.Results)
129-
{
130-
if (result == null)
131-
{
132-
continue;
133-
}
130+
private static bool ShouldIgnore(DeleteTopicsException ex)
131+
{
132+
if (ex.Results == null)
133+
{
134+
return true;
135+
}
134136

135-
if (
136-
result.Error.Code != ErrorCode.UnknownTopicOrPart
137-
&& result.Error.Code != ErrorCode.TopicDeletionDisabled
138-
)
139-
{
140-
shouldIgnore = false;
141-
break;
142-
}
143-
}
137+
foreach (var result in ex.Results)
138+
{
139+
if (result == null)
140+
{
141+
continue;
142+
}
144143

145-
if (!shouldIgnore)
146-
{
147-
throw;
148-
}
144+
if (
145+
result.Error.Code != ErrorCode.UnknownTopicOrPart
146+
&& result.Error.Code != ErrorCode.TopicDeletionDisabled
147+
)
148+
{
149+
return false;
149150
}
150151
}
152+
153+
return true;
151154
}
152155
}
153156
}

0 commit comments

Comments
 (0)