Skip to content

Commit d27a207

Browse files
committed
address review comments
1 parent 57333c5 commit d27a207

File tree

8 files changed

+73
-77
lines changed

8 files changed

+73
-77
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ public KafkaStoreIngestionService(
394394
pubSubTopicRepository,
395395
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
396396
serverConfig.getMetaStoreWriterCloseConcurrency(),
397-
storeName -> metadataRepo.getStore(storeName));
397+
storeName -> Utils.getRealTimeTopicNameForSystemStore(metadataRepo.getStore(storeName)));
398398
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
399399
@Override
400400
public void handleStoreDeleted(Store store) {

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4355,11 +4355,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
43554355
availableSchemaIds.set(schemaId, new Object());
43564356
// TODO: Query metastore for existence of value schema id before doing an update. During bounce of large
43574357
// cluster these metastore writes could be spiky
4358-
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
4358+
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)
4359+
&& !VeniceSystemStoreUtils.isSystemStore(storeName)) {
43594360
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
4360-
Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName);
4361-
if (metaStore != null) {
4362-
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore));
4361+
String metaStoreRealTimeTopicName = metaStoreWriter.realTimeTopicNameResolver.apply(metaStoreName);
4362+
if (metaStoreRealTimeTopicName != null) {
4363+
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName);
43634364
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
43644365
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
43654366
}

clients/da-vinci-client/src/test/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTaskTest.java

Lines changed: 33 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -467,8 +467,9 @@ public static Object[][] sortedInputAndAAConfigProvider() {
467467
private MockStoreVersionConfigs storeAndVersionConfigsUnderTest;
468468

469469
private MetaStoreWriter mockMetaStoreWriter;
470-
private Function<String, Store> mockStoreResolver;
470+
private Function<String, String> mockStoreRealTimeTopicNameResolver;
471471
private Store mockMetaStore;
472+
private String mockMetaStoreRealTimeTopicName;
472473

473474
private static byte[] getRandomKey(Integer partition) {
474475
String randomString = Utils.getUniqueString("KeyForPartition" + partition);
@@ -591,9 +592,12 @@ public void methodSetUp() throws Exception {
591592
kafkaClusterBasedRecordThrottler = new KafkaClusterBasedRecordThrottler(kafkaUrlToRecordsThrottler);
592593

593594
mockMetaStoreWriter = mock(MetaStoreWriter.class);
594-
mockStoreResolver = mock(Function.class);
595+
mockStoreRealTimeTopicNameResolver = mock(Function.class);
595596
mockMetaStore = mock(Store.class);
596-
mockMetaStoreWriter.storeResolver = mockStoreResolver;
597+
when(mockMetaStore.getName()).thenReturn("metaStoreName");
598+
when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1);
599+
mockMetaStoreRealTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore);
600+
mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver;
597601

598602
mockTopicManager = mock(TopicManager.class);
599603
mockTopicManagerRepository = mock(TopicManagerRepository.class);
@@ -1489,15 +1493,16 @@ public void testMetaStoreWriterIntegration() throws Exception {
14891493
Store mockMetaStore = mock(Store.class);
14901494
when(mockMetaStore.getName()).thenReturn(metaStoreName);
14911495
when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1);
1496+
String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore);
14921497

14931498
MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class);
1494-
Function<String, Store> mockStoreResolver = mock(Function.class);
1495-
mockMetaStoreWriter.storeResolver = mockStoreResolver;
1499+
Function<String, String> mockStoreRealTimeTopicNameResolver = mock(Function.class);
1500+
mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver;
14961501

14971502
// Meta store exists
1498-
when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore);
1503+
when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName);
14991504

1500-
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore));
1505+
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName);
15011506
when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true);
15021507

15031508
// Setup StoreIngestionTask with real metaStoreWriter
@@ -1506,7 +1511,7 @@ public void testMetaStoreWriterIntegration() throws Exception {
15061511

15071512
StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> {
15081513
// Verify that the meta store was resolved
1509-
verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName);
1514+
verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName);
15101515

