File tree Expand file tree Collapse file tree 5 files changed +23
-5
lines changed
clients/da-vinci-client/src
main/java/com/linkedin/davinci
test/java/com/linkedin/davinci/stats/ingestion/heartbeat
internal/venice-common/src/main/java/com/linkedin/venice
services/venice-controller/src/main/java/com/linkedin/venice/controller Expand file tree Collapse file tree 5 files changed +23
-5
lines changed Original file line number Diff line number Diff line change @@ -561,6 +561,11 @@ public SubscriptionBasedReadOnlyStoreRepository getStoreRepository() {
561561 return storeRepository ;
562562 }
563563
564+ /**
565+ * Resolves and returns the store object for the given store name.
566+ * For user system stores, this returns the SystemStoreAttributes from the parent user store.
567+ * For regular stores and shared system stores, this returns the Store object directly.
568+ */
564569 public final Object getStore (String storeName ) {
565570 VeniceSystemStoreType systemStoreType = VeniceSystemStoreType .getSystemStoreType (storeName );
566571 if (systemStoreType != null ) {
Original file line number Diff line number Diff line change 4242import com .linkedin .venice .utils .Utils ;
4343import com .linkedin .venice .utils .concurrent .VeniceConcurrentHashMap ;
4444import io .tehuti .metrics .MetricsRepository ;
45+ import java .lang .reflect .Field ;
4546import java .time .Duration ;
4647import java .util .HashMap ;
4748import java .util .HashSet ;
@@ -816,7 +817,7 @@ public void testLargestHeartbeatLag() {
816817 }
817818
818819 @ Test
819- public void testTriggerAutoResubscribe () {
820+ public void testTriggerAutoResubscribe () throws Exception {
820821 String store = "foo" ;
821822 int version = 100 ;
822823 int partition = 123 ;
@@ -831,6 +832,11 @@ public void testTriggerAutoResubscribe() {
831832
832833 HeartbeatMonitoringService heartbeatMonitoringService = mock (HeartbeatMonitoringService .class );
833834 KafkaStoreIngestionService kafkaStoreIngestionService = mock (KafkaStoreIngestionService .class );
835+ ReadOnlyStoreRepository metadataRepository = mock (ReadOnlyStoreRepository .class );
836+ doReturn (mock (Store .class )).when (metadataRepository ).getStore (store );
837+ Field metadataRepositoryField = HeartbeatMonitoringService .class .getDeclaredField ("metadataRepository" );
838+ metadataRepositoryField .setAccessible (true );
839+ metadataRepositoryField .set (heartbeatMonitoringService , metadataRepository );
834840 VeniceServerConfig serverConfig = mock (VeniceServerConfig .class );
835841 doReturn (serverConfig ).when (heartbeatMonitoringService ).getServerConfig ();
836842 doReturn (kafkaStoreIngestionService ).when (heartbeatMonitoringService ).getKafkaStoreIngestionService ();
Original file line number Diff line number Diff line change @@ -52,9 +52,11 @@ public VeniceWriter prepareVeniceWriter(String storeName) {
5252 rtTopic = Utils .getRealTimeTopicName ((Store ) store );
5353 } else if (store instanceof StoreInfo ) {
5454 rtTopic = Utils .getRealTimeTopicName ((StoreInfo ) store );
55- } else {
55+ } else // if (store instanceof SystemStoreAttributes) {
5656 rtTopic = Utils .getRealTimeTopicName ((SystemStoreAttributes ) store );
57- }
57+ // } else {
58+ // throw new VeniceException("Unexpected store type for: " + storeName + ", type: " + store.getClass().getName());
59+ // }
5860 VeniceWriterOptions options = new VeniceWriterOptions .Builder (rtTopic )
5961 .setKeyPayloadSerializer (
6062 new VeniceAvroKafkaSerializer (
Original file line number Diff line number Diff line change @@ -79,7 +79,10 @@ public class MetaStoreWriter implements Closeable {
7979 private final PubSubTopicRepository pubSubTopicRepository ;
8080 private final long closeTimeoutMs ;
8181 private final int numOfConcurrentVwCloseOps ;
82-
82+ /*
83+ * Function to resolve store names to Store objects. Used to fetch system store metadata
84+ * for determining the correct RT topic names.
85+ */
8386 public Function <String , Store > storeResolver ;
8487
8588 public MetaStoreWriter (
@@ -372,8 +375,10 @@ VeniceWriter getOrCreateMetaStoreWriter(String metaStoreName) {
372375 int largestUsedRTVersionNumber ;
373376 VeniceSystemStoreType type = VeniceSystemStoreType .getSystemStoreType (store .getName ());
374377 if (type != null && store .isSystemStore ()) {
378+ // metaStoreName is user system store
375379 largestUsedRTVersionNumber = ((SystemStore ) store ).getVeniceStore ().getLargestUsedRTVersionNumber ();
376380 } else {
381+ // metaStoreName is zkShared system store
377382 largestUsedRTVersionNumber = store .getLargestUsedRTVersionNumber ();
378383 }
379384 String rt = Utils .getRealTimeTopicName (store , largestUsedRTVersionNumber );
Original file line number Diff line number Diff line change 7979import static com .linkedin .venice .meta .HybridStoreConfigImpl .DEFAULT_REAL_TIME_TOPIC_NAME ;
8080import static com .linkedin .venice .meta .HybridStoreConfigImpl .DEFAULT_REWIND_TIME_IN_SECONDS ;
8181import static com .linkedin .venice .meta .Store .NON_EXISTING_VERSION ;
82- import com .linkedin .venice .meta .SystemStore ;
8382import static com .linkedin .venice .meta .Version .VERSION_SEPARATOR ;
8483import static com .linkedin .venice .meta .VersionStatus .CREATED ;
8584import static com .linkedin .venice .meta .VersionStatus .ERROR ;
231230import com .linkedin .venice .meta .StoreGraveyard ;
232231import com .linkedin .venice .meta .StoreInfo ;
233232import com .linkedin .venice .meta .StoreVersionInfo ;
233+ import com .linkedin .venice .meta .SystemStore ;
234234import com .linkedin .venice .meta .Version ;
235235import com .linkedin .venice .meta .VersionStatus ;
236236import com .linkedin .venice .meta .ViewConfig ;
You can’t perform that action at this time.
0 commit comments