Skip to content

Commit 54804c0

Browse files
committed
refactor: use upstream UnifiedLog analyzeAndValidateRecords [INK-127]
Now that analyzeAndValidateRecords is available on UnifiedLog.java make use of it by tweaking the access to the method.
1 parent a44e941 commit 54804c0

File tree

2 files changed

+26
-196
lines changed

2 files changed

+26
-196
lines changed

storage/inkless/src/main/java/io/aiven/inkless/produce/UnifiedLog.java

Lines changed: 4 additions & 179 deletions
Original file line numberDiff line numberDiff line change
@@ -16,20 +16,13 @@
1616
*/
1717
package io.aiven.inkless.produce;
1818

19-
import org.apache.kafka.common.InvalidRecordException;
2019
import org.apache.kafka.common.TopicIdPartition;
2120
import org.apache.kafka.common.TopicPartition;
2221
import org.apache.kafka.common.compress.Compression;
23-
import org.apache.kafka.common.errors.CorruptRecordException;
24-
import org.apache.kafka.common.errors.OffsetOutOfRangeException;
2522
import org.apache.kafka.common.errors.RecordTooLargeException;
2623
import org.apache.kafka.common.protocol.Errors;
27-
import org.apache.kafka.common.record.CompressionType;
2824
import org.apache.kafka.common.record.MemoryRecords;
29-
import org.apache.kafka.common.record.MutableRecordBatch;
30-
import org.apache.kafka.common.record.Record;
3125
import org.apache.kafka.common.record.RecordBatch;
32-
import org.apache.kafka.common.record.RecordValidationStats;
3326
import org.apache.kafka.common.record.TimestampType;
3427
import org.apache.kafka.common.requests.ProduceResponse;
3528
import org.apache.kafka.common.utils.PrimitiveRef;
@@ -38,8 +31,6 @@
3831
import org.apache.kafka.server.common.RequestLocal;
3932
import org.apache.kafka.server.record.BrokerCompressionType;
4033
import org.apache.kafka.storage.internals.log.AppendOrigin;
41-
import org.apache.kafka.storage.internals.log.LeaderHwChange;
42-
import org.apache.kafka.storage.internals.log.LocalLog;
4334
import org.apache.kafka.storage.internals.log.LogAppendInfo;
4435
import org.apache.kafka.storage.internals.log.LogConfig;
4536
import org.apache.kafka.storage.internals.log.LogValidator;
@@ -48,17 +39,13 @@
4839
import org.slf4j.Logger;
4940
import org.slf4j.LoggerFactory;
5041

51-
import java.nio.ByteBuffer;
52-
import java.util.Collections;
5342
import java.util.Map;
54-
import java.util.Optional;
55-
import java.util.stream.Collectors;
56-
import java.util.stream.StreamSupport;
5743

5844
import static org.apache.kafka.storage.internals.log.UnifiedLog.UNKNOWN_OFFSET;
45+
import static org.apache.kafka.storage.internals.log.UnifiedLog.analyzeAndValidateRecords;
46+
import static org.apache.kafka.storage.internals.log.UnifiedLog.trimInvalidBytes;
5947