15111516
// Verify that writeInUseValueSchema was called
15121517
verify(mockMetaStoreWriter, timeout(TEST_TIMEOUT_MS))
@@ -1534,8 +1539,8 @@ public void testMetaStoreWriterWhenMetaStoreDoesNotExist() throws Exception {
15341539
int schemaId = 1;
15351540

15361541
MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class);
1537-
Function<String, Store> mockStoreResolver = mock(Function.class);
1538-
mockMetaStoreWriter.storeResolver = mockStoreResolver;
1542+
Function<String, String> mockStoreResolver = mock(Function.class);
1543+
mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver;
15391544

15401545
// Meta store does NOT exist
15411546
when(mockStoreResolver.apply(metaStoreName)).thenReturn(null);
@@ -1573,24 +1578,25 @@ public void testMetaStoreWriterWhenRTTopicDoesNotExist() throws Exception {
15731578
Store mockMetaStore = mock(Store.class);
15741579
when(mockMetaStore.getName()).thenReturn(metaStoreName);
15751580
when(mockMetaStore.getLargestUsedRTVersionNumber()).thenReturn(1);
1581+
String realTimeTopicName = Utils.getRealTimeTopicName(mockMetaStore);
15761582

15771583
MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class);
1578-
Function<String, Store> mockStoreResolver = mock(Function.class);
1579-
mockMetaStoreWriter.storeResolver = mockStoreResolver;
1584+
Function<String, String> mockStoreRealTimeTopicNameResolver = mock(Function.class);
1585+
mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreRealTimeTopicNameResolver;
15801586

15811587
// Meta store exists
1582-
when(mockStoreResolver.apply(metaStoreName)).thenReturn(mockMetaStore);
1588+
when(mockStoreRealTimeTopicNameResolver.apply(metaStoreName)).thenReturn(realTimeTopicName);
15831589

15841590
// But RT topic does NOT exist
1585-
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore));
1591+
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(realTimeTopicName);
15861592
when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(false);
15871593

15881594
localVeniceWriter.broadcastStartOfPush(new HashMap<>());
15891595
localVeniceWriter.put(putKeyFoo, putValue, schemaId).get();
15901596

15911597
StoreIngestionTaskTestConfig config = new StoreIngestionTaskTestConfig(Utils.setOf(PARTITION_FOO), () -> {
15921598
// Verify that the meta store was resolved
1593-
verify(mockStoreResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName);
1599+
verify(mockStoreRealTimeTopicNameResolver, timeout(TEST_TIMEOUT_MS).atLeastOnce()).apply(metaStoreName);
15941600

15951601
// Verify RT topic check was made
15961602
verify(mockTopicManager, timeout(TEST_TIMEOUT_MS).atLeastOnce()).containsTopicWithRetries(metaStoreRTTopic, 5);
@@ -1618,8 +1624,8 @@ public void testMetaStoreWriterSkippedForSystemStore() throws Exception {
16181624
String systemStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeNameWithoutVersionInfo);
16191625

16201626
MetaStoreWriter mockMetaStoreWriter = mock(MetaStoreWriter.class);
1621-
Function<String, Store> mockStoreResolver = mock(Function.class);
1622-
mockMetaStoreWriter.storeResolver = mockStoreResolver;
1627+
Function<String, String> mockStoreResolver = mock(Function.class);
1628+
mockMetaStoreWriter.realTimeTopicNameResolver = mockStoreResolver;
16231629

16241630
localVeniceWriter.broadcastStartOfPush(new HashMap<>());
16251631
localVeniceWriter.put(putKeyFoo, putValue, SCHEMA_ID).get();
@@ -1654,35 +1660,35 @@ public void testMetaStoreResolutionWhenMetaStoreExists() {
16541660
String STORE_NAME = "testStore";
16551661
String META_STORE_NAME = "venice_system_store_meta_store_testStore";
16561662

1657-
when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStore);
1663+
when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(mockMetaStoreRealTimeTopicName);
16581664
when(mockMetaStore.getName()).thenReturn(META_STORE_NAME);
1659-
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(mockMetaStore));
1665+
PubSubTopic metaStoreRTTopic = pubSubTopicRepository.getTopic(mockMetaStoreRealTimeTopicName);
16601666
when(mockTopicManager.containsTopicWithRetries(metaStoreRTTopic, 5)).thenReturn(true);
16611667

16621668
// Execute: Call the method that uses metaStoreWriter.storeResolver
16631669
String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME);
1664-
Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName);
1670+
String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName);
16651671

