Skip to content

Commit 763499b

Browse files
committed
Complete V1 admin topic metadata removal and fix test failures
- Remove all V1 methods, constants, and configuration keys - Fix AdminConsumptionTask to synchronize numeric offsets with positions - Add automatic offset sync in AdminMetadata.setPubSubPosition() - Update all tests to use V2 format (AdminMetadata vs Map<String, Long>) - Add generateAdminMetadata() helper for V2 test data creation - System now exclusively uses V2 admin topic metadata format Fixes test failures caused by V1/V2 data format mismatches.
1 parent b18ab05 commit 763499b

File tree

13 files changed

+80
-189
lines changed

13 files changed

+80
-189
lines changed

internal/venice-common/src/main/java/com/linkedin/venice/ConfigKeys.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2744,7 +2744,6 @@ private ConfigKeys() {
27442744
"controller.enable.realtime.topic.versioning";
27452745

27462746
public static final boolean DEFAULT_CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING = false;
2747-
public final static String USE_V2_ADMIN_TOPIC_METADATA = "controller.use.v2.admin.topic.metadata";
27482747
public static final String CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE =
27492748
"controller.enable.hybrid.store.partition.count.update";
27502749
public static final String PUSH_JOB_VIEW_CONFIGS = "push.job.view.configs";

internal/venice-common/src/main/java/com/linkedin/venice/zk/VeniceZkPaths.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,6 @@
1010
* This class contains constants that represent Venice-managed ZooKeeper paths.
1111
*/
1212
public class VeniceZkPaths {
13-
public static final String ADMIN_TOPIC_METADATA = "adminTopicMetadata";
14-
// new admin topic metadata structure is incompatible with the old one, so creating a new "v2" path
1513
public static final String ADMIN_TOPIC_METADATA_V2 = "adminTopicMetadataV2";
1614
public static final String CLUSTER_CONFIG = "ClusterConfig";
1715
public static final String DARK_CLUSTER_CONFIG = "DarkClusterConfig";
@@ -25,7 +23,7 @@ public class VeniceZkPaths {
2523

2624
/** Set of all Venice-managed ZooKeeper cluster paths */
2725
private static final Set<String> CLUSTER_ZK_PATHS_MODIFIABLE = new HashSet<>(
28-
Arrays.asList(ADMIN_TOPIC_METADATA, EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES));
26+
Arrays.asList(EXECUTION_IDS, PARENT_OFFLINE_PUSHES, ROUTERS, STORE_GRAVEYARD, STORES));
2927
/** @see #CLUSTER_ZK_PATHS_MODIFIABLE */
3028
public static final Set<String> CLUSTER_ZK_PATHS = Collections.unmodifiableSet(CLUSTER_ZK_PATHS_MODIFIABLE);
3129
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/AdminTopicMetadataAccessor.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,22 @@ public static Map<String, Long> generateMetadataMap(
4343
return metadata;
4444
}
4545

46+
/**
47+
* Generate AdminMetadata object with specified values (V2 format)
48+
*/
49+
public static AdminMetadata generateAdminMetadata(
50+
Optional<Long> localOffset,
51+
Optional<Long> upstreamOffset,
52+
Optional<Long> executionId,
53+
Optional<Long> adminOperationProtocolVersion) {
54+
AdminMetadata metadata = new AdminMetadata();
55+
executionId.ifPresent(metadata::setExecutionId);
56+
localOffset.ifPresent(metadata::setOffset);
57+
upstreamOffset.ifPresent(metadata::setUpstreamOffset);
58+
adminOperationProtocolVersion.ifPresent(metadata::setAdminOperationProtocolVersion);
59+
return metadata;
60+
}
61+
4662
/**
4763
* @return a pair of values to which the specified keys are mapped to {@linkplain AdminTopicMetadataAccessor#OFFSET_KEY}
4864
* and {@linkplain AdminTopicMetadataAccessor#UPSTREAM_OFFSET_KEY}.

services/venice-controller/src/main/java/com/linkedin/venice/controller/VeniceControllerClusterConfig.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -604,7 +604,6 @@ public class VeniceControllerClusterConfig {
604604

605605
private final Set<PushJobCheckpoints> pushJobUserErrorCheckpoints;
606606
private final boolean isRealTimeTopicVersioningEnabled;
607-
private final boolean useV2AdminTopicMetadata;
608607
private final boolean isHybridStorePartitionCountUpdateEnabled;
609608

610609
/**
@@ -1184,7 +1183,6 @@ public VeniceControllerClusterConfig(VeniceProperties props) {
11841183
this.isRealTimeTopicVersioningEnabled = props.getBoolean(
11851184
ConfigKeys.CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING,
11861185
DEFAULT_CONTROLLER_ENABLE_REAL_TIME_TOPIC_VERSIONING);
1187-
this.useV2AdminTopicMetadata = props.getBoolean(ConfigKeys.USE_V2_ADMIN_TOPIC_METADATA, false);
11881186
this.isHybridStorePartitionCountUpdateEnabled =
11891187
props.getBoolean(ConfigKeys.CONTROLLER_ENABLE_HYBRID_STORE_PARTITION_COUNT_UPDATE, false);
11901188

@@ -2212,9 +2210,6 @@ public boolean isDarkCluster() {
22122210
return isDarkCluster;
22132211
}
22142212

2215-
public boolean useV2AdminTopicMetadata() {
2216-
return useV2AdminTopicMetadata;
2217-
}
22182213

22192214
public boolean isProtocolVersionAutoDetectionServiceEnabled() {
22202215
return isProtocolVersionAutoDetectionServiceEnabled;

services/venice-controller/src/main/java/com/linkedin/venice/controller/ZkAdminTopicMetadataAccessor.java

Lines changed: 3 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,15 @@
11
package com.linkedin.venice.controller;
22

3-
import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA;
43
import static com.linkedin.venice.zk.VeniceZkPaths.ADMIN_TOPIC_METADATA_V2;
54

6-
import com.linkedin.venice.annotation.VisibleForTesting;
75
import com.linkedin.venice.controller.kafka.consumer.AdminMetadata;
86
import com.linkedin.venice.controller.kafka.consumer.AdminMetadataJSONSerializer;
9-
import com.linkedin.venice.controller.kafka.consumer.StringToLongMapJSONSerializer;
107
import com.linkedin.venice.exceptions.VeniceException;
118
import com.linkedin.venice.helix.HelixAdapterSerializer;
129
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
1310
import com.linkedin.venice.utils.HelixUtils;
1411
import com.linkedin.venice.utils.TrieBasedPathResourceRegistry;
1512
import com.linkedin.venice.utils.Utils;
16-
import java.util.HashMap;
17-
import java.util.Map;
1813
import org.apache.helix.AccessOption;
1914
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
2015
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -29,16 +24,11 @@ public class ZkAdminTopicMetadataAccessor extends AdminTopicMetadataAccessor {
2924
private static final Logger LOGGER = LogManager.getLogger(ZkAdminTopicMetadataAccessor.class);
3025
private static final int ZK_UPDATE_RETRY = 3;
3126
private static final long ZK_UPDATE_RETRY_DELAY_MS = 1000;
32-
private static final String ADMIN_TOPIC_METADATA_NODE = "/" + ADMIN_TOPIC_METADATA;
3327
private static final String ADMIN_TOPIC_METADATA_SECONDARY_NODE = "/" + ADMIN_TOPIC_METADATA_V2;
34-
private static final String ADMIN_TOPIC_METADATA_NODE_PATH_PATTERN =
35-
getAdminTopicMetadataNodePath(TrieBasedPathResourceRegistry.WILDCARD_MATCH_ANY);
3628
private static final String ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN =
3729
getAdminTopicV2MetadataNodePath(TrieBasedPathResourceRegistry.WILDCARD_MATCH_ANY);
3830

39-
private final ZkBaseDataAccessor<Map<String, Long>> zkMapAccessor;
4031
private final ZkBaseDataAccessor<AdminMetadata> zkMapV2Accessor;
41-
Map<String, Boolean> useV2AdminTopicMetadataMap = new HashMap<>();
4232

4333
public ZkAdminTopicMetadataAccessor(
4434
ZkClient zkClient,
@@ -48,7 +38,6 @@ public ZkAdminTopicMetadataAccessor(
4838
adapterSerializer.registerSerializer(
4939
ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN,
5040
new AdminMetadataJSONSerializer(clusterConfig.getPubSubPositionDeserializer()));
51-
useV2AdminTopicMetadataMap.put(clusterConfig.getClusterName(), clusterConfig.useV2AdminTopicMetadata());
5241
}
5342

5443
public ZkAdminTopicMetadataAccessor(
@@ -59,18 +48,12 @@ public ZkAdminTopicMetadataAccessor(
5948
adapterSerializer.registerSerializer(
6049
ADMIN_TOPIC_METADATA_NODE_V2_PATH_PATTERN,
6150
new AdminMetadataJSONSerializer(multiClusterConfig.getPubSubPositionDeserializer()));
62-
multiClusterConfig.getClusters()
63-
.forEach(
64-
clusterName -> useV2AdminTopicMetadataMap
65-
.put(clusterName, multiClusterConfig.getControllerConfig(clusterName).useV2AdminTopicMetadata()));
6651
}
6752

6853
private ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer adapterSerializer) {
69-
adapterSerializer.registerSerializer(ADMIN_TOPIC_METADATA_NODE_PATH_PATTERN, new StringToLongMapJSONSerializer());
7054
// TODO We get two objects (zkClient and adapterSerializer), then call a setter of the first object and pass it the
7155
// second object, effectively mutating the (zkClient) object that was passed in.
7256
zkClient.setZkSerializer(adapterSerializer);
73-
zkMapAccessor = new ZkBaseDataAccessor<>(zkClient);
7457
zkMapV2Accessor = new ZkBaseDataAccessor<>(zkClient);
7558
}
7659

@@ -81,30 +64,11 @@ private ZkAdminTopicMetadataAccessor(ZkClient zkClient, HelixAdapterSerializer a
8164
*/
8265
@Override
8366
public void updateMetadata(String clusterName, AdminMetadata metadataDelta) {
84-
updateV1Metadata(clusterName, metadataDelta);
8567
updateV2Metadata(clusterName, metadataDelta);
8668
}
8769

88-
private void updateV1Metadata(String clusterName, AdminMetadata metadata) {
89-
String path = getAdminTopicMetadataNodePath(clusterName);
90-
// Use the new toLegacyMap method to get the V1 format
91-
Map<String, Long> metadataDelta = metadata.toLegacyMap();
9270

93-
HelixUtils.compareAndUpdate(zkMapAccessor, path, ZK_UPDATE_RETRY, currentMetadataMap -> {
94-
if (currentMetadataMap == null) {
95-
currentMetadataMap = new HashMap<>();
96-
}
97-
LOGGER.info(
98-
"Updating AdminTopicMetadata map for cluster: {}. Current metadata: {}. New delta metadata: {}",
99-
clusterName,
100-
currentMetadataMap,
101-
metadataDelta);
102-
currentMetadataMap.putAll(metadataDelta);
103-
return currentMetadataMap;
104-
});
105-
}
106-
107-
public void updateV2Metadata(String clusterName, AdminMetadata metadata) {
71+
private void updateV2Metadata(String clusterName, AdminMetadata metadata) {
10872
String path = getAdminTopicV2MetadataNodePath(clusterName);
10973

11074
HelixUtils.compareAndUpdate(zkMapV2Accessor, path, ZK_UPDATE_RETRY, currentMetadata -> {
@@ -147,33 +111,14 @@ public void updateV2Metadata(String clusterName, AdminMetadata metadata) {
147111
*/
148112
@Override
149113
public AdminMetadata getMetadata(String clusterName) {
150-
return useV2AdminTopicMetadataMap.getOrDefault(clusterName, false)
151-
? getV2AdminMetadata(clusterName)
152-
: getV1AdminMetadata(clusterName);
114+
return getV2AdminMetadata(clusterName);
153115
}
154116

155-
private AdminMetadata getV1AdminMetadata(String clusterName) {
156-
int retry = ZK_UPDATE_RETRY;
157-
String path = getAdminTopicMetadataNodePath(clusterName);
158-
while (retry > 0) {
159-
try {
160-
return AdminMetadata.fromLegacyMap(zkMapAccessor.get(path, null, AccessOption.PERSISTENT));
161-
} catch (Exception e) {
162-
LOGGER.warn("Could not get the admin topic metadata map from Zk with: {}. Will retry.", path, e);
163-
retry--;
164-
Utils.sleep(ZK_UPDATE_RETRY_DELAY_MS);
165-
}
166-
}
167-
throw new VeniceException(
168-
"After " + ZK_UPDATE_RETRY + " retries still could not get the admin topic metadata map" + " from Zk with: "
169-
+ path);
170-
}
171117

172118
/**
173119
* Get the AdminMetadata object directly from ZooKeeper
174120
*/
175-
@VisibleForTesting
176-
AdminMetadata getV2AdminMetadata(String clusterName) {
121+
private AdminMetadata getV2AdminMetadata(String clusterName) {
177122
int retry = ZK_UPDATE_RETRY;
178123
String path = getAdminTopicV2MetadataNodePath(clusterName);
179124
while (retry > 0) {
@@ -194,10 +139,6 @@ AdminMetadata getV2AdminMetadata(String clusterName) {
194139
throw new VeniceException("Could not get the admin topic metadata from Zk with: " + path);
195140
}
196141

197-
static String getAdminTopicMetadataNodePath(String clusterName) {
198-
return HelixUtils.getHelixClusterZkPath(clusterName) + ADMIN_TOPIC_METADATA_NODE;
199-
}
200-
201142
static String getAdminTopicV2MetadataNodePath(String clusterName) {
202143
return HelixUtils.getHelixClusterZkPath(clusterName) + ADMIN_TOPIC_METADATA_SECONDARY_NODE;
203144
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminConsumptionTask.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import com.linkedin.venice.pubsub.api.PubSubMessage;
3131
import com.linkedin.venice.pubsub.api.PubSubMessageHeader;
3232
import com.linkedin.venice.pubsub.api.PubSubMessageHeaders;
33+
import com.linkedin.venice.pubsub.adapter.kafka.common.ApacheKafkaOffsetPosition;
3334
import com.linkedin.venice.pubsub.api.PubSubPosition;
3435
import com.linkedin.venice.pubsub.api.PubSubSymbolicPosition;
3536
import com.linkedin.venice.pubsub.api.PubSubTopic;
@@ -1053,9 +1054,23 @@ private void persistAdminTopicMetadata() {
10531054
if (remoteConsumptionEnabled) {
10541055
adminMetadata.setPubSubPosition(localPositionCheckpointAtStartTime);
10551056
adminMetadata.setUpstreamPubSubPosition(lastDelegatedPosition);
1057+
// Set numeric offsets to keep them in sync with positions
1058+
if (localPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) {
1059+
adminMetadata.setOffset(((ApacheKafkaOffsetPosition) localPositionCheckpointAtStartTime).getNumericOffset());
1060+
}
1061+
if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) {
1062+
adminMetadata.setUpstreamOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset());
1063+
}
10561064
} else {
10571065
adminMetadata.setPubSubPosition(lastDelegatedPosition);
10581066
adminMetadata.setUpstreamPubSubPosition(upstreamPositionCheckpointAtStartTime);
1067+
// Set numeric offsets to keep them in sync with positions
1068+
if (lastDelegatedPosition instanceof ApacheKafkaOffsetPosition) {
1069+
adminMetadata.setOffset(((ApacheKafkaOffsetPosition) lastDelegatedPosition).getNumericOffset());
1070+
}
1071+
if (upstreamPositionCheckpointAtStartTime instanceof ApacheKafkaOffsetPosition) {
1072+
adminMetadata.setUpstreamOffset(((ApacheKafkaOffsetPosition) upstreamPositionCheckpointAtStartTime).getNumericOffset());
1073+
}
10591074
}
10601075
adminTopicMetadataAccessor.updateMetadata(clusterName, adminMetadata);
10611076
lastPersistedPosition = lastDelegatedPosition;

services/venice-controller/src/main/java/com/linkedin/venice/controller/kafka/consumer/AdminMetadata.java

Lines changed: 1 addition & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -84,51 +84,7 @@ public Map<String, Object> toMap() {
8484
return map;
8585
}
8686

87-
/**
88-
* Convert AdminMetadata to legacy Map<String, Long> format for V1 compatibility
89-
* This only includes the Long fields and excludes Position objects
90-
* @return Map<String, Long> containing only the Long fields
91-
* @deprecated Scheduled for removal
92-
*/
93-
public Map<String, Long> toLegacyMap() {
94-
Map<String, Long> legacyMap = new HashMap<>();
95-
if (executionId != null) {
96-
legacyMap.put(AdminTopicMetadataAccessor.EXECUTION_ID_KEY, executionId);
97-
}
98-
if (offset != null) {
99-
legacyMap.put(AdminTopicMetadataAccessor.OFFSET_KEY, offset);
100-
}
101-
if (upstreamOffset != null) {
102-
legacyMap.put(AdminTopicMetadataAccessor.UPSTREAM_OFFSET_KEY, upstreamOffset);
103-
}
104-
if (adminOperationProtocolVersion != null) {
105-
legacyMap.put(AdminTopicMetadataAccessor.ADMIN_OPERATION_PROTOCOL_VERSION_KEY, adminOperationProtocolVersion);
106-
}
107-
return legacyMap;
108-
}
109-
110-
/**
111-
* Factory method to create AdminMetadata from legacy Map<String, Long> format
112-
*/
113-
public static AdminMetadata fromLegacyMap(Map<String, Long> legacyMap) {
114-
AdminMetadata metadata = new AdminMetadata();
115-
if (legacyMap != null) {
116-
metadata.setExecutionId(legacyMap.get(AdminTopicMetadataAccessor.EXECUTION_ID_KEY));
117-
metadata.setOffset(legacyMap.get(AdminTopicMetadataAccessor.OFFSET_KEY));
118-
metadata.setUpstreamOffset(legacyMap.get(AdminTopicMetadataAccessor.UPSTREAM_OFFSET_KEY));
119-
metadata.setAdminOperationProtocolVersion(
120-
legacyMap.get(AdminTopicMetadataAccessor.ADMIN_OPERATION_PROTOCOL_VERSION_KEY));
121-
metadata.setPubSubPosition(
122-
metadata.getOffset().longValue() == UNDEFINED_VALUE
123-
? PubSubSymbolicPosition.EARLIEST
124-
: ApacheKafkaOffsetPosition.of(metadata.getOffset()));
125-
metadata.setUpstreamPubSubPosition(
126-
metadata.getUpstreamOffset().longValue() == UNDEFINED_VALUE
127-
? PubSubSymbolicPosition.EARLIEST
128-
: ApacheKafkaOffsetPosition.of(metadata.getUpstreamOffset()));
129-
}
130-
return metadata;
131-
}
87+
13288

13389
// Getters and setters
13490
public Long getExecutionId() {
@@ -178,12 +134,6 @@ private PubSubPosition getPubSubPosition(PubSubPosition position, Long offset) {
178134
} else {
179135
return PubSubSymbolicPosition.EARLIEST;
180136
}
181-
} else if (offset != null && offset > position.getNumericOffset()) {
182-
LOGGER.warn(
183-
"Offset {} is greater than position {}. Resetting position to offset.",
184-
offset,
185-
position.getNumericOffset());
186-
return ApacheKafkaOffsetPosition.of(offset);
187137
} else {
188138
return position;
189139
}

services/venice-controller/src/main/java/com/linkedin/venice/controller/stats/AdminConsumptionStats.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,12 +74,6 @@ public AdminConsumptionStats(MetricsRepository metricsRepository, String name) {
7474
adminConsumeFailCountSensor = registerSensor("failed_admin_messages", new Count());
7575
adminConsumeFailRetriableMessageCountSensor = registerSensor("failed_retriable_admin_messages", new Count());
7676
adminTopicDIVErrorReportCountSensor = registerSensor("admin_message_div_error_report_count", new Count());
77-
registerSensor(
78-
new AsyncGauge(
79-
(ignored, ignored2) -> adminConsumptionFailedPosition == null
80-
? 0L
81-
: adminConsumptionFailedPosition.getNumericOffset(),
82-
"failed_admin_message_offset"));
8377
adminConsumptionCycleDurationMsSensor =
8478
registerSensor("admin_consumption_cycle_duration_ms", new Avg(), new Min(), new Max());
8579
registerSensor(

services/venice-controller/src/test/java/com/linkedin/venice/controller/AbstractTestVeniceParentHelixAdmin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@ public class AbstractTestVeniceParentHelixAdmin {
6464
protected static final String clusterName = "test-cluster";
6565
static final String regionName = "test-region";
6666
static final String topicName = AdminTopicUtils.getTopicNameFromClusterName(clusterName);
67-
static final String zkMetadataNodePath = ZkAdminTopicMetadataAccessor.getAdminTopicMetadataNodePath(clusterName);
67+
static final String zkMetadataNodePath = ZkAdminTopicMetadataAccessor.getAdminTopicV2MetadataNodePath(clusterName);
6868
static final int partitionId = AdminTopicUtils.ADMIN_TOPIC_PARTITION_ID;
6969
static final AdminOperationSerializer adminOperationSerializer = new AdminOperationSerializer();
7070
static final PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();

0 commit comments

Comments
 (0)