60-
// TODO: This method is being migrated to Java and this is a placeholder for when it becomes available
61-
// on UnifiedLog.java from apache/kafka#19030
48+
// Stub to keep code that may eventually be moved to upstream UnifiedLog
6249
class UnifiedLog {
6350
private static final Logger LOGGER = LoggerFactory.getLogger(UnifiedLog.class);
6451

@@ -67,169 +54,6 @@ class UnifiedLog {
6754
// Using 0 as for inkless the leader epoch is not used
6855
public static final int LEADER_EPOCH = LeaderAndIsr.INITIAL_LEADER_EPOCH;
6956

70-
/**
71-
* Validate the following:
72-
* <ol>
73-
* <li> each message matches its CRC
74-
* <li> each message size is valid (if ignoreRecordSize is false)
75-
* <li> that the sequence numbers of the incoming record batches are consistent with the existing state and with each other
76-
* <li> that the offsets are monotonically increasing (if requireOffsetsMonotonic is true)
77-
* </ol>
78-
* <p>
79-
* Also compute the following quantities:
80-
* <ol>
81-
* <li> First offset in the message set
82-
* <li> Last offset in the message set
83-
* <li> Number of messages
84-
* <li> Number of valid bytes
85-
* <li> Whether the offsets are monotonically increasing
86-
* <li> Whether any compression codec is used (if many are used, then the last one is given)
87-
* </ol>
88-
*/
89-
static LogAppendInfo analyzeAndValidateRecords(TopicPartition topicPartition,
90-
LogConfig config,
91-
MemoryRecords records,
92-
long logStartOffset,
93-
AppendOrigin origin,
94-
boolean ignoreRecordSize,
95-
boolean requireOffsetsMonotonic,
96-
int leaderEpoch,
97-
BrokerTopicStats brokerTopicStats) {
98-
int validBytesCount = 0;
99-
long firstOffset = LocalLog.UNKNOWN_OFFSET;
100-
long lastOffset = -1L;
101-
int lastLeaderEpoch = RecordBatch.NO_PARTITION_LEADER_EPOCH;
102-
CompressionType sourceCompression = CompressionType.NONE;
103-
boolean monotonic = true;
104-
long maxTimestamp = RecordBatch.NO_TIMESTAMP;
105-
boolean readFirstMessage = false;
106-
long lastOffsetOfFirstBatch = -1L;
107-
boolean skipRemainingBatches = false;
108-
109-
for (MutableRecordBatch batch : records.batches()) {
110-
// we only validate V2 and higher to avoid potential compatibility issues with older clients
111-
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && batch.baseOffset() != 0) {
112-
throw new InvalidRecordException("The baseOffset of the record batch in the append to " + topicPartition + " should " +
113-
"be 0, but it is " + batch.baseOffset());
114-
}
115-
116-
/* During replication of uncommitted data it is possible for the remote replica to send record batches after it lost
117-
* leadership. This can happen if sending FETCH responses is slow. There is a race between sending the FETCH
118-
* response and the replica truncating and appending to the log. The replicating replica resolves this issue by only
119-
* persisting up to the current leader epoch used in the fetch request. See KAFKA-18723 for more details.
120-
*/
121-
skipRemainingBatches = skipRemainingBatches || hasHigherPartitionLeaderEpoch(batch, origin, leaderEpoch);
122-
if (skipRemainingBatches) {
123-
LOGGER.info("Skipping batch {} from an origin of {} because its partition leader epoch {} is higher than the replica's current leader epoch {}",
124-
batch, origin, batch.partitionLeaderEpoch(), leaderEpoch);
125-
} else {
126-
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
127-
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
128-
// For magic version 2, we can get the first offset directly from the batch header.
129-
// When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
130-
// case, validation will be more lenient.
131-
// Also indicate whether we have the accurate first offset or not
132-
if (!readFirstMessage) {
133-
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) {
134-
firstOffset = batch.baseOffset();
135-
}
136-
lastOffsetOfFirstBatch = batch.lastOffset();
137-
readFirstMessage = true;
138-
}
139-
140-
// check that offsets are monotonically increasing
141-
if (lastOffset >= batch.lastOffset()) {
142-
monotonic = false;
143-
}
144-
145-
// update the last offset seen
146-
lastOffset = batch.lastOffset();
147-
lastLeaderEpoch = batch.partitionLeaderEpoch();
148-
149-
// Check if the message sizes are valid.
150-
int batchSize = batch.sizeInBytes();
151-
if (!ignoreRecordSize && batchSize > config.maxMessageSize()) {
152-
brokerTopicStats.topicStats(topicPartition.topic()).bytesRejectedRate().mark(records.sizeInBytes());
153-
brokerTopicStats.allTopicsStats().bytesRejectedRate().mark(records.sizeInBytes());
154-
throw new RecordTooLargeException("The record batch size in the append to " + topicPartition + " is " + batchSize + " bytes " +
155-
"which exceeds the maximum configured value of " + config.maxMessageSize() + ").");
156-
}
157-
158-
// check the validity of the message by checking CRC
159-
if (!batch.isValid()) {
160-
brokerTopicStats.allTopicsStats().invalidMessageCrcRecordsPerSec().mark();
161-
throw new CorruptRecordException("Record is corrupt (stored crc = " + batch.checksum() + ") in topic partition " + topicPartition + ".");
162-
}
163-
164-
if (batch.maxTimestamp() > maxTimestamp) {
165-
maxTimestamp = batch.maxTimestamp();
166-
}
167-
168-
validBytesCount += batchSize;
169-
170-
CompressionType batchCompression = CompressionType.forId(batch.compressionType().id);
171-
// sourceCompression is only used on the leader path, which only contains one batch if version is v2 or messages are compressed
172-
if (batchCompression != CompressionType.NONE) {
173-
sourceCompression = batchCompression;
174-
}
175-
}
176-
177-
if (requireOffsetsMonotonic && !monotonic) {
178-
throw new OffsetOutOfRangeException("Out of order offsets found in append to " + topicPartition + ": " +
179-
StreamSupport.stream(records.records().spliterator(), false)
180-
.map(Record::offset)
181-
.map(String::valueOf)
182-
.collect(Collectors.joining(",")));
183-
}
184-
}
185-
Optional<Integer> lastLeaderEpochOpt = (lastLeaderEpoch != RecordBatch.NO_PARTITION_LEADER_EPOCH)
186-
? Optional.of(lastLeaderEpoch)
187-
: Optional.empty();
188-
189-
return new LogAppendInfo(firstOffset, lastOffset, lastLeaderEpochOpt, maxTimestamp,
190-
RecordBatch.NO_TIMESTAMP, logStartOffset, RecordValidationStats.EMPTY, sourceCompression,
191-
validBytesCount, lastOffsetOfFirstBatch, Collections.emptyList(), LeaderHwChange.NONE);
192-
}
193-
194-
/**
195-
* Return true if the record batch has a higher leader epoch than the specified leader epoch
196-
*
197-
* @param batch the batch to validate
198-
* @param origin the reason for appending the record batch
199-
* @param leaderEpoch the epoch to compare
200-
* @return true if the append reason is replication and the batch's partition leader epoch is
201-
* greater than the specified leaderEpoch, otherwise false
202-
*/
203-
private static boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
204-
return origin == AppendOrigin.REPLICATION
205-
&& batch.partitionLeaderEpoch() != RecordBatch.NO_PARTITION_LEADER_EPOCH
206-
&& batch.partitionLeaderEpoch() > leaderEpoch;
207-
}
208-
209-
/**
210-
* Trim any invalid bytes from the end of this message set (if there are any)
211-
*
212-
* @param records The records to trim
213-
* @param info The general information of the message set
214-
* @return A trimmed message set. This may be the same as what was passed in, or it may not.
215-
*/
216-
static MemoryRecords trimInvalidBytes(TopicPartition topicPartition, MemoryRecords records, LogAppendInfo info) {
217-
int validBytes = info.validBytes();
218-
if (validBytes < 0) {
219-
throw new CorruptRecordException("Cannot append record batch with illegal length " + validBytes + " to " +
220-
"log for " + topicPartition + ". A possible cause is a corrupted produce request.");
221-
}
222-
if (validBytes == records.sizeInBytes()) {
223-
return records;
224-
} else {
225-
// trim invalid bytes
226-
ByteBuffer validByteBuffer = records.buffer().duplicate();
227-
validByteBuffer.limit(validBytes);
228-
return MemoryRecords.readableRecords(validByteBuffer);
229-
}
230-
}
231-
232-
23357
// Similar to UnifiedLog.append(...)
23458
static LogAppendInfo validateAndAppendBatch(
23559
final Time time,
@@ -252,6 +76,7 @@ static LogAppendInfo validateAndAppendBatch(
25276
false,
25377
true, // ensures that offsets across batches on the same partition grow monotonically
25478
LEADER_EPOCH,
79+
LOGGER,
25580
brokerTopicStats);
25681

25782
if (appendInfo.validBytes() <= 0) {

storage/src/main/java/org/apache/kafka/storage/internals/log/UnifiedLog.java

Lines changed: 22 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1076,14 +1076,14 @@ private LogAppendInfo append(MemoryRecords records,
10761076
// This will ensure that any log data can be recovered with the correct topic ID in the case of failure.
10771077
maybeFlushMetadataFile();
10781078

1079-
LogAppendInfo appendInfo = analyzeAndValidateRecords(records, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch);
1079+
LogAppendInfo appendInfo = analyzeAndValidateRecords(topicPartition(), config(), records, logStartOffset, origin, ignoreRecordSize, !validateAndAssignOffsets, leaderEpoch, logger, brokerTopicStats);
10801080

10811081
// return if we have no valid messages or if this is a duplicate of the last appended entry
10821082
if (appendInfo.validBytes() <= 0) {
10831083
return appendInfo;
10841084
} else {
10851085
// trim any invalid bytes or partial messages before appending it to the on-disk log
1086-
final MemoryRecords trimmedRecords = trimInvalidBytes(records, appendInfo);
1086+
final MemoryRecords trimmedRecords = trimInvalidBytes(topicPartition(), records, appendInfo);
10871087
// they are valid, insert them in the log
10881088
synchronized (lock) {
10891089
return maybeHandleIOException(
@@ -1418,11 +1418,16 @@ private boolean batchMissingRequiredVerification(MutableRecordBatch batch, Verif
14181418
* <li> Whether any compression codec is used (if many are used, then the last one is given)
14191419
* </ol>
14201420
*/
1421-
private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
1422-
AppendOrigin origin,
1423-
boolean ignoreRecordSize,
1424-
boolean requireOffsetsMonotonic,
1425-
int leaderEpoch) {
1421+
public static LogAppendInfo analyzeAndValidateRecords(TopicPartition topicPartition,
1422+
LogConfig config,
1423+
MemoryRecords records,
1424+
long logStartOffset,
1425+
AppendOrigin origin,
1426+
boolean ignoreRecordSize,
1427+
boolean requireOffsetsMonotonic,
1428+
int leaderEpoch,
1429+
Logger logger,
1430+
BrokerTopicStats brokerTopicStats) {
14261431
int validBytesCount = 0;
14271432
long firstOffset = UnifiedLog.UNKNOWN_OFFSET;
14281433
long lastOffset = -1L;
@@ -1441,7 +1446,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
14411446
}
14421447
// we only validate V2 and higher to avoid potential compatibility issues with older clients
14431448
if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && origin == AppendOrigin.CLIENT && batch.baseOffset() != 0) {
1444-
throw new InvalidRecordException("The baseOffset of the record batch in the append to " + topicPartition() + " should " +
1449+
throw new InvalidRecordException("The baseOffset of the record batch in the append to " + topicPartition + " should " +
14451450
"be 0, but it is " + batch.baseOffset());
14461451
}
14471452

@@ -1480,17 +1485,17 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
14801485

14811486
// Check if the message sizes are valid.
14821487
int batchSize = batch.sizeInBytes();
1483-
if (!ignoreRecordSize && batchSize > config().maxMessageSize()) {
1484-
brokerTopicStats.topicStats(topicPartition().topic()).bytesRejectedRate().mark(records.sizeInBytes());
1488+
if (!ignoreRecordSize && batchSize > config.maxMessageSize()) {
1489+
brokerTopicStats.topicStats(topicPartition.topic()).bytesRejectedRate().mark(records.sizeInBytes());
14851490
brokerTopicStats.allTopicsStats().bytesRejectedRate().mark(records.sizeInBytes());
1486-
throw new RecordTooLargeException("The record batch size in the append to " + topicPartition() + " is " + batchSize + " bytes " +
1487-
"which exceeds the maximum configured value of " + config().maxMessageSize() + ").");
1491+
throw new RecordTooLargeException("The record batch size in the append to " + topicPartition + " is " + batchSize + " bytes " +
1492+
"which exceeds the maximum configured value of " + config.maxMessageSize() + ").");
14881493
}
14891494

14901495
// check the validity of the message by checking CRC
14911496
if (!batch.isValid()) {
14921497
brokerTopicStats.allTopicsStats().invalidMessageCrcRecordsPerSec().mark();
1493-
throw new CorruptRecordException("Record is corrupt (stored crc = " + batch.checksum() + ") in topic partition " + topicPartition() + ".");
1498+
throw new CorruptRecordException("Record is corrupt (stored crc = " + batch.checksum() + ") in topic partition " + topicPartition + ".");
14941499
}
14951500

14961501
if (batch.maxTimestamp() > maxTimestamp) {
@@ -1507,7 +1512,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
15071512
}
15081513

15091514
if (requireOffsetsMonotonic && !monotonic) {
1510-
throw new OffsetsOutOfOrderException("Out of order offsets found in append to " + topicPartition() + ": " +
1515+
throw new OffsetsOutOfOrderException("Out of order offsets found in append to " + topicPartition + ": " +
15111516
StreamSupport.stream(records.records().spliterator(), false)
15121517
.map(Record::offset)
15131518
.map(String::valueOf)
@@ -1532,7 +1537,7 @@ private LogAppendInfo analyzeAndValidateRecords(MemoryRecords records,
15321537
* @return true if the append reason is replication and the batch's partition leader epoch is
15331538
* greater than the specified leaderEpoch, otherwise false
15341539
*/
1535-
private boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
1540+
private static boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin origin, int leaderEpoch) {
15361541
return origin == AppendOrigin.REPLICATION
15371542
&& batch.partitionLeaderEpoch() != RecordBatch.NO_PARTITION_LEADER_EPOCH
15381543
&& batch.partitionLeaderEpoch() > leaderEpoch;
@@ -1545,11 +1550,11 @@ private boolean hasHigherPartitionLeaderEpoch(RecordBatch batch, AppendOrigin or
15451550
* @param info The general information of the message set
15461551
* @return A trimmed message set. This may be the same as what was passed in, or it may not.
15471552
*/
1548-
private MemoryRecords trimInvalidBytes(MemoryRecords records, LogAppendInfo info) {
1553+
public static MemoryRecords trimInvalidBytes(TopicPartition topicPartition, MemoryRecords records, LogAppendInfo info) {
15491554
int validBytes = info.validBytes();
15501555
if (validBytes < 0) {
15511556
throw new CorruptRecordException("Cannot append record batch with illegal length " + validBytes + " to " +
1552-
"log for " + topicPartition() + ". A possible cause is a corrupted produce request.");
1557+
"log for " + topicPartition + ". A possible cause is a corrupted produce request.");
15531558
}
15541559
if (validBytes == records.sizeInBytes()) {
15551560
return records;

0 commit comments

Comments
 (0)