Skip to content

Commit 198e17d

Browse files
committed
refactor: use upstream UnifiedLog analyzeAndValidateRecords [INK-127]
Now that analyzeAndValidateRecords is available on UnifiedLog.java make use of it by tweaking the access to the method.
1 parent 13d2055 commit 198e17d

File tree

2 files changed

+20
-190
lines changed

2 files changed

+20
-190
lines changed

storage/inkless/src/main/java/io/aiven/inkless/produce/UnifiedLog.java

Lines changed: 4 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,13 @@
1616
*/
1717
package io.aiven.inkless.produce;
1818

19-
import org.apache.kafka.common.InvalidRecordException;
2019
import org.apache.kafka.common.TopicIdPartition;
2120
import org.apache.kafka.common.TopicPartition;
2221
import org.apache.kafka.common.compress.Compression;
23-
import org.apache.kafka.common.errors.CorruptRecordException;
24-
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
2522
import org.apache.kafka.common.errors.RecordTooLargeException;
2623
import org.apache.kafka.common.protocol.Errors;
27-
import org.apache.kafka.common.record.CompressionType;
2824
import org.apache.kafka.common.record.MemoryRecords;
29-
import org.apache.kafka.common.record.MutableRecordBatch;
30-
import org.apache.kafka.common.record.Record;
3125
import org.apache.kafka.common.record.RecordBatch;
32-
import org.apache.kafka.common.record.RecordValidationStats;
3326
import org.apache.kafka.common.record.TimestampType;
3427
import org.apache.kafka.common.requests.ProduceResponse;
3528
import org.apache.kafka.common.utils.PrimitiveRef;
@@ -38,8 +31,6 @@
3831
import org.apache.kafka.server.common.RequestLocal;
3932
import org.apache.kafka.server.record.BrokerCompressionType;
4033
import org.apache.kafka.storage.internals.log.AppendOrigin;
41-
import org.apache.kafka.storage.internals.log.LeaderHwChange;
42-
import org.apache.kafka.storage.internals.log.LocalLog;
4334
import org.apache.kafka.storage.internals.log.LogAppendInfo;
4435
import org.apache.kafka.storage.internals.log.LogConfig;
4536
import org.apache.kafka.storage.internals.log.LogValidator;
@@ -48,17 +39,13 @@
4839
import org.slf4j.Logger;
4940
import org.slf4j.LoggerFactory;
5041

51-
import java.nio.ByteBuffer;
52-
import java.util.Collections;
5342
import java.util.Map;
54-
import java.util.Optional;
55-
import java.util.stream.Collectors;
56-
import java.util.stream.StreamSupport;
5743

5844
import static org.apache.kafka.storage.internals.log.UnifiedLog.UNKNOWN_OFFSET;
45+
import static org.apache.kafka.storage.internals.log.UnifiedLog.analyzeAndValidateRecords;
46+
import static org.apache.kafka.storage.internals.log.UnifiedLog.trimInvalidBytes;
5947

