Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ image::https://raw.githubusercontent.com/apache/tinkerpop/master/docs/static/ima
* Added `closeSessionPostGraphOp` to the Gremlin Server settings to indicate that the `Session` should be closed on either a successful commit or rollback.
* Added `SessionedChildClient` that borrows connections from a different `Client` for use with `Sessions`.
* Added `reuseConnectionsForSessions` to Java GLV settings to decide whether to use `SessionedChildClient` for remote transactions.
* Added `MaxConnectionLifetime` setting to the Go GLV to allow periodic recycling of WebSocket connections; expired idle connections are closed immediately while busy connections drain in-flight requests before being replaced.

[[release-3-7-5]]
=== TinkerPop 3.7.5 (Release Date: November 12, 2025)
Expand Down
7 changes: 6 additions & 1 deletion gremlin-go/driver/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ type ClientSettings struct {
// Initial amount of instantiated connections. Default: 1
InitialConcurrentConnections int
EnableUserAgentOnConnect bool
// MaxConnectionLifetime is the maximum duration a connection can be reused before being
// drained and replaced. Expired connections finish in-flight requests before closing.
// Default: 0 (disabled — connections live forever).
MaxConnectionLifetime time.Duration
}

// Client is used to connect and interact with a Gremlin-supported server.
Expand Down Expand Up @@ -111,7 +115,8 @@ func NewClient(url string, configurations ...func(settings *ClientSettings)) (*C
settings.InitialConcurrentConnections = settings.MaximumConcurrentConnections
}
pool, err := newLoadBalancingPool(url, logHandler, connSettings, settings.NewConnectionThreshold,
settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections)
settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections,
settings.MaxConnectionLifetime)
if err != nil {
if err != nil {
logHandler.logf(Error, logErrorGeneric, "NewClient", err.Error())
Expand Down
17 changes: 13 additions & 4 deletions gremlin-go/driver/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type connection struct {
protocol protocol
results *synchronizedMap
state connectionState
createdAt time.Time
}

type connectionSettings struct {
Expand Down Expand Up @@ -100,12 +101,20 @@ func (connection *connection) activeResults() int {
// established: connection has established communication established with the server
// closed: connection was closed by the user.
// closedDueToError: connection was closed internally due to an error.
func (connection *connection) isExpired(maxLifetime time.Duration) bool {
if maxLifetime <= 0 {
return false
}
return time.Since(connection.createdAt) > maxLifetime
}

func createConnection(url string, logHandler *logHandler, connSettings *connectionSettings) (*connection, error) {
conn := &connection{
logHandler,
nil,
&synchronizedMap{map[string]ResultSet{}, sync.Mutex{}},
initialized,
logHandler: logHandler,
protocol: nil,
results: &synchronizedMap{map[string]ResultSet{}, sync.Mutex{}},
state: initialized,
createdAt: time.Now(),
}
logHandler.log(Info, connectConnection)
protocol, err := newGremlinServerWSProtocol(logHandler, Gorilla, url, connSettings, conn.results, conn.errorCallback)
Expand Down
30 changes: 23 additions & 7 deletions gremlin-go/driver/connectionPool.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package gremlingo

import (
"sync"
"time"
)

type connectionPool interface {
Expand All @@ -43,6 +44,7 @@ type loadBalancingPool struct {
connSettings *connectionSettings

newConnectionThreshold int
maxConnLifetime time.Duration
connections []*connection
loadBalanceLock sync.Mutex
isClosed bool
Expand Down Expand Up @@ -100,12 +102,24 @@ func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
validConnections := make([]*connection, 0, cap(pool.connections))
for _, connection := range pool.connections {
if connection.state == established || connection.state == initialized {
validConnections = append(validConnections, connection)
}
if connection.state == established {
// Set the least used connection.
if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
leastUsed = connection
if connection.isExpired(pool.maxConnLifetime) {
if connection.activeResults() == 0 {
// Expired and idle: close it now, exclude from pool.
pool.logHandler.log(Info, connectionExpired)
connection.close()
} else {
// Expired but has in-flight requests: keep alive so responses can arrive,
// but do not select for new work.
validConnections = append(validConnections, connection)
}
} else {
validConnections = append(validConnections, connection)
if connection.state == established {
// Set the least used connection.
if leastUsed == nil || connection.activeResults() < leastUsed.activeResults() {
leastUsed = connection
}
}
}
}
}
Expand Down Expand Up @@ -137,7 +151,8 @@ func (pool *loadBalancingPool) getLeastUsedConnection() (*connection, error) {
}

func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *connectionSettings,
newConnectionThreshold int, maximumConcurrentConnections int, initialConcurrentConnections int) (connectionPool, error) {
newConnectionThreshold int, maximumConcurrentConnections int, initialConcurrentConnections int,
maxConnLifetime time.Duration) (connectionPool, error) {
var wg sync.WaitGroup
wg.Add(initialConcurrentConnections)
var appendLock sync.Mutex
Expand Down Expand Up @@ -170,6 +185,7 @@ func newLoadBalancingPool(url string, logHandler *logHandler, connSettings *conn
logHandler: logHandler,
connSettings: connSettings,
newConnectionThreshold: newConnectionThreshold,
maxConnLifetime: maxConnLifetime,
connections: pool,
}, nil
}
133 changes: 127 additions & 6 deletions gremlin-go/driver/connection_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -462,7 +462,7 @@ func TestConnection(t *testing.T) {
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
newPoolSize := 2
pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English),
newDefaultConnectionSettings(), 4, 4, newPoolSize)
newDefaultConnectionSettings(), 4, 4, newPoolSize, 0)
assert.Nil(t, err)
defer pool.close()
assert.Len(t, pool.(*loadBalancingPool).connections, newPoolSize)
Expand All @@ -472,7 +472,7 @@ func TestConnection(t *testing.T) {
newPoolSize := 0
skipTestsIfNotEnabled(t, integrationTestSuiteName, testNoAuthEnable)
pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English),
newDefaultConnectionSettings(), 4, 4, newPoolSize)
newDefaultConnectionSettings(), 4, 4, newPoolSize, 0)
assert.Nil(t, err)
defer pool.close()
lhp := pool.(*loadBalancingPool)
Expand All @@ -499,7 +499,7 @@ func TestConnection(t *testing.T) {
t.Run("pool is empty", func(t *testing.T) {
pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English),
newDefaultConnectionSettings(),
newConnectionThreshold, maximumConcurrentConnections, 0)
newConnectionThreshold, maximumConcurrentConnections, 0, 0)
assert.Nil(t, err)
lbp := pool.(*loadBalancingPool)
defer lbp.close()
Expand All @@ -512,7 +512,7 @@ func TestConnection(t *testing.T) {
t.Run("newConcurrentThreshold reached with capacity remaining", func(t *testing.T) {
pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English),
newDefaultConnectionSettings(),
newConnectionThreshold, maximumConcurrentConnections, 0)
newConnectionThreshold, maximumConcurrentConnections, 0, 0)
assert.Nil(t, err)
lbp := pool.(*loadBalancingPool)
defer lbp.close()
Expand Down Expand Up @@ -541,7 +541,7 @@ func TestConnection(t *testing.T) {
t.Run("newConcurrentThreshold reached with no capacity remaining", func(t *testing.T) {
capacityFullConnectionPool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info,
language.English), newDefaultConnectionSettings(),
1, 1, 1)
1, 1, 1, 0)
assert.Nil(t, err)
assert.NotNil(t, capacityFullConnectionPool)
capacityFullLbp := capacityFullConnectionPool.(*loadBalancingPool)
Expand All @@ -557,7 +557,7 @@ func TestConnection(t *testing.T) {
t.Run("all connections in pool invalid", func(t *testing.T) {
pool, err := newLoadBalancingPool(testNoAuthUrl, newLogHandler(&defaultLogger{}, Info, language.English),
newDefaultConnectionSettings(),
newConnectionThreshold, maximumConcurrentConnections, 0)
newConnectionThreshold, maximumConcurrentConnections, 0, 0)
assert.Nil(t, err)
lbp := pool.(*loadBalancingPool)
defer lbp.close()
Expand Down Expand Up @@ -1322,3 +1322,124 @@ func TestConnection(t *testing.T) {
}
})
}

// Unit tests for MaxConnectionLifetime feature

func makeTestLogHandlerLifetime() *logHandler {
return newLogHandler(&defaultLogger{}, Warning, language.English)
}

func makeTestConnection(state connectionState, createdAt time.Time) *connection {
return &connection{
logHandler: makeTestLogHandlerLifetime(),
protocol: nil,
results: &synchronizedMap{internalMap: map[string]ResultSet{}, syncLock: sync.Mutex{}},
state: state,
createdAt: createdAt,
}
}

func makeTestConnectionWithResults(state connectionState, createdAt time.Time, numResults int) *connection {
results := &synchronizedMap{internalMap: map[string]ResultSet{}, syncLock: sync.Mutex{}}
for i := 0; i < numResults; i++ {
key := strconv.Itoa(i)
results.internalMap[key] = newChannelResultSet(key, results)
}
return &connection{
logHandler: makeTestLogHandlerLifetime(),
protocol: nil,
results: results,
state: state,
createdAt: createdAt,
}
}

func TestConnectionIsExpired(t *testing.T) {
t.Run("returns false when maxLifetime is 0 (disabled)", func(t *testing.T) {
conn := makeTestConnection(established, time.Now().Add(-24*time.Hour))
assert.False(t, conn.isExpired(0))
})

t.Run("returns false for fresh connection within lifetime", func(t *testing.T) {
conn := makeTestConnection(established, time.Now())
assert.False(t, conn.isExpired(1*time.Hour))
})

t.Run("returns true after lifetime has elapsed", func(t *testing.T) {
conn := makeTestConnection(established, time.Now().Add(-2*time.Millisecond))
assert.True(t, conn.isExpired(1*time.Millisecond))
})
}

func TestExpiredIdleConnectionClosed(t *testing.T) {
oldTime := time.Now().Add(-1 * time.Second)
expiredIdle := makeTestConnection(established, oldTime)
fresh := makeTestConnection(established, time.Now())

conns := make([]*connection, 0, 4)
conns = append(conns, expiredIdle, fresh)

pool := &loadBalancingPool{
url: "ws://localhost:8182/gremlin",
logHandler: makeTestLogHandlerLifetime(),
connSettings: &connectionSettings{},
newConnectionThreshold: defaultNewConnectionThreshold,
maxConnLifetime: 500 * time.Millisecond,
connections: conns,
}

result, err := pool.getLeastUsedConnection()

assert.NoError(t, err)
assert.Equal(t, fresh, result, "should select the fresh connection")
assert.Equal(t, closed, expiredIdle.state, "expired idle connection should be closed")
assert.Equal(t, 1, len(pool.connections), "expired connection should be removed from pool")
assert.Equal(t, fresh, pool.connections[0], "remaining connection should be the fresh one")
}

func TestExpiredBusyConnectionKeptButNotSelected(t *testing.T) {
oldTime := time.Now().Add(-1 * time.Second)
expiredBusy := makeTestConnectionWithResults(established, oldTime, 2)
fresh := makeTestConnection(established, time.Now())

conns := make([]*connection, 0, 4)
conns = append(conns, expiredBusy, fresh)

pool := &loadBalancingPool{
url: "ws://localhost:8182/gremlin",
logHandler: makeTestLogHandlerLifetime(),
connSettings: &connectionSettings{},
newConnectionThreshold: defaultNewConnectionThreshold,
maxConnLifetime: 500 * time.Millisecond,
connections: conns,
}

result, err := pool.getLeastUsedConnection()

assert.NoError(t, err)
assert.Equal(t, fresh, result, "should select the fresh connection, not the expired+busy one")
assert.Equal(t, established, expiredBusy.state, "expired busy connection should remain established")
assert.Equal(t, 2, len(pool.connections), "expired busy connection should remain in pool")
}

func TestMaxConnectionLifetimeClientSettingsWiring(t *testing.T) {
settings := &ClientSettings{MaxConnectionLifetime: 5 * time.Minute}

// Verify the field flows from ClientSettings into loadBalancingPool
pool := &loadBalancingPool{maxConnLifetime: settings.MaxConnectionLifetime}
assert.Equal(t, 5*time.Minute, pool.maxConnLifetime)
}

func TestMaxConnectionLifetimeDefaultIsZero(t *testing.T) {
// Default ClientSettings should have MaxConnectionLifetime == 0 (disabled)
settings := &ClientSettings{}
assert.Equal(t, time.Duration(0), settings.MaxConnectionLifetime)
}

func TestMaxConnectionLifetimeDriverRemoteConnectionSettingsWiring(t *testing.T) {
settings := &DriverRemoteConnectionSettings{MaxConnectionLifetime: 3 * time.Minute}

// Verify the field flows from DriverRemoteConnectionSettings into loadBalancingPool
pool := &loadBalancingPool{maxConnLifetime: settings.MaxConnectionLifetime}
assert.Equal(t, 3*time.Minute, pool.maxConnLifetime)
}
8 changes: 7 additions & 1 deletion gremlin-go/driver/driverRemoteConnection.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@ type DriverRemoteConnectionSettings struct {
MaximumConcurrentConnections int
// Initial amount of instantiated connections. Default: 1
InitialConcurrentConnections int
// MaxConnectionLifetime is the maximum duration a connection can be reused before being
// drained and replaced. Expired connections finish in-flight requests before closing.
// Default: 0 (disabled — connections live forever).
MaxConnectionLifetime time.Duration
}

// DriverRemoteConnection is a remote connection.
Expand Down Expand Up @@ -120,7 +124,8 @@ func NewDriverRemoteConnection(
settings.InitialConcurrentConnections = settings.MaximumConcurrentConnections
}
pool, err := newLoadBalancingPool(url, logHandler, connSettings, settings.NewConnectionThreshold,
settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections)
settings.MaximumConcurrentConnections, settings.InitialConcurrentConnections,
settings.MaxConnectionLifetime)
if err != nil {
if err != nil {
logHandler.logf(Error, logErrorGeneric, "NewDriverRemoteConnection", err.Error())
Expand Down Expand Up @@ -218,6 +223,7 @@ func (driver *DriverRemoteConnection) CreateSession(sessionId ...string) (*Drive
settings.ReadBufferSize = driver.settings.ReadBufferSize
settings.WriteBufferSize = driver.settings.WriteBufferSize
settings.MaximumConcurrentConnections = driver.settings.MaximumConcurrentConnections
settings.MaxConnectionLifetime = driver.settings.MaxConnectionLifetime
})
if err != nil {
return nil, err
Expand Down
1 change: 1 addition & 0 deletions gremlin-go/driver/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,4 +127,5 @@ const (
poolNewConnectionError errorKey = "POOL_NEW_CONNECTION_ERROR"
sessionDetected errorKey = "SESSION_DETECTED"
poolInitialExceedsMaximum errorKey = "POOL_INITIAL_EXCEEDS_MAXIMUM"
connectionExpired errorKey = "CONNECTION_EXPIRED"
)
3 changes: 2 additions & 1 deletion gremlin-go/driver/resources/logger-messages/en.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,6 @@
"CREATE_CONNECTION_ERROR": "Error creating new connection for connection pool: %s",
"POOL_NEW_CONNECTION_ERROR": "Falling back to least-used connection. Creating new connection due to least-used connection exceeding concurrent usage threshold failed: %s",
"SESSION_DETECTED": "Session detected. Setting connection pool size maximum to 1.",
"POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d exceeded MaximumConcurrentConnections setting %d - limiting InitialConcurrentConnections to %d."
"POOL_INITIAL_EXCEEDS_MAXIMUM": "InitialConcurrentConnections setting %d exceeded MaximumConcurrentConnections setting %d - limiting InitialConcurrentConnections to %d.",
"CONNECTION_EXPIRED": "Closing expired connection due to max connection lifetime."
}