Skip to content

Commit e0c3c62

Browse files
committed
fix(txmgr): ErrOffsetsLoadInProgress is retriable
Also update the errors.go message to match Errors.COORDINATOR_LOAD_IN_PROGRESS from Kafka Signed-off-by: Dominic Evans <[email protected]>
1 parent 2e077cf commit e0c3c62

4 files changed

+182
-96
lines changed

errors.go

+93-93
Original file line numberDiff line numberDiff line change
@@ -173,98 +173,98 @@ type KError int16
173173

174174
// Numeric error codes returned by the Kafka server.
175175
const (
176-
ErrNoError KError = 0
177-
ErrUnknown KError = -1
178-
ErrOffsetOutOfRange KError = 1
179-
ErrInvalidMessage KError = 2
180-
ErrUnknownTopicOrPartition KError = 3
181-
ErrInvalidMessageSize KError = 4
182-
ErrLeaderNotAvailable KError = 5
183-
ErrNotLeaderForPartition KError = 6
184-
ErrRequestTimedOut KError = 7
185-
ErrBrokerNotAvailable KError = 8
186-
ErrReplicaNotAvailable KError = 9
187-
ErrMessageSizeTooLarge KError = 10
188-
ErrStaleControllerEpochCode KError = 11
189-
ErrOffsetMetadataTooLarge KError = 12
190-
ErrNetworkException KError = 13
191-
ErrOffsetsLoadInProgress KError = 14
192-
ErrConsumerCoordinatorNotAvailable KError = 15
193-
ErrNotCoordinatorForConsumer KError = 16
194-
ErrInvalidTopic KError = 17
195-
ErrMessageSetSizeTooLarge KError = 18
196-
ErrNotEnoughReplicas KError = 19
197-
ErrNotEnoughReplicasAfterAppend KError = 20
198-
ErrInvalidRequiredAcks KError = 21
199-
ErrIllegalGeneration KError = 22
200-
ErrInconsistentGroupProtocol KError = 23
201-
ErrInvalidGroupId KError = 24
202-
ErrUnknownMemberId KError = 25
203-
ErrInvalidSessionTimeout KError = 26
204-
ErrRebalanceInProgress KError = 27
205-
ErrInvalidCommitOffsetSize KError = 28
206-
ErrTopicAuthorizationFailed KError = 29
207-
ErrGroupAuthorizationFailed KError = 30
208-
ErrClusterAuthorizationFailed KError = 31
209-
ErrInvalidTimestamp KError = 32
210-
ErrUnsupportedSASLMechanism KError = 33
211-
ErrIllegalSASLState KError = 34
212-
ErrUnsupportedVersion KError = 35
213-
ErrTopicAlreadyExists KError = 36
214-
ErrInvalidPartitions KError = 37
215-
ErrInvalidReplicationFactor KError = 38
216-
ErrInvalidReplicaAssignment KError = 39
217-
ErrInvalidConfig KError = 40
218-
ErrNotController KError = 41
219-
ErrInvalidRequest KError = 42
220-
ErrUnsupportedForMessageFormat KError = 43
221-
ErrPolicyViolation KError = 44
222-
ErrOutOfOrderSequenceNumber KError = 45
223-
ErrDuplicateSequenceNumber KError = 46
224-
ErrInvalidProducerEpoch KError = 47
225-
ErrInvalidTxnState KError = 48
226-
ErrInvalidProducerIDMapping KError = 49
227-
ErrInvalidTransactionTimeout KError = 50
228-
ErrConcurrentTransactions KError = 51
229-
ErrTransactionCoordinatorFenced KError = 52
230-
ErrTransactionalIDAuthorizationFailed KError = 53
231-
ErrSecurityDisabled KError = 54
232-
ErrOperationNotAttempted KError = 55
233-
ErrKafkaStorageError KError = 56
234-
ErrLogDirNotFound KError = 57
235-
ErrSASLAuthenticationFailed KError = 58
236-
ErrUnknownProducerID KError = 59
237-
ErrReassignmentInProgress KError = 60
238-
ErrDelegationTokenAuthDisabled KError = 61
239-
ErrDelegationTokenNotFound KError = 62
240-
ErrDelegationTokenOwnerMismatch KError = 63
241-
ErrDelegationTokenRequestNotAllowed KError = 64
242-
ErrDelegationTokenAuthorizationFailed KError = 65
243-
ErrDelegationTokenExpired KError = 66
244-
ErrInvalidPrincipalType KError = 67
245-
ErrNonEmptyGroup KError = 68
246-
ErrGroupIDNotFound KError = 69
247-
ErrFetchSessionIDNotFound KError = 70
248-
ErrInvalidFetchSessionEpoch KError = 71
249-
ErrListenerNotFound KError = 72
250-
ErrTopicDeletionDisabled KError = 73
251-
ErrFencedLeaderEpoch KError = 74
252-
ErrUnknownLeaderEpoch KError = 75
253-
ErrUnsupportedCompressionType KError = 76
254-
ErrStaleBrokerEpoch KError = 77
255-
ErrOffsetNotAvailable KError = 78
256-
ErrMemberIdRequired KError = 79
257-
ErrPreferredLeaderNotAvailable KError = 80
258-
ErrGroupMaxSizeReached KError = 81
259-
ErrFencedInstancedId KError = 82
260-
ErrEligibleLeadersNotAvailable KError = 83
261-
ErrElectionNotNeeded KError = 84
262-
ErrNoReassignmentInProgress KError = 85
263-
ErrGroupSubscribedToTopic KError = 86
264-
ErrInvalidRecord KError = 87
265-
ErrUnstableOffsetCommit KError = 88
266-
ErrThrottlingQuotaExceeded KError = 89
267-
ErrProducerFenced KError = 90
176+
ErrUnknown KError = -1 // Errors.UNKNOWN_SERVER_ERROR
177+
ErrNoError KError = 0 // Errors.NONE
178+
ErrOffsetOutOfRange KError = 1 // Errors.OFFSET_OUT_OF_RANGE
179+
ErrInvalidMessage KError = 2 // Errors.CORRUPT_MESSAGE
180+
ErrUnknownTopicOrPartition KError = 3 // Errors.UNKNOWN_TOPIC_OR_PARTITION
181+
ErrInvalidMessageSize KError = 4 // Errors.INVALID_FETCH_SIZE
182+
ErrLeaderNotAvailable KError = 5 // Errors.LEADER_NOT_AVAILABLE
183+
ErrNotLeaderForPartition KError = 6 // Errors.NOT_LEADER_OR_FOLLOWER
184+
ErrRequestTimedOut KError = 7 // Errors.REQUEST_TIMED_OUT
185+
ErrBrokerNotAvailable KError = 8 // Errors.BROKER_NOT_AVAILABLE
186+
ErrReplicaNotAvailable KError = 9 // Errors.REPLICA_NOT_AVAILABLE
187+
ErrMessageSizeTooLarge KError = 10 // Errors.MESSAGE_TOO_LARGE
188+
ErrStaleControllerEpochCode KError = 11 // Errors.STALE_CONTROLLER_EPOCH
189+
ErrOffsetMetadataTooLarge KError = 12 // Errors.OFFSET_METADATA_TOO_LARGE
190+
ErrNetworkException KError = 13 // Errors.NETWORK_EXCEPTION
191+
ErrOffsetsLoadInProgress KError = 14 // Errors.COORDINATOR_LOAD_IN_PROGRESS
192+
ErrConsumerCoordinatorNotAvailable KError = 15 // Errors.COORDINATOR_NOT_AVAILABLE
193+
ErrNotCoordinatorForConsumer KError = 16 // Errors.NOT_COORDINATOR
194+
ErrInvalidTopic KError = 17 // Errors.INVALID_TOPIC_EXCEPTION
195+
ErrMessageSetSizeTooLarge KError = 18 // Errors.RECORD_LIST_TOO_LARGE
196+
ErrNotEnoughReplicas KError = 19 // Errors.NOT_ENOUGH_REPLICAS
197+
ErrNotEnoughReplicasAfterAppend KError = 20 // Errors.NOT_ENOUGH_REPLICAS_AFTER_APPEND
198+
ErrInvalidRequiredAcks KError = 21 // Errors.INVALID_REQUIRED_ACKS
199+
ErrIllegalGeneration KError = 22 // Errors.ILLEGAL_GENERATION
200+
ErrInconsistentGroupProtocol KError = 23 // Errors.INCONSISTENT_GROUP_PROTOCOL
201+
ErrInvalidGroupId KError = 24 // Errors.INVALID_GROUP_ID
202+
ErrUnknownMemberId KError = 25 // Errors.UNKNOWN_MEMBER_ID
203+
ErrInvalidSessionTimeout KError = 26 // Errors.INVALID_SESSION_TIMEOUT
204+
ErrRebalanceInProgress KError = 27 // Errors.REBALANCE_IN_PROGRESS
205+
ErrInvalidCommitOffsetSize KError = 28 // Errors.INVALID_COMMIT_OFFSET_SIZE
206+
ErrTopicAuthorizationFailed KError = 29 // Errors.TOPIC_AUTHORIZATION_FAILED
207+
ErrGroupAuthorizationFailed KError = 30 // Errors.GROUP_AUTHORIZATION_FAILED
208+
ErrClusterAuthorizationFailed KError = 31 // Errors.CLUSTER_AUTHORIZATION_FAILED
209+
ErrInvalidTimestamp KError = 32 // Errors.INVALID_TIMESTAMP
210+
ErrUnsupportedSASLMechanism KError = 33 // Errors.UNSUPPORTED_SASL_MECHANISM
211+
ErrIllegalSASLState KError = 34 // Errors.ILLEGAL_SASL_STATE
212+
ErrUnsupportedVersion KError = 35 // Errors.UNSUPPORTED_VERSION
213+
ErrTopicAlreadyExists KError = 36 // Errors.TOPIC_ALREADY_EXISTS
214+
ErrInvalidPartitions KError = 37 // Errors.INVALID_PARTITIONS
215+
ErrInvalidReplicationFactor KError = 38 // Errors.INVALID_REPLICATION_FACTOR
216+
ErrInvalidReplicaAssignment KError = 39 // Errors.INVALID_REPLICA_ASSIGNMENT
217+
ErrInvalidConfig KError = 40 // Errors.INVALID_CONFIG
218+
ErrNotController KError = 41 // Errors.NOT_CONTROLLER
219+
ErrInvalidRequest KError = 42 // Errors.INVALID_REQUEST
220+
ErrUnsupportedForMessageFormat KError = 43 // Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT
221+
ErrPolicyViolation KError = 44 // Errors.POLICY_VIOLATION
222+
ErrOutOfOrderSequenceNumber KError = 45 // Errors.OUT_OF_ORDER_SEQUENCE_NUMBER
223+
ErrDuplicateSequenceNumber KError = 46 // Errors.DUPLICATE_SEQUENCE_NUMBER
224+
ErrInvalidProducerEpoch KError = 47 // Errors.INVALID_PRODUCER_EPOCH
225+
ErrInvalidTxnState KError = 48 // Errors.INVALID_TXN_STATE
226+
ErrInvalidProducerIDMapping KError = 49 // Errors.INVALID_PRODUCER_ID_MAPPING
227+
ErrInvalidTransactionTimeout KError = 50 // Errors.INVALID_TRANSACTION_TIMEOUT
228+
ErrConcurrentTransactions KError = 51 // Errors.CONCURRENT_TRANSACTIONS
229+
ErrTransactionCoordinatorFenced KError = 52 // Errors.TRANSACTION_COORDINATOR_FENCED
230+
ErrTransactionalIDAuthorizationFailed KError = 53 // Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
231+
ErrSecurityDisabled KError = 54 // Errors.SECURITY_DISABLED
232+
ErrOperationNotAttempted KError = 55 // Errors.OPERATION_NOT_ATTEMPTED
233+
ErrKafkaStorageError KError = 56 // Errors.KAFKA_STORAGE_ERROR
234+
ErrLogDirNotFound KError = 57 // Errors.LOG_DIR_NOT_FOUND
235+
ErrSASLAuthenticationFailed KError = 58 // Errors.SASL_AUTHENTICATION_FAILED
236+
ErrUnknownProducerID KError = 59 // Errors.UNKNOWN_PRODUCER_ID
237+
ErrReassignmentInProgress KError = 60 // Errors.REASSIGNMENT_IN_PROGRESS
238+
ErrDelegationTokenAuthDisabled KError = 61 // Errors.DELEGATION_TOKEN_AUTH_DISABLED
239+
ErrDelegationTokenNotFound KError = 62 // Errors.DELEGATION_TOKEN_NOT_FOUND
240+
ErrDelegationTokenOwnerMismatch KError = 63 // Errors.DELEGATION_TOKEN_OWNER_MISMATCH
241+
ErrDelegationTokenRequestNotAllowed KError = 64 // Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED
242+
ErrDelegationTokenAuthorizationFailed KError = 65 // Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED
243+
ErrDelegationTokenExpired KError = 66 // Errors.DELEGATION_TOKEN_EXPIRED
244+
ErrInvalidPrincipalType KError = 67 // Errors.INVALID_PRINCIPAL_TYPE
245+
ErrNonEmptyGroup KError = 68 // Errors.NON_EMPTY_GROUP
246+
ErrGroupIDNotFound KError = 69 // Errors.GROUP_ID_NOT_FOUND
247+
ErrFetchSessionIDNotFound KError = 70 // Errors.FETCH_SESSION_ID_NOT_FOUND
248+
ErrInvalidFetchSessionEpoch KError = 71 // Errors.INVALID_FETCH_SESSION_EPOCH
249+
ErrListenerNotFound KError = 72 // Errors.LISTENER_NOT_FOUND
250+
ErrTopicDeletionDisabled KError = 73 // Errors.TOPIC_DELETION_DISABLED
251+
ErrFencedLeaderEpoch KError = 74 // Errors.FENCED_LEADER_EPOCH
252+
ErrUnknownLeaderEpoch KError = 75 // Errors.UNKNOWN_LEADER_EPOCH
253+
ErrUnsupportedCompressionType KError = 76 // Errors.UNSUPPORTED_COMPRESSION_TYPE
254+
ErrStaleBrokerEpoch KError = 77 // Errors.STALE_BROKER_EPOCH
255+
ErrOffsetNotAvailable KError = 78 // Errors.OFFSET_NOT_AVAILABLE
256+
ErrMemberIdRequired KError = 79 // Errors.MEMBER_ID_REQUIRED
257+
ErrPreferredLeaderNotAvailable KError = 80 // Errors.PREFERRED_LEADER_NOT_AVAILABLE
258+
ErrGroupMaxSizeReached KError = 81 // Errors.GROUP_MAX_SIZE_REACHED
259+
ErrFencedInstancedId KError = 82 // Errors.FENCED_INSTANCE_ID
260+
ErrEligibleLeadersNotAvailable KError = 83 // Errors.ELIGIBLE_LEADERS_NOT_AVAILABLE
261+
ErrElectionNotNeeded KError = 84 // Errors.ELECTION_NOT_NEEDED
262+
ErrNoReassignmentInProgress KError = 85 // Errors.NO_REASSIGNMENT_IN_PROGRESS
263+
ErrGroupSubscribedToTopic KError = 86 // Errors.GROUP_SUBSCRIBED_TO_TOPIC
264+
ErrInvalidRecord KError = 87 // Errors.INVALID_RECORD
265+
ErrUnstableOffsetCommit KError = 88 // Errors.UNSTABLE_OFFSET_COMMIT
266+
ErrThrottlingQuotaExceeded KError = 89 // Errors.THROTTLING_QUOTA_EXCEEDED
267+
ErrProducerFenced KError = 90 // Errors.PRODUCER_FENCED
268268
)
269269

