Skip to content

Commit 2ba03b2

Browse files
KAFKA-20610: Added abstraction classes for share fetch reads (1/N) (#22389)
The PR adds abstracted classes to move replica manager dependency from share fetch related classes. The change is also needed by Share Group DLQ manager to read data for specific offset. Reviewers: Andrew Schofield <aschofield@confluent.io>, Sushant Mahajan <smahajan@confluent.io>
1 parent 80ee0a9 commit 2ba03b2

4 files changed

Lines changed: 332 additions & 0 deletions

File tree

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.server.share;
18+
19+
import kafka.server.QuotaFactory;
20+
import kafka.server.ReplicaManager;
21+
22+
import org.apache.kafka.common.TopicIdPartition;
23+
import org.apache.kafka.common.requests.FetchRequest;
24+
import org.apache.kafka.server.share.LogReader;
25+
import org.apache.kafka.server.storage.log.FetchParams;
26+
import org.apache.kafka.storage.internals.log.LogReadResult;
27+
28+
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
30+
31+
import java.util.LinkedHashMap;
32+
import java.util.Optional;
33+
import java.util.Set;
34+
import java.util.stream.Collectors;
35+
36+
import scala.Tuple2;
37+
import scala.collection.Seq;
38+
import scala.jdk.javaapi.CollectionConverters;
39+
import scala.runtime.BoxedUnit;
40+
41+
/**
42+
* Implementation of {@link LogReader} that reads records from the local log
43+
* via {@link ReplicaManager#readFromLog}.
44+
*/
45+
public class ReplicaManagerLogReader implements LogReader {
46+
47+
private static final Logger log = LoggerFactory.getLogger(ReplicaManagerLogReader.class);
48+
49+
private final ReplicaManager replicaManager;
50+
51+
public ReplicaManagerLogReader(ReplicaManager replicaManager) {
52+
this.replicaManager = replicaManager;
53+
}
54+
55+
@Override
56+
public LinkedHashMap<TopicIdPartition, LogReadResult> read(
57+
FetchParams fetchParams,
58+
Set<TopicIdPartition> partitionsToFetch,
59+
LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
60+
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes) {
61+
62+
if (partitionsToFetch.isEmpty()) {
63+
return new LinkedHashMap<>();
64+
}
65+
66+
LinkedHashMap<TopicIdPartition, FetchRequest.PartitionData> topicPartitionData = new LinkedHashMap<>();
67+
topicPartitionFetchOffsets.forEach((topicIdPartition, fetchOffset) ->
68+
topicPartitionData.put(topicIdPartition,
69+
new FetchRequest.PartitionData(
70+
topicIdPartition.topicId(),
71+
fetchOffset,
72+
0,
73+
partitionMaxBytes.get(topicIdPartition),
74+
Optional.empty())
75+
));
76+
77+
Seq<Tuple2<TopicIdPartition, LogReadResult>> responseLogResult = replicaManager.readFromLog(
78+
fetchParams,
79+
CollectionConverters.asScala(
80+
partitionsToFetch.stream().map(topicIdPartition ->
81+
new Tuple2<>(topicIdPartition, topicPartitionData.get(topicIdPartition))).collect(Collectors.toList())
82+
),
83+
QuotaFactory.UNBOUNDED_QUOTA,
84+
true);
85+
86+
LinkedHashMap<TopicIdPartition, LogReadResult> responseData = new LinkedHashMap<>();
87+
responseLogResult.foreach(tpLogResult -> {
88+
responseData.put(tpLogResult._1(), tpLogResult._2());
89+
return BoxedUnit.UNIT;
90+
});
91+
92+
log.trace("Data successfully retrieved by replica manager: {}", responseData);
93+
return responseData;
94+
}
95+
}
Lines changed: 122 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package kafka.server.share;
18+
19+
import kafka.cluster.Partition;
20+
import kafka.server.ReplicaManager;
21+
22+
import org.apache.kafka.common.IsolationLevel;
23+
import org.apache.kafka.common.TopicIdPartition;
24+
import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
25+
import org.apache.kafka.common.errors.OffsetNotAvailableException;
26+
import org.apache.kafka.common.record.internal.FileRecords;
27+
import org.apache.kafka.common.requests.ListOffsetsRequest;
28+
import org.apache.kafka.server.partition.PartitionListener;
29+
import org.apache.kafka.server.share.PartitionMetadataProvider;
30+
import org.apache.kafka.server.storage.log.FetchIsolation;
31+
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
32+
import org.apache.kafka.storage.internals.log.LogOffsetSnapshot;
33+
34+
import org.slf4j.Logger;
35+
import org.slf4j.LoggerFactory;
36+
37+
import java.util.Optional;
38+
39+
import scala.Some;
40+
41+
/**
42+
* Implementation of {@link PartitionMetadataProvider} backed by {@link ReplicaManager}.
43+
*/
44+
public class ReplicaManagerPartitionMetadataProvider implements PartitionMetadataProvider {
45+
46+
private static final Logger log = LoggerFactory.getLogger(ReplicaManagerPartitionMetadataProvider.class);
47+
48+
private final ReplicaManager replicaManager;
49+
50+
public ReplicaManagerPartitionMetadataProvider(ReplicaManager replicaManager) {
51+
this.replicaManager = replicaManager;
52+
}
53+
54+
@Override
55+
public long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, int leaderEpoch) {
56+
Optional<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
57+
topicIdPartition.topicPartition(), ListOffsetsRequest.EARLIEST_TIMESTAMP, scala.Option.empty(),
58+
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
59+
if (timestampAndOffset.isEmpty()) {
60+
throw new OffsetNotAvailableException("Offset for earliest timestamp not found for topic partition: " + topicIdPartition);
61+
}
62+
return timestampAndOffset.get().offset;
63+
}
64+
65+
@Override
66+
public long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, int leaderEpoch) {
67+
Optional<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
68+
topicIdPartition.topicPartition(), ListOffsetsRequest.LATEST_TIMESTAMP, new Some<>(IsolationLevel.READ_UNCOMMITTED),
69+
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
70+
if (timestampAndOffset.isEmpty()) {
71+
throw new OffsetNotAvailableException("Offset for latest timestamp not found for topic partition: " + topicIdPartition);
72+
}
73+
return timestampAndOffset.get().offset;
74+
}
75+
76+
@Override
77+
public long offsetForTimestamp(TopicIdPartition topicIdPartition, long timestamp, int leaderEpoch) {
78+
Optional<FileRecords.TimestampAndOffset> timestampAndOffset = replicaManager.fetchOffsetForTimestamp(
79+
topicIdPartition.topicPartition(), timestamp, new Some<>(IsolationLevel.READ_UNCOMMITTED),
80+
Optional.of(leaderEpoch), true).timestampAndOffsetOpt();
81+
if (timestampAndOffset.isEmpty()) {
82+
throw new OffsetNotAvailableException("Offset for timestamp " + timestamp + " not found for topic partition: " + topicIdPartition);
83+
}
84+
return timestampAndOffset.get().offset;
85+
}
86+
87+
@Override
88+
public Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition topicIdPartition, FetchIsolation isolation) {
89+
Partition partition = leaderPartition(topicIdPartition);
90+
LogOffsetSnapshot offsetSnapshot = partition.fetchOffsetSnapshot(Optional.empty(), true);
91+
if (isolation == FetchIsolation.LOG_END)
92+
return Optional.of(offsetSnapshot.logEndOffset());
93+
else if (isolation == FetchIsolation.HIGH_WATERMARK)
94+
return Optional.of(offsetSnapshot.highWatermark());
95+
else
96+
return Optional.of(offsetSnapshot.lastStableOffset());
97+
}
98+
99+
@Override
100+
public int leaderEpoch(TopicIdPartition topicIdPartition) {
101+
return leaderPartition(topicIdPartition).getLeaderEpoch();
102+
}
103+
104+
@Override
105+
public boolean addPartitionListener(TopicIdPartition topicIdPartition, PartitionListener listener) {
106+
return replicaManager.maybeAddListener(topicIdPartition.topicPartition(), listener);
107+
}
108+
109+
@Override
110+
public void removePartitionListener(TopicIdPartition topicIdPartition, PartitionListener listener) {
111+
replicaManager.removeListener(topicIdPartition.topicPartition(), listener);
112+
}
113+
114+
private Partition leaderPartition(TopicIdPartition topicIdPartition) {
115+
Partition partition = replicaManager.getPartitionOrException(topicIdPartition.topicPartition());
116+
if (!partition.isLeader()) {
117+
log.debug("The broker is not the leader for topic partition: {}", topicIdPartition.topicPartition());
118+
throw new NotLeaderOrFollowerException();
119+
}
120+
return partition;
121+
}
122+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share;
18+
19+
import org.apache.kafka.common.TopicIdPartition;
20+
import org.apache.kafka.server.storage.log.FetchParams;
21+
import org.apache.kafka.storage.internals.log.LogReadResult;
22+
23+
import java.util.LinkedHashMap;
24+
import java.util.Set;
25+
26+
/**
27+
* Abstraction for reading records from log.
28+
*/
29+
public interface LogReader {
30+
31+
/**
32+
* Read records for the given partitions starting at the specified offsets.
33+
*
34+
* @param fetchParams The fetch parameters (isolation level, maxBytes, etc.)
35+
* @param partitionsToFetch The set of partitions to actually fetch (after filtering erroneous ones)
36+
* @param topicPartitionFetchOffsets The fetch offset per partition
37+
* @param partitionMaxBytes The max bytes per partition
38+
* @return A map of partition to log read result
39+
*/
40+
LinkedHashMap<TopicIdPartition, LogReadResult> read(
41+
FetchParams fetchParams,
42+
Set<TopicIdPartition> partitionsToFetch,
43+
LinkedHashMap<TopicIdPartition, Long> topicPartitionFetchOffsets,
44+
LinkedHashMap<TopicIdPartition, Integer> partitionMaxBytes);
45+
}
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.kafka.server.share;
18+
19+
import org.apache.kafka.common.TopicIdPartition;
20+
import org.apache.kafka.server.partition.PartitionListener;
21+
import org.apache.kafka.server.storage.log.FetchIsolation;
22+
import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
23+
24+
import java.util.Optional;
25+
26+
/**
27+
* Abstraction for partition metadata operations.
28+
*/
29+
public interface PartitionMetadataProvider {
30+
31+
/**
32+
* Resolve the offset for the earliest timestamp.
33+
*/
34+
long offsetForEarliestTimestamp(TopicIdPartition topicIdPartition, int leaderEpoch);
35+
36+
/**
37+
* Resolve the offset for the latest timestamp.
38+
*/
39+
long offsetForLatestTimestamp(TopicIdPartition topicIdPartition, int leaderEpoch);
40+
41+
/**
42+
* Resolve the offset for a specific timestamp.
43+
*/
44+
long offsetForTimestamp(TopicIdPartition topicIdPartition, long timestamp, int leaderEpoch);
45+
46+
/**
47+
* Get the end offset metadata for minBytes estimation.
48+
*
49+
* @return The end offset metadata based on the given fetch isolation, or
50+
* {@link Optional#empty()} when no local partition metadata is available.
51+
*/
52+
Optional<LogOffsetMetadata> endOffsetMetadata(TopicIdPartition topicIdPartition, FetchIsolation isolation);
53+
54+
/**
55+
* Get the leader epoch for a partition.
56+
*/
57+
int leaderEpoch(TopicIdPartition topicIdPartition);
58+
59+
/**
60+
* Register a partition listener for state change notifications.
61+
*
62+
* @return true if the listener was successfully added.
63+
*/
64+
boolean addPartitionListener(TopicIdPartition topicIdPartition, PartitionListener listener);
65+
66+
/**
67+
* Remove a previously registered partition listener.
68+
*/
69+
void removePartitionListener(TopicIdPartition topicIdPartition, PartitionListener listener);
70+
}

0 commit comments

Comments
 (0)