16661672
// Verify
1667-
assertNotNull(resolvedStore);
1668-
assertEquals(resolvedStore.getName(), META_STORE_NAME);
1669-
verify(mockStoreResolver, times(1)).apply(META_STORE_NAME);
1673+
assertNotNull(realTimeTopicName);
1674+
assertEquals(realTimeTopicName, mockMetaStoreRealTimeTopicName);
1675+
verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME);
16701676
}
16711677

16721678
@Test
16731679
public void testMetaStoreResolutionWhenMetaStoreDoesNotExist() {
16741680
// Setup - metaStore is null
16751681
String STORE_NAME = "testStore";
16761682
String META_STORE_NAME = "venice_system_store_meta_store_testStore";
1677-
when(mockStoreResolver.apply(META_STORE_NAME)).thenReturn(null);
1683+
when(mockStoreRealTimeTopicNameResolver.apply(META_STORE_NAME)).thenReturn(null);
16781684

16791685
// Execute
16801686
String resolvedMetaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(STORE_NAME);
1681-
Store resolvedStore = mockStoreResolver.apply(resolvedMetaStoreName);
1687+
String realTimeTopicName = mockStoreRealTimeTopicNameResolver.apply(resolvedMetaStoreName);
16821688

16831689
// Verify - should handle null gracefully
1684-
assertNull(resolvedStore);
1685-
verify(mockStoreResolver, times(1)).apply(META_STORE_NAME);
1690+
assertNull(realTimeTopicName);
1691+
verify(mockStoreRealTimeTopicNameResolver, times(1)).apply(META_STORE_NAME);
16861692
}
16871693

16881694
@Test

internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java

