Skip to content

Commit 7eae524

Browse files
authored
fix: clean up connection handler if connection fails (#3405)
Remove ConnectionHandler instances from the CONNECTION_HANDLERS map when the connection is closed, regardless how the connection would be closed. Before this change, connection handlers that failed at startup, for example connection attempts from simple TCP probers, would not be cleaned up, and would cause a memory leak if repated TCP probes are executed. Updates #3402
1 parent 92c7712 commit 7eae524

File tree

5 files changed

+59
-28
lines changed

5 files changed

+59
-28
lines changed

src/main/java/com/google/cloud/spanner/pgadapter/ConnectionHandler.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
package com.google.cloud.spanner.pgadapter;
1616

17+
import static com.google.cloud.spanner.pgadapter.ProxyServer.CONNECTION_HANDLERS;
18+
1719
import com.google.api.core.InternalApi;
1820
import com.google.auth.Credentials;
1921
import com.google.cloud.spanner.Database;
@@ -90,7 +92,6 @@
9092
import java.util.Properties;
9193
import java.util.Set;
9294
import java.util.UUID;
93-
import java.util.concurrent.ConcurrentHashMap;
9495
import java.util.concurrent.Future;
9596
import java.util.concurrent.atomic.AtomicInteger;
9697
import java.util.logging.Level;
@@ -123,8 +124,6 @@ public class ConnectionHandler implements Runnable {
123124
.concurrencyLevel(1)
124125
.build();
125126
private final Map<String, IntermediatePortalStatement> portalsMap = new HashMap<>();
126-
private static final Map<Integer, ConnectionHandler> CONNECTION_HANDLERS =
127-
new ConcurrentHashMap<>();
128127
private volatile ConnectionStatus status = ConnectionStatus.UNAUTHENTICATED;
129128
private Thread thread;
130129
private final int connectionId;
@@ -165,7 +164,6 @@ public class ConnectionHandler implements Runnable {
165164
this.socket = socket;
166165
this.secret = new SecureRandom().nextInt();
167166
this.connectionId = incrementingConnectionId.incrementAndGet();
168-
CONNECTION_HANDLERS.put(this.connectionId, this);
169167
this.spannerConnection = spannerConnection;
170168
}
171169

@@ -384,6 +382,7 @@ public void run() {
384382
String.format(
385383
"Connection handler with ID %s starting for client %s with thread %s",
386384
getName(), socket.getInetAddress().getHostAddress(), thread.toString())));
385+
CONNECTION_HANDLERS.put(this.connectionId, this);
387386
if (runConnection(false) == RunConnectionState.RESTART_WITH_SSL) {
388387
logger.log(
389388
Level.INFO,
@@ -522,6 +521,7 @@ && getServer().getOptions().getSslMode().isSslEnabled()
522521
String.format(
523522
"Exception while closing connection handler with ID %s", getName())));
524523
} finally {
524+
CONNECTION_HANDLERS.remove(this.connectionId);
525525
this.server.deregister(this);
526526
logger.log(
527527
Level.INFO,

src/main/java/com/google/cloud/spanner/pgadapter/ProxyServer.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.google.cloud.spanner.pgadapter.utils.Metrics;
2929
import com.google.cloud.spanner.pgadapter.wireprotocol.MessageReader;
3030
import com.google.cloud.spanner.pgadapter.wireprotocol.WireMessage;
31+
import com.google.common.annotations.VisibleForTesting;
3132
import com.google.common.collect.ImmutableList;
3233
import io.opentelemetry.api.OpenTelemetry;
3334
import io.opentelemetry.api.trace.Tracer;
@@ -43,6 +44,7 @@
4344
import java.util.List;
4445
import java.util.Map;
4546
import java.util.Properties;
47+
import java.util.concurrent.ConcurrentHashMap;
4648
import java.util.concurrent.ConcurrentLinkedQueue;
4749
import java.util.concurrent.CountDownLatch;
4850
import java.util.concurrent.ExecutorService;
@@ -65,6 +67,10 @@
6567
public class ProxyServer extends AbstractApiService {
6668

6769
private static final Logger logger = Logger.getLogger(ProxyServer.class.getName());
70+
71+
@VisibleForTesting
72+
static final Map<Integer, ConnectionHandler> CONNECTION_HANDLERS = new ConcurrentHashMap<>();
73+
6874
private final OptionsMetadata options;
6975
private final OpenTelemetry openTelemetry;
7076
private final Metrics metrics;

src/test/java/com/google/cloud/spanner/pgadapter/ConnectionHandlerTest.java

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import static com.google.cloud.spanner.pgadapter.ConnectionHandler.buildConnectionURL;
1919
import static com.google.cloud.spanner.pgadapter.ConnectionHandler.listDatabasesOrInstances;
2020
import static com.google.cloud.spanner.pgadapter.EmulatedPsqlMockServerTest.newStatusResourceNotFoundException;
21+
import static com.google.cloud.spanner.pgadapter.ProxyServer.CONNECTION_HANDLERS;
2122
import static org.junit.Assert.assertArrayEquals;
2223
import static org.junit.Assert.assertEquals;
2324
import static org.junit.Assert.assertFalse;
@@ -329,30 +330,35 @@ public void testCancelActiveStatement() {
329330
ConnectionHandler connectionHandlerToCancel =
330331
new ConnectionHandler(server, socket, spannerConnection);
331332
connectionHandlerToCancel.setThread(mock(Thread.class));
333+
CONNECTION_HANDLERS.put(connectionHandlerToCancel.getConnectionId(), connectionHandlerToCancel);
332334

333-
// Cancelling yourself is not allowed.
334-
assertFalse(
335-
connectionHandler.cancelActiveStatement(
336-
connectionHandler.getConnectionId(), connectionHandler.getSecret()));
337-
// Cancelling a random non-existing connection should not work.
338-
assertFalse(connectionHandler.cancelActiveStatement(100, 100));
339-
// Cancelling another connecting using the wrong secret is not allowed.
340-
assertFalse(
341-
connectionHandler.cancelActiveStatement(
342-
connectionHandlerToCancel.getConnectionId(),
343-
connectionHandlerToCancel.getSecret() - 1));
344-
345-
assertTrue(
346-
connectionHandler.cancelActiveStatement(
347-
connectionHandlerToCancel.getConnectionId(), connectionHandlerToCancel.getSecret()));
348-
349-
// The method should just return false if an error occurs.
350-
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, "test error"))
351-
.when(spannerConnection)
352-
.cancel();
353-
assertFalse(
354-
connectionHandler.cancelActiveStatement(
355-
connectionHandlerToCancel.getConnectionId(), connectionHandlerToCancel.getSecret()));
335+
try {
336+
// Cancelling yourself is not allowed.
337+
assertFalse(
338+
connectionHandler.cancelActiveStatement(
339+
connectionHandler.getConnectionId(), connectionHandler.getSecret()));
340+
// Cancelling a random non-existing connection should not work.
341+
assertFalse(connectionHandler.cancelActiveStatement(100, 100));
342+
// Cancelling another connecting using the wrong secret is not allowed.
343+
assertFalse(
344+
connectionHandler.cancelActiveStatement(
345+
connectionHandlerToCancel.getConnectionId(),
346+
connectionHandlerToCancel.getSecret() - 1));
347+
348+
assertTrue(
349+
connectionHandler.cancelActiveStatement(
350+
connectionHandlerToCancel.getConnectionId(), connectionHandlerToCancel.getSecret()));
351+
352+
// The method should just return false if an error occurs.
353+
doThrow(SpannerExceptionFactory.newSpannerException(ErrorCode.INTERNAL, "test error"))
354+
.when(spannerConnection)
355+
.cancel();
356+
assertFalse(
357+
connectionHandler.cancelActiveStatement(
358+
connectionHandlerToCancel.getConnectionId(), connectionHandlerToCancel.getSecret()));
359+
} finally {
360+
CONNECTION_HANDLERS.remove(connectionHandlerToCancel.getConnectionId());
361+
}
356362
}
357363

358364
@Test

src/test/java/com/google/cloud/spanner/pgadapter/JdbcMockServerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414

1515
package com.google.cloud.spanner.pgadapter;
1616

17+
import static com.google.cloud.spanner.pgadapter.ProxyServer.CONNECTION_HANDLERS;
1718
import static com.google.cloud.spanner.pgadapter.statements.BackendConnection.TRANSACTION_ABORTED_ERROR;
1819
import static com.google.cloud.spanner.pgadapter.statements.PgCatalog.PgNamespace.PG_NAMESPACE_CTE;
1920
import static org.junit.Assert.assertArrayEquals;
@@ -649,6 +650,12 @@ public void testSelectHelloWorld() throws SQLException {
649650
}
650651
}
651652
}
653+
Stopwatch stopwatch = Stopwatch.createStarted();
654+
// The cleanup of the connection is async, so it can take a few milliseconds to reach the state
655+
// that we expect.
656+
//noinspection StatementWithEmptyBody
657+
while (stopwatch.elapsed(TimeUnit.SECONDS) < 3 && !CONNECTION_HANDLERS.isEmpty()) {}
658+
assertEquals(0, CONNECTION_HANDLERS.size());
652659
}
653660

654661
@Test

src/test/java/com/google/cloud/spanner/pgadapter/ProxyServerTest.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,17 @@
1414

1515
package com.google.cloud.spanner.pgadapter;
1616

17+
import static com.google.cloud.spanner.pgadapter.ProxyServer.CONNECTION_HANDLERS;
18+
import static org.junit.Assert.assertEquals;
1719
import static org.junit.Assert.assertThrows;
1820
import static org.junit.Assert.assertTrue;
1921

2022
import com.google.cloud.spanner.pgadapter.metadata.OptionsMetadata;
2123
import com.google.cloud.spanner.pgadapter.metadata.TestOptionsMetadataBuilder;
24+
import com.google.common.base.Stopwatch;
2225
import java.net.Socket;
2326
import java.time.Duration;
27+
import java.util.concurrent.TimeUnit;
2428
import org.junit.Test;
2529
import org.junit.runner.RunWith;
2630
import org.junit.runners.JUnit4;
@@ -30,16 +34,24 @@ public class ProxyServerTest {
3034

3135
@Test
3236
public void testProbeConnection() throws Exception {
37+
assertEquals(0, CONNECTION_HANDLERS.size());
3338
// This test verifies that doing a simple TCP aliveness probe to check whether PGAdapter is
3439
// running can be done without any errors.
3540
ProxyServer server = new ProxyServer(OptionsMetadata.newBuilder().setPort(0).build());
3641
server.startServer();
3742
server.awaitRunning();
3843

39-
//noinspection EmptyTryBlock
4044
try (Socket ignore = new Socket("localhost", server.getLocalPort())) {
4145
// Do nothing, just verify that we can connect without any errors.
46+
Stopwatch stopwatch = Stopwatch.createStarted();
47+
//noinspection StatementWithEmptyBody
48+
while (stopwatch.elapsed(TimeUnit.SECONDS) < 3 && CONNECTION_HANDLERS.isEmpty()) {}
49+
assertEquals(1, CONNECTION_HANDLERS.size());
4250
}
51+
Stopwatch stopwatch = Stopwatch.createStarted();
52+
//noinspection StatementWithEmptyBody
53+
while (stopwatch.elapsed(TimeUnit.SECONDS) < 3 && !CONNECTION_HANDLERS.isEmpty()) {}
54+
assertEquals(0, CONNECTION_HANDLERS.size());
4355
server.stopServer();
4456
server.awaitTerminated();
4557
}

0 commit comments

Comments
 (0)