Skip to content

Commit 9f67db4

Browse files
authored
SinkClient fix logging on unsubscribe (#144)
* Add logging to SinkClient to track uncleaned connections * Only count resubmission from worker failures * SinkClient fix logging on unsubscribe Co-authored-by: Calvin Cheung <ccheung@netflix.com>
1 parent de88e88 commit 9f67db4

File tree

1 file changed

+3
-3
lines changed

1 file changed

+3
-3
lines changed

mantis-client/src/main/java/io/mantisrx/client/SinkClientImpl.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -144,10 +144,10 @@ public Observable<T> call(EndpointChange endpointChange) {
144144
})
145145
.doOnUnsubscribe(() -> {
146146
try {
147-
logger.warn("Closing connections to sink of job " + jobId);
147+
logger.warn("Closing connections to sink of job {}", jobId);
148148
closeAllConnections();
149149
} catch (Exception e) {
150-
Observable.error(e);
150+
logger.warn("Error closing all connections to sink of job {}", jobId, e);
151151
}
152152
})
153153
.share()
@@ -212,7 +212,7 @@ public void call(Boolean flag) {
212212
}
213213
}
214214
return ((SinkConnection<T>) sinkConnection).call()
215-
//.flatMap(o -> o)
215+
.takeWhile(e -> !nowClosed.get())
216216
;
217217
}
218218

0 commit comments

Comments
 (0)