Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.StoreDataChangedListener;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
import com.linkedin.venice.meta.SystemStoreAttributes;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.api.PubSubPosition;
import com.linkedin.venice.pushmonitor.ExecutionStatus;
Expand Down Expand Up @@ -308,7 +309,8 @@ public DaVinciBackend(
ingestionService.getVeniceWriterFactory(),
instanceName,
valueSchemaEntry,
updateSchemaEntry);
updateSchemaEntry,
(this::getRealTimeTopicName));
}

ingestionService.start();
Expand Down Expand Up @@ -560,6 +562,29 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
return storeRepository;
}

/**
* Resolves and returns the store object for the given store name.
* For user system stores, this returns the SystemStoreAttributes from the parent user store.
* For regular stores and shared system stores, this returns the Store object directly.
*/
private String getRealTimeTopicName(String storeName) {
VeniceSystemStoreType systemStoreType = VeniceSystemStoreType.getSystemStoreType(storeName);
if (systemStoreType != null) {
// it is a user system store
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Everything else is good but I just found that this assumption might not be correct. There is a BATCH JOB HB system store which is a ZK shared one. I believe we should exclude that.

String userStoreName = VeniceSystemStoreType.extractUserStoreName(storeName);
Store userStore = storeRepository.getStore(userStoreName);
Map<String, SystemStoreAttributes> systemStores = userStore.getSystemStores();
Comment on lines +575 to +576
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException when userStore is null. If the user store doesn't exist in the repository, calling userStore.getSystemStores() will throw a NullPointerException. Add a null check before accessing userStore methods.

Copilot uses AI. Check for mistakes.
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
if (storeName.startsWith(systemStoreEntries.getKey())) {
return Utils.getRealTimeTopicName(systemStoreEntries.getValue());
}
Comment on lines +577 to +580
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Inefficient string matching using startsWith. The code iterates through all system store entries and uses storeName.startsWith(systemStoreEntries.getKey()) to find a match. This approach is fragile because it relies on prefix matching rather than exact matching. Consider using a direct lookup by comparing the full store name or using a more precise matching strategy to avoid false positives if one system store name is a prefix of another.

Suggested change
for (Map.Entry<String, SystemStoreAttributes> systemStoreEntries: systemStores.entrySet()) {
if (storeName.startsWith(systemStoreEntries.getKey())) {
return Utils.getRealTimeTopicName(systemStoreEntries.getValue());
}
SystemStoreAttributes systemStoreAttributes = systemStores.get(storeName);
if (systemStoreAttributes != null) {
return Utils.getRealTimeTopicName(systemStoreAttributes);

Copilot uses AI. Check for mistakes.
}
return null;
} else {
return Utils.getRealTimeTopicName(storeRepository.getStore(storeName));
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException when calling Utils.getRealTimeTopicName() with a null store. If storeRepository.getStore(storeName) returns null, this will fail. Add a null check before calling the utility method.

Copilot uses AI. Check for mistakes.
}
}

public ObjectCacheBackend getObjectCache() {
return cacheBackend.get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,8 @@ private void asyncStart() {
ingestionService.getVeniceWriterFactory(),
instance.getNodeId(),
valueSchemaEntry,
updateSchemaEntry);
updateSchemaEntry,
storeName -> Utils.getRealTimeTopicName(helixReadOnlyStoreRepository.getStore(storeName)));

// Record replica status in Zookeeper.
// Need to be started before connecting to ZK, otherwise some notification will not be sent by this notifier.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,8 @@ public KafkaStoreIngestionService(
zkSharedSchemaRepository.get(),
pubSubTopicRepository,
serverConfig.getMetaStoreWriterCloseTimeoutInMS(),
serverConfig.getMetaStoreWriterCloseConcurrency());
serverConfig.getMetaStoreWriterCloseConcurrency(),
storeName -> Utils.getRealTimeTopicNameForSystemStore(metadataRepo.getStore(storeName)));
metadataRepo.registerStoreDataChangedListener(new StoreDataChangedListener() {
@Override
public void handleStoreDeleted(Store store) {
Expand Down Expand Up @@ -1486,7 +1487,8 @@ public InternalDaVinciRecordTransformerConfig getInternalRecordTransformerConfig
return storeNameToInternalRecordTransformerConfig.get(storeName);
}

public void attemptToPrintIngestionInfoFor(String storeName, Integer version, Integer partition, String regionName) {
public void attemptToPrintIngestionInfoFor(Store store, Integer version, Integer partition, String regionName) {
String storeName = store.getName();
try {
PubSubTopic versionTopic = pubSubTopicRepository.getTopic(Version.composeKafkaTopic(storeName, version));
StoreIngestionTask storeIngestionTask = getStoreIngestionTask(versionTopic.getName());
Expand All @@ -1508,7 +1510,7 @@ public void attemptToPrintIngestionInfoFor(String storeName, Integer version, In
String infoPrefix = "isCurrentVersion: " + (storeIngestionTask.isCurrentVersion()) + "\n";
if (storeIngestionTask.isHybridMode() && partitionConsumptionState.isEndOfPushReceived()
&& partitionConsumptionState.getLeaderFollowerState() == LeaderFollowerStateType.LEADER) {
ingestingTopic = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(storeName));
ingestingTopic = pubSubTopicRepository.getTopic(Utils.getRealTimeTopicName(store));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this util come with a fallback? I think it is important to have a fallback inside the util method so that it does not throw exception in a random place

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, it does come with a fallback, for stores not using rt versioning, it returns "storeName_rt"

}
PubSubTopicPartition ingestingTopicPartition = new PubSubTopicPartitionImpl(ingestingTopic, partition);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4355,11 +4355,15 @@ private void waitUntilValueSchemaAvailable(int schemaId) throws InterruptedExcep
availableSchemaIds.set(schemaId, new Object());
// TODO: Query metastore for existence of value schema id before doing an update. During bounce of large
// cluster these metastore writes could be spiky
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)) {
if (metaStoreWriter != null && !VeniceSystemStoreType.META_STORE.isSystemStore(storeName)
&& !VeniceSystemStoreUtils.isSystemStore(storeName)) {
String metaStoreName = VeniceSystemStoreType.META_STORE.getSystemStoreName(storeName);
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(Utils.composeRealTimeTopic(metaStoreName));
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
String metaStoreRealTimeTopicName = metaStoreWriter.realTimeTopicNameResolver.apply(metaStoreName);
if (metaStoreRealTimeTopicName != null) {
PubSubTopic metaStoreRT = pubSubTopicRepository.getTopic(metaStoreRealTimeTopicName);
if (getTopicManager(localKafkaServer).containsTopicWithRetries(metaStoreRT, 5)) {
metaStoreWriter.writeInUseValueSchema(storeName, versionNumber, schemaId);
}
}
}
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ void checkAndMaybeLogHeartbeatDelayMap(
heartbeatTs,
currentTimestamp);
getKafkaStoreIngestionService().attemptToPrintIngestionInfoFor(
storeName.getKey(),
metadataRepository.getStore(storeName.getKey()),
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential NullPointerException if metadataRepository.getStore returns null. When the store is not found in the repository, calling attemptToPrintIngestionInfoFor with a null Store will cause a NullPointerException when accessing store.getName() on line 1491. Add a null check before calling attemptToPrintIngestionInfoFor or handle the null case within the method.

Copilot uses AI. Check for mistakes.
version.getKey(),
partition.getKey(),
region.getKey());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import static com.linkedin.venice.utils.DataProviderUtils.BOOLEAN;
import static com.linkedin.venice.utils.DataProviderUtils.allPermutationGenerator;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
Expand All @@ -34,6 +35,7 @@
import com.linkedin.venice.client.store.ClientFactory;
import com.linkedin.venice.exceptions.DiskLimitExhaustedException;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.ZkClientFactory;
import com.linkedin.venice.meta.ClusterInfoProvider;
import com.linkedin.venice.meta.ReadOnlySchemaRepository;
import com.linkedin.venice.meta.SubscriptionBasedReadOnlyStoreRepository;
Expand All @@ -43,16 +45,19 @@
import com.linkedin.venice.schema.writecompute.DerivedSchemaEntry;
import com.linkedin.venice.serialization.avro.SchemaPresenceChecker;
import com.linkedin.venice.service.ICProvider;
import com.linkedin.venice.stats.ZkClientStatusStats;
import com.linkedin.venice.utils.VeniceProperties;
import io.tehuti.metrics.MetricsRepository;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.helix.zookeeper.impl.client.ZkClient;
import org.mockito.MockedConstruction;
import org.mockito.MockedStatic;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
Expand All @@ -66,6 +71,15 @@ public class DaVinciBackendTest {
private MockedStatic<ClientFactory> mockClientFactory;
private MockedConstruction<VeniceMetadataRepositoryBuilder> mockMetadataBuilder;
private MockedConstruction<SchemaPresenceChecker> mockSchemaPresenceChecker;
private MockedStatic<ZkClientFactory> mockZkFactory;

@BeforeClass
public void init() {
mockZkFactory = mockStatic(ZkClientFactory.class);
ZkClient mockZkClient = mock(ZkClient.class);
mockZkFactory.when(() -> ZkClientFactory.newZkClient(anyString())).thenReturn(mockZkClient);
doNothing().when(mockZkClient).subscribeStateChanges(any(ZkClientStatusStats.class));
}
Comment on lines +76 to +82
Copy link

Copilot AI Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Potential test resource leak. The mockZkFactory MockedStatic is created in BeforeClass but never closed in an AfterClass method. MockedStatic instances should be closed to avoid resource leaks and test interference. Add an AfterClass method that calls mockZkFactory.close().

Copilot uses AI. Check for mistakes.

@BeforeMethod
public void setUp() throws Exception {
Expand Down
Loading
Loading