diff --git a/CHANGELOG.asciidoc b/CHANGELOG.asciidoc index 40d3eab415b..27f38ab048e 100644 --- a/CHANGELOG.asciidoc +++ b/CHANGELOG.asciidoc @@ -29,6 +29,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima * Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback. * Added `SessionedChildClient` that borrows connections from a different `Client` for use with `Sessions`. * Added `reuseConnectionsForSessions` to Java GLV settings to decide whether to use `SessionedChildClient` for remote transactions. +* Added `MaxConnectionLifetime` setting to the Go GLV to allow periodic recycling of WebSocket connections; expired idle connections are closed immediately while busy connections drain in-flight requests before being replaced. [[release-3-7-5]] === TinkerPop 3.7.5 (Release Date: November 12, 2025) diff --git a/gremlin-go/driver/client.go b/gremlin-go/driver/client.go index c484914f9d3..20afb7320c7 100644 --- a/gremlin-go/driver/client.go +++ b/gremlin-go/driver/client.go @@ -51,6 +51,10 @@ type ClientSettings struct { // Initial amount of instantiated connections. Default: 1 InitialConcurrentConnections int EnableUserAgentOnConnect bool + // MaxConnectionLifetime is the maximum duration a connection can be reused before being + // drained and replaced. Expired connections finish in-flight requests before closing. + // Default: 0 (disabled — connections live forever). + MaxConnectionLifetime time.Duration } // Client is used to connect and interact with a Gremlin-supported server. @@ -111,7 +115,8 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C settings.InitialConcurrentConnections = settings.MaximumConcurrentConnections } pool, err := newLoadBalancingPool(url, logHandler, connSettings, settings.NewConnectionThreshold, - settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections) + settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections, + settings.MaxConnectionLifetime) if err != nil { if err != nil { logHandler.logf(Error, logErrorGeneric, "NewClient", err.Error()) diff --git a/gremlin-go/driver/connection.go b/gremlin-go/driver/connection.go index a721044340f..833ad9c36b4 100644 --- a/gremlin-go/driver/connection.go +++ b/gremlin-go/driver/connection.go @@ -39,6 +39,7 @@ type connection struct { protocol protocol results *synchronizedMap state connectionState + createdAt time.Time } type connectionSettings struct { @@ -100,12 +101,20 @@ func (connection *connection) activeResults() int { // established: connection has established communication established with the server // closed: connection was closed by the user. // closedDueToError: connection was closed internally due to an error. +func (connection *connection) isExpired(maxLifetime time.Duration) bool { + if maxLifetime <= 0 { + return false + } + return time.Since(connection.createdAt) > maxLifetime +} + func createConnection(url string, logHandler *logHandler, connSettings *connectionSettings) (*connection, error) { conn := &connection{ - logHandler, - nil, - &synchronizedMap{map[string]ResultSet{}, sync.Mutex{}}, - initialized, + logHandler: logHandler, + protocol: nil, + results: &synchronizedMap{map[string]ResultSet{}, sync.Mutex{}}, + state: initialized, + createdAt: time.Now(), } logHandler.log(Info, connectConnection) protocol, err := newGremlinServerWSProtocol(logHandler, Gorilla, url, connSettings, conn.results, conn.errorCallback) diff --git a/gremlin-go/driver/connectionPool.go b/gremlin-go/driver/connectionPool.go index 7f800ef25fd..0390a1d204d 100644 --- a/gremlin-go/driver/connectionPool.go +++ b/gremlin-go/driver/connectionPool.go @@ -21,6 +21,7 @@ package gremlingo import ( "sync" + "time" ) type connectionPool interface { @@ -43,6 +44,7 @@ type loadBalancingPool struct { connSettings *connectionSettings newConnectionThreshold int + maxConnLifetime time.Duration connections []*connection loadBalanceLock sync.Mutex isClosed bool @@ -100,12 +102,24 @@ func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) { validConnections := make([]*connection, 0, cap(pool.connections)) for _, connection := range pool.connections { if connection.state == established || connection.state == initialized { - validConnections = append(validConnections, connection) - } - if connection.state == established { - // Set the least used connection. - if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() { - leastUsed = connection + if connection.isExpired(pool.maxConnLifetime) { + if connection.activeResults() == 0 { + // Expired and idle: close it now, exclude from pool. + pool.logHandler.log(Info, connectionExpired) + connection.close() + } else { + // Expired but has in-flight requests: keep alive so responses can arrive, + // but do not select for new work. + validConnections = append(validConnections, connection) + } + } else { + validConnections = append(validConnections, connection) + if connection.state == established { + // Set the least used connection. + if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() { + leastUsed = connection + } + } } } } @@ -137,7 +151,8 @@ func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) { } func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *connectionSettings, - newConnectionThreshold int, maximumConcurrentConnections int, initialConcurrentConnections int) (connectionPool, error) { + newConnectionThreshold int, maximumConcurrentConnections int, initialConcurrentConnections int, + maxConnLifetime time.Duration) (connectionPool, error) { var wg sync.WaitGroup wg.Add(initialConcurrentConnections) var appendLock sync.Mutex @@ -170,6 +185,7 @@ func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *conn logHandler: logHandler, connSettings: connSettings, newConnectionThreshold: newConnectionThreshold, + maxConnLifetime: maxConnLifetime, connections: pool, }, nil } diff --git a/gremlin-go/driver/connection_test.go b/gremlin-go/driver/connection_test.go index 863c264c090..0eb7e7e49c1 100644 --- a/gremlin-go/driver/connection_test.go +++ b/gremlin-go/driver/connection_test.go @@ -462,7 +462,7 @@ func TestConnection(t *testing.T) { skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) newPoolSize := 2 pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), - newDefaultConnectionSettings(), 4, 4, newPoolSize) + newDefaultConnectionSettings(), 4, 4, newPoolSize, 0) assert.Nil(t, err) defer pool.close() assert.Len(t, pool.(*loadBalancingPool).connections, newPoolSize) @@ -472,7 +472,7 @@ func TestConnection(t *testing.T) { newPoolSize := 0 skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable) pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), - newDefaultConnectionSettings(), 4, 4, newPoolSize) + newDefaultConnectionSettings(), 4, 4, newPoolSize, 0) assert.Nil(t, err) defer pool.close() lhp := pool.(*loadBalancingPool) @@ -499,7 +499,7 @@ func TestConnection(t *testing.T) { t.Run("pool is empty", func(t *testing.T) { pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), newDefaultConnectionSettings(), - newConnectionThreshold, maximumConcurrentConnections, 0) + newConnectionThreshold, maximumConcurrentConnections, 0, 0) assert.Nil(t, err) lbp := pool.(*loadBalancingPool) defer lbp.close() @@ -512,7 +512,7 @@ func TestConnection(t *testing.T) { t.Run("newConcurrentThreshold reached with capacity remaining", func(t *testing.T) { pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), newDefaultConnectionSettings(), - newConnectionThreshold, maximumConcurrentConnections, 0) + newConnectionThreshold, maximumConcurrentConnections, 0, 0) assert.Nil(t, err) lbp := pool.(*loadBalancingPool) defer lbp.close() @@ -541,7 +541,7 @@ func TestConnection(t *testing.T) { t.Run("newConcurrentThreshold reached with no capacity remaining", func(t *testing.T) { capacityFullConnectionPool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), newDefaultConnectionSettings(), - 1, 1, 1) + 1, 1, 1, 0) assert.Nil(t, err) assert.NotNil(t, capacityFullConnectionPool) capacityFullLbp := capacityFullConnectionPool.(*loadBalancingPool) @@ -557,7 +557,7 @@ func TestConnection(t *testing.T) { t.Run("all connections in pool invalid", func(t *testing.T) { pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English), newDefaultConnectionSettings(), - newConnectionThreshold, maximumConcurrentConnections, 0) + newConnectionThreshold, maximumConcurrentConnections, 0, 0) assert.Nil(t, err) lbp := pool.(*loadBalancingPool) defer lbp.close() @@ -1322,3 +1322,124 @@ func TestConnection(t *testing.T) { } }) } + +// Unit tests for MaxConnectionLifetime feature + +func makeTestLogHandlerLifetime() *logHandler { + return newLogHandler(&defaultLogger{}, Warning, language.English) +} + +func makeTestConnection(state connectionState, createdAt time.Time) *connection { + return &connection{ + logHandler: makeTestLogHandlerLifetime(), + protocol: nil, + results: &synchronizedMap{internalMap: map[string]ResultSet{}, syncLock: sync.Mutex{}}, + state: state, + createdAt: createdAt, + } +} + +func makeTestConnectionWithResults(state connectionState, createdAt time.Time, numResults int) *connection { + results := &synchronizedMap{internalMap: map[string]ResultSet{}, syncLock: sync.Mutex{}} + for i := 0; i < numResults; i++ { + key := strconv.Itoa(i) + results.internalMap[key] = newChannelResultSet(key, results) + } + return &connection{ + logHandler: makeTestLogHandlerLifetime(), + protocol: nil, + results: results, + state: state, + createdAt: createdAt, + } +} + +func TestConnectionIsExpired(t *testing.T) { + t.Run("returns false when maxLifetime is 0 (disabled)", func(t *testing.T) { + conn := makeTestConnection(established, time.Now().Add(-24*time.Hour)) + assert.False(t, conn.isExpired(0)) + }) + + t.Run("returns false for fresh connection within lifetime", func(t *testing.T) { + conn := makeTestConnection(established, time.Now()) + assert.False(t, conn.isExpired(1*time.Hour)) + }) + + t.Run("returns true after lifetime has elapsed", func(t *testing.T) { + conn := makeTestConnection(established, time.Now().Add(-2*time.Millisecond)) + assert.True(t, conn.isExpired(1*time.Millisecond)) + }) +} + +func TestExpiredIdleConnectionClosed(t *testing.T) { + oldTime := time.Now().Add(-1 * time.Second) + expiredIdle := makeTestConnection(established, oldTime) + fresh := makeTestConnection(established, time.Now()) + + conns := make([]*connection, 0, 4) + conns = append(conns, expiredIdle, fresh) + + pool := &loadBalancingPool{ + url: "ws://localhost:8182/gremlin", + logHandler: makeTestLogHandlerLifetime(), + connSettings: &connectionSettings{}, + newConnectionThreshold: defaultNewConnectionThreshold, + maxConnLifetime: 500 * time.Millisecond, + connections: conns, + } + + result, err := pool.getLeastUsedConnection() + + assert.NoError(t, err) + assert.Equal(t, fresh, result, "should select the fresh connection") + assert.Equal(t, closed, expiredIdle.state, "expired idle connection should be closed") + assert.Equal(t, 1, len(pool.connections), "expired connection should be removed from pool") + assert.Equal(t, fresh, pool.connections[0], "remaining connection should be the fresh one") +} + +func TestExpiredBusyConnectionKeptButNotSelected(t *testing.T) { + oldTime := time.Now().Add(-1 * time.Second) + expiredBusy := makeTestConnectionWithResults(established, oldTime, 2) + fresh := makeTestConnection(established, time.Now()) + + conns := make([]*connection, 0, 4) + conns = append(conns, expiredBusy, fresh) + + pool := &loadBalancingPool{ + url: "ws://localhost:8182/gremlin", + logHandler: makeTestLogHandlerLifetime(), + connSettings: &connectionSettings{}, + newConnectionThreshold: defaultNewConnectionThreshold, + maxConnLifetime: 500 * time.Millisecond, + connections: conns, + } + + result, err := pool.getLeastUsedConnection() + + assert.NoError(t, err) + assert.Equal(t, fresh, result, "should select the fresh connection, not the expired+busy one") + assert.Equal(t, established, expiredBusy.state, "expired busy connection should remain established") + assert.Equal(t, 2, len(pool.connections), "expired busy connection should remain in pool") +} + +func TestMaxConnectionLifetimeClientSettingsWiring(t *testing.T) { + settings := &ClientSettings{MaxConnectionLifetime: 5 * time.Minute} + + // Verify the field flows from ClientSettings into loadBalancingPool + pool := &loadBalancingPool{maxConnLifetime: settings.MaxConnectionLifetime} + assert.Equal(t, 5*time.Minute, pool.maxConnLifetime) +} + +func TestMaxConnectionLifetimeDefaultIsZero(t *testing.T) { + // Default ClientSettings should have MaxConnectionLifetime == 0 (disabled) + settings := &ClientSettings{} + assert.Equal(t, time.Duration(0), settings.MaxConnectionLifetime) +} + +func TestMaxConnectionLifetimeDriverRemoteConnectionSettingsWiring(t *testing.T) { + settings := &DriverRemoteConnectionSettings{MaxConnectionLifetime: 3 * time.Minute} + + // Verify the field flows from DriverRemoteConnectionSettings into loadBalancingPool + pool := &loadBalancingPool{maxConnLifetime: settings.MaxConnectionLifetime} + assert.Equal(t, 3*time.Minute, pool.maxConnLifetime) +} diff --git a/gremlin-go/driver/driverRemoteConnection.go b/gremlin-go/driver/driverRemoteConnection.go index a0f155f4c4d..e66af5d691b 100644 --- a/gremlin-go/driver/driverRemoteConnection.go +++ b/gremlin-go/driver/driverRemoteConnection.go @@ -53,6 +53,10 @@ type DriverRemoteConnectionSettings struct { MaximumConcurrentConnections int // Initial amount of instantiated connections. Default: 1 InitialConcurrentConnections int + // MaxConnectionLifetime is the maximum duration a connection can be reused before being + // drained and replaced. Expired connections finish in-flight requests before closing. + // Default: 0 (disabled — connections live forever). + MaxConnectionLifetime time.Duration } // DriverRemoteConnection is a remote connection. @@ -120,7 +124,8 @@ func NewDriverRemoteConnection( settings.InitialConcurrentConnections = settings.MaximumConcurrentConnections } pool, err := newLoadBalancingPool(url, logHandler, connSettings, settings.NewConnectionThreshold, - settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections) + settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections, + settings.MaxConnectionLifetime) if err != nil { if err != nil { logHandler.logf(Error, logErrorGeneric, "NewDriverRemoteConnection", err.Error()) @@ -218,6 +223,7 @@ func (driver *DriverRemoteConnection) CreateSession(sessionId ...string) (*Drive settings.ReadBufferSize = driver.settings.ReadBufferSize settings.WriteBufferSize = driver.settings.WriteBufferSize settings.MaximumConcurrentConnections = driver.settings.MaximumConcurrentConnections + settings.MaxConnectionLifetime = driver.settings.MaxConnectionLifetime }) if err != nil { return nil, err diff --git a/gremlin-go/driver/logger.go b/gremlin-go/driver/logger.go index 3cc623cbb48..906c95eb686 100644 --- a/gremlin-go/driver/logger.go +++ b/gremlin-go/driver/logger.go @@ -127,4 +127,5 @@ const ( poolNewConnectionError errorKey = "POOL_NEW_CONNECTION_ERROR" sessionDetected errorKey = "SESSION_DETECTED" poolInitialExceedsMaximum errorKey = "POOL_INITIAL_EXCEEDS_MAXIMUM" + connectionExpired errorKey = "CONNECTION_EXPIRED" ) diff --git a/gremlin-go/driver/resources/logger-messages/en.json b/gremlin-go/driver/resources/logger-messages/en.json index 63287db1e73..5df7801dd72 100644 --- a/gremlin-go/driver/resources/logger-messages/en.json +++ b/gremlin-go/driver/resources/logger-messages/en.json @@ -27,5 +27,6 @@ "CREATE_CONNECTION_ERROR": "Error creating new connection for connection pool: %s", "POOL_NEW_CONNECTION_ERROR": "Falling back to least-used connection. Creating new connection due to least-used connection exceeding concurrent usage threshold failed: %s", "SESSION_DETECTED": "Session detected. Setting connection pool size maximum to 1.", - "POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d exceeded MaximumConcurrentConnections setting %d - limiting InitialConcurrentConnections to %d." + "POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d exceeded MaximumConcurrentConnections setting %d - limiting InitialConcurrentConnections to %d.", + "CONNECTION_EXPIRED": "Closing expired connection due to max connection lifetime." }