11package org .logstash .beats ;
22
3+ import org .apache .logging .log4j .Level ;
4+ import org .apache .logging .log4j .LogManager ;
5+ import org .apache .logging .log4j .Logger ;
6+
37import io .netty .channel .ChannelHandlerContext ;
48import io .netty .channel .SimpleChannelInboundHandler ;
59import io .netty .util .AttributeKey ;
1014import javax .net .ssl .SSLHandshakeException ;
1115
1216public class BeatsHandler extends SimpleChannelInboundHandler <Batch > {
13- private final static Logger logger = LogManager .getLogger (BeatsHandler .class );
17+
18+ private final static Logger logger = LogManager .getLogger ();
1419 private final IMessageListener messageListener ;
1520 private ChannelHandlerContext context ;
1621
@@ -22,33 +27,25 @@ public BeatsHandler(IMessageListener listener) {
2227 @ Override
2328 public void channelActive (final ChannelHandlerContext ctx ) throws Exception {
2429 context = ctx ;
25- if (logger .isTraceEnabled ()){
26- logger .trace (format ("Channel Active" ));
27- }
30+ logger .trace ("{}" , () -> format ("Channel Active" ));
2831 super .channelActive (ctx );
2932 messageListener .onNewConnection (ctx );
3033 }
3134
3235 @ Override
3336 public void channelInactive (ChannelHandlerContext ctx ) throws Exception {
3437 super .channelInactive (ctx );
35- if (logger .isTraceEnabled ()){
36- logger .trace (format ("Channel Inactive" ));
37- }
38+ logger .trace ("{}" , () -> format ("Channel Inactive" ));
3839 messageListener .onConnectionClose (ctx );
3940 }
4041
4142
4243 @ Override
4344 public void channelRead0 (ChannelHandlerContext ctx , Batch batch ) throws Exception {
44- if (logger .isDebugEnabled ()) {
45- logger .debug (format ("Received a new payload" ));
46- }
45+ logger .debug ("{}" , () -> format ("Received a new payload" ));
4746 try {
4847 for (Message message : batch ) {
49- if (logger .isDebugEnabled ()) {
50- logger .debug (format ("Sending a new message for the listener, sequence: " + message .getSequence ()));
51- }
48+ logger .debug ("{}" , () -> format ("Sending a new message for the listener, sequence: " + message .getSequence ()));
5249 messageListener .onNewMessage (ctx , message );
5350
5451 if (needAck (message )) {
@@ -58,9 +55,9 @@ public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exceptio
5855 }finally {
5956 //this channel is done processing this payload, instruct the connection handler to stop sending TCP keep alive
6057 ctx .channel ().attr (ConnectionHandler .CHANNEL_SEND_KEEP_ALIVE ).get ().set (false );
61- if ( logger .isDebugEnabled ()) {
62- logger . debug ( "{}: batches pending: {}" , ctx .channel ().id ().asShortText (),ctx . channel (). attr ( ConnectionHandler . CHANNEL_SEND_KEEP_ALIVE ). get (). get ());
63- }
58+ logger .debug ( "{}: batches pending: {}" ,
59+ () -> ctx .channel ().id ().asShortText (),
60+ () -> ctx . channel (). attr ( ConnectionHandler . CHANNEL_SEND_KEEP_ALIVE ). get (). get ());
6461 batch .release ();
6562 ctx .flush ();
6663 }
@@ -83,11 +80,9 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
8380 }
8481 String causeMessage = cause .getMessage () == null ? cause .getClass ().toString () : cause .getMessage ();
8582
86- if (logger .isDebugEnabled ()){
87- logger .debug (format ("Handling exception: " + causeMessage ), cause );
88- }
89- logger .info (format ("Handling exception: " + causeMessage ));
90- } finally {
83+ logger .info ("{}" , () -> format ("Handling exception: " + causeMessage ));
84+ logger .catching (Level .DEBUG , cause );
85+ } finally {
9186 super .exceptionCaught (ctx , cause );
9287 ctx .flush ();
9388 ctx .close ();
@@ -99,9 +94,7 @@ private boolean needAck(Message message) {
9994 }
10095
10196 private void ack (ChannelHandlerContext ctx , Message message ) {
102- if (logger .isTraceEnabled ()){
103- logger .trace (format ("Acking message number " + message .getSequence ()));
104- }
97+ logger .trace ("{}" , () -> format ("Acking message number " + message .getSequence ()));
10598 writeAck (ctx , message .getBatch ().getProtocol (), message .getSequence ());
10699 }
107100
0 commit comments