@@ -2,8 +2,8 @@ package e2e
2
2
3
3
import (
4
4
"context"
5
+ "errors"
5
6
"fmt"
6
- "github.com/pkg/errors"
7
7
"math"
8
8
"time"
9
9
@@ -14,12 +14,14 @@ import (
14
14
15
15
// Check our end-to-end test topic and adapt accordingly if something does not match our expectations.
16
16
// - does it exist?
17
+ //
17
18
// - is it configured correctly?
18
- // - does it have enough partitions?
19
- // - is the replicationFactor correct?
19
+ // - does it have enough partitions?
20
+ // - is the replicationFactor correct?
21
+ //
20
22
// - are assignments good?
21
- // - is each broker leading at least one partition?
22
- // - are replicas distributed correctly?
23
+ // - is each broker leading at least one partition?
24
+ // - are replicas distributed correctly?
23
25
func (s * Service ) validateManagementTopic (ctx context.Context ) error {
24
26
s .logger .Debug ("validating end-to-end topic..." )
25
27
@@ -30,10 +32,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
30
32
31
33
typedErr := kerr .TypedErrorForCode (meta .Topics [0 ].ErrorCode )
32
34
topicExists := false
33
- switch typedErr {
34
- case nil :
35
+ switch {
36
+ case typedErr == nil :
35
37
topicExists = true
36
- case kerr .UnknownTopicOrPartition :
38
+ case errors . Is ( typedErr , kerr .UnknownTopicOrPartition ) :
37
39
// UnknownTopicOrPartition (Error code 3) means that the topic does not exist.
38
40
// When the topic doesn't exist, continue to create it further down in the code.
39
41
topicExists = false
@@ -72,8 +74,10 @@ func (s *Service) validateManagementTopic(ctx context.Context) error {
72
74
return s .updatePartitionCount (ctx )
73
75
}
74
76
75
- // The partition count must be updated after topic validation because the validation process may lead to the
76
- // creation of new partitions. This can occur when new brokers are added to the cluster.
77
+ // updatePartitionCount retrieves metadata to inform kminion about the updated
78
+ // partition count of its e2e topic. It must be updated after topic validation
79
+ // because the validation process may lead to the creation of new partitions.
80
+ // This can occur when new brokers are added to the cluster.
77
81
func (s * Service ) updatePartitionCount (ctx context.Context ) error {
78
82
retryTicker := time .NewTicker (1 * time .Second )
79
83
defer retryTicker .Stop ()
@@ -98,9 +102,9 @@ func (s *Service) updatePartitionCount(ctx context.Context) error {
98
102
return fmt .Errorf ("unexpected error while updating partition count: %w" , typedErr )
99
103
}
100
104
s .logger .Warn ("updatePartitionCount: received UNKNOWN_TOPIC_OR_PARTITION error, possibly due to timing issue. Retrying..." )
101
- // The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created
102
- // in the validateManagementTopic function. It appears to be a timing issue where the topic metadata
103
- // is not immediately available after creation. In practice, waiting for a short period and then retrying
105
+ // The UNKNOWN_TOPIC_OR_PARTITION error occurs occasionally even though the topic is created
106
+ // in the validateManagementTopic function. It appears to be a timing issue where the topic metadata
107
+ // is not immediately available after creation. In practice, waiting for a short period and then retrying
104
108
// the operation resolves the issue.
105
109
}
106
110
}
0 commit comments