Skip to content

Commit 75eafe8

Browse files
committed
SAMZA-2286: Integrate MetadataStore#putAll for improved startup time.
1 parent 159683b commit 75eafe8

File tree

10 files changed

+264
-74
lines changed

10 files changed

+264
-74
lines changed

samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskAssignmentManager.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
2727
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
2828
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
29+
import org.apache.samza.job.model.ContainerModel;
2930
import org.apache.samza.job.model.TaskMode;
3031
import org.apache.samza.metadatastore.MetadataStore;
3132
import org.apache.samza.serializers.Serde;
@@ -101,7 +102,9 @@ public Map<TaskName, TaskMode> readTaskModes() {
101102
* @param taskName the task name
102103
* @param containerId the SamzaContainer ID or {@code null} to delete the mapping
103104
* @param taskMode the mode of the task
105+
* @deprecated in favor of {@link #writeTaskContainerAssignments(Map)}
104106
*/
107+
@Deprecated
105108
public void writeTaskContainerMapping(String taskName, String containerId, TaskMode taskMode) {
106109
String existingContainerId = taskNameToContainerId.get(taskName);
107110
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
@@ -121,6 +124,45 @@ public void writeTaskContainerMapping(String taskName, String containerId, TaskM
121124
}
122125
}
123126

127+
/**
128+
* Method to write task-to-container mapping and the mode of the task to {@link MetadataStore}
129+
* @param taskToContainerMapping the task name to container mapping. Container IDs that are null are deleted.
130+
*/
131+
public void writeTaskContainerAssignments(Map<String, ContainerModel> taskToContainerMapping) {
132+
HashMap<String, byte[]> containerMapping = new HashMap<>();
133+
HashMap<String, byte[]> modeMapping = new HashMap<>();
134+
135+
for (String taskName : taskToContainerMapping.keySet()) {
136+
ContainerModel container = taskToContainerMapping.get(taskName);
137+
String containerId = container.getId();
138+
TaskMode taskMode = container.getTasks().get(new TaskName(taskName)).getTaskMode();
139+
140+
String existingContainerId = taskNameToContainerId.get(taskName);
141+
if (existingContainerId != null && !existingContainerId.equals(containerId)) {
142+
LOG.info("Task \"{}\" in mode {} moved from container {} to container {}", new Object[]{taskName, taskMode, existingContainerId, containerId});
143+
} else {
144+
LOG.debug("Task \"{}\" in mode {} assigned to container {}", taskName, taskMode, containerId);
145+
}
146+
147+
if (containerId == null) {
148+
LOG.info("Deleting task: {} from the task-to-container assignment in the metadata store", taskName);
149+
taskContainerMappingMetadataStore.delete(taskName);
150+
taskModeMappingMetadataStore.delete(taskName);
151+
taskNameToContainerId.remove(taskName);
152+
} else {
153+
LOG.info("Assigning task: {} to container ID: {} in the metadata store", taskName, containerId);
154+
containerMapping.put(taskName, containerIdSerde.toBytes(containerId));
155+
modeMapping.put(taskName, taskModeSerde.toBytes(taskMode.toString()));
156+
taskNameToContainerId.put(taskName, containerId);
157+
}
158+
}
159+
160+
if (!containerMapping.isEmpty()) {
161+
taskContainerMappingMetadataStore.putAll(containerMapping);
162+
taskModeMappingMetadataStore.putAll(modeMapping);
163+
}
164+
}
165+
124166
/**
125167
* Deletes the task container info from the {@link MetadataStore} for the task names.
126168
*

samza-core/src/main/java/org/apache/samza/container/grouper/task/TaskPartitionAssignmentManager.java

Lines changed: 36 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
package org.apache.samza.container.grouper.task;
2020

2121
import com.google.common.base.Preconditions;
22+
import com.google.common.collect.ImmutableMap;
23+
import java.io.IOException;
24+
import java.util.HashMap;
25+
import java.util.List;
26+
import java.util.Map;
27+
import org.apache.commons.collections4.CollectionUtils;
2228
import org.apache.samza.SamzaException;
2329
import org.apache.samza.coordinator.stream.CoordinatorStreamValueSerde;
2430
import org.apache.samza.coordinator.stream.messages.SetTaskPartitionMapping;
@@ -30,10 +36,6 @@
3036
import org.codehaus.jackson.type.TypeReference;
3137
import org.slf4j.Logger;
3238
import org.slf4j.LoggerFactory;
33-
import java.io.IOException;
34-
import java.util.HashMap;
35-
import java.util.List;
36-
import java.util.Map;
3739

3840
/**
3941
* Used to persist and read the task-to-partition assignment information
@@ -69,25 +71,40 @@ public TaskPartitionAssignmentManager(MetadataStore metadataStore) {
6971
* Stores the task to partition assignments to the metadata store.
7072
* @param partition the system stream partition.
7173
* @param taskNames the task names to which the partition is assigned to.
74+
* @deprecated in favor of {@link #writeTaskPartitionAssignments(Map)}
7275
*/
76+
@Deprecated
7377
public void writeTaskPartitionAssignment(SystemStreamPartition partition, List<String> taskNames) {
74-
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
75-
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
76-
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
77-
String serializedSSPAsJson = serializeSSPToJson(partition);
78-
if (taskNames == null || taskNames.isEmpty()) {
79-
LOG.info("Deleting the key: {} from the metadata store.", partition);
80-
metadataStore.delete(serializedSSPAsJson);
81-
} else {
82-
try {
83-
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
84-
byte[] taskNamesAsBytes = valueSerde.toBytes(taskNamesAsString);
85-
LOG.info("Storing the partition: {} and taskNames: {} into the metadata store.", serializedSSPAsJson, taskNames);
86-
metadataStore.put(serializedSSPAsJson, taskNamesAsBytes);
87-
} catch (Exception e) {
88-
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
78+
writeTaskPartitionAssignments(ImmutableMap.of(partition, taskNames));
79+
}
80+
81+
/**
82+
* Stores the task names to {@link SystemStreamPartition} assignments to the metadata store.
83+
* @param sspToTaskNameMapping the mapped assignments to write to the metadata store. If the task name list is empty,
84+
* then the entry is deleted from the metadata store.
85+
*/
86+
public void writeTaskPartitionAssignments(Map<SystemStreamPartition, List<String>> sspToTaskNameMapping) {
87+
HashMap<String, byte[]> tasksMapping = new HashMap<>();
88+
for (SystemStreamPartition ssp: sspToTaskNameMapping.keySet()) {
89+
// For broadcast streams, a input system stream partition will be mapped to more than one tasks in a
90+
// SamzaContainer. Rather than storing taskName to list of SystemStreamPartitions in metadata store, here
91+
// systemStreamPartition to list of taskNames is stored. This was done due to 1 MB limit on value size in kafka.
92+
List<String> taskNames = sspToTaskNameMapping.get(ssp);
93+
String serializedSSPAsJson = serializeSSPToJson(ssp);
94+
if (CollectionUtils.isEmpty(taskNames)) {
95+
LOG.info("Deleting the task assignment for partition: {} from the metadata store.", ssp);
96+
metadataStore.delete(serializedSSPAsJson);
97+
} else {
98+
try {
99+
String taskNamesAsString = taskNamesMapper.writeValueAsString(taskNames);
100+
LOG.info("Assigning the partition: {} with taskNames: {} in the metadata store.", serializedSSPAsJson, taskNamesAsString);
101+
tasksMapping.put(serializedSSPAsJson, valueSerde.toBytes(taskNamesAsString));
102+
} catch (IOException e) {
103+
throw new SamzaException("Exception occurred when writing task to partition assignment.", e);
104+
}
89105
}
90106
}
107+
metadataStore.putAll(tasksMapping);
91108
}
92109

93110
/**

samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,18 +90,23 @@ public Map<TaskName, Integer> readPartitionMapping() {
9090
*/
9191
public void writePartitionMapping(Map<TaskName, Integer> changelogEntries) {
9292
LOG.debug("Updating changelog information with: ");
93+
HashMap<String, byte[]> changelogEntriesToStore = new HashMap<>();
9394
for (Map.Entry<TaskName, Integer> entry : changelogEntries.entrySet()) {
9495
Preconditions.checkNotNull(entry.getKey());
9596
String taskName = entry.getKey().getTaskName();
9697
if (entry.getValue() != null) {
9798
String changeLogPartitionId = String.valueOf(entry.getValue());
9899
LOG.debug("TaskName: {} to Partition: {}", taskName, entry.getValue());
99-
metadataStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
100+
changelogEntriesToStore.put(taskName, valueSerde.toBytes(changeLogPartitionId));
100101
} else {
101102
LOG.debug("Deleting the TaskName: {}", taskName);
102103
metadataStore.delete(taskName);
103104
}
104105
}
106+
if (!changelogEntries.isEmpty()) {
107+
LOG.info("Storing {} changelog partition assignments", changelogEntries.size());
108+
metadataStore.putAll(changelogEntriesToStore);
109+
}
105110
}
106111

107112
/**

samza-core/src/main/java/org/apache/samza/zk/ZkJobCoordinator.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import com.google.common.annotations.VisibleForTesting;
2222
import com.google.common.base.Preconditions;
23+
import com.google.common.collect.Sets;
2324
import java.io.IOException;
2425
import java.util.ArrayList;
2526
import java.util.HashMap;
@@ -29,6 +30,7 @@
2930
import java.util.Objects;
3031
import java.util.Set;
3132
import java.util.concurrent.atomic.AtomicBoolean;
33+
import java.util.stream.Collectors;
3234
import org.I0Itec.zkclient.IZkStateListener;
3335
import org.apache.samza.SamzaException;
3436
import org.apache.samza.config.Config;
@@ -316,15 +318,22 @@ void loadMetadataResources(JobModel jobModel) {
316318
metadataResourceUtil.createResources();
317319

318320
if (coordinatorStreamStore != null) {
319-
// TODO: SAMZA-2273 - publish configs async
320321
CoordinatorStreamValueSerde jsonSerde = new CoordinatorStreamValueSerde(SetConfig.TYPE);
321322
NamespaceAwareCoordinatorStreamStore configStore =
322323
new NamespaceAwareCoordinatorStreamStore(coordinatorStreamStore, SetConfig.TYPE);
323-
for (Map.Entry<String, String> entry : config.entrySet()) {
324-
byte[] serializedValue = jsonSerde.toBytes(entry.getValue());
325-
configStore.put(entry.getKey(), serializedValue);
324+
325+
// Delete configs from metadata store that are no longer needed.
326+
Set<String> configsToDelete = Sets.difference(configStore.all().keySet(), config.keySet());
327+
for (String configKeyToDelete : configsToDelete) {
328+
LOG.debug("Deleting config: {}", configKeyToDelete);
329+
configStore.delete(configKeyToDelete);
326330
}
327331

332+
// Publish current configs to metadata store.
333+
Map<String, byte[]> configsToStore =
334+
config.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> jsonSerde.toBytes(e.getValue())));
335+
configStore.putAll(configsToStore);
336+
328337
// fan out the startpoints
329338
StartpointManager startpointManager = createStartpointManager();
330339
startpointManager.start();

samza-core/src/main/scala/org/apache/samza/coordinator/JobModelManager.scala

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -239,23 +239,24 @@ object JobModelManager extends Logging {
239239
// taskName to SystemStreamPartitions is done here to wire-in the data to {@see JobModel}.
240240
val sspToTaskNameMap: util.Map[SystemStreamPartition, util.List[String]] = new util.HashMap[SystemStreamPartition, util.List[String]]()
241241

242+
// The task to container mapping that will be written to the coordinator stream by the TaskAssignmentManager.
243+
val taskToContainerModelMap: util.Map[String, ContainerModel] = new util.HashMap[String, ContainerModel]()
244+
242245
for (container <- jobModel.getContainers.values()) {
243246
for ((taskName, taskModel) <- container.getTasks) {
244-
info ("Storing task: %s and container ID: %s into metadata store" format(taskName.getTaskName, container.getId))
245-
taskAssignmentManager.writeTaskContainerMapping(taskName.getTaskName, container.getId, container.getTasks.get(taskName).getTaskMode)
246-
for (partition <- taskModel.getSystemStreamPartitions) {
247-
if (!sspToTaskNameMap.containsKey(partition)) {
248-
sspToTaskNameMap.put(partition, new util.ArrayList[String]())
249-
}
250-
sspToTaskNameMap.get(partition).add(taskName.getTaskName)
247+
taskToContainerModelMap.put(taskName.getTaskName, container)
248+
for (ssp <- taskModel.getSystemStreamPartitions) {
249+
sspToTaskNameMap.putIfAbsent(ssp, new util.ArrayList[String]())
250+
sspToTaskNameMap.get(ssp).add(taskName.getTaskName)
251251
}
252252
}
253253
}
254254

255-
for ((ssp, taskNames) <- sspToTaskNameMap) {
256-
info ("Storing ssp: %s and task: %s into metadata store" format(ssp, taskNames))
257-
taskPartitionAssignmentManager.writeTaskPartitionAssignment(ssp, taskNames)
258-
}
255+
taskAssignmentManager.writeTaskContainerAssignments(taskToContainerModelMap)
256+
info("Stored %d task-to-container assignments in the metadata store." format taskToContainerModelMap.size())
257+
258+
taskPartitionAssignmentManager.writeTaskPartitionAssignments(sspToTaskNameMap)
259+
info("Stored %d partition-to-tasks assignments in the metadata store." format sspToTaskNameMap.size())
259260
}
260261

261262
/**

samza-core/src/test/java/org/apache/samza/container/grouper/task/TestTaskAssignmentManager.java

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,21 +19,28 @@
1919

2020
package org.apache.samza.container.grouper.task;
2121

22+
import com.google.common.collect.ImmutableList;
2223
import com.google.common.collect.ImmutableMap;
2324
import java.util.HashMap;
25+
import java.util.List;
2426
import java.util.Map;
27+
import java.util.stream.Collectors;
2528
import org.apache.samza.config.Config;
2629
import org.apache.samza.config.MapConfig;
30+
import org.apache.samza.container.TaskName;
2731
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStore;
2832
import org.apache.samza.coordinator.metadatastore.CoordinatorStreamStoreTestUtil;
2933
import org.apache.samza.coordinator.metadatastore.NamespaceAwareCoordinatorStreamStore;
3034
import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory;
3135
import org.apache.samza.coordinator.stream.messages.SetTaskContainerMapping;
3236
import org.apache.samza.coordinator.stream.messages.SetTaskModeMapping;
37+
import org.apache.samza.job.model.ContainerModel;
3338
import org.apache.samza.job.model.TaskMode;
39+
import org.apache.samza.job.model.TaskModel;
3440
import org.junit.After;
3541
import org.junit.Before;
3642
import org.junit.Test;
43+
import org.mockito.Mockito;
3744

3845
import static org.junit.Assert.assertEquals;
3946
import static org.junit.Assert.assertTrue;
@@ -59,29 +66,59 @@ public void tearDown() {
5966

6067
@Test
6168
public void testTaskAssignmentManager() {
62-
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1", "Task2", "2", "Task3", "0", "Task4", "1");
69+
String t0 = "Task0";
70+
String t1 = "Task1";
71+
String t2 = "Task2";
72+
String t3 = "Task3";
73+
String t4 = "Task4";
6374

64-
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
65-
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
66-
}
75+
ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0, t3));
76+
ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1, t4));
77+
ContainerModel c2 = createMockContainerModel("2", ImmutableList.of(t2));
78+
79+
ImmutableMap<String, ContainerModel> expectedMap = ImmutableMap.of(t0, c0, t1, c1, t2, c2, t3, c0, t4, c1);
80+
81+
taskAssignmentManager.writeTaskContainerAssignments(expectedMap);
6782

6883
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
6984

70-
assertEquals(expectedMap, localMap);
85+
assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap);
7186

7287
taskAssignmentManager.close();
7388
}
7489

90+
public ContainerModel createMockContainerModel(String containerId, List<String> taskNames) {
91+
ContainerModel mockContainerModel = Mockito.mock(ContainerModel.class);
92+
HashMap<TaskName, TaskModel> taskModelMap = new HashMap<>();
93+
94+
for (String taskName : taskNames) {
95+
TaskModel mockTaskModel = Mockito.mock(TaskModel.class);
96+
TaskName task = new TaskName(taskName);
97+
Mockito.doReturn(task).when(mockTaskModel).getTaskName();
98+
Mockito.doReturn(TaskMode.Active).when(mockTaskModel).getTaskMode();
99+
taskModelMap.put(task, mockTaskModel);
100+
}
101+
102+
Mockito.doReturn(containerId).when(mockContainerModel).getId();
103+
Mockito.doReturn(taskModelMap).when(mockContainerModel).getTasks();
104+
return mockContainerModel;
105+
}
106+
75107
@Test
76108
public void testDeleteMappings() {
77-
Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
109+
//Map<String, String> expectedMap = ImmutableMap.of("Task0", "0", "Task1", "1");
110+
String t0 = "Task0";
111+
String t1 = "Task1";
78112

79-
for (Map.Entry<String, String> entry : expectedMap.entrySet()) {
80-
taskAssignmentManager.writeTaskContainerMapping(entry.getKey(), entry.getValue(), TaskMode.Active);
81-
}
113+
ContainerModel c0 = createMockContainerModel("0", ImmutableList.of(t0));
114+
ContainerModel c1 = createMockContainerModel("1", ImmutableList.of(t1));
115+
116+
ImmutableMap<String, ContainerModel> expectedMap = ImmutableMap.of(t0, c0, t1, c1);
117+
118+
taskAssignmentManager.writeTaskContainerAssignments(expectedMap);
82119

83120
Map<String, String> localMap = taskAssignmentManager.readTaskAssignment();
84-
assertEquals(expectedMap, localMap);
121+
assertEquals(expectedMap.entrySet().stream().collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getId())), localMap);
85122

86123
taskAssignmentManager.deleteTaskContainerMappings(localMap.keySet());
87124

0 commit comments

Comments
 (0)