Skip to content

Commit d286572

Browse files
committed
address review comments
1 parent f546e9a commit d286572

File tree

8 files changed

+26
-19
lines changed

8 files changed

+26
-19
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,15 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
561561
return storeRepository;
562562
}
563563

564+
/**
565+
* Resolves and returns the store object for the given store name.
566+
* For user system stores, this returns the SystemStoreAttributes from the parent user store.
567+
* For regular stores and shared system stores, this returns the Store object directly.
568+
*/
564569
public final Object getStore(String storeName) {
565570
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
566571
if (systemStoreType != null) {
572+
// it is a user system store
567573
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
568574
Store userStore = storeRepository.getStore(userStoreName);
569575
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();

clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ public class DaVinciBackendTest {
7171
private MockedStatic<ClientFactory> mockClientFactory;
7272
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
7373
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
74+
private MockedStatic<ZkClientFactory> mockZkFactory;
7475

7576
@BeforeClass
7677
public void init() {
77-
MockedStatic<ZkClientFactory> mockZkFactory = mockStatic(ZkClientFactory.class);
78+
mockZkFactory = mockStatic(ZkClientFactory.class);
7879
ZkClient mockZkClient = mock(ZkClient.class);
7980
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
8081
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));

clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.linkedin.venice.utils.Utils;
4343
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
4444
import io.tehuti.metrics.MetricsRepository;
45+
import java.lang.reflect.Field;
4546
import java.time.Duration;
4647
import java.util.HashMap;
4748
import java.util.HashSet;
@@ -816,7 +817,7 @@ public void testLargestHeartbeatLag() {
816817
}
817818

818819
@Test
819-
public void testTriggerAutoResubscribe() {
820+
public void testTriggerAutoResubscribe() throws Exception {
820821
String store = "foo";
821822
int version = 100;
822823
int partition = 123;
@@ -831,6 +832,11 @@ public void testTriggerAutoResubscribe() {
831832

832833
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
833834
KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class);
835+
ReadOnlyStoreRepository metadataRepository = mock(ReadOnlyStoreRepository.class);
836+
doReturn(mock(Store.class)).when(metadataRepository).getStore(store);
837+
Field metadataRepositoryField = HeartbeatMonitoringService.class.getDeclaredField("metadataRepository");
838+
metadataRepositoryField.setAccessible(true);
839+
metadataRepositoryField.set(heartbeatMonitoringService, metadataRepository);
834840
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
835841
doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig();
836842
doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService();

internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ private void throwUnsupportedOperationException(String method) {
9393
*/
9494
private synchronized SystemStoreAttributes fetchAndBackfillSystemStoreAttributes(boolean readAccess) {
9595
SystemStoreAttributes systemStoreAttributes = veniceStore.getSystemStores().get(systemStoreType.getPrefix());
96-
if (veniceStore.getSystemStores().size() > 0) {
97-
System.out.println();
98-
}
9996
if (systemStoreAttributes == null) {
10097
if (readAccess) {
10198
return DEFAULT_READ_ONLY_SYSTEM_STORE_ATTRIBUTE;

internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable {
2929
private final Map<String, VeniceWriter> veniceWriters = new VeniceConcurrentHashMap<>();
3030
private final Schema valueSchema;
3131
private final Schema updateSchema;
32-
Function<String, Object> storeResolver;
32+
private final Function<String, Object> storeResolver;
3333

3434
// writerFactory Used for instantiating VeniceWriter
3535
public PushStatusStoreVeniceWriterCache(

internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ public class MetaStoreWriter implements Closeable {
7979
private final PubSubTopicRepository pubSubTopicRepository;
8080
private final long closeTimeoutMs;
8181
private final int numOfConcurrentVwCloseOps;
82-
82+
/*
83+
* Function to resolve store names to Store objects. Used to fetch system store metadata
84+
* for determining the correct RT topic names.
85+
*/
8386
public Function<String, Store> storeResolver;
8487

8588
public MetaStoreWriter(
@@ -372,11 +375,13 @@ VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
372375
int largestUsedRTVersionNumber;
373376
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName());
374377
if (type != null && store.isSystemStore()) {
378+
// metaStoreName is user system store
375379
largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber();
376380
} else {
381+
// metaStoreName is zkShared system store
377382
largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber();
378383
}
379-
String rt = Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName), largestUsedRTVersionNumber);
384+
String rt = Utils.getRealTimeTopicName(store, largestUsedRTVersionNumber);
380385
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt);
381386
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
382387
throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online");

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5229,7 +5229,7 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
52295229
if (!store.isHybrid()) {
52305230
String realTimeTopic = Utils.getRealTimeTopicName(store);
52315231
if (realTimeTopic != null) {
5232-
PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
5232+
PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(realTimeTopic);
52335233
TopicManager topicManager = getTopicManagerForCluster(clusterConfig);
52345234
if (topicManager.containsTopic(realTimeTopicForBatchStore)) {
52355235
int rtPartitionCount = topicManager.getPartitionCount(realTimeTopicForBatchStore);

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,6 @@
7979
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME;
8080
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS;
8181
import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION;
82-
import com.linkedin.venice.meta.SystemStore;
8382
import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR;
8483
import static com.linkedin.venice.meta.VersionStatus.CREATED;
8584
import static com.linkedin.venice.meta.VersionStatus.ERROR;
@@ -231,6 +230,7 @@
231230
import com.linkedin.venice.meta.StoreGraveyard;
232231
import com.linkedin.venice.meta.StoreInfo;
233232
import com.linkedin.venice.meta.StoreVersionInfo;
233+
import com.linkedin.venice.meta.SystemStore;
234234
import com.linkedin.venice.meta.Version;
235235
import com.linkedin.venice.meta.VersionStatus;
236236
import com.linkedin.venice.meta.ViewConfig;
@@ -5601,15 +5601,7 @@ public int getLargestUsedVersion(String clusterName, String storeName) {
56015601

56025602
@Override
56035603
public int getLargestUsedRTVersion(String clusterName, String storeName) {
5604-
Map<String, ControllerClient> childControllers = getVeniceHelixAdmin().getControllerClientMap(clusterName);
5605-
int aggregatedLargestUsedRTVersionNumber = getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName);
5606-
for (Map.Entry<String, ControllerClient> controller: childControllers.entrySet()) {
5607-
VersionResponse response = controller.getValue().getStoreLargestUsedVersion(clusterName, storeName);
5608-
if (response.getVersion() > aggregatedLargestUsedRTVersionNumber) {
5609-
aggregatedLargestUsedRTVersionNumber = response.getVersion();
5610-
}
5611-
}
5612-
return aggregatedLargestUsedRTVersionNumber;
5604+
return getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName);
56135605
}
56145606

56155607
/**

0 commit comments

Comments
 (0)