Skip to content

Commit 2cc8cdf

Browse files
committed
address review comments
1 parent ae75269 commit 2cc8cdf

File tree

7 files changed

+12
-24
lines changed

7 files changed

+12
-24
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -567,7 +567,7 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
567567
* For user system stores, this returns the SystemStoreAttributes from the parent user store.
568568
* For regular stores and shared system stores, this returns the Store object directly.
569569
*/
570-
public final Object getStore(String storeName) {
570+
public final String getStore(String storeName) {
571571
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
572572
if (systemStoreType != null) {
573573
// it is a user system store
@@ -576,12 +576,12 @@ public final Object getStore(String storeName) {
576576
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
577577
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
578578
if (storeName.startsWith(systemStoreEntries.getKey())) {
579-
return systemStoreEntries.getValue();
579+
return Utils.getRealTimeTopicName(systemStoreEntries.getValue());
580580
}
581581
}
582582
return null;
583583
} else {
584-
return storeRepository.getStore(storeName);
584+
return Utils.getRealTimeTopicName(storeRepository.getStore(storeName));
585585
}
586586
}
587587

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,7 @@ private void asyncStart() {
362362
instance.getNodeId(),
363363
valueSchemaEntry,
364364
updateSchemaEntry,
365-
helixReadOnlyStoreRepository::getStore);
365+
storeName -> Utils.getRealTimeTopicName(helixReadOnlyStoreRepository.getStore(storeName)));
366366

367367
// Record replica status in Zookeeper.
368368
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.

internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java

Lines changed: 4 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,8 @@
11
package com.linkedin.venice.pushstatushelper;
22

33
import com.linkedin.venice.common.VeniceSystemStoreUtils;
4-
import com.linkedin.venice.meta.Store;
5-
import com.linkedin.venice.meta.StoreInfo;
6-
import com.linkedin.venice.meta.SystemStoreAttributes;
74
import com.linkedin.venice.serialization.avro.AvroProtocolDefinition;
85
import com.linkedin.venice.serialization.avro.VeniceAvroKafkaSerializer;
9-
import com.linkedin.venice.utils.Utils;
106
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
117
import com.linkedin.venice.writer.VeniceWriter;
128
import com.linkedin.venice.writer.VeniceWriterFactory;
@@ -29,32 +25,24 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable {
2925
private final Map<String, VeniceWriter> veniceWriters = new VeniceConcurrentHashMap<>();
3026
private final Schema valueSchema;
3127
private final Schema updateSchema;
32-
private final Function<String, Object> storeResolver;
28+
private final Function<String, String> rtNameResolver;
3329

3430
// writerFactory Used for instantiating VeniceWriter
3531
public PushStatusStoreVeniceWriterCache(
3632
VeniceWriterFactory writerFactory,
3733
Schema valueSchema,
3834
Schema updateSchema,
39-
Function<String, Object> storeResolver) {
35+
Function<String, String> rtNameResolver) {
4036
this.writerFactory = writerFactory;
4137
this.valueSchema = valueSchema;
4238
this.updateSchema = updateSchema;
43-
this.storeResolver = storeResolver;
39+
this.rtNameResolver = rtNameResolver;
4440
}
4541

4642
public VeniceWriter prepareVeniceWriter(String storeName) {
4743
return veniceWriters.computeIfAbsent(storeName, s -> {
48-
Object store = storeResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
49-
String rtTopic;
44+
String rtTopic = rtNameResolver.apply(VeniceSystemStoreUtils.getDaVinciPushStatusStoreName(storeName));
5045

51-
if (store instanceof Store) {
52-
rtTopic = Utils.getRealTimeTopicName((Store) store);
53-
} else if (store instanceof StoreInfo) {
54-
rtTopic = Utils.getRealTimeTopicName((StoreInfo) store);
55-
} else {
56-
rtTopic = Utils.getRealTimeTopicName((SystemStoreAttributes) store);
57-
}
5846
VeniceWriterOptions options = new VeniceWriterOptions.Builder(rtTopic)
5947
.setKeyPayloadSerializer(
6048
new VeniceAvroKafkaSerializer(

internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public PushStatusStoreWriter(
5656
String instanceName,
5757
SchemaEntry valueSchemaEntry,
5858
DerivedSchemaEntry updateSchemaEntry,
59-
Function<String, Object> storeResolver) {
59+
Function<String, String> storeResolver) {
6060
this(
6161
new PushStatusStoreVeniceWriterCache(
6262
writerFactory,

internal/venice-test-common/src/integrationTest/java/com/linkedin/venice/endToEnd/PushStatusStoreTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -292,7 +292,7 @@ public void testIncrementalPushStatusReadingFromPushStatusStoreInController() th
292292
"dummyInstance",
293293
valueSchemaEntry,
294294
updateSchemaEntry,
295-
(storeName) -> controllerClient.getStore(storeName).getStore());
295+
(storeName) -> Utils.getRealTimeTopicName(controllerClient.getStore(storeName).getStore()));
296296

297297
// After deleting the inc push status belonging to just one partition we should expect
298298
// SOIP from the controller since other partition has replicas with EOIP status

services/venice-controller/src/main/java/com/linkedin/venice/controller/UserSystemStoreLifeCycleHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,7 +165,7 @@ public static void deleteSystemStore(
165165
throw new VeniceException("Unknown system store type: " + systemStoreName);
166166
}
167167
// skip truncating system store RT topics if it's parent fabric as it's not created for parent fabric
168-
if (!admin.isParent()) {
168+
if (!admin.isParent() && systemStore != null) {
169169
admin.truncateKafkaTopic(Utils.getRealTimeTopicName(systemStore));
170170
}
171171
} else {

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -683,7 +683,7 @@ public VeniceHelixAdmin(
683683
CONTROLLER_HEARTBEAT_INSTANCE_NAME,
684684
valueSchemaEntry,
685685
updateSchemaEntry,
686-
storeName -> getStore(discoverCluster(storeName), storeName));
686+
storeName -> Utils.getRealTimeTopicName(getStore(discoverCluster(storeName), storeName)));
687687
});
688688

689689
clusterToLiveClusterConfigRepo = new VeniceConcurrentHashMap<>();

0 commit comments

Comments
 (0)