1515@ Slf4j
1616public class OnSuccess extends Sinker {
1717 public static void main (String [] args ) throws Exception {
18- Server server = new Server (new SimpleSink ());
18+ Server server = new Server (new OnSuccess ());
1919
2020 // Start the server
2121 server .start ();
@@ -41,13 +41,20 @@ public ResponseList processMessages(DatumIterator datumIterator) {
4141 }
4242 try {
4343 String msg = new String (datum .getValue ());
44- log .info ("Received message: {}, headers - {}" , msg , datum .getHeaders ());
44+ log .info ("Received message: {}, id: {}, headers - {}" , msg , datum . getId () , datum .getHeaders ());
4545 if (writeToPrimarySink ()) {
46- responseListBuilder .addResponse (Response .responseOnSuccess (datum .getId (), (OnSuccessMessage ) null ));
46+ log .info ("Writing to onSuccess sink: {}" , datum .getId ());
47+ responseListBuilder .addResponse (Response .responseOnSuccess (datum .getId (),
48+ OnSuccessMessage .builder ()
49+ .value (String .format ("Successfully wrote message with ID: %s" ,
50+ datum .getId ()).getBytes ())
51+ .build ()));
4752 } else {
53+ log .info ("Writing to fallback sink: {}" , datum .getId ());
4854 responseListBuilder .addResponse (Response .responseFallback (datum .getId ()));
4955 }
5056 } catch (Exception e ) {
57+ log .warn ("Error while writing to any sink: " , e );
5158 responseListBuilder .addResponse (Response .responseFailure (
5259 datum .getId (),
5360 e .getMessage ()));
@@ -59,7 +66,7 @@ public ResponseList processMessages(DatumIterator datumIterator) {
5966 /**
6067 * Example method to simulate write failures/success to primary sink.
6168 * Based on whether this returns true/false, we write to fallback sink / onSuccess sink
62- * @return
69+ * @return true if simulated write to primary sink is successful, false otherwise
6370 */
6471 public boolean writeToPrimarySink () {
6572 Random random = new Random ();
0 commit comments