Lines changed: 8 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@
66
import com.linkedin.venice.helix.HelixReadOnlyZKSharedSchemaRepository;
77
import com.linkedin.venice.meta.Store;
88
import com.linkedin.venice.meta.StoreConfig;
9-
import com.linkedin.venice.meta.SystemStore;
109
import com.linkedin.venice.meta.ZKStore;
1110
import com.linkedin.venice.pubsub.PubSubTopicRepository;
1211
import com.linkedin.venice.pubsub.api.PubSubTopic;
@@ -21,7 +20,6 @@
2120
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
2221
import com.linkedin.venice.systemstore.schemas.StoreValueSchema;
2322
import com.linkedin.venice.systemstore.schemas.StoreValueSchemas;
24-
import com.linkedin.venice.utils.Utils;
2523
import com.linkedin.venice.utils.VeniceResourceCloseResult;
2624
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
2725
import com.linkedin.venice.writer.VeniceWriter;
@@ -83,7 +81,7 @@ public class MetaStoreWriter implements Closeable {
8381
* Function to resolve store names to Store objects. Used to fetch system store metadata
8482
* for determining the correct RT topic names.
8583
*/
86-
public Function<String, Store> storeResolver;
84+
public Function<String, String> realTimeTopicNameResolver;
8785

8886
public MetaStoreWriter(
8987
TopicManager topicManager,
@@ -92,7 +90,7 @@ public MetaStoreWriter(
9290
PubSubTopicRepository pubSubTopicRepository,
9391
long closeTimeoutMs,
9492
int numOfConcurrentVwCloseOps,
95-
Function<String, Store> storeResolver) {
93+
Function<String, String> realTimeTopicNameResolver) {
9694
/**
9795
* TODO: get the write compute schema from the constructor so that this class does not use {@link WriteComputeSchemaConverter}
9896
*/
@@ -106,7 +104,7 @@ public MetaStoreWriter(
106104
pubSubTopicRepository,
107105
closeTimeoutMs,
108106
numOfConcurrentVwCloseOps,
109-
storeResolver);
107+
realTimeTopicNameResolver);
110108
}
111109

112110
MetaStoreWriter(
@@ -117,15 +115,15 @@ public MetaStoreWriter(
117115
PubSubTopicRepository pubSubTopicRepository,
118116
long closeTimeoutMs,
119117
int numOfConcurrentVwCloseOps,
120-
Function<String, Store> storeResolver) {
118+
Function<String, String> realTimeTopicNameResolver) {
121119
this.topicManager = topicManager;
122120
this.writerFactory = writerFactory;
123121
this.derivedComputeSchema = derivedComputeSchema;
124122
this.zkSharedSchemaRepository = schemaRepo;
125123
this.pubSubTopicRepository = pubSubTopicRepository;
126124
this.closeTimeoutMs = closeTimeoutMs;
127125
this.numOfConcurrentVwCloseOps = numOfConcurrentVwCloseOps;
128-
this.storeResolver = storeResolver;
126+
this.realTimeTopicNameResolver = realTimeTopicNameResolver;
129127
}
130128

131129
/**
@@ -371,18 +369,8 @@ Map<String, VeniceWriter> getMetaStoreWriterMap() {
371369

372370
VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
373371
return metaStoreWriterMap.computeIfAbsent(metaStoreName, k -> {
374-
Store store = storeResolver.apply(metaStoreName);
375-
int largestUsedRTVersionNumber;
376-
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName());
377-
if (type != null && store.isSystemStore()) {
378-
// metaStoreName is user system store
379-
largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber();
380-
} else {
381-
// metaStoreName is zkShared system store
382-
largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber();
383-
}
384-
String rt = Utils.getRealTimeTopicName(store, largestUsedRTVersionNumber);
385-
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt);
372+
String metaStoreRealTimeTopicName = realTimeTopicNameResolver.apply(metaStoreName);
373+
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName);
386374
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
387375
throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online");
388376
}
@@ -427,8 +415,7 @@ private void closeVeniceWriter(String metaStoreName, VeniceWriter veniceWriter,
427415
* to write a Control Message to the RT topic, and it could hang if the topic doesn't exist.
428416
* This check is a best-effort since the race condition is still there between topic check and closing VeniceWriter.
429417
*/
430-
PubSubTopic rtTopic =
431-
pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName)));
418+
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(realTimeTopicNameResolver.apply(metaStoreName));
432419
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
433420
LOGGER.info(
434421
"RT topic: {} for meta system store: {} doesn't exist, will only close the internal producer without sending END_OF_SEGMENT control messages",

internal/venice-common/src/main/java/com/linkedin/venice/utils/Utils.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import com.fasterxml.jackson.core.type.TypeReference;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import com.linkedin.avroutil1.compatibility.AvroCompatibilityHelper;
9+
import com.linkedin.venice.common.VeniceSystemStoreType;
910
import com.linkedin.venice.controllerapi.ControllerResponse;
1011
import com.linkedin.venice.exceptions.ConfigurationException;
1112
import com.linkedin.venice.exceptions.ErrorType;
@@ -748,6 +749,19 @@ public static String getSeparateRealTimeTopicName(StoreInfo storeInfo) {
748749
return getSeparateRealTimeTopicName(Utils.getRealTimeTopicName(storeInfo));
749750
}
750751

752+
public static String getRealTimeTopicNameForSystemStore(Store systemStore) {
753+
int largestUsedRTVersionNumber;
754+
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(systemStore.getName());
755+
if (type != null && systemStore.isSystemStore()) {
756+
// systemStore is a user system store
757+
largestUsedRTVersionNumber = ((SystemStore) systemStore).getVeniceStore().getLargestUsedRTVersionNumber();
758+
} else {
759+
// systemStore is a zkShared system store
760+
largestUsedRTVersionNumber = systemStore.getLargestUsedRTVersionNumber();
761+
}
762+
return Utils.getRealTimeTopicName(systemStore, largestUsedRTVersionNumber);
763+
}
764+
751765
public static int calculateTopicHashCode(PubSubTopic topic) {
752766
if (topic.isSeparateRealTimeTopic()) {
753767
String realTimeTopicName = Utils.getRealTimeTopicNameFromSeparateRealTimeTopic(topic.getName());

internal/venice-common/src/test/java/com/linkedin/venice/system/store/MetaStoreWriterTest.java

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.linkedin.venice.schema.GeneratedSchemaID;
2525
import com.linkedin.venice.systemstore.schemas.StoreMetaKey;
2626
import com.linkedin.venice.systemstore.schemas.StoreMetaValue;
27+
import com.linkedin.venice.utils.Utils;
2728
import com.linkedin.venice.utils.VeniceResourceCloseResult;
2829
import com.linkedin.venice.writer.VeniceWriter;
2930
import com.linkedin.venice.writer.VeniceWriterFactory;
@@ -91,14 +92,9 @@ public void testGetOrCreateMetaStoreWriterWithSystemStore() {
9192
pubSubTopicRepository,
9293
5000L,
9394
2,
94-
storeName -> systemStore);
95+
storeName -> Utils.getRealTimeTopicNameForSystemStore(systemStore));
9596

96-
// Execute
97-
VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName);
98-
99-
// Verify
100-
Assert.assertNotNull(result);
101-
verify(veniceStore, times(1)).getLargestUsedRTVersionNumber();
97+
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(systemStoreName));
10298
verify(systemStore, times(1)).getVeniceStore();
10399
}
104100