270270
func (err KError) Error() string {
@@ -302,7 +302,7 @@ func (err KError) Error() string {
302302
case ErrNetworkException:
303303
return "kafka server: The server disconnected before a response was received"
304304
case ErrOffsetsLoadInProgress:
305-
return "kafka server: The broker is still loading offsets after a leader change for that offset's topic partition"
305+
return "kafka server: The coordinator is still loading offsets and cannot currently process requests"
306306
case ErrConsumerCoordinatorNotAvailable:
307307
return "kafka server: Offset's topic has not yet been created"
308308
case ErrNotCoordinatorForConsumer:

mockresponses.go

+40
Original file line numberDiff line numberDiff line change
@@ -1467,3 +1467,43 @@ func (m *MockApiVersionsResponse) For(reqBody versionedDecoder) encoderWithHeade
14671467
}
14681468
return res
14691469
}
1470+
1471+
// MockInitProducerIDResponse is an `InitPorducerIDResponse` builder.
1472+
type MockInitProducerIDResponse struct {
1473+
producerID int64
1474+
producerEpoch int16
1475+
err KError
1476+
t TestReporter
1477+
}
1478+
1479+
func NewMockInitProducerIDResponse(t TestReporter) *MockInitProducerIDResponse {
1480+
return &MockInitProducerIDResponse{
1481+
t: t,
1482+
}
1483+
}
1484+
1485+
func (m *MockInitProducerIDResponse) SetProducerID(id int) *MockInitProducerIDResponse {
1486+
m.producerID = int64(id)
1487+
return m
1488+
}
1489+
1490+
func (m *MockInitProducerIDResponse) SetProducerEpoch(epoch int) *MockInitProducerIDResponse {
1491+
m.producerEpoch = int16(epoch)
1492+
return m
1493+
}
1494+
1495+
func (m *MockInitProducerIDResponse) SetError(err KError) *MockInitProducerIDResponse {
1496+
m.err = err
1497+
return m
1498+
}
1499+
1500+
func (m *MockInitProducerIDResponse) For(reqBody versionedDecoder) encoderWithHeader {
1501+
req := reqBody.(*InitProducerIDRequest)
1502+
res := &InitProducerIDResponse{
1503+
Version: req.Version,
1504+
Err: m.err,
1505+
ProducerID: m.producerID,
1506+
ProducerEpoch: m.producerEpoch,
1507+
}
1508+
return res
1509+
}

