@@ -156,8 +156,8 @@ private void startProducer() {
156
156
State stateTransient = state ;
157
157
log .info ("[{}] Closing the new producer because the synchronizer state is {}" , prod ,
158
158
stateTransient );
159
- CompletableFuture closeProducer = new CompletableFuture <>();
160
- closeResource (() -> prod . closeAsync () , closeProducer );
159
+ CompletableFuture < Void > closeProducer = new CompletableFuture <>();
160
+ closeResource (prod :: closeAsync , closeProducer );
161
161
closeProducer .thenRun (() -> {
162
162
log .info ("[{}] Closed the new producer because the synchronizer state is {}" , prod ,
163
163
stateTransient );
@@ -221,11 +221,13 @@ private void startConsumer() {
221
221
log .info ("successfully created consumer {}" , topicName );
222
222
} else {
223
223
State stateTransient = state ;
224
- log .info ("[{}] Closing the new consumer because the synchronizer state is {}" , stateTransient );
225
- CompletableFuture closeConsumer = new CompletableFuture <>();
226
- closeResource (() -> consumer .closeAsync (), closeConsumer );
224
+ log .info ("[{}] Closing the new consumer because the synchronizer state is {}" , topicName ,
225
+ stateTransient );
226
+ CompletableFuture <Void > closeConsumer = new CompletableFuture <>();
227
+ closeResource (consumer ::closeAsync , closeConsumer );
227
228
closeConsumer .thenRun (() -> {
228
- log .info ("[{}] Closed the new consumer because the synchronizer state is {}" , stateTransient );
229
+ log .info ("[{}] Closed the new consumer because the synchronizer state is {}" , topicName ,
230
+ stateTransient );
229
231
});
230
232
}
231
233
}).exceptionally (ex -> {
0 commit comments