Skip to content
This repository was archived by the owner on Apr 9, 2022. It is now read-only.

Commit b887819

Browse files
author
Serge Danzanvilliers
committed
Reset Generation ID in case of Sync exceptions
- In case of exceptions during Join workflow but after JOIN was succesful (typically a client side timeout), the generation id was not reset which would prevent subsequent HEARTBEAT to fail (brokers don't care if a connection was aborted, they only take into account the generation id and member id for heartbeats). Change-Id: Ieedbbd0aa411fddf321e6653dcada9681864709f
1 parent 01651b1 commit b887819

File tree

2 files changed

+93
-57
lines changed

2 files changed

+93
-57
lines changed

kafka-sharp/kafka-sharp.UTest/TestConsumerGroup.cs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -431,6 +431,28 @@ public async Task TestConsumerGroup_JoinErrorJoin()
431431
Assert.AreEqual(ErrorCode.InconsistentGroupProtocol, assignments.ErrorCode);
432432
}
433433

434+
[Test]
435+
public async Task TestConsumerGroup_JoinExceptionSync()
436+
{
437+
var mocks = InitCluster();
438+
var group = new ConsumerGroup("the group", new ConsumerGroupConfiguration(), mocks.Cluster.Object);
439+
440+
mocks.Node.Setup(
441+
n =>
442+
n.SyncConsumerGroup(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(),
443+
It.IsAny<IEnumerable<ConsumerGroupAssignment>>()))
444+
.ThrowsAsync(new Exception());
445+
446+
try
447+
{
448+
await group.Join(new[] { "the topic" });
449+
}
450+
catch
451+
{
452+
Assert.AreEqual(-1, group.Generation);
453+
}
454+
}
455+
434456
[Test]
435457
public async Task TestConsumerGroup_JoinErrorSync()
436458
{

kafka-sharp/kafka-sharp/Routing/ConsumerGroup.cs

Lines changed: 71 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -74,76 +74,90 @@ public ConsumerGroup(string groupId, ConsumerGroupConfiguration configuration, I
7474

7575
public async Task<PartitionAssignments> Join(IEnumerable<string> topics)
7676
{
77-
await RefreshCoordinator();
77+
try
78+
{
79+
await RefreshCoordinator();
7880

79-
// JoinGroup request
80-
var joinResponse =
81-
await
82-
_coordinator.JoinConsumerGroup(GroupId, MemberId, Configuration.SessionTimeoutMs,
83-
Configuration.RebalanceTimeoutMs, topics);
81+
// JoinGroup request
82+
var joinResponse =
83+
await
84+
_coordinator.JoinConsumerGroup(GroupId, MemberId, Configuration.SessionTimeoutMs,
85+
Configuration.RebalanceTimeoutMs, topics);
8486

85-
Generation = joinResponse.GenerationId;
86-
MemberId = joinResponse.MemberId ?? "";
87+
Generation = joinResponse.GenerationId;
88+
MemberId = joinResponse.MemberId ?? "";
8789

88-
if (joinResponse.ErrorCode != ErrorCode.NoError)
89-
{
90-
HandleError(joinResponse.ErrorCode);
91-
return new PartitionAssignments
90+
if (joinResponse.ErrorCode != ErrorCode.NoError)
9291
{
93-
ErrorCode = joinResponse.ErrorCode,
94-
Assignments = EmptyAssignment
95-
};
96-
}
97-
98-
bool isLeader = joinResponse.GroupMembers.Length > 0;
99-
_cluster.Logger.LogInformation(
100-
string.Format("Consumer group \"{0}\" joined. Member id is \"{1}\", generation is {2}.{3}", GroupId,
101-
MemberId, Generation, isLeader ? " I am leader." : ""));
92+
HandleError(joinResponse.ErrorCode);
93+
return new PartitionAssignments
94+
{
95+
ErrorCode = joinResponse.ErrorCode,
96+
Assignments = EmptyAssignment
97+
};
98+
}
10299

103-
// SyncGroup request
104-
var assignments = isLeader ? await LeaderAssign(joinResponse.GroupMembers) : Enumerable.Empty<ConsumerGroupAssignment>();
105-
var syncResponse = await _coordinator.SyncConsumerGroup(GroupId, MemberId, Generation, assignments);
100+
bool isLeader = joinResponse.GroupMembers.Length > 0;
101+
_cluster.Logger.LogInformation(
102+
string.Format("Consumer group \"{0}\" joined. Member id is \"{1}\", generation is {2}.{3}", GroupId,
103+
MemberId, Generation, isLeader ? " I am leader." : ""));
104+
105+
// SyncGroup request
106+
var assignments = isLeader
107+
? await LeaderAssign(joinResponse.GroupMembers)
108+
: Enumerable.Empty<ConsumerGroupAssignment>();
109+
var syncResponse = await _coordinator.SyncConsumerGroup(GroupId, MemberId, Generation, assignments);
110+
111+
_cluster.Logger.LogInformation(string.Format("Consumer group \"{0}\" synced. Assignments: {1}", GroupId,
112+
string.Join(", ",
113+
syncResponse.MemberAssignment.PartitionAssignments.Select(
114+
td =>
115+
string.Format("{0}:[{1}]", td.TopicName,
116+
string.Join(",", td.PartitionsData.Select(p => p.Partition.ToString())))))));
117+
118+
// Empty assignments, no need to fetch offsets
119+
if (!syncResponse.MemberAssignment.PartitionAssignments.Any()
120+
|| joinResponse.ErrorCode != ErrorCode.NoError)
121+
{
122+
HandleError(joinResponse.ErrorCode);
123+
return new PartitionAssignments
124+
{
125+
ErrorCode = syncResponse.ErrorCode,
126+
Assignments = EmptyAssignment
127+
};
128+
}
106129

107-
_cluster.Logger.LogInformation(string.Format("Consumer group \"{0}\" synced. Assignments: {1}", GroupId,
108-
string.Join(", ",
109-
syncResponse.MemberAssignment.PartitionAssignments.Select(
110-
td =>
111-
string.Format("{0}:[{1}]", td.TopicName,
112-
string.Join(",", td.PartitionsData.Select(p => p.Partition.ToString())))))));
130+
// FetchOffsets request (retrieve saved offsets)
131+
var offsets =
132+
await _coordinator.FetchOffsets(GroupId, syncResponse.MemberAssignment.PartitionAssignments);
113133

114-
// Empty assignments, no need to fetch offsets
115-
if (!syncResponse.MemberAssignment.PartitionAssignments.Any() || joinResponse.ErrorCode != ErrorCode.NoError)
116-
{
117-
HandleError(joinResponse.ErrorCode);
134+
// Returns assignments, each one with the offset we have have to start to read from
118135
return new PartitionAssignments
119136
{
120137
ErrorCode = syncResponse.ErrorCode,
121-
Assignments = EmptyAssignment
138+
Assignments =
139+
offsets.TopicsResponse.ToDictionary(assignment => assignment.TopicName,
140+
assignment =>
141+
new HashSet<PartitionOffset>(
142+
assignment.PartitionsData.Select(
143+
p =>
144+
new PartitionOffset
145+
{
146+
Partition = p.Partition,
147+
Offset = p.Offset < 0
148+
? Offset(Configuration.DefaultOffsetToReadFrom)
149+
// read from whatever is configured offset (end or start)
150+
: p.Offset // read from saved offset
151+
})) as ISet<PartitionOffset>)
122152
};
123153
}
124-
125-
// FetchOffsets request (retrieve saved offsets)
126-
var offsets = await _coordinator.FetchOffsets(GroupId, syncResponse.MemberAssignment.PartitionAssignments);
127-
128-
// Returns assignments, each one with the offset we have have to start to read from
129-
return new PartitionAssignments
154+
catch
130155
{
131-
ErrorCode = syncResponse.ErrorCode,
132-
Assignments =
133-
offsets.TopicsResponse.ToDictionary(assignment => assignment.TopicName,
134-
assignment =>
135-
new HashSet<PartitionOffset>(
136-
assignment.PartitionsData.Select(
137-
p =>
138-
new PartitionOffset
139-
{
140-
Partition = p.Partition,
141-
Offset = p.Offset < 0
142-
? Offset(Configuration.DefaultOffsetToReadFrom)
143-
// read from whatever is configured offset (end or start)
144-
: p.Offset // read from saved offset
145-
})) as ISet<PartitionOffset>)
146-
};
156+
// Something worng occured during the workflow (typically a timeout).
157+
// Reset Generation in case the problem occured after Join.
158+
Generation = -1;
159+
throw;
160+
}
147161
}
148162

149163
private async Task RefreshCoordinator()

0 commit comments

Comments
 (0)