Skip to content

Commit 4861297

Browse files
KAFKA-20292 [2/N]: Prepare to split TargetAssignmentBuilders (apache#22486)
To accommodate asynchronous assignments, such as those from client-side assignors and assignors offloaded to background threads, we want to split the TargetAssignmentBuilders into two: one builder for building the target assignment and another for building the target assignment records. Client-side assignors will only use the second builder. Both builders require an up-to-date view of group members at the time they are run. In the non-offloaded case, this is the same view. However, the view needs to include the unwritten member operations from the ongoing heartbeat request. Currently the operations are applied within the TargetAssignmentBuilders. To avoid duplicating the logic once the TargetAssignmentBuilders are split, we would like to lift it out and pass the TargetAssignmentBuilders the updated view of members and assignments. Add UpdatedMembersAndTargetAssignmentView, which provides updated views of members and target assignments after unwritten member joins, updates and leaves. Future commits will update the consumer, share and streams group assignment paths to use the new class. Reviewers: David Jacot <david.jacot@gmail.com>
1 parent 4a2edef commit 4861297

2 files changed

Lines changed: 314 additions & 0 deletions

File tree

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.util;
18+
19+
import java.util.Collections;
20+
import java.util.Map;
21+
import java.util.Objects;
22+
23+
/**
24+
* A view of a group's members, static members, and target assignment after unwritten membership
25+
* changes have been applied.
26+
*
27+
* @param <M> The member type.
28+
* @param <A> The member's target assignment type.
29+
*/
30+
public class UpdatedMembersAndTargetAssignmentView<M, A> {
31+
32+
/**
33+
* The group members.
34+
*/
35+
private final OverlayMap<String, M> members;
36+
37+
/**
38+
* The static group members.
39+
*/
40+
private final OverlayMap<String, String> staticMembers;
41+
42+
/**
43+
* The target assignment per member id.
44+
*/
45+
private final OverlayMap<String, A> targetAssignment;
46+
47+
/**
48+
* @param members The group members. Must not be modified during the lifetime of the view.
49+
* @param staticMembers The static group members. Must not be modified during the lifetime of the view.
50+
* @param targetAssignment The target assignment per member id. Must not be modified during the lifetime of the view.
51+
*/
52+
public UpdatedMembersAndTargetAssignmentView(
53+
Map<String, M> members,
54+
Map<String, String> staticMembers,
55+
Map<String, A> targetAssignment
56+
) {
57+
this.members = new OverlayMap<>(Objects.requireNonNull(members));
58+
this.staticMembers = new OverlayMap<>(Objects.requireNonNull(staticMembers));
59+
this.targetAssignment = new OverlayMap<>(Objects.requireNonNull(targetAssignment));
60+
}
61+
62+
/**
63+
* @return The group members after updates.
64+
*/
65+
public Map<String, M> members() {
66+
return Collections.unmodifiableMap(members);
67+
}
68+
69+
/**
70+
* @return The static group members after updates.
71+
*/
72+
public Map<String, String> staticMembers() {
73+
return Collections.unmodifiableMap(staticMembers);
74+
}
75+
76+
/**
77+
* @return The target assignment per member id after updates.
78+
*/
79+
public Map<String, A> targetAssignment() {
80+
return Collections.unmodifiableMap(targetAssignment);
81+
}
82+
83+
/**
84+
* Adds or updates a member. If the member is static and there is a different existing static
85+
* member for the same instance id, the previous static member's target assignment is moved to
86+
* the new member and the previous static member is removed from the view.
87+
*
88+
* @param memberId The member id.
89+
* @param instanceId The instance id of the member, or {@code null} if the member is not static.
90+
* @param member The member to add or update.
91+
*/
92+
public void addOrUpdateMember(String memberId, String instanceId, M member) {
93+
members.put(memberId, member);
94+
if (instanceId != null) {
95+
String previousMemberId = staticMembers.put(instanceId, memberId);
96+
if (previousMemberId != null && !memberId.equals(previousMemberId)) {
97+
// A static member is being replaced. Move the assignment to the new member.
98+
A memberAssignment = targetAssignment.get(previousMemberId);
99+
if (memberAssignment != null) {
100+
targetAssignment.put(memberId, memberAssignment);
101+
}
102+
103+
// Remove the previous member.
104+
members.remove(previousMemberId);
105+
targetAssignment.remove(previousMemberId);
106+
}
107+
}
108+
}
109+
110+
/**
111+
* Removes a member.
112+
*
113+
* @param memberId The member id.
114+
* @param instanceId The instance id of the member, or {@code null} if the member is not static.
115+
*/
116+
public void removeMember(String memberId, String instanceId) {
117+
members.remove(memberId);
118+
if (instanceId != null && memberId.equals(staticMembers.get(instanceId))) {
119+
staticMembers.remove(instanceId);
120+
}
121+
targetAssignment.remove(memberId);
122+
}
123+
}
Lines changed: 191 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,191 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.coordinator.group.util;
18+
19+
import org.junit.jupiter.api.Test;
20+
21+
import java.util.Map;
22+
23+
import static org.junit.jupiter.api.Assertions.assertEquals;
24+
25+
public class UpdatedMembersAndTargetAssignmentViewTest {
26+
27+
/**
28+
* Creates an {@link UpdatedMembersAndTargetAssignmentView} with two members, one static and one
29+
* non-static.
30+
*/
31+
private static UpdatedMembersAndTargetAssignmentView<String, String> createView() {
32+
return new UpdatedMembersAndTargetAssignmentView<>(
33+
Map.of(
34+
"member-1", "Member1",
35+
"member-2", "Member2"
36+
),
37+
Map.of(
38+
"instance-id", "member-2"
39+
),
40+
Map.of(
41+
"member-1", "Assignment-member-1",
42+
"member-2", "Assignment-member-2"
43+
)
44+
);
45+
}
46+
47+
@Test
48+
public void testAddMember() {
49+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
50+
51+
view.addOrUpdateMember("member-3", null, "Member3");
52+
53+
assertEquals(Map.of(
54+
"member-1", "Member1",
55+
"member-2", "Member2",
56+
"member-3", "Member3"
57+
), view.members());
58+
assertEquals(Map.of(
59+
"instance-id", "member-2"
60+
), view.staticMembers());
61+
assertEquals(Map.of(
62+
"member-1", "Assignment-member-1",
63+
"member-2", "Assignment-member-2"
64+
), view.targetAssignment());
65+
}
66+
67+
@Test
68+
public void testAddStaticMember() {
69+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
70+
71+
view.addOrUpdateMember("member-3", "instance-id-2", "Member3");
72+
73+
assertEquals(Map.of(
74+
"member-1", "Member1",
75+
"member-2", "Member2",
76+
"member-3", "Member3"
77+
), view.members());
78+
assertEquals(Map.of(
79+
"instance-id", "member-2",
80+
"instance-id-2", "member-3"
81+
), view.staticMembers());
82+
assertEquals(Map.of(
83+
"member-1", "Assignment-member-1",
84+
"member-2", "Assignment-member-2"
85+
), view.targetAssignment());
86+
}
87+
88+
@Test
89+
public void testReplaceMember() {
90+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
91+
92+
view.addOrUpdateMember("member-1", null, "Member1-updated");
93+
94+
assertEquals(Map.of(
95+
"member-1", "Member1-updated",
96+
"member-2", "Member2"
97+
), view.members());
98+
assertEquals(Map.of(
99+
"instance-id", "member-2"
100+
), view.staticMembers());
101+
assertEquals(Map.of(
102+
"member-1", "Assignment-member-1",
103+
"member-2", "Assignment-member-2"
104+
), view.targetAssignment());
105+
}
106+
107+
@Test
108+
public void testReplaceStaticMemberWithSameMemberId() {
109+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
110+
111+
view.addOrUpdateMember("member-2", "instance-id", "Member2-updated");
112+
113+
assertEquals(Map.of(
114+
"member-1", "Member1",
115+
"member-2", "Member2-updated"
116+
), view.members());
117+
assertEquals(Map.of(
118+
"instance-id", "member-2"
119+
), view.staticMembers());
120+
assertEquals(Map.of(
121+
"member-1", "Assignment-member-1",
122+
"member-2", "Assignment-member-2"
123+
), view.targetAssignment());
124+
}
125+
126+
@Test
127+
public void testReplaceStaticMemberWithDifferentMemberId() {
128+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
129+
130+
view.addOrUpdateMember("member-3", "instance-id", "Member3");
131+
132+
assertEquals(Map.of(
133+
"member-1", "Member1",
134+
"member-3", "Member3"
135+
), view.members());
136+
assertEquals(Map.of(
137+
"instance-id", "member-3"
138+
), view.staticMembers());
139+
assertEquals(Map.of(
140+
"member-1", "Assignment-member-1",
141+
"member-3", "Assignment-member-2"
142+
), view.targetAssignment());
143+
144+
// Removing the previous static member does not change the new static member's assignment.
145+
view.removeMember("member-2", "instance-id");
146+
147+
assertEquals(Map.of(
148+
"member-1", "Member1",
149+
"member-3", "Member3"
150+
), view.members());
151+
assertEquals(Map.of(
152+
"instance-id", "member-3"
153+
), view.staticMembers());
154+
assertEquals(Map.of(
155+
"member-1", "Assignment-member-1",
156+
"member-3", "Assignment-member-2"
157+
), view.targetAssignment());
158+
}
159+
160+
@Test
161+
public void testRemoveMember() {
162+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
163+
164+
view.removeMember("member-1", null);
165+
166+
assertEquals(Map.of(
167+
"member-2", "Member2"
168+
), view.members());
169+
assertEquals(Map.of(
170+
"instance-id", "member-2"
171+
), view.staticMembers());
172+
assertEquals(Map.of(
173+
"member-2", "Assignment-member-2"
174+
), view.targetAssignment());
175+
}
176+
177+
@Test
178+
public void testRemoveStaticMember() {
179+
UpdatedMembersAndTargetAssignmentView<String, String> view = createView();
180+
181+
view.removeMember("member-2", "instance-id");
182+
183+
assertEquals(Map.of(
184+
"member-1", "Member1"
185+
), view.members());
186+
assertEquals(Map.of(), view.staticMembers());
187+
assertEquals(Map.of(
188+
"member-1", "Assignment-member-1"
189+
), view.targetAssignment());
190+
}
191+
}

0 commit comments

Comments
 (0)