60-
// TODO: This method is being migrated to Java and this is a placeholder for when it becomes available
61-
// on UnifiedLog.java from apache/kafka#19030
48+
// Stub to keep code that may eventually be moved to upstream UnifiedLog
6249
class UnifiedLog {
6350
private static final Logger LOGGER = LoggerFactory.getLogger(UnifiedLog.class);
6451

@@ -67,169 +54,6 @@ class UnifiedLog {
6754
// Using 0 as for inkless the leader epoch is not used
6855
public static final int LEADER_EPOCH = LeaderAndIsr.INITIAL_LEADER_EPOCH;
6956

70-
/**
71-
* Validate the following:
72-
* <ol>
73-
* <li> each message matches its CRC
74-
* <li> each message size is valid (if ignoreRecordSize is false)
75-
* <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other
76-
* <li> that the offsets are monotonically increasing (if requireOffsetsMonotonic is true)
77-
* </ol>
78-
* <p>
79-
* Also compute the following quantities:
80-
* <ol>
81-
* <li> First offset in the message set
82-
* <li> Last offset in the message set
83-
* <li> Number of messages
84-
* <li> Number of valid bytes
85-
* <li> Whether the offsets are monotonically increasing
86-
* <li> Whether any compression codec is used (if many are used, then the last one is given)
87-
* </ol>
88-
*/
89-
static LogAppendInfo analyzeAndValidateRecords(TopicPartition topicPartition,
90-
LogConfig config,
91-
MemoryRecords records,
92-
long logStartOffset,
93-
AppendOrigin origin,
94-
boolean ignoreRecordSize,
95-
boolean requireOffsetsMonotonic,
96-
int leaderEpoch,
97-
BrokerTopicStats brokerTopicStats) {
98-
int validBytesCount = 0;
99-
long firstOffset = LocalLog.UNKNOWN_OFFSET;
100-
long lastOffset = -1L;
101-
int lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
102-
CompressionType sourceCompression = CompressionType.NONE;
103-
boolean monotonic = true;
104-
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
105-
boolean readFirstMessage = false;
106-
long lastOffsetOfFirstBatch = -1L;
107-
boolean skipRemainingBatches = false;
108-
109-
for (MutableRecordBatch batch : records.batches()) {
110-
// we only validate V2 and higher to avoid potential compatibility issues with older clients
111-
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && batch.baseOffset() != 0) {
112-
throw new InvalidRecordException("The baseOffset of the record batch in the append to " + topicPartition + " should " +
113-
"be 0, but it is " + batch.baseOffset());
114-
}
115-
116-
/* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost
117-
* leadership. This can happen if sending FETCH responses is slow. There is a race between sending the FETCH
118-
* response and the replica truncating and appending to the log. The replicating replica resolves this issue by only
119-
* persisting up to the current leader epoch used in the fetch request. See KAFKA-18723 for more details.
120-
*/
121-
skipRemainingBatches = skipRemainingBatches || hasHigherPartitionLeaderEpoch(batch, origin, leaderEpoch);
122-
if (skipRemainingBatches) {
123-
LOGGER.info("Skipping batch {} from an origin of {} because its partition leader epoch {} is higher than the replica's current leader epoch {}",
124-
batch, origin, batch.partitionLeaderEpoch(), leaderEpoch);
125-
} else {
126-
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
127-
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
128-
// For magic version 2, we can get the first offset directly from the batch header.
129-
// When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
130-
// case, validation will be more lenient.
131-
// Also indicate whether we have the accurate first offset or not
132-
if (!readFirstMessage) {
133-
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
134-
firstOffset = batch.baseOffset();
135-
}
136-
lastOffsetOfFirstBatch = batch.lastOffset();
137-
readFirstMessage = true;
138-
}
139-
140-
// check that offsets are monotonically increasing
141-
if (lastOffset >= batch.lastOffset()) {
142-
monotonic = false;
143-
}
144-
145-
// update the last offset seen
146-
lastOffset = batch.lastOffset();
147-
lastLeaderEpoch = batch.partitionLeaderEpoch();
148-
149-
// Check if the message sizes are valid.
150-
int batchSize = batch.sizeInBytes();
151-
if (!ignoreRecordSize && batchSize > config.maxMessageSize()) {
152-
brokerTopicStats.topicStats(topicPartition.topic()).bytesRejectedRate().mark(records.sizeInBytes());
153-
brokerTopicStats.allTopicsStats().bytesRejectedRate().mark(records.sizeInBytes());
154-
throw new RecordTooLargeException("The record batch size in the append to " + topicPartition + " is " + batchSize + " bytes " +
155-
"which exceeds the maximum configured value of " + config.maxMessageSize() + ").");
156-
}
157-
158-
// check the validity of the message by checking CRC
159-
if (!batch.isValid()) {
160-
brokerTopicStats.allTopicsStats().invalidMessageCrcRecordsPerSec().mark();
161-
throw new CorruptRecordException("Record is corrupt (stored crc = " + batch.checksum() + ") in topic partition " + topicPartition + ".");
162-
}
163-
164-
if (batch.maxTimestamp() > maxTimestamp) {
165-
maxTimestamp = batch.maxTimestamp();
166-
}
167-
168-
validBytesCount += batchSize;
169-
170-
CompressionType batchCompression = CompressionType.forId(batch.compressionType().id);
171-
// sourceCompression is only used on the leader path, which only contains one batch if version is v2 or messages are compressed
172-
if (batchCompression != CompressionType.NONE) {
173-
sourceCompression = batchCompression;
174-
}
175-
}
176-
177-
if (requireOffsetsMonotonic && !monotonic) {
178-
throw new OffsetOutOfRangeException("Out of order offsets found in append to " + topicPartition + ": " +
179-
StreamSupport.stream(records.records().spliterator(), false)
180-
.map(Record::offset)
181-
.map(String::valueOf)
182-
.collect(Collectors.joining(",")));
183-
}
184-
}
185-
Optional<Integer> lastLeaderEpochOpt = (lastLeaderEpoch != RecordBatch.NO_PARTITION_LEADER_EPOCH)
186-
? Optional.of(lastLeaderEpoch)
187-
: Optional.empty();
188-
189-
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp,
190-
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
191-
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE);
192-
}
193-
194-
/**
195-
* Return true if the record batch has a higher leader epoch than the specified leader epoch
196-
*
197-
* @param batch the batch to validate
198-
* @param origin the reason for appending the record batch
199-
* @param leaderEpoch the epoch to compare
200-
* @return true if the append reason is replication and the batch's partition leader epoch is
201-
* greater than the specified leaderEpoch, otherwise false
202-
*/
203-
private static boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
204-
return origin == AppendOrigin.REPLICATION
205-
&& batch.partitionLeaderEpoch() != RecordBatch.NO_PARTITION_LEADER_EPOCH
206-
&& batch.partitionLeaderEpoch() > leaderEpoch;
207-
}
208-
209-
/**
210-
* Trim any invalid bytes from the end of this message set (if there are any)
211-
*
212-
* @param records The records to trim
213-
* @param info The general information of the message set
214-
* @return A trimmed message set. This may be the same as what was passed in, or it may not.
215-
*/
216-
static MemoryRecords trimInvalidBytes(TopicPartition topicPartition, MemoryRecords records, LogAppendInfo info) {
217-
int validBytes = info.validBytes();
218-
if (validBytes < 0) {
219-
throw new CorruptRecordException("Cannot append record batch with illegal length " + validBytes + " to " +
220-
"log for " + topicPartition + ". A possible cause is a corrupted produce request.");
221-
}
222-
if (validBytes == records.sizeInBytes()) {
223-
return records;
224-
} else {
225-
// trim invalid bytes
226-
ByteBuffer validByteBuffer = records.buffer().duplicate();
227-
validByteBuffer.limit(validBytes);
228-
return MemoryRecords.readableRecords(validByteBuffer);
229-
}
230-
}
231-
232-
23357
// Similar to UnifiedLog.append(...)
23458
static LogAppendInfo validateAndAppendBatch(
23559
final Time time,
@@ -252,6 +76,7 @@ static LogAppendInfo validateAndAppendBatch(
25276
false,
25377
true, // ensures that offsets across batches on the same partition grow monotonically
25478
LEADER_EPOCH,
79+
LOGGER,
25580
brokerTopicStats);
25681

25782
if (appendInfo.validBytes() <= 0) {

storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1078,14 +1078,14 @@ private LogAppendInfo append(MemoryRecords records,
10781078
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
10791079
maybeFlushMetadataFile();
10801080

1081-
LogAppendInfo appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch);
1081+
LogAppendInfo appendInfo = analyzeAndValidateRecords(topicPartition, config(), records, logStartOffset, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch, logger, brokerTopicStats);
10821082

10831083
// return if we have no valid messages or if this is a duplicate of the last appended entry
10841084
if (appendInfo.validBytes() <= 0) {
10851085
return appendInfo;
10861086
} else {
10871087
// trim any invalid bytes or partial messages before appending it to the on-disk log
1088-
final MemoryRecords trimmedRecords = trimInvalidBytes(records, appendInfo);
1088+
final MemoryRecords trimmedRecords = trimInvalidBytes(topicPartition, records, appendInfo);
10891089
// they are valid, insert them in the log
10901090
synchronized (lock) {
10911091
return maybeHandleIOException(
@@ -1420,11 +1420,16 @@ private boolean batchMissingRequiredVerification(MutableRecordBatch batch, Verif
14201420
* <li> Whether any compression codec is used (if many are used, then the last one is given)
14211421
* </ol>
14221422
*/
1423-
private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
1424-
AppendOrigin origin,
1425-
boolean ignoreRecordSize,
1426-
boolean requireOffsetsMonotonic,
1427-
int leaderEpoch) {
1423+
public static LogAppendInfo analyzeAndValidateRecords(TopicPartition topicPartition,
1424+
LogConfig config,
1425+
MemoryRecords records,
1426+
long logStartOffset,
1427+
AppendOrigin origin,
1428+
boolean ignoreRecordSize,
1429+
boolean requireOffsetsMonotonic,
1430+
int leaderEpoch,
1431+
Logger logger,
1432+
BrokerTopicStats brokerTopicStats) {
14281433
int validBytesCount = 0;
14291434
long firstOffset = UnifiedLog.UNKNOWN_OFFSET;
14301435
long lastOffset = -1L;
@@ -1482,11 +1487,11 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
14821487

14831488
// Check if the message sizes are valid.
14841489
int batchSize = batch.sizeInBytes();
1485-
if (!ignoreRecordSize && batchSize > config().maxMessageSize()) {
1490+
if (!ignoreRecordSize && batchSize > config.maxMessageSize()) {
14861491
brokerTopicStats.topicStats(topicPartition.topic()).bytesRejectedRate().mark(records.sizeInBytes());
14871492
brokerTopicStats.allTopicsStats().bytesRejectedRate().mark(records.sizeInBytes());
14881493
throw new RecordTooLargeException("The record batch size in the append to " + topicPartition + " is " + batchSize + " bytes " +
1489-
"which exceeds the maximum configured value of " + config().maxMessageSize() + ").");
1494+
"which exceeds the maximum configured value of " + config.maxMessageSize() + ").");
14901495
}
14911496

14921497
// check the validity of the message by checking CRC
@@ -1534,7 +1539,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
15341539
* @return true if the append reason is replication and the batch's partition leader epoch is
15351540
* greater than the specified leaderEpoch, otherwise false
15361541
*/
1537-
private boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
1542+
private static boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
15381543
return origin == AppendOrigin.REPLICATION
15391544
&& batch.partitionLeaderEpoch() != RecordBatch.NO_PARTITION_LEADER_EPOCH
15401545
&& batch.partitionLeaderEpoch() > leaderEpoch;
@@ -1547,7 +1552,7 @@ private boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin or
15471552
* @param info The general information of the message set
15481553
* @return A trimmed message set. This may be the same as what was passed in, or it may not.
15491554
*/
1550-
private MemoryRecords trimInvalidBytes(MemoryRecords records, LogAppendInfo info) {
1555+
public static MemoryRecords trimInvalidBytes(TopicPartition topicPartition, MemoryRecords records, LogAppendInfo info) {
15511556
int validBytes = info.validBytes();
15521557
if (validBytes < 0) {
15531558
throw new CorruptRecordException("Cannot append record batch with illegal length " + validBytes + " to " +

0 commit comments

Comments
 (0)