Skip to content

Commit 818888a

Browse files
committed
LogAppendResult depends on immutable arguments
1 parent 214c580 commit 818888a

File tree

5 files changed

+60
-24
lines changed

5 files changed

+60
-24
lines changed

core/src/main/scala/kafka/coordinator/group/CoordinatorPartitionWriter.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ class CoordinatorPartitionWriter(
163163
}
164164

165165
// Required offset.
166-
partitionResult.info.lastOffset + 1
166+
partitionResult.logAppendSummary.lastOffset + 1
167167
}
168168

169169
override def deleteRecords(tp: TopicPartition, deleteBeforeOffset: Long): CompletableFuture[Void] = {

core/src/main/scala/kafka/server/ReplicaManager.scala

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
5050
import org.apache.kafka.logger.StateChangeLogger
5151
import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5252
import org.apache.kafka.metadata.MetadataCache
53+
import org.apache.kafka.server.LogAppendResult.LogAppendSummary
5354
import org.apache.kafka.server.common.{DirectoryEventHandler, RequestLocal, StopPartition, TransactionVersion}
5455
import org.apache.kafka.server.log.remote.TopicPartitionLog
5556
import org.apache.kafka.server.config.ReplicationConfigs
@@ -687,7 +688,7 @@ class ReplicaManager(val config: KafkaConfig,
687688
val produceStatus = buildProducePartitionStatus(localProduceResults)
688689

689690
recordValidationStatsCallback(localProduceResults.map { case (k, v) =>
690-
k -> v.info.recordValidationStats
691+
k -> v.logAppendSummary().recordValidationStats()
691692
})
692693

693694
maybeAddDelayedProduce(
@@ -770,7 +771,7 @@ class ReplicaManager(val config: KafkaConfig,
770771
case _ => None
771772
}
772773
new TopicIdPartition(topicIds.getOrElse(topicPartition.topic(), Uuid.ZERO_UUID), topicPartition) -> new LogAppendResult(
773-
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
774+
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
774775
Optional.of(customException.getOrElse(error.exception)),
775776
customException.isDefined
776777
)
@@ -860,13 +861,13 @@ class ReplicaManager(val config: KafkaConfig,
860861
): Map[TopicIdPartition, ProducePartitionStatus] = {
861862
results.map { case (topicIdPartition, result) =>
862863
topicIdPartition -> ProducePartitionStatus(
863-
result.info.lastOffset + 1, // required offset
864+
result.logAppendSummary.lastOffset + 1, // required offset
864865
new PartitionResponse(
865866
result.error,
866-
result.info.firstOffset,
867-
result.info.logAppendTime,
868-
result.info.logStartOffset,
869-
result.info.recordErrors,
867+
result.logAppendSummary.firstOffset,
868+
result.logAppendSummary.logAppendTime,
869+
result.logAppendSummary.logStartOffset,
870+
result.logAppendSummary.recordErrors,
870871
result.errorMessage
871872
)
872873
)
@@ -880,7 +881,7 @@ class ReplicaManager(val config: KafkaConfig,
880881
actionQueue.add {
881882
() => appendResults.foreach { case (topicIdPartition, result) =>
882883
val requestKey = new TopicPartitionOperationKey(topicIdPartition.topicPartition)
883-
result.info.leaderHwChange match {
884+
result.logAppendSummary.leaderHwChange match {
884885
case LeaderHwChange.INCREASED =>
885886
// some delayed operations may be unblocked after HW changed
886887
delayedProducePurgatory.checkAndComplete(requestKey)
@@ -1403,7 +1404,7 @@ class ReplicaManager(val config: KafkaConfig,
14031404
// reject appending to internal topics if it is not allowed
14041405
if (Topic.isInternal(topicIdPartition.topic) && !internalTopicsAllowed) {
14051406
(topicIdPartition, new LogAppendResult(
1406-
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
1407+
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
14071408
Optional.of(new InvalidTopicException(s"Cannot append to internal topic ${topicIdPartition.topic}")),
14081409
false))
14091410
} else {
@@ -1423,7 +1424,7 @@ class ReplicaManager(val config: KafkaConfig,
14231424
trace(s"${records.sizeInBytes} written to log $topicIdPartition beginning at offset " +
14241425
s"${info.firstOffset} and ending at offset ${info.lastOffset}")
14251426

1426-
(topicIdPartition, new LogAppendResult(info, Optional.empty(), false))
1427+
(topicIdPartition, new LogAppendResult(LogAppendSummary.fromAppendInfo(info), Optional.empty(), false))
14271428

14281429
} catch {
14291430
// NOTE: Failed produce requests metric is not incremented for known exceptions
@@ -1435,15 +1436,18 @@ class ReplicaManager(val config: KafkaConfig,
14351436
_: CorruptRecordException |
14361437
_: KafkaStorageException |
14371438
_: UnknownTopicIdException) =>
1438-
(topicIdPartition, new LogAppendResult(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, Optional.of(e), false))
1439+
(topicIdPartition, new LogAppendResult(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
1440+
Optional.of(e), false))
14391441
case rve: RecordValidationException =>
14401442
val logStartOffset = processFailedRecord(topicIdPartition, rve.invalidException)
14411443
val recordErrors = rve.recordErrors
1442-
(topicIdPartition, new LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors),
1444+
(topicIdPartition, new LogAppendResult(
1445+
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(logStartOffset, recordErrors)),
14431446
Optional.of(rve.invalidException), true))
14441447
case t: Throwable =>
14451448
val logStartOffset = processFailedRecord(topicIdPartition, t)
1446-
(topicIdPartition, new LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset),
1449+
(topicIdPartition, new LogAppendResult(
1450+
LogAppendSummary.fromAppendInfo(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset)),
14471451
Optional.of(t), false))
14481452
}
14491453
}

core/src/test/scala/unit/kafka/coordinator/group/CoordinatorPartitionWriterTest.scala

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import org.apache.kafka.common.protocol.{ApiKeys, Errors}
2525
import org.apache.kafka.common.record.internal.{CompressionType, ControlRecordType, EndTransactionMarker, MemoryRecords, RecordBatch, SimpleRecord}
2626
import org.apache.kafka.coordinator.common.runtime.PartitionWriter
2727
import org.apache.kafka.server.LogAppendResult
28+
import org.apache.kafka.server.LogAppendResult.LogAppendSummary
2829
import org.apache.kafka.server.common.TransactionVersion
2930
import org.apache.kafka.storage.internals.log.{AppendOrigin, LogAppendInfo, LogConfig, RecordValidationStats, VerificationGuard}
3031
import org.apache.kafka.test.TestUtils.assertFutureThrows
@@ -109,7 +110,7 @@ class CoordinatorPartitionWriterTest {
109110
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
110111
ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
111112
)).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
112-
new LogAppendInfo(
113+
LogAppendSummary.fromAppendInfo(new LogAppendInfo(
113114
5L,
114115
10L,
115116
Optional.empty,
@@ -120,7 +121,7 @@ class CoordinatorPartitionWriterTest {
120121
CompressionType.NONE,
121122
100,
122123
10L
123-
),
124+
)),
124125
Optional.empty(),
125126
false
126127
)))
@@ -171,7 +172,7 @@ class CoordinatorPartitionWriterTest {
171172
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
172173
ArgumentMatchers.eq(TransactionVersion.TV_2.featureLevel())
173174
)).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
174-
new LogAppendInfo(
175+
LogAppendSummary.fromAppendInfo(new LogAppendInfo(
175176
5L,
176177
10L,
177178
Optional.empty,
@@ -182,7 +183,7 @@ class CoordinatorPartitionWriterTest {
182183
CompressionType.NONE,
183184
100,
184185
10L
185-
),
186+
)),
186187
Optional.empty(),
187188
false
188189
)))
@@ -331,7 +332,7 @@ class CoordinatorPartitionWriterTest {
331332
ArgumentMatchers.eq(Map(tp -> VerificationGuard.SENTINEL)),
332333
ArgumentMatchers.eq(TransactionVersion.TV_UNKNOWN)
333334
)).thenReturn(Map(new TopicIdPartition(topicId, tp) -> new LogAppendResult(
334-
LogAppendInfo.UNKNOWN_LOG_APPEND_INFO,
335+
LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO),
335336
Optional.of(Errors.NOT_LEADER_OR_FOLLOWER.exception),
336337
false
337338
)))

core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
5959
import org.apache.kafka.metadata.{LeaderRecoveryState, MetadataCache, PartitionRegistration}
6060
import org.apache.kafka.metadata.properties.{MetaProperties, MetaPropertiesEnsemble, MetaPropertiesVersion, PropertiesUtils}
6161
import org.apache.kafka.raft.KRaftConfigs
62+
import org.apache.kafka.server.LogAppendResult.LogAppendSummary
6263
import org.apache.kafka.server.common.{DirectoryEventHandler, KRaftVersion, MetadataVersion, OffsetAndEpoch, RequestLocal, StopPartition, TransactionVersion}
6364
import org.apache.kafka.server.config.{ReplicationConfigs, ServerLogConfigs}
6465
import org.apache.kafka.server.log.remote.TopicPartitionLog
@@ -6031,13 +6032,13 @@ class ReplicaManagerTest {
60316032

60326033
val fooResult = result(foo)
60336034
assertEquals(Errors.NONE, fooResult.error)
6034-
assertEquals(0, fooResult.info.logStartOffset)
6035-
assertEquals(0, fooResult.info.firstOffset)
6036-
assertEquals(0, fooResult.info.lastOffset)
6035+
assertEquals(0, fooResult.logAppendSummary.logStartOffset)
6036+
assertEquals(0, fooResult.logAppendSummary.firstOffset)
6037+
assertEquals(0, fooResult.logAppendSummary.lastOffset)
60376038

60386039
val barResult = result(bar)
60396040
assertEquals(Errors.UNKNOWN_TOPIC_OR_PARTITION, barResult.error)
6040-
assertEquals(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO, barResult.info)
6041+
assertEquals(LogAppendSummary.fromAppendInfo(LogAppendInfo.UNKNOWN_LOG_APPEND_INFO), barResult.logAppendSummary)
60416042
} finally {
60426043
replicaManager.shutdown(checkpointHW = false)
60436044
}

server/src/main/java/org/apache/kafka/server/LogAppendResult.java

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,19 +18,49 @@
1818
package org.apache.kafka.server;
1919

2020
import org.apache.kafka.common.protocol.Errors;
21+
import org.apache.kafka.common.requests.ProduceResponse;
22+
import org.apache.kafka.storage.internals.log.LeaderHwChange;
2123
import org.apache.kafka.storage.internals.log.LogAppendInfo;
24+
import org.apache.kafka.storage.internals.log.RecordValidationStats;
2225

26+
import java.util.List;
2327
import java.util.Optional;
2428

2529
/**
2630
* Result metadata of a log append operation on the log
2731
*/
2832
public record LogAppendResult(
29-
LogAppendInfo info,
33+
LogAppendSummary logAppendSummary,
3034
Optional<Throwable> exception,
3135
boolean hasCustomErrorMessage
3236
) {
3337

38+
public record LogAppendSummary(
39+
long firstOffset,
40+
long lastOffset,
41+
long logAppendTime,
42+
long logStartOffset,
43+
RecordValidationStats recordValidationStats,
44+
List<ProduceResponse.RecordError> recordErrors,
45+
LeaderHwChange leaderHwChange
46+
) {
47+
public LogAppendSummary {
48+
recordErrors = List.copyOf(recordErrors);
49+
}
50+
51+
public static LogAppendSummary fromAppendInfo(LogAppendInfo info) {
52+
return new LogAppendSummary(
53+
info.firstOffset(),
54+
info.lastOffset(),
55+
info.logAppendTime(),
56+
info.logStartOffset(),
57+
info.recordValidationStats(),
58+
info.recordErrors(),
59+
info.leaderHwChange()
60+
);
61+
}
62+
}
63+
3464
public Errors error() {
3565
return exception
3666
.map(Errors::forException)

0 commit comments

Comments
 (0)