transaction_manager.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -569,9 +569,8 @@ func (t *transactionManager) initProducerId() (int64, int16, error) {
569569
return response.ProducerID, response.ProducerEpoch, false, nil
570570
}
571571
switch response.Err {
572-
case ErrConsumerCoordinatorNotAvailable:
573-
fallthrough
574-
case ErrNotCoordinatorForConsumer:
572+
// Retriable errors
573+
case ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer, ErrOffsetsLoadInProgress:
575574
if t.isTransactional() {
576575
_ = coordinator.Close()
577576
_ = t.client.RefreshTransactionCoordinator(t.transactionalID)

transaction_manager_test.go

+47
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,53 @@ func TestTxnmgrInitProducerIdTxn(t *testing.T) {
125125
require.Equal(t, ProducerTxnFlagReady, txmng.status)
126126
}
127127

128+
// TestTxnmgrInitProducerIdTxnCoordinatorLoading ensure we retry initProducerId when either FindCoordinator or InitProducerID returns ErrOffsetsLoadInProgress
129+
func TestTxnmgrInitProducerIdTxnCoordinatorLoading(t *testing.T) {
130+
config := NewTestConfig()
131+
config.Producer.Idempotent = true
132+
config.Producer.Transaction.ID = "txid-group"
133+
config.Version = V0_11_0_0
134+
config.Producer.RequiredAcks = WaitForAll
135+
config.Net.MaxOpenRequests = 1
136+
137+
broker := NewMockBroker(t, 1)
138+
defer broker.Close()
139+
140+
broker.SetHandlerByMap(map[string]MockResponse{
141+
"MetadataRequest": NewMockMetadataResponse(t).
142+
SetController(broker.BrokerID()).
143+
SetBroker(broker.Addr(), broker.BrokerID()),
144+
"FindCoordinatorRequest": NewMockSequence(
145+
NewMockFindCoordinatorResponse(t).
146+
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
147+
NewMockFindCoordinatorResponse(t).
148+
SetError(CoordinatorTransaction, "txid-group", ErrOffsetsLoadInProgress),
149+
NewMockFindCoordinatorResponse(t).
150+
SetCoordinator(CoordinatorTransaction, "txid-group", broker),
151+
),
152+
"InitProducerIDRequest": NewMockSequence(
153+
NewMockInitProducerIDResponse(t).
154+
SetError(ErrOffsetsLoadInProgress),
155+
NewMockInitProducerIDResponse(t).
156+
SetError(ErrOffsetsLoadInProgress),
157+
NewMockInitProducerIDResponse(t).
158+
SetProducerID(1).
159+
SetProducerEpoch(0),
160+
),
161+
})
162+
163+
client, err := NewClient([]string{broker.Addr()}, config)
164+
require.NoError(t, err)
165+
defer client.Close()
166+
167+
txmng, err := newTransactionManager(config, client)
168+
require.NoError(t, err)
169+
170+
require.Equal(t, int64(1), txmng.producerID)
171+
require.Equal(t, int16(0), txmng.producerEpoch)
172+
require.Equal(t, ProducerTxnFlagReady, txmng.status)
173+
}
174+
128175
func TestMaybeAddPartitionToCurrentTxn(t *testing.T) {
129176
type testCase struct {
130177
initialFlags ProducerTxnStatusFlag

0 commit comments

Comments
 (0)