@@ -139,15 +135,9 @@ public void testGetOrCreateMetaStoreWriterWithRegularStore() {
139135
pubSubTopicRepository,
140136
5000L,
141137
2,
142-
storeName1 -> regularStore);
143-
144-
// Execute
145-
VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName);
138+
storeName1 -> Utils.getRealTimeTopicNameForSystemStore(regularStore));
146139

147-
// Verify
148-
Assert.assertNotNull(result);
149-
verify(regularStore, times(1)).getLargestUsedRTVersionNumber();
150-
// verify(regularStore, never()).getVeniceStore; // Should not call getVeniceStore() for regular stores
140+
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName));
151141
}
152142

153143
@Test
@@ -187,14 +177,9 @@ public void testGetOrCreateMetaStoreWriterWithNonSystemStoreType() {
187177
pubSubTopicRepository,
188178
5000L,
189179
2,
190-
storeName1 -> store);
191-
192-
// Execute
193-
VeniceWriter result = metaStoreWriter.getOrCreateMetaStoreWriter(storeName);
180+
storeName1 -> Utils.getRealTimeTopicNameForSystemStore(store));
194181

195-
// Verify - should use else branch
196-
Assert.assertNotNull(result);
197-
verify(store, times(1)).getLargestUsedRTVersionNumber();
182+
Assert.assertNotNull(metaStoreWriter.getOrCreateMetaStoreWriter(storeName));
198183
}
199184

200185
@Test
@@ -268,7 +253,7 @@ public void testClose(long closeTimeoutMs, int numOfConcurrentVwCloseOps)
268253
pubSubTopicRepository,
269254
closeTimeoutMs,
270255
numOfConcurrentVwCloseOps,
271-
storeName -> metaStore);
256+
storeName -> metaStoreName + "_rt");
272257
Map<String, VeniceWriter> metaStoreWriters = metaStoreWriter.getMetaStoreWriterMap();
273258

274259
List<CompletableFuture<VeniceResourceCloseResult>> completedFutures = new ArrayList<>(20);

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -667,7 +667,7 @@ public VeniceHelixAdmin(
667667
pubSubTopicRepository,
668668
commonConfig.getMetaStoreWriterCloseTimeoutInMS(),
669669
commonConfig.getMetaStoreWriterCloseConcurrency(),
670-
storeName -> getStore(discoverCluster(storeName), storeName));
670+
storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName)));
671671
metaStoreReader = new MetaStoreReader(d2Client, commonConfig.getClusterDiscoveryD2ServiceName());
672672
pushStatusStoreReader = new PushStatusStoreReader(
673673
d2Client,

0 commit comments

Comments
 (0)