@@ -71,29 +71,36 @@ public void initialize(PulsarClient client) {
71
71
this .executor = EventLoopUtil .newEventLoopGroup (1 , false ,
72
72
new ExecutorProvider .ExtendedThreadFactory ("broker-service-url-check" ));
73
73
scheduledCheckTask = executor .scheduleAtFixedRate (() -> {
74
- if (closed ) {
75
- return ;
76
- }
77
- checkPulsarServices ();
78
- int firstHealthyPulsarService = firstHealthyPulsarService ();
79
- if (firstHealthyPulsarService == currentPulsarServiceIndex ) {
80
- return ;
81
- }
82
- if (firstHealthyPulsarService < 0 ) {
83
- int failoverTo = findFailoverTo ();
84
- if (failoverTo < 0 ) {
85
- // No healthy pulsar service to connect.
86
- log .error ("Failed to choose a pulsar service to connect, no one pulsar service is healthy. Current"
87
- + " pulsar service: [{}] {}. States: {}, Counters: {}" , currentPulsarServiceIndex ,
88
- pulsarServiceUrlArray [currentPulsarServiceIndex ], Arrays .toString (pulsarServiceStateArray ),
89
- Arrays .toString (checkCounterArray ));
74
+ try {
75
+ if (closed ) {
76
+ return ;
77
+ }
78
+ checkPulsarServices ();
79
+ int firstHealthyPulsarService = firstHealthyPulsarService ();
80
+ if (firstHealthyPulsarService == currentPulsarServiceIndex ) {
81
+ return ;
82
+ }
83
+ if (firstHealthyPulsarService < 0 ) {
84
+ int failoverTo = findFailoverTo ();
85
+ if (failoverTo < 0 ) {
86
+ // No healthy pulsar service to connect.
87
+ log .error (
88
+ "Failed to choose a pulsar service to connect, no one pulsar service is healthy."
89
+ + " Current pulsar service: [{}] {}. States: {}, Counters: {}" ,
90
+ currentPulsarServiceIndex ,
91
+ pulsarServiceUrlArray [currentPulsarServiceIndex ],
92
+ Arrays .toString (pulsarServiceStateArray ),
93
+ Arrays .toString (checkCounterArray ));
94
+ } else {
95
+ // Failover to low priority pulsar service.
96
+ updateServiceUrl (failoverTo );
97
+ }
90
98
} else {
91
- // Failover to low priority pulsar service.
92
- updateServiceUrl (failoverTo );
99
+ // Back to high priority pulsar service.
100
+ updateServiceUrl (firstHealthyPulsarService );
93
101
}
94
- } else {
95
- // Back to high priority pulsar service.
96
- updateServiceUrl (firstHealthyPulsarService );
102
+ } catch (Exception ex ) {
103
+ log .error ("Failed to re-check cluster status" , ex );
97
104
}
98
105
}, checkHealthyIntervalMs , checkHealthyIntervalMs , TimeUnit .MILLISECONDS );
99
106
}
@@ -123,7 +130,7 @@ private int firstHealthyPulsarService() {
123
130
}
124
131
125
132
private int findFailoverTo () {
126
- for (int i = currentPulsarServiceIndex + 1 ; i <= pulsarServiceUrlArray .length ; i ++) {
133
+ for (int i = currentPulsarServiceIndex + 1 ; i < pulsarServiceUrlArray .length ; i ++) {
127
134
if (probeAvailable (i )) {
128
135
return i ;
129
136
}
0 commit comments