Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice;

import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA;
import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA_V2;
import static com.linkedin.venice.zk.VeniceZkPaths.EXECUTION_IDS;
import static com.linkedin.venice.zk.VeniceZkPaths.PARENT_OFFLINE_PUSHES;
import static com.linkedin.venice.zk.VeniceZkPaths.ROUTERS;
Expand All @@ -23,7 +23,7 @@
@Test
public void testPathsListToTree() {
TreeNode root = ZkCopier.pathsListToTree(getPaths(), BASE_PATH);
testTreeAsserts(root);

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 26 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsListToTree

java.lang.AssertionError: expected [true] but found [false]
}

@Test
Expand Down Expand Up @@ -66,7 +66,7 @@
Assert.assertTrue(list.contains("/venice-parent/storeConfigs"));
Assert.assertTrue(list.contains("/venice-parent/cluster1"));
Assert.assertFalse(list.contains("/venice-parent/cluster1/storeConfigs"));
Assert.assertTrue(list.contains("/venice-parent/cluster1/adminTopicMetadata"));

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (17)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (8)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]

Check failure on line 69 in clients/venice-admin-tool/src/test/java/com/linkedin/venice/TestTreeNode.java

View workflow job for this annotation

GitHub Actions / Clients / UT & CodeCov (11)

TestTreeNode.testPathsTreeToList

java.lang.AssertionError: expected [true] but found [false]
Assert.assertTrue(list.contains("/venice-parent/cluster1/adminTopicMetadata/file1"));
Assert.assertTrue(list.contains("/venice-parent/cluster1/adminTopicMetadata/file2"));
Assert.assertTrue(list.contains("/venice-parent/cluster1/adminTopicMetadata/file2/file3"));
Expand All @@ -89,7 +89,7 @@
TreeNode root = new TreeNode(BASE_PATH);
root.addChild(STORE_CONFIGS);
TreeNode cluster1 = root.addChild(CLUSTER_1);
TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA);
TreeNode adminTopicMetadata = cluster1.addChild(ADMIN_TOPIC_METADATA_V2);
adminTopicMetadata.addChild("file1");
TreeNode file2 = adminTopicMetadata.addChild("file2");
file2.addChild("file3");
Expand All @@ -99,7 +99,7 @@
cluster1.addChild(STORES);
cluster1.addChild(STORE_GRAVEYARD);
TreeNode cluster2 = root.addChild(CLUSTER_2);
cluster2.addChild(ADMIN_TOPIC_METADATA);
cluster2.addChild(ADMIN_TOPIC_METADATA_V2);
cluster2.addChild(EXECUTION_IDS);
cluster2.addChild(PARENT_OFFLINE_PUSHES);
cluster2.addChild(ROUTERS);
Expand All @@ -123,7 +123,7 @@
Assert.assertEquals(child.getChildren().size(), 6);
testContainsChildAsserts(child);
for (TreeNode grandchild: child.getChildren().values()) {
if (grandchild.getName().equals(ADMIN_TOPIC_METADATA)) {
if (grandchild.getName().equals(ADMIN_TOPIC_METADATA_V2)) {
Assert.assertEquals(grandchild.getChildren().size(), 2);
for (TreeNode greatgrandchild: grandchild.getChildren().values()) {
if (greatgrandchild.getName().equals("file2")) {
Expand All @@ -144,7 +144,7 @@
}

private void testContainsChildAsserts(TreeNode child) {
Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA));
Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA_V2));
Assert.assertTrue(child.containsChild(EXECUTION_IDS));
Assert.assertTrue(child.containsChild(PARENT_OFFLINE_PUSHES));
Assert.assertTrue(child.containsChild(ROUTERS));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.linkedin.venice;

import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA;
import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA_V2;
import static com.linkedin.venice.zk.VeniceZkPaths.EXECUTION_IDS;
import static com.linkedin.venice.zk.VeniceZkPaths.PARENT_OFFLINE_PUSHES;
import static com.linkedin.venice.zk.VeniceZkPaths.ROUTERS;
Expand Down Expand Up @@ -132,7 +132,7 @@ private void testVenicePathsContainsAsserts(List<String> venicePaths) {
}

private void testContainsChildAsserts(TreeNode child) {
Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA));
Assert.assertTrue(child.containsChild(ADMIN_TOPIC_METADATA_V2));
Assert.assertTrue(child.containsChild(EXECUTION_IDS));
Assert.assertTrue(child.containsChild(PARENT_OFFLINE_PUSHES));
Assert.assertTrue(child.containsChild(ROUTERS));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2744,7 +2744,6 @@ private ConfigKeys() {
"controller.enable.realtime.topic.versioning";

public static final boolean DEFAULT_CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING = false;
public final static String USE_V2_ADMIN_TOPIC_METADATA = "controller.use.v2.admin.topic.metadata";
public static final String CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE =
"controller.enable.hybrid.store.partition.count.update";
public static final String PUSH_JOB_VIEW_CONFIGS = "push.job.view.configs";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@
* This class contains constants that represent Venice-managed ZooKeeper paths.
*/
public class VeniceZkPaths {
public static final String ADMIN_TOPIC_METADATA = "adminTopicMetadata";
// new admin topic metadata structure is incompatible with the old one, so creating a new "v2" path
public static final String ADMIN_TOPIC_METADATA_V2 = "adminTopicMetadataV2";
public static final String CLUSTER_CONFIG = "ClusterConfig";
public static final String DARK_CLUSTER_CONFIG = "DarkClusterConfig";
Expand All @@ -24,8 +22,8 @@ public class VeniceZkPaths {
public static final String STORE_GRAVEYARD = "StoreGraveyard";

/** Set of all Venice-managed ZooKeeper cluster paths */
private static final Set<String> CLUSTER_ZK_PATHS_MODIFIABLE = new HashSet<>(
Arrays.asList(ADMIN_TOPIC_METADATA, EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES));
private static final Set<String> CLUSTER_ZK_PATHS_MODIFIABLE =
new HashSet<>(Arrays.asList(EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES));
/** @see #CLUSTER_ZK_PATHS_MODIFIABLE */
public static final Set<String> CLUSTER_ZK_PATHS = Collections.unmodifiableSet(CLUSTER_ZK_PATHS_MODIFIABLE);
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,22 @@ public static Map<String, Long> generateMetadataMap(
return metadata;
}

/**
* Generate AdminMetadata object with specified values (V2 format)
*/
public static AdminMetadata generateAdminMetadata(
Optional<Long> localOffset,
Optional<Long> upstreamOffset,
Optional<Long> executionId,
Optional<Long> adminOperationProtocolVersion) {
AdminMetadata metadata = new AdminMetadata();
executionId.ifPresent(metadata::setExecutionId);
localOffset.ifPresent(metadata::setOffset);
upstreamOffset.ifPresent(metadata::setUpstreamOffset);
adminOperationProtocolVersion.ifPresent(metadata::setAdminOperationProtocolVersion);
return metadata;
}

/**
* @return a pair of values to which the specified keys are mapped to {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}
* and {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,6 @@ public class VeniceControllerClusterConfig {

private final Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
private final boolean isRealTimeTopicVersioningEnabled;
private final boolean useV2AdminTopicMetadata;
private final boolean isHybridStorePartitionCountUpdateEnabled;

/**
Expand Down Expand Up @@ -1184,7 +1183,6 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
this.isRealTimeTopicVersioningEnabled = props.getBoolean(
ConfigKeys.CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING,
DEFAULT_CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING);
this.useV2AdminTopicMetadata = props.getBoolean(ConfigKeys.USE_V2_ADMIN_TOPIC_METADATA, false);
this.isHybridStorePartitionCountUpdateEnabled =
props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false);

Expand Down Expand Up @@ -2212,10 +2210,6 @@ public boolean isDarkCluster() {
return isDarkCluster;
}

public boolean useV2AdminTopicMetadata() {
return useV2AdminTopicMetadata;
}

public boolean isProtocolVersionAutoDetectionServiceEnabled() {
return isProtocolVersionAutoDetectionServiceEnabled;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
package com.linkedin.venice.controller;

import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA;
import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA_V2;

import com.linkedin.venice.annotation.VisibleForTesting;
import com.linkedin.venice.controller.kafka.consumer.AdminMetadata;
import com.linkedin.venice.controller.kafka.consumer.AdminMetadataJSONSerializer;
import com.linkedin.venice.controller.kafka.consumer.StringToLongMapJSONSerializer;
import com.linkedin.venice.exceptions.VeniceException;
import com.linkedin.venice.helix.HelixAdapterSerializer;
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
import com.linkedin.venice.utils.HelixUtils;
import com.linkedin.venice.utils.TrieBasedPathResourceRegistry;
import com.linkedin.venice.utils.Utils;
import java.util.HashMap;
import java.util.Map;
import org.apache.helix.AccessOption;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.impl.client.ZkClient;
Expand All @@ -29,16 +24,11 @@ public class ZkAdminTopicMetadataAccessor extends AdminTopicMetadataAccessor {
private static final Logger LOGGER = LogManager.getLogger(ZkAdminTopicMetadataAccessor.class);
private static final int ZK_UPDATE_RETRY = 3;
private static final long ZK_UPDATE_RETRY_DELAY_MS = 1000;
private static final String ADMIN_TOPIC_METADATA_NODE = "/" + ADMIN_TOPIC_METADATA;
private static final String ADMIN_TOPIC_METADATA_SECONDARY_NODE = "/" + ADMIN_TOPIC_METADATA_V2;
private static final String ADMIN_TOPIC_METADATA_NODE_PATH_PATTERN =
getAdminTopicMetadataNodePath(TrieBasedPathResourceRegistry.WILDCARD_MATCH_ANY);
private static final String ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN =
getAdminTopicV2MetadataNodePath(TrieBasedPathResourceRegistry.WILDCARD_MATCH_ANY);

private final ZkBaseDataAccessor<Map<String, Long>> zkMapAccessor;
private final ZkBaseDataAccessor<AdminMetadata> zkMapV2Accessor;
Map<String, Boolean> useV2AdminTopicMetadataMap = new HashMap<>();

public ZkAdminTopicMetadataAccessor(
ZkClient zkClient,
Expand All @@ -48,7 +38,6 @@ public ZkAdminTopicMetadataAccessor(
adapterSerializer.registerSerializer(
ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN,
new AdminMetadataJSONSerializer(clusterConfig.getPubSubPositionDeserializer()));
useV2AdminTopicMetadataMap.put(clusterConfig.getClusterName(), clusterConfig.useV2AdminTopicMetadata());
}

public ZkAdminTopicMetadataAccessor(
Expand All @@ -59,18 +48,12 @@ public ZkAdminTopicMetadataAccessor(
adapterSerializer.registerSerializer(
ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN,
new AdminMetadataJSONSerializer(multiClusterConfig.getPubSubPositionDeserializer()));
multiClusterConfig.getClusters()
.forEach(
clusterName -> useV2AdminTopicMetadataMap
.put(clusterName, multiClusterConfig.getControllerConfig(clusterName).useV2AdminTopicMetadata()));
}

private ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer adapterSerializer) {
adapterSerializer.registerSerializer(ADMIN_TOPIC_METADATA_NODE_PATH_PATTERN, new StringToLongMapJSONSerializer());
// TODO We get two objects (zkClient and adapterSerializer), then call a setter of the first object and pass it the
// second object, effectively mutating the (zkClient) object that was passed in.
zkClient.setZkSerializer(adapterSerializer);
zkMapAccessor = new ZkBaseDataAccessor<>(zkClient);
zkMapV2Accessor = new ZkBaseDataAccessor<>(zkClient);
}

Expand All @@ -81,30 +64,10 @@ private ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer a
*/
@Override
public void updateMetadata(String clusterName, AdminMetadata metadataDelta) {
updateV1Metadata(clusterName, metadataDelta);
updateV2Metadata(clusterName, metadataDelta);
}

private void updateV1Metadata(String clusterName, AdminMetadata metadata) {
String path = getAdminTopicMetadataNodePath(clusterName);
// Use the new toLegacyMap method to get the V1 format
Map<String, Long> metadataDelta = metadata.toLegacyMap();

HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> {
if (currentMetadataMap == null) {
currentMetadataMap = new HashMap<>();
}
LOGGER.info(
"Updating AdminTopicMetadata map for cluster: {}. Current metadata: {}. New delta metadata: {}",
clusterName,
currentMetadataMap,
metadataDelta);
currentMetadataMap.putAll(metadataDelta);
return currentMetadataMap;
});
}

public void updateV2Metadata(String clusterName, AdminMetadata metadata) {
private void updateV2Metadata(String clusterName, AdminMetadata metadata) {
String path = getAdminTopicV2MetadataNodePath(clusterName);

HelixUtils.compareAndUpdate(zkMapV2Accessor, path, ZK_UPDATE_RETRY, currentMetadata -> {
Expand Down Expand Up @@ -147,33 +110,13 @@ public void updateV2Metadata(String clusterName, AdminMetadata metadata) {
*/
@Override
public AdminMetadata getMetadata(String clusterName) {
return useV2AdminTopicMetadataMap.getOrDefault(clusterName, false)
? getV2AdminMetadata(clusterName)
: getV1AdminMetadata(clusterName);
}

private AdminMetadata getV1AdminMetadata(String clusterName) {
int retry = ZK_UPDATE_RETRY;
String path = getAdminTopicMetadataNodePath(clusterName);
while (retry > 0) {
try {
return AdminMetadata.fromLegacyMap(zkMapAccessor.get(path, null, AccessOption.PERSISTENT));
} catch (Exception e) {
LOGGER.warn("Could not get the admin topic metadata map from Zk with: {}. Will retry.", path, e);
retry--;
Utils.sleep(ZK_UPDATE_RETRY_DELAY_MS);
}
}
throw new VeniceException(
"After " + ZK_UPDATE_RETRY + " retries still could not get the admin topic metadata map" + " from Zk with: "
+ path);
return getV2AdminMetadata(clusterName);
}

/**
* Get the AdminMetadata object directly from ZooKeeper
*/
@VisibleForTesting
AdminMetadata getV2AdminMetadata(String clusterName) {
private AdminMetadata getV2AdminMetadata(String clusterName) {
int retry = ZK_UPDATE_RETRY;
String path = getAdminTopicV2MetadataNodePath(clusterName);
while (retry > 0) {
Expand All @@ -194,10 +137,6 @@ AdminMetadata getV2AdminMetadata(String clusterName) {
throw new VeniceException("Could not get the admin topic metadata from Zk with: " + path);
}

static String getAdminTopicMetadataNodePath(String clusterName) {
return HelixUtils.getHelixClusterZkPath(clusterName) + ADMIN_TOPIC_METADATA_NODE;
}

static String getAdminTopicV2MetadataNodePath(String clusterName) {
return HelixUtils.getHelixClusterZkPath(clusterName) + ADMIN_TOPIC_METADATA_SECONDARY_NODE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.pubsub.PubSubTopicPartitionImpl;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition;
import com.linkedin.venice.pubsub.api.DefaultPubSubMessage;
import com.linkedin.venice.pubsub.api.PubSubConsumerAdapter;
import com.linkedin.venice.pubsub.api.PubSubMessage;
Expand Down Expand Up @@ -1053,9 +1054,24 @@ private void persistAdminTopicMetadata() {
if (remoteConsumptionEnabled) {
adminMetadata.setPubSubPosition(localPositionCheckpointAtStartTime);
adminMetadata.setUpstreamPubSubPosition(lastDelegatedPosition);
// Set numeric offsets to keep them in sync with positions
if (localPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) {
adminMetadata.setOffset(((ApacheKafkaOffsetPosition) localPositionCheckpointAtStartTime).getNumericOffset());
}
if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) {
adminMetadata.setUpstreamOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset());
}
} else {
adminMetadata.setPubSubPosition(lastDelegatedPosition);
adminMetadata.setUpstreamPubSubPosition(upstreamPositionCheckpointAtStartTime);
// Set numeric offsets to keep them in sync with positions
if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) {
adminMetadata.setOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset());
}
if (upstreamPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) {
adminMetadata.setUpstreamOffset(
((ApacheKafkaOffsetPosition) upstreamPositionCheckpointAtStartTime).getNumericOffset());
}
}
adminTopicMetadataAccessor.updateMetadata(clusterName, adminMetadata);
lastPersistedPosition = lastDelegatedPosition;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,52 +84,6 @@ public Map<String, Object> toMap() {
return map;
}

/**
* Convert AdminMetadata to legacy Map<String, Long> format for V1 compatibility
* This only includes the Long fields and excludes Position objects
* @return Map<String, Long> containing only the Long fields
* @deprecated Scheduled for removal
*/
public Map<String, Long> toLegacyMap() {
Map<String, Long> legacyMap = new HashMap<>();
if (executionId != null) {
legacyMap.put(AdminTopicMetadataAccessor.EXECUTION_ID_KEY, executionId);
}
if (offset != null) {
legacyMap.put(AdminTopicMetadataAccessor.OFFSET_KEY, offset);
}
if (upstreamOffset != null) {
legacyMap.put(AdminTopicMetadataAccessor.UPSTREAM_OFFSET_KEY, upstreamOffset);
}
if (adminOperationProtocolVersion != null) {
legacyMap.put(AdminTopicMetadataAccessor.ADMIN_OPERATION_PROTOCOL_VERSION_KEY, adminOperationProtocolVersion);
}
return legacyMap;
}

/**
* Factory method to create AdminMetadata from legacy Map<String, Long> format
*/
public static AdminMetadata fromLegacyMap(Map<String, Long> legacyMap) {
AdminMetadata metadata = new AdminMetadata();
if (legacyMap != null) {
metadata.setExecutionId(legacyMap.get(AdminTopicMetadataAccessor.EXECUTION_ID_KEY));
metadata.setOffset(legacyMap.get(AdminTopicMetadataAccessor.OFFSET_KEY));
metadata.setUpstreamOffset(legacyMap.get(AdminTopicMetadataAccessor.UPSTREAM_OFFSET_KEY));
metadata.setAdminOperationProtocolVersion(
legacyMap.get(AdminTopicMetadataAccessor.ADMIN_OPERATION_PROTOCOL_VERSION_KEY));
metadata.setPubSubPosition(
metadata.getOffset().longValue() == UNDEFINED_VALUE
? PubSubSymbolicPosition.EARLIEST
: ApacheKafkaOffsetPosition.of(metadata.getOffset()));
metadata.setUpstreamPubSubPosition(
metadata.getUpstreamOffset().longValue() == UNDEFINED_VALUE
? PubSubSymbolicPosition.EARLIEST
: ApacheKafkaOffsetPosition.of(metadata.getUpstreamOffset()));
}
return metadata;
}

// Getters and setters
public Long getExecutionId() {
return executionId == null ? UNDEFINED_VALUE : executionId;
Expand Down Expand Up @@ -178,12 +132,6 @@ private PubSubPosition getPubSubPosition(PubSubPosition position, Long offset) {
} else {
return PubSubSymbolicPosition.EARLIEST;
}
} else if (offset != null && offset > position.getNumericOffset()) {
LOGGER.warn(
"Offset {} is greater than position {}. Resetting position to offset.",
offset,
position.getNumericOffset());
return ApacheKafkaOffsetPosition.of(offset);
} else {
return position;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,6 @@ public AdminConsumptionStats(MetricsRepository metricsRepository, String name) {
adminConsumeFailCountSensor = registerSensor("failed_admin_messages", new Count());
adminConsumeFailRetriableMessageCountSensor = registerSensor("failed_retriable_admin_messages", new Count());
adminTopicDIVErrorReportCountSensor = registerSensor("admin_message_div_error_report_count", new Count());
registerSensor(
new AsyncGauge(
(ignored, ignored2) -> adminConsumptionFailedPosition == null
? 0L
: adminConsumptionFailedPosition.getNumericOffset(),
"failed_admin_message_offset"));
adminConsumptionCycleDurationMsSensor =
registerSensor("admin_consumption_cycle_duration_ms", new Avg(), new Min(), new Max());
registerSensor(
Expand Down
Loading
Loading