Skip to content
This repository was archived by the owner on Apr 9, 2022. It is now read-only.

Commit 570dad3

Browse files
authored
Merge pull request #41 from criteo/syncApril2018
More resilient consumer + various fixes
2 parents 1411c98 + 55b2704 commit 570dad3

15 files changed

+304
-551
lines changed

appveyor.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ before_build:
1515
build_script:
1616
- dotnet build "kafka-sharp/kafka-sharp-netstd.sln" -c %CONFIGURATION%
1717
after_build:
18-
- dotnet pack /p:PackageVersion="%APPVEYOR_REPO_TAG_NAME%" "kafka-sharp/kafka-sharp/Kafka.netstandard.csproj" -c %CONFIGURATION% --no-build -o %APPVEYOR_BUILD_FOLDER%\artifacts
18+
- cmd: IF "%APPVEYOR_REPO_TAG%" == "true" (dotnet pack /p:PackageVersion="%APPVEYOR_REPO_TAG_NAME%" "kafka-sharp/kafka-sharp/Kafka.netstandard.csproj" -c %CONFIGURATION% --no-build -o %APPVEYOR_BUILD_FOLDER%\artifacts)
1919
test_script:
2020
- dotnet test "kafka-sharp/kafka-sharp.UTest/kafka.UTest.netstandard.csproj" -c %CONFIGURATION%
2121
cache:

kafka-sharp/kafka-sharp.UTest/Kafka.UTest.csproj

+3
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@
6363
<Reference Include="Microsoft.CSharp" />
6464
<Reference Include="System.Data" />
6565
<Reference Include="System.Net.Http" />
66+
<Reference Include="System.ValueTuple">
67+
<HintPath>..\packages\System.ValueTuple.4.3.0\lib\netstandard1.0\System.ValueTuple.dll</HintPath>
68+
</Reference>
6669
<Reference Include="System.Xml" />
6770
</ItemGroup>
6871
<ItemGroup>

kafka-sharp/kafka-sharp.UTest/Mocks.cs

+5-1
Original file line numberDiff line numberDiff line change
@@ -367,9 +367,12 @@ class EchoConnectionMock : SuccessConnectionMock
367367
private static int _count;
368368
private readonly int _responseDelayMs;
369369

370+
private static ConcurrentQueue<Timer> _timers = new ConcurrentQueue<Timer>();
371+
370372
public static void Reset()
371373
{
372374
_count = 1;
375+
_timers = new ConcurrentQueue<Timer>();
373376
}
374377

375378
public EchoConnectionMock(bool forceErrors = false, int responseDelayMs = 0)
@@ -406,11 +409,12 @@ public override Task SendAsync(int correlationId, ReusableMemoryStream buffer, b
406409
if (_responseDelayMs > 0)
407410
{
408411
var tcs = new TaskCompletionSource<bool>();
409-
new Timer(_ =>
412+
var timer = new Timer(_ =>
410413
{
411414
OnResponse(correlationId, response);
412415
tcs.SetResult(true);
413416
}, null, _responseDelayMs, -1);
417+
_timers.Enqueue(timer);
414418
return tcs.Task;
415419
}
416420

kafka-sharp/kafka-sharp.UTest/TestConsumer.cs

+148-497
Large diffs are not rendered by default.

kafka-sharp/kafka-sharp.UTest/TestConsumerGroup.cs

+36-15
Original file line numberDiff line numberDiff line change
@@ -182,10 +182,21 @@ Mocks InitCluster()
182182
return new Mocks { Cluster = cluster, Node = node, Group = group, HeartbeatCalled = heartbeatEvent};
183183
}
184184

185-
private void WaitForCallToHeartbeat(Mocks mock)
185+
private void WaitOneSecondMaxForEvent(string name, AutoResetEvent ev)
186186
{
187-
if (!mock.HeartbeatCalled.WaitOne(TimeSpan.FromSeconds(1)))
188-
Assert.Fail("We waited 1 sec for a hearbeat to happen, but it never did.");
187+
if (!ev.WaitOne(TimeSpan.FromSeconds(1)))
188+
{
189+
Assert.Fail("We waited 1 sec for " + name + " to happen, but it never did.");
190+
}
191+
}
192+
193+
private async Task HeartbeatFinishedProcessing(Mocks mock, ConsumeRouter router)
194+
{
195+
// First we wait to be sure that a heartbeat has started being processed
196+
WaitOneSecondMaxForEvent("heatbeat", mock.HeartbeatCalled);
197+
// Then we wait to be sure that the current message is finished processing
198+
// (this message being the heartbeat or a following message)
199+
await router.StopProcessingTask();
189200
}
190201

191202
[Test]
@@ -505,7 +516,7 @@ public void TestConsumer_ConsumerGroupStartConsume()
505516
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()), Times.Once); // 1 partition with specific offset
506517
mocks.Node.Verify(n => n.Offset(It.IsAny<OffsetMessage>()), Times.Once); // 1 partition with offset -1
507518

508-
WaitForCallToHeartbeat(mocks);
519+
WaitOneSecondMaxForEvent("heatbeat", mocks.HeartbeatCalled);
509520

510521
mocks.Group.Verify(g => g.Heartbeat());
511522

@@ -540,27 +551,30 @@ public void TestConsumer_ConsumerGroupStartConsume()
540551
}});
541552

542553
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()), Times.Exactly(2)); // response should have triggered one more fetch
543-
if (!commitEvent.WaitOne(TimeSpan.FromSeconds(1)))
544-
Assert.Fail("We waited 1 sec for a commit from the group, it did not happen");
554+
WaitOneSecondMaxForEvent("commit", commitEvent);
545555
mocks.Group.Verify(g => g.Commit(It.IsAny<IEnumerable<TopicData<OffsetCommitPartitionData>>>())); // should have auto commited
546556

547557
consumer.Stop().Wait();
548558
}
549559

550560
[Test]
551-
public void TestConsumer_ConsumerGroupRestartConsume()
561+
public async Task TestConsumer_ConsumerGroupRestartConsume()
552562
{
553563
var mocks = InitCluster();
554564
var consumer = new ConsumeRouter(mocks.Cluster.Object,
555565
new Configuration { TaskScheduler = new CurrentThreadTaskScheduler(), ConsumeBatchSize = 1 }, 1);
556566

567+
var consumerStartEvent = new AutoResetEvent(false);
568+
var consumerStopEvent = new AutoResetEvent(false);
569+
consumer.ConsumerStopped += () => { consumerStopEvent.Set(); };
570+
557571
consumer.StartConsumeSubscription(mocks.Group.Object, new[] { "the topic" });
558572

559573
mocks.Group.Verify(g => g.Join(It.IsAny<IEnumerable<string>>()), Times.Once);
560574
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()), Times.Once); // 1 partition with specific offset
561575
mocks.Node.Verify(n => n.Offset(It.IsAny<OffsetMessage>()), Times.Once); // 1 partition with offset -1
562576

563-
WaitForCallToHeartbeat(mocks);
577+
WaitOneSecondMaxForEvent("heatbeat", mocks.HeartbeatCalled);
564578

565579
mocks.Group.Verify(g => g.Heartbeat());
566580

@@ -594,9 +608,11 @@ public void TestConsumer_ConsumerGroupRestartConsume()
594608
}
595609
}});
596610

597-
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()), Times.Exactly(2)); // response should have triggered one more fetch
598611

599612
consumer.StopConsume("the topic", Partitions.All, Offsets.Now);
613+
WaitOneSecondMaxForEvent("stop", consumerStopEvent);
614+
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()),
615+
Times.Exactly(2)); // response should have triggered one more fetch
600616

601617
consumer.Acknowledge(new CommonAcknowledgement<FetchResponse>
602618
{
@@ -628,8 +644,10 @@ public void TestConsumer_ConsumerGroupRestartConsume()
628644
}
629645
}});
630646

647+
consumer.ConsumerStarted += () => { consumerStartEvent.Set(); };
631648
consumer.StartConsume("the topic", Partitions.All, Offsets.Now);
632649

650+
WaitOneSecondMaxForEvent("start", consumerStartEvent);
633651
mocks.Node.Verify(n => n.Fetch(It.IsAny<FetchMessage>()), Times.Exactly(3));
634652

635653
consumer.Stop().Wait();
@@ -657,7 +675,7 @@ public void TestConsumer_ConsumerGroupCommit()
657675

658676
consumer.StartConsumeSubscription(mocks.Group.Object, new[] { "the topic" });
659677

660-
WaitForCallToHeartbeat(mocks);
678+
WaitOneSecondMaxForEvent("heatbeat", mocks.HeartbeatCalled);
661679

662680
consumer.Acknowledge(new CommonAcknowledgement<FetchResponse>
663681
{
@@ -730,7 +748,7 @@ public async Task TestConsumer_ConsumerGroupCommitAsync()
730748
}
731749

732750
[Test]
733-
public void TestConsumer_ConsumerGroupHeartbeatErrors()
751+
public async Task TestConsumer_ConsumerGroupHeartbeatErrors()
734752
{
735753
var mocks = InitCluster();
736754
mocks.Group.SetupGet(g => g.Configuration)
@@ -743,7 +761,7 @@ public void TestConsumer_ConsumerGroupHeartbeatErrors()
743761

744762
consumer.StartConsumeSubscription(mocks.Group.Object, new[] { "the topic" });
745763

746-
WaitForCallToHeartbeat(mocks);
764+
await HeartbeatFinishedProcessing(mocks, consumer);
747765

748766
// At least 2 Join (one on start, one on next heartbeat)
749767
mocks.Group.Verify(g => g.Join(It.IsAny<IEnumerable<string>>()), Times.AtLeast(2));
@@ -761,7 +779,7 @@ public void TestConsumer_ConsumerGroupHeartbeatErrors()
761779

762780
consumer.StartConsumeSubscription(mocks.Group.Object, new[] { "the topic" });
763781

764-
WaitForCallToHeartbeat(mocks);
782+
await HeartbeatFinishedProcessing(mocks, consumer);
765783

766784
mocks.Group.Verify(g => g.Join(It.IsAny<IEnumerable<string>>()), Times.AtLeast(2));
767785
// No Commit tried in case of ""hard" heartbeat errors
@@ -823,7 +841,9 @@ public async Task TestConsumer_RaisesPartitionsRevokedOnRebalance()
823841
{
824842
var mocks = InitCluster();
825843

826-
mocks.Group.Setup(g => g.Heartbeat()).ReturnsAsync(ErrorCode.RebalanceInProgress);
844+
mocks.Group.Setup(g => g.Heartbeat())
845+
.ReturnsAsync(ErrorCode.RebalanceInProgress)
846+
.Callback(() => mocks.HeartbeatCalled.Set());
827847

828848
mocks.Group.SetupGet(g => g.Configuration).Returns(
829849
new ConsumerGroupConfiguration { AutoCommitEveryMs = -1, SessionTimeoutMs = 10 });
@@ -841,7 +861,8 @@ public async Task TestConsumer_RaisesPartitionsRevokedOnRebalance()
841861
consumer.PartitionsRevoked += () => partitionsRevokedEventIsCalled = true;
842862

843863
consumer.StartConsumeSubscription(mocks.Group.Object, new[] { "the topic" });
844-
Thread.Sleep(20);
864+
865+
await HeartbeatFinishedProcessing(mocks, consumer);
845866

846867
Assert.That(partitionsRevokedEventIsCalled, Is.True);
847868

kafka-sharp/kafka-sharp.UTest/kafka.UTest.netstandard.csproj

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
<PackageReference Include="System.Reactive.Interfaces" Version="3.1.1" />
3030
<PackageReference Include="System.Reactive.Linq" Version="3.1.1" />
3131
<PackageReference Include="System.Threading.Thread" Version="4.3.0" />
32+
<PackageReference Include="System.ValueTuple" Version="4.3.0" />
3233
</ItemGroup>
3334

3435
<PropertyGroup>

kafka-sharp/kafka-sharp/Cluster/Node.cs

+12-3
Original file line numberDiff line numberDiff line change
@@ -1454,8 +1454,10 @@ private void CleanUpConnection(IConnection connection, Exception exception = nul
14541454
private static readonly long[] NoOffset = new long[0];
14551455

14561456
// Build an empty response from a given Fetch request with error set to LocalError.
1457-
private static FetchResponse BuildEmptyFetchResponseFromOriginal(
1458-
IBatchByTopic<FetchMessage> originalRequest)
1457+
// If the partition and topic match the parameters it means that the deserialization of a message
1458+
// for said (topic, partition) failed. The error is set to DeserializationError in this case.
1459+
private static FetchResponse BuildEmptyFetchResponseFromOriginal(IBatchByTopic<FetchMessage> originalRequest,
1460+
string faultyTopic = null, int faultyPartition = -1)
14591461
{
14601462
return new FetchResponse
14611463
{
@@ -1473,7 +1475,9 @@ private static FetchResponse BuildEmptyFetchResponseFromOriginal(
14731475
fm =>
14741476
new FetchPartitionResponse
14751477
{
1476-
ErrorCode = ErrorCode.LocalError,
1478+
ErrorCode = b.Key == faultyTopic && fm.Partition == faultyPartition
1479+
? ErrorCode.DeserializationError
1480+
: ErrorCode.LocalError,
14771481
HighWatermarkOffset = -1,
14781482
Partition = fm.Partition,
14791483
Messages = ResponseMessageListPool.EmptyList
@@ -1553,6 +1557,11 @@ private void ProcessFetchResponse(int correlationId, ReusableMemoryStream respon
15531557
response.Response = _serialization.DeserializeResponse<FetchResponse>(correlationId, responseData,
15541558
_configuration.Compatibility == Compatibility.V0_8_2 ? Basics.ApiVersion.V0 : Basics.ApiVersion.V2);
15551559
}
1560+
catch (ProtocolException pex)
1561+
{
1562+
OnDecodeError(pex);
1563+
response.Response = BuildEmptyFetchResponseFromOriginal(originalRequest, pex.Topic, pex.Partition);
1564+
}
15561565
catch (Exception ex)
15571566
{
15581567
OnDecodeError(ex);

kafka-sharp/kafka-sharp/Protocol/Errors.cs

+5
Original file line numberDiff line numberDiff line change
@@ -230,5 +230,10 @@ enum ErrorCode : short
230230

231231
// Local error, not from brokers
232232
LocalError = -42,
233+
234+
/// <summary>
235+
/// Deserialization on one message failed (bad Crc, magic number or compression)
236+
/// </summary>
237+
DeserializationError = -43,
233238
}
234239
}

kafka-sharp/kafka-sharp/Protocol/FetchResponse.cs

+9-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,15 @@ public void Deserialize(ReusableMemoryStream stream, object extra, Basics.ApiVer
8181
Partition = BigEndianConverter.ReadInt32(stream);
8282
ErrorCode = (ErrorCode) BigEndianConverter.ReadInt16(stream);
8383
HighWatermarkOffset = BigEndianConverter.ReadInt64(stream);
84-
Messages = DeserializeMessageSet(stream, extra as Deserializers);
84+
try
85+
{
86+
Messages = DeserializeMessageSet(stream, extra as Deserializers);
87+
}
88+
catch (ProtocolException pEx)
89+
{
90+
pEx.Partition = Partition;
91+
throw;
92+
}
8593
}
8694

8795
internal static List<ResponseMessage> DeserializeMessageSet(ReusableMemoryStream stream, Deserializers deserializers)

kafka-sharp/kafka-sharp/Protocol/Message.cs

+12-3
Original file line numberDiff line numberDiff line change
@@ -7,18 +7,20 @@
77

88
namespace Kafka.Protocol
99
{
10-
enum MessageVersion
10+
internal enum MessageVersion
1111
{
1212
V0 = 0,
1313
V1 = 1
1414
}
1515

16-
struct Message
16+
internal struct Message
1717
{
1818
public object Key;
1919
public object Value;
2020
public long TimeStamp;
21-
public ReusableMemoryStream SerializedKeyValue;
21+
22+
// Visible for tests
23+
internal ReusableMemoryStream SerializedKeyValue;
2224

2325
private const int MinimumValidSizeForSerializedKeyValue = 2 * 4; // At least 4 bytes for key size and 4 bytes for value size
2426

@@ -30,6 +32,13 @@ public void SerializeKeyValue(ReusableMemoryStream target, Tuple<ISerializer, IS
3032
Value = null;
3133
}
3234

35+
public void ReleaseSerializedKeyValue()
36+
{
37+
// Make sure that the buffer cannot be disposed twice (not good for buffer pooling)
38+
SerializedKeyValue?.Dispose();
39+
SerializedKeyValue = null;
40+
}
41+
3342
public void Serialize(ReusableMemoryStream stream, CompressionCodec compressionCodec,
3443
Tuple<ISerializer, ISerializer> serializers, MessageVersion msgVersion)
3544
{

kafka-sharp/kafka-sharp/Protocol/ProtocolException.cs

+4
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,10 @@ namespace Kafka.Protocol
55
{
66
class ProtocolException : Exception
77
{
8+
// Fields to identify the faulty message (useful in case the exception stops a batch)
9+
internal string Topic;
10+
internal int Partition;
11+
812
public ProtocolException(string message) : base(message)
913
{
1014
}

kafka-sharp/kafka-sharp/Protocol/TopicData.cs

+12-4
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,20 @@ public void Deserialize(ReusableMemoryStream stream, object extra, Basics.ApiVer
3737
var config = extra as SerializationConfig;
3838
pdExtra = config.GetDeserializersForTopic(TopicName);
3939
}
40-
for (int i = 0; i < count; ++i)
40+
try
4141
{
42-
array[i] = new TPartitionData();
43-
array[i].Deserialize(stream, pdExtra, version);
42+
for (int i = 0; i < count; ++i)
43+
{
44+
array[i] = new TPartitionData();
45+
array[i].Deserialize(stream, pdExtra, version);
46+
}
47+
PartitionsData = array;
48+
}
49+
catch (ProtocolException pEx)
50+
{
51+
pEx.Topic = TopicName;
52+
throw;
4453
}
45-
PartitionsData = array;
4654
}
4755

4856
#endregion

kafka-sharp/kafka-sharp/Public/Configuration.cs

+9-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public enum Compatibility
5555
}
5656

5757
/// <summary>
58-
/// In case of network errors
58+
/// In case of network or protocol errors
5959
/// </summary>
6060
public enum ErrorStrategy
6161
{
@@ -65,7 +65,8 @@ public enum ErrorStrategy
6565
Discard,
6666

6767
/// <summary>
68-
/// Retry sending messsages (this may end up in duplicate messages)
68+
/// Retry sending messsages (this may end up in duplicate messages) for Producer.
69+
/// Retry fetching messages (this may end up in an infinite loop) for Consumer.
6970
/// </summary>
7071
Retry
7172
}
@@ -142,10 +143,15 @@ public class Configuration
142143
public TimeSpan TemporaryIgnorePartitionTime = TimeSpan.FromSeconds(42);
143144

144145
/// <summary>
145-
/// Strategy in case opf network errors.
146+
/// Strategy in case opf network errors for Producer.
146147
/// </summary>
147148
public ErrorStrategy ErrorStrategy = ErrorStrategy.Discard;
148149

150+
/// <summary>
151+
/// Strategy in case of deserialization Error for Consumer.
152+
/// </summary>
153+
public ErrorStrategy ConsumerErrorStrategy = ErrorStrategy.Discard;
154+
149155
/// <summary>
150156
/// Time slice for batching messages. We wait that much time at most before processing
151157
/// a batch of messages.

0 commit comments

Comments
 (0)