4747import java .util .concurrent .TimeoutException ;
4848import java .util .concurrent .atomic .AtomicLong ;
4949import java .util .concurrent .atomic .AtomicReference ;
50+ import java .util .concurrent .locks .ReentrantLock ;
5051import java .util .function .LongConsumer ;
5152import java .util .logging .Level ;
5253import java .util .stream .Collectors ;
@@ -196,8 +197,8 @@ public abstract class AbstractSession extends SessionHelper {
196197 protected int decoderState ;
197198 protected int decoderLength ;
198199 protected SshException discarding ;
199- protected final Object encodeLock = new Object ();
200- protected final Object decodeLock = new Object ();
200+ protected final ReentrantLock encodeLock = new ReentrantLock ();
201+ protected final ReentrantLock decodeLock = new ReentrantLock ();
201202 protected final Object requestLock = new Object ();
202203
203204 /**
@@ -501,7 +502,8 @@ public MacInformation getMacInformation(boolean incoming) {
501502 * @throws Exception if an error occurs while decoding or handling the data
502503 */
503504 public void messageReceived (Readable buffer ) throws Exception {
504- synchronized (decodeLock ) {
505+ decodeLock .lock ();
506+ try {
505507 decoderBuffer .putBuffer (buffer );
506508 // One of those properties will be set by the constructor and the other
507509 // one should be set by the readIdentification method
@@ -513,6 +515,8 @@ public void messageReceived(Readable buffer) throws Exception {
513515 }
514516 }
515517 decode ();
518+ } finally {
519+ decodeLock .unlock ();
516520 }
517521 }
518522
@@ -718,13 +722,16 @@ protected IoWriteFuture sendNewKeys() throws Exception {
718722 prepareNewKeys ();
719723 Buffer buffer = createBuffer (SshConstants .SSH_MSG_NEWKEYS , Byte .SIZE );
720724 IoWriteFuture future ;
721- synchronized (encodeLock ) {
725+ encodeLock .lock ();
726+ try {
722727 // writePacket() would also work since it would never try to queue the packet, and would never try to
723728 // initiate a new KEX, and thus would never try to get the kexLock monitor. If it did, we might get a
724729 // deadlock due to lock inversion. It seems safer to push this out directly, though.
725730 future = doWritePacket (buffer );
726731 // Use the new settings from now on for any outgoing packet
727732 setOutputEncoding ();
733+ } finally {
734+ encodeLock .unlock ();
728735 }
729736 kexHandler .updateState (() -> kexState .set (KexState .KEYS ));
730737
@@ -1169,11 +1176,14 @@ protected IoWriteFuture doWritePacket(Buffer buffer) throws IOException {
11691176 // Synchronize all write requests as needed by the encoding algorithm
11701177 // and also queue the write request in this synchronized block to ensure
11711178 // packets are sent in the correct order
1172- synchronized (encodeLock ) {
1179+ encodeLock .lock ();
1180+ try {
11731181 Buffer packet = resolveOutputPacket (buffer );
11741182 IoSession networkSession = getIoSession ();
11751183 IoWriteFuture future = networkSession .writeBuffer (packet );
11761184 return future ;
1185+ } finally {
1186+ encodeLock .unlock ();
11771187 }
11781188 }
11791189
0 commit comments