Skip to content

Commit ff362b8

Browse files
committed
extend rt versioning to system stores
1 parent bd26e99 commit ff362b8

File tree

30 files changed

+742
-139
lines changed

30 files changed

+742
-139
lines changed

clients/da-vinci-client/src/main/java/com/linkedin/davinci/DaVinciBackend.java

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,7 @@
5757
import com.linkedin.venice.meta.Store;
5858
import com.linkedin.venice.meta.StoreDataChangedListener;
5959
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
60+
import com.linkedin.venice.meta.SystemStoreAttributes;
6061
import com.linkedin.venice.meta.Version;
6162
import com.linkedin.venice.pubsub.api.PubSubPosition;
6263
import com.linkedin.venice.pushmonitor.ExecutionStatus;
@@ -308,7 +309,8 @@ public DaVinciBackend(
308309
ingestionService.getVeniceWriterFactory(),
309310
instanceName,
310311
valueSchemaEntry,
311-
updateSchemaEntry);
312+
updateSchemaEntry,
313+
(this::getStore));
312314
}
313315

314316
ingestionService.start();
@@ -559,6 +561,23 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
559561
return storeRepository;
560562
}
561563

564+
public final Object getStore(String storeName) {
565+
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
566+
if (systemStoreType != null) {
567+
String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
568+
Store userStore = storeRepository.getStore(userStoreName);
569+
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
570+
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
571+
if (storeName.startsWith(systemStoreEntries.getKey())) {
572+
return systemStoreEntries.getValue();
573+
}
574+
}
575+
return null;
576+
} else {
577+
return storeRepository.getStore(storeName);
578+
}
579+
}
580+
562581
public ObjectCacheBackend getObjectCache() {
563582
return cacheBackend.get();
564583
}

clients/da-vinci-client/src/main/java/com/linkedin/davinci/helix/HelixParticipationService.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -361,7 +361,8 @@ private void asyncStart() {
361361
ingestionService.getVeniceWriterFactory(),
362362
instance.getNodeId(),
363363
valueSchemaEntry,
364-
updateSchemaEntry);
364+
updateSchemaEntry,
365+
helixReadOnlyStoreRepository::getStore);
365366

366367
// Record replica status in Zookeeper.
367368
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/KafkaStoreIngestionService.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -393,7 +393,8 @@ public KafkaStoreIngestionService(
393393
zkSharedSchemaRepository.get(),
394394
pubSubTopicRepository,
395395
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
396-
serverConfig.getMetaStoreWriterCloseConcurrency());
396+
serverConfig.getMetaStoreWriterCloseConcurrency(),
397+
storeName -> metadataRepo.getStore(storeName));
397398
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
398399
@Override
399400
public void handleStoreDeleted(Store store) {
@@ -1486,7 +1487,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig
14861487
return storeNameToInternalRecordTransformerConfig.get(storeName);
14871488
}
14881489

1489-
public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) {
1490+
public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) {
1491+
String storeName = store.getName();
14901492
try {
14911493
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
14921494
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
@@ -1508,7 +1510,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
15081510
String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n";
15091511
if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived()
15101512
&& partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) {
1511-
ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName));
1513+
ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
15121514
}
15131515
PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition);
15141516

clients/da-vinci-client/src/main/java/com/linkedin/davinci/kafka/consumer/StoreIngestionTask.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4342,9 +4342,12 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
43424342
// cluster these metastore writes could be spiky
43434343
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
43444344
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
4345-
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
4346-
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
4347-
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
4345+
Store metaStore = metaStoreWriter.storeResolver.apply(metaStoreName);
4346+
if (metaStore != null) {
4347+
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(metaStore));
4348+
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
4349+
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
4350+
}
43484351
}
43494352
}
43504353
return;

clients/da-vinci-client/src/main/java/com/linkedin/davinci/stats/ingestion/heartbeat/HeartbeatMonitoringService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -647,7 +647,7 @@ void checkAndMaybeLogHeartbeatDelayMap(
647647
heartbeatTs,
648648
currentTimestamp);
649649
getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor(
650-
storeName.getKey(),
650+
metadataRepository.getStore(storeName.getKey()),
651651
version.getKey(),
652652
partition.getKey(),
653653
region.getKey());

clients/da-vinci-client/src/test/java/com/linkedin/davinci/DaVinciBackendTest.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
1212
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
1313
import static org.mockito.ArgumentMatchers.any;
14+
import static org.mockito.ArgumentMatchers.anyString;
1415
import static org.mockito.Mockito.clearInvocations;
1516
import static org.mockito.Mockito.doNothing;
1617
import static org.mockito.Mockito.mock;
@@ -34,6 +35,7 @@
3435
import com.linkedin.venice.client.store.ClientFactory;
3536
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
3637
import com.linkedin.venice.exceptions.VeniceException;
38+
import com.linkedin.venice.helix.ZkClientFactory;
3739
import com.linkedin.venice.meta.ClusterInfoProvider;
3840
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
3941
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
@@ -43,16 +45,19 @@
4345
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
4446
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
4547
import com.linkedin.venice.service.ICProvider;
48+
import com.linkedin.venice.stats.ZkClientStatusStats;
4649
import com.linkedin.venice.utils.VeniceProperties;
4750
import io.tehuti.metrics.MetricsRepository;
4851
import java.util.Optional;
4952
import java.util.Properties;
5053
import java.util.concurrent.ExecutionException;
5154
import java.util.concurrent.TimeUnit;
5255
import java.util.concurrent.TimeoutException;
56+
import org.apache.helix.zookeeper.impl.client.ZkClient;
5357
import org.mockito.MockedConstruction;
5458
import org.mockito.MockedStatic;
5559
import org.testng.annotations.AfterMethod;
60+
import org.testng.annotations.BeforeClass;
5661
import org.testng.annotations.BeforeMethod;
5762
import org.testng.annotations.DataProvider;
5863
import org.testng.annotations.Test;
@@ -67,6 +72,14 @@ public class DaVinciBackendTest {
6772
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
6873
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
6974

75+
@BeforeClass
76+
public void init() {
77+
MockedStatic<ZkClientFactory> mockZkFactory = mockStatic(ZkClientFactory.class);
78+
ZkClient mockZkClient = mock(ZkClient.class);
79+
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
80+
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));
81+
}
82+
7083
@BeforeMethod
7184
public void setUp() throws Exception {
7285
ClientConfig clientConfig = new ClientConfig(STORE_NAME).setVeniceURL("http://localhost:7777")

0 commit comments

Comments
 (0)