Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix connection leaks after closing non-exhaustive EventSource. #91

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

LinZong
Copy link

@LinZong LinZong commented Jan 7, 2024

Affected version

4.1.1

Problem

Okhttp connection won't closed properly after closing EventSource when SSE stream is not fully consumed.

Reproduce code

package moe.nemesiss.playground;

import com.launchdarkly.eventsource.ConnectStrategy;
import com.launchdarkly.eventsource.EventSource;
import com.launchdarkly.eventsource.MessageEvent;
import okhttp3.ConnectionPool;
import okhttp3.HttpUrl;
import okhttp3.OkHttpClient;

import java.util.logging.Level;
import java.util.logging.Logger;

import static org.fest.reflect.core.Reflection.field;
import static org.fest.reflect.core.Reflection.method;

public class EventSourceLeakDemo {

    private static HttpUrl streamGeneratorUrlFor(int length, int interval) {
        // A self-hosted SSE stream generator.
        return HttpUrl.get(String.format("https://plugins.nemesiss.xyz/main/stream_generator?length=%s&intervalMs=%s", length, interval));
    }

    public static void main(String[] args) throws InterruptedException {
        EventSource closedEventSource;
        Logger.getLogger(OkHttpClient.class.getName()).setLevel(Level.FINE);

        try (EventSource source = new EventSource.Builder(
                // Obtain a stream with 10 MessageEvents
                ConnectStrategy.http(streamGeneratorUrlFor(10, 200))).build()) {

            // Actually consumes 8 message and gone.
            for (int i = 0; i < 8; i++) {
                try {
                    MessageEvent message = source.readMessage();
                    System.out.println(message.getData());
                } catch (Throwable t) {
                    t.printStackTrace();
                    break;
                }
            }
            closedEventSource = source;
        }

        // Here EventSource was closed since leaving try-with-resource block.

        // GC WeakReference hosted in connection.
        // see: okhttp3.internal.connection.RealConnection.getCalls
        System.gc();

        // Trigger connection pool cleanup manually.
        // these reflection calls are equivalent to statement:
        // ((com.launchdarkly.eventsource.HttpConnectStrategy.Client) closedEventSource).client.connectionPool().delegate.cleanup(System.nanoTime());
        ConnectStrategy.Client client = field("client").ofType(ConnectStrategy.Client.class).in(closedEventSource).get();
        OkHttpClient internalOkHttpClient = field("httpClient").ofType(OkHttpClient.class).in(client).get();
        ConnectionPool cp = internalOkHttpClient.connectionPool();
        method("cleanup").withReturnType(long.class).withParameterTypes(long.class).in(cp.getDelegate$okhttp()).invoke(System.nanoTime());
        // And connection leak warning should be shown in terminal like below:
        // index: 0, timestamp: 1704600618813
        // index: 1, timestamp: 1704600619016
        // index: 2, timestamp: 1704600619220
        // index: 3, timestamp: 1704600619424
        // index: 4, timestamp: 1704600619628
        // index: 5, timestamp: 1704600619833
        // index: 6, timestamp: 1704600620037
        // index: 7, timestamp: 1704600620240
        //Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
        //WARNING: A connection to https://plugins.nemesiss.xyz/ was leaked. Did you forget to close a response body?
        //java.lang.Throwable: response.body().close()
        //	at okhttp3.internal.platform.Platform.getStackTraceForCloseable(Platform.kt:145)
        //	at okhttp3.internal.connection.RealCall.callStart(RealCall.kt:170)
        //	at okhttp3.internal.connection.RealCall.execute(RealCall.kt:151)
        //	at com.launchdarkly.eventsource.HttpConnectStrategy$Client.connect(HttpConnectStrategy.java:452)
        //	at com.launchdarkly.eventsource.EventSource.tryStart(EventSource.java:292)
        //	at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:595)
        //	at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:390)
        //	at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:359)
        //	at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:33)
        // Jan 07, 2024 12:10:20 PM okhttp3.internal.platform.Platform log
    }
}

Reason

  1. com.launchdarkly.eventsource.EventSource#close will close underlying okhttp3.Call and package-private com.launchdarkly.eventsource.HttpConnectStrategy.Client#httpClient if there is no specified httpClient from outside.

  2. Canceling a okhttp3.Call will:

    • close underlying Socket (for http1.1) or Stream (for http2) will call stack below:
      java.lang.Thread.State: RUNNABLE
    	  at okhttp3.internal.connection.Exchange.cancel(Exchange.kt:153)
    	  at okhttp3.internal.connection.RealCall.cancel(RealCall.kt:139)
    	  at com.launchdarkly.eventsource.HttpConnectStrategy$RequestCloser.close(HttpConnectStrategy.java:554)
    	  at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:677)
    	  - locked <0x916> (a java.lang.Object)
    	  at com.launchdarkly.eventsource.EventSource.close(EventSource.java:532)
    
  3. Condition for logging connection leak warning: connection.calls is not empty, but referring 'RealCall' was gone by GC, indicating RealCall was not detached with RealConnection properly.

    internal class CallReference(
        referent: RealCall,
        /**
         * Captures the stack trace at the time the Call is executed or enqueued. This is helpful for
         * identifying the origin of connection leaks.
         */
        val callStackTrace: Any?
      ) : WeakReference<RealCall>(referent)
    
    private fun pruneAndGetAllocationCount(connection: RealConnection, now: Long): Int {
        connection.assertThreadHoldsLock()
    
        val references: MutableList<Reference<RealCall>> = connection.calls
        var i = 0
        // connection.calls is not empty
        while (i < references.size) {
          val reference: Reference<RealCall> = references[i]
    
          if (reference.get() != null) {
            i++
            continue
          }
          // But referring 'RealCall' was gone by GC.
          // Indicates RealCall was not detached with RealConnection properly.
          
          // We've discovered a leaked call. This is an application bug.
          val callReference = reference as CallReference
          val message = "A connection to ${connection.route().address.url} was leaked. " +
              "Did you forget to close a response body?"
          Platform.get().logCloseableLeak(message, callReference.callStackTrace)
    
          references.removeAt(i)
          connection.noNewExchanges = true
    
          // If this was the last allocation, the connection is eligible for immediate eviction.
          if (references.isEmpty()) {
            connection.idleAtNs = now - keepAliveDurationNs
            return 0
          }
        }
    
        return references.size
      }
  4. The only way to initiative detach RealCall with RealConnection properly is calling releaseConnectionNoEvents method, which entrance is in messageDone method.

    • Called from stream exhausted:

      "main@1" prio=5 tid=0x1 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
      	  at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381)
      	  at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350)
      	  - locked <0x8b2> (a okhttp3.internal.connection.RealConnection)
      	  at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309)
      	  at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.read(Exchange.kt:284)
      	  at okio.RealBufferedSource$inputStream$1.read(RealBufferedSource.kt:158)
      	  at com.launchdarkly.eventsource.BufferedLineParser.readMoreIntoBuffer(BufferedLineParser.java:138)
      	  at com.launchdarkly.eventsource.BufferedLineParser.read(BufferedLineParser.java:63)
      	  at com.launchdarkly.eventsource.EventParser.getNextChunk(EventParser.java:267)
      	  at com.launchdarkly.eventsource.EventParser.tryNextEvent(EventParser.java:130)
      	  at com.launchdarkly.eventsource.EventParser.nextEvent(EventParser.java:109)
      	  at com.launchdarkly.eventsource.EventSource.requireEvent(EventSource.java:600)
      	  at com.launchdarkly.eventsource.EventSource.readAnyEvent(EventSource.java:392)
      	  at com.launchdarkly.eventsource.EventSource.readMessage(EventSource.java:361)
      	  at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:34)
      
    • Called from closing Response.

      "main@1" prio=5 tid=0x1 nid=NA runnable
        java.lang.Thread.State: RUNNABLE
      	  at okhttp3.internal.connection.RealCall.releaseConnectionNoEvents$okhttp(RealCall.kt:381)
      	  at okhttp3.internal.connection.RealCall.callDone(RealCall.kt:350)
      	  - locked <0x8b3> (a okhttp3.internal.connection.RealConnection)
      	  at okhttp3.internal.connection.RealCall.messageDone$okhttp(RealCall.kt:309)
      	  at okhttp3.internal.connection.Exchange.bodyComplete(Exchange.kt:193)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.complete(Exchange.kt:324)
      	  at okhttp3.internal.connection.Exchange$ResponseBodySource.close(Exchange.kt:310)
      	  at okio.RealBufferedSource.close(RealBufferedSource.kt:392)
      	  at okhttp3.internal.Util.closeQuietly(Util.kt:495)
      	  at okhttp3.ResponseBody.close(ResponseBody.kt:192)
      	  at okhttp3.Response.close(Response.kt:302)
      	  at com.launchdarkly.eventsource.HttpConnectStrategy$ResponseCloser.close(HttpConnectStrategy.java:570)
      	  at com.launchdarkly.eventsource.EventSource.closeCurrentStream(EventSource.java:694)
      	  - locked <0x923> (a java.lang.Object)
      	  at com.launchdarkly.eventsource.EventSource.close(EventSource.java:534)
      	  at moe.nemesiss.playground.EventSourceLeakDemo.main(EventSourceLeakDemo.java:42)
      
  5. But such method cannot be called either closing Call or closing connection pool. Closing Call is obvious, let's focus on closing conneciton pool.

    // com.launchdarkly.eventsource.HttpConnectStrategy.Client#close
    public void close() {
      // We need to shut down the HTTP client *if* it is one that we created, and not
      // one that the application provided to us.
      OkHttpClient preconfiguredClient = HttpConnectStrategy.this.httpClient;      
      if (preconfiguredClient == null) {
        // COVERAGE: these null guards are here for safety but in practice the values are never null and there
        // is no way to cause them to be null in unit tests
        if (httpClient.connectionPool() != null) {
          httpClient.connectionPool().evictAll(); // evict calls from connection pool.
        }
        if (httpClient.dispatcher() != null) {
          httpClient.dispatcher().cancelAll();
          if (httpClient.dispatcher().executorService() != null) {
            httpClient.dispatcher().executorService().shutdownNow();
          }
        }
      }
    }
    
    // okhttp3.internal.connection.RealConnectionPool#evictAll
    fun evictAll() {
      val i = connections.iterator()
      while (i.hasNext()) {
        val connection = i.next()
        val socketToClose = synchronized(connection) {
          if (connection.calls.isEmpty()) { // false, closing 'Call' will not detach call with connection.
            i.remove() 
            connection.noNewExchanges = true
            return@synchronized connection.socket() 
          } else {
            return@synchronized null
          }
        }
        socketToClose?.closeQuietly() // close socket (equivalent to closing 'Call').
      }
      if (connections.isEmpty()) cleanupQueue.cancelAll()
    }

Possible Fixup

Close underlying Response in closeCurrentStream along with closing Call at reading thread.

image

@tanderson-ld
Copy link
Contributor

tanderson-ld commented Jan 10, 2024

Hello and thank you for bringing this to our attention and opening this PR. We will review when able to do so. Thank you!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants