Skip to content

Commit 001bf7e

Browse files
committed
.
1 parent ff362b8 commit 001bf7e

File tree

9 files changed

+26
-55
lines changed

9 files changed

+26
-55
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -561,9 +561,15 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
561561
return storeRepository;
562562
}
563563

564+
/**
565+
* Resolves and returns the store object for the given store name.
566+
* For user system stores, this returns the SystemStoreAttributes from the parent user store.
567+
* For regular stores and shared system stores, this returns the Store object directly.
568+
*/
564569
public final Object getStore(String storeName) {
565570
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
566571
if (systemStoreType != null) {
572+
// it is a user system store
567573
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
568574
Store userStore = storeRepository.getStore(userStoreName);
569575
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();

clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,10 +71,11 @@ public class DaVinciBackendTest {
7171
private MockedStatic<ClientFactory> mockClientFactory;
7272
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
7373
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
74+
private MockedStatic<ZkClientFactory> mockZkFactory;
7475

7576
@BeforeClass
7677
public void init() {
77-
MockedStatic<ZkClientFactory> mockZkFactory = mockStatic(ZkClientFactory.class);
78+
mockZkFactory = mockStatic(ZkClientFactory.class);
7879
ZkClient mockZkClient = mock(ZkClient.class);
7980
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
8081
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));

clients/da-vinci-client/src/test/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringServiceTest.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import com.linkedin.venice.utils.Utils;
4343
import com.linkedin.venice.utils.concurrent.VeniceConcurrentHashMap;
4444
import io.tehuti.metrics.MetricsRepository;
45+
import java.lang.reflect.Field;
4546
import java.time.Duration;
4647
import java.util.HashMap;
4748
import java.util.HashSet;
@@ -816,7 +817,7 @@ public void testLargestHeartbeatLag() {
816817
}
817818

818819
@Test
819-
public void testTriggerAutoResubscribe() {
820+
public void testTriggerAutoResubscribe() throws Exception {
820821
String store = "foo";
821822
int version = 100;
822823
int partition = 123;
@@ -831,6 +832,11 @@ public void testTriggerAutoResubscribe() {
831832

832833
HeartbeatMonitoringService heartbeatMonitoringService = mock(HeartbeatMonitoringService.class);
833834
KafkaStoreIngestionService kafkaStoreIngestionService = mock(KafkaStoreIngestionService.class);
835+
ReadOnlyStoreRepository metadataRepository = mock(ReadOnlyStoreRepository.class);
836+
doReturn(mock(Store.class)).when(metadataRepository).getStore(store);
837+
Field metadataRepositoryField = HeartbeatMonitoringService.class.getDeclaredField("metadataRepository");
838+
metadataRepositoryField.setAccessible(true);
839+
metadataRepositoryField.set(heartbeatMonitoringService, metadataRepository);
834840
VeniceServerConfig serverConfig = mock(VeniceServerConfig.class);
835841
doReturn(serverConfig).when(heartbeatMonitoringService).getServerConfig();
836842
doReturn(kafkaStoreIngestionService).when(heartbeatMonitoringService).getKafkaStoreIngestionService();

internal/venice-common/src/main/java/com/linkedin/venice/meta/SystemStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -93,9 +93,6 @@ private void throwUnsupportedOperationException(String method) {
9393
*/
9494
private synchronized SystemStoreAttributes fetchAndBackfillSystemStoreAttributes(boolean readAccess) {
9595
SystemStoreAttributes systemStoreAttributes = veniceStore.getSystemStores().get(systemStoreType.getPrefix());
96-
if (veniceStore.getSystemStores().size() > 0) {
97-
System.out.println();
98-
}
9996
if (systemStoreAttributes == null) {
10097
if (readAccess) {
10198
return DEFAULT_READ_ONLY_SYSTEM_STORE_ATTRIBUTE;

internal/venice-common/src/main/java/com/linkedin/venice/pushstatushelper/PushStatusStoreVeniceWriterCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public class PushStatusStoreVeniceWriterCache implements AutoCloseable {
2929
private final Map<String, VeniceWriter> veniceWriters = new VeniceConcurrentHashMap<>();
3030
private final Schema valueSchema;
3131
private final Schema updateSchema;
32-
Function<String, Object> storeResolver;
32+
private final Function<String, Object> storeResolver;
3333

3434
// writerFactory Used for instantiating VeniceWriter
3535
public PushStatusStoreVeniceWriterCache(

internal/venice-common/src/main/java/com/linkedin/venice/system/store/MetaStoreWriter.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,10 @@ public class MetaStoreWriter implements Closeable {
7979
private final PubSubTopicRepository pubSubTopicRepository;
8080
private final long closeTimeoutMs;
8181
private final int numOfConcurrentVwCloseOps;
82-
82+
/*
83+
* Function to resolve store names to Store objects. Used to fetch system store metadata
84+
* for determining the correct RT topic names.
85+
*/
8386
public Function<String, Store> storeResolver;
8487

8588
public MetaStoreWriter(
@@ -372,11 +375,13 @@ VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
372375
int largestUsedRTVersionNumber;
373376
VeniceSystemStoreType type = VeniceSystemStoreType.getSystemStoreType(store.getName());
374377
if (type != null && store.isSystemStore()) {
378+
// metaStoreName is user system store
375379
largestUsedRTVersionNumber = ((SystemStore) store).getVeniceStore().getLargestUsedRTVersionNumber();
376380
} else {
381+
// metaStoreName is zkShared system store
377382
largestUsedRTVersionNumber = store.getLargestUsedRTVersionNumber();
378383
}
379-
String rt = Utils.getRealTimeTopicName(storeResolver.apply(metaStoreName), largestUsedRTVersionNumber);
384+
String rt = Utils.getRealTimeTopicName(store, largestUsedRTVersionNumber);
380385
PubSubTopic rtTopic = pubSubTopicRepository.getTopic(rt);
381386
if (!topicManager.containsTopicAndAllPartitionsAreOnline(rtTopic)) {
382387
throw new VeniceException("Realtime topic: " + rtTopic + " doesn't exist or some partitions are not online");

internal/venice-test-common/src/integrationTest/resources/log4j2-test.xml

Lines changed: 0 additions & 36 deletions
This file was deleted.

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceHelixAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5232,7 +5232,7 @@ void preCheckStorePartitionCountUpdate(String clusterName, Store store, int newP
52325232
if (!store.isHybrid()) {
52335233
String realTimeTopic = Utils.getRealTimeTopicName(store);
52345234
if (realTimeTopic != null) {
5235-
PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
5235+
PubSubTopic realTimeTopicForBatchStore = pubSubTopicRepository.getTopic(realTimeTopic);
52365236
TopicManager topicManager = getTopicManagerForCluster(clusterConfig);
52375237
if (topicManager.containsTopic(realTimeTopicForBatchStore)) {
52385238
int rtPartitionCount = topicManager.getPartitionCount(realTimeTopicForBatchStore);

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceParentHelixAdmin.java

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,6 @@
8080
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REAL_TIME_TOPIC_NAME;
8181
import static com.linkedin.venice.meta.HybridStoreConfigImpl.DEFAULT_REWIND_TIME_IN_SECONDS;
8282
import static com.linkedin.venice.meta.Store.NON_EXISTING_VERSION;
83-
import com.linkedin.venice.meta.SystemStore;
8483
import static com.linkedin.venice.meta.Version.VERSION_SEPARATOR;
8584
import static com.linkedin.venice.meta.VersionStatus.CREATED;
8685
import static com.linkedin.venice.meta.VersionStatus.ERROR;
@@ -232,6 +231,7 @@
232231
import com.linkedin.venice.meta.StoreGraveyard;
233232
import com.linkedin.venice.meta.StoreInfo;
234233
import com.linkedin.venice.meta.StoreVersionInfo;
234+
import com.linkedin.venice.meta.SystemStore;
235235
import com.linkedin.venice.meta.VeniceETLStrategy;
236236
import com.linkedin.venice.meta.Version;
237237
import com.linkedin.venice.meta.VersionStatus;
@@ -5608,15 +5608,7 @@ public int getLargestUsedVersion(String clusterName, String storeName) {
56085608

56095609
@Override
56105610
public int getLargestUsedRTVersion(String clusterName, String storeName) {
5611-
Map<String, ControllerClient> childControllers = getVeniceHelixAdmin().getControllerClientMap(clusterName);
5612-
int aggregatedLargestUsedRTVersionNumber = getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName);
5613-
for (Map.Entry<String, ControllerClient> controller: childControllers.entrySet()) {
5614-
VersionResponse response = controller.getValue().getStoreLargestUsedVersion(clusterName, storeName);
5615-
if (response.getVersion() > aggregatedLargestUsedRTVersionNumber) {
5616-
aggregatedLargestUsedRTVersionNumber = response.getVersion();
5617-
}
5618-
}
5619-
return aggregatedLargestUsedRTVersionNumber;
5611+
return getVeniceHelixAdmin().getLargestUsedRTVersion(clusterName, storeName);
56205612
}
56215613

56225614
/**

0 commit comments

Comments
 (0)