Skip to content

Commit d5f672a

Browse files
authored
Merge pull request #478 from aerospike/stage
Go Client v8.2.0
2 parents 6400205 + 173f0cc commit d5f672a

22 files changed

+194
-96
lines changed

CHANGELOG.md

+11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
# Change History
22

3+
## March 18 2025: v8.2.0
4+
5+
- **Fixes**
6+
- [CLIENT-3379] Setting `TotalTimeout` to 0 causes Scan / Query to fail (timeout).
7+
- [CLIENT-3376] `BatchRead` on Non-Existing namespace with transaction policy does not throw expected error.
8+
- [CLIENT-3353] Insufficient Error Messages from MRT commit failure.
9+
- [CLIENT-3374] Fix panic when trying to commit a MRT with no read and write transaction.
10+
11+
- **Improvements**
12+
- [CLIENT-3361] Update documentation for `RecordsPerSecond` scan/query policy.
13+
314
## February 26 2025: v8.1.0
415

516
- **New Features**

admin_command.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ func (acmd *AdminCommand) executeCommand(conn *Connection, policy *AdminPolicy)
396396
timeout = policy.Timeout
397397
}
398398

399-
if err := conn.SetTimeout(time.Now().Add(timeout), timeout); err != nil {
399+
if err := conn.SetTimeout(timeout, timeout); err != nil {
400400
return err
401401
}
402402

@@ -426,7 +426,7 @@ func (acmd *AdminCommand) readUsers(conn *Connection, policy *AdminPolicy) ([]*U
426426
timeout = policy.Timeout
427427
}
428428

429-
if err := conn.SetTimeout(time.Now().Add(timeout), timeout); err != nil {
429+
if err := conn.SetTimeout(timeout, timeout); err != nil {
430430
return nil, err
431431
}
432432

@@ -579,7 +579,7 @@ func (acmd *AdminCommand) readRoles(conn *Connection, policy *AdminPolicy) ([]*R
579579
timeout = policy.Timeout
580580
}
581581

582-
if err := conn.SetTimeout(time.Now().Add(timeout), timeout); err != nil {
582+
if err := conn.SetTimeout(timeout, timeout); err != nil {
583583
return nil, err
584584
}
585585

aerospike_suite_test.go

+1-2
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ var (
5959
tlsConfig *tls.Config
6060
clientPolicy *as.ClientPolicy
6161
client *as.Client
62+
dbHosts []*as.Host
6263
)
6364

6465
func initTestVars() {
@@ -94,8 +95,6 @@ func initTestVars() {
9495
clientPolicy.TlsConfig = tlsConfig
9596
clientPolicy.UseServicesAlternate = *UseServicesAlternate
9697

97-
var dbHosts []*as.Host
98-
9998
if len(strings.TrimSpace(*hosts)) > 0 {
10099
dbHosts, err = as.NewHosts(strings.Split(*hosts, ",")...)
101100
if err != nil {

batch_command.go

+2-6
Original file line numberDiff line numberDiff line change
@@ -14,17 +14,13 @@
1414

1515
package aerospike
1616

17-
import (
18-
"time"
19-
)
20-
2117
type batcher interface {
2218
command
2319

2420
cloneBatchCommand(batch *batchNode) batcher
2521
filteredOut() int
2622

27-
retryBatch(ifc batcher, cluster *Cluster, deadline time.Time, iteration int) (bool, Error)
23+
retryBatch(ifc batcher, cluster *Cluster, iteration int) (bool, Error)
2824
generateBatchNodes(*Cluster) ([]*batchNode, Error)
2925
setSequence(int, int)
3026

@@ -75,7 +71,7 @@ func (cmd *batchCommand) prepareRetry(ifc command, isTimeout bool) bool {
7571
return false
7672
}
7773

78-
func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, deadline time.Time, iteration int) (bool, Error) {
74+
func (cmd *batchCommand) retryBatch(ifc batcher, cluster *Cluster, iteration int) (bool, Error) {
7975
// Retry requires keys for this node to be split among other nodes.
8076
// This is both recursive and exponential.
8177
batchNodes, err := ifc.generateBatchNodes(cluster)

batch_test.go

+15
Original file line numberDiff line numberDiff line change
@@ -427,6 +427,21 @@ var _ = gg.Describe("Aerospike", func() {
427427
// gm.Expect(err.Matches(types.BATCH_MAX_REQUESTS_EXCEEDED)).To(gm.BeTrue())
428428
})
429429

430+
gg.It("XXXShould return the error for invalid namespace", func() {
431+
var brs []as.BatchRecordIfc
432+
433+
for i := 0; i < 1; i++ {
434+
key, _ := as.NewKey("non_exist", "non_exist", i)
435+
brr := as.NewBatchReadOps(nil, key, []*as.Operation{as.GetBinOp("i_bin")}...)
436+
brs = append(brs, brr)
437+
}
438+
439+
bp := as.NewBatchPolicy()
440+
err := client.BatchOperate(bp, brs)
441+
gm.Expect(err).To(gm.HaveOccurred())
442+
gm.Expect(err.Matches(types.INVALID_NAMESPACE)).To(gm.BeTrue())
443+
})
444+
430445
gg.It("Overall command error should be reflected in API call error and not BatchRecord error", func() {
431446
var batchRecords []as.BatchRecordIfc
432447
key, _ := as.NewKey(*namespace, set, 0)

client.go

+4
Original file line numberDiff line numberDiff line change
@@ -858,6 +858,10 @@ func (clnt *Client) BatchOperate(policy *BatchPolicy, records []BatchRecordIfc)
858858
return err
859859
}
860860

861+
if len(batchNodes) == 0 {
862+
return newError(types.INVALID_NAMESPACE)
863+
}
864+
861865
cmd := newBatchCommandOperate(clnt, nil, policy, records)
862866
_, err = clnt.batchExecute(policy, batchNodes, &cmd)
863867
return err

command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -3647,7 +3647,7 @@ func (cmd *baseCommand) executeAt(ifc command, policy *BasePolicy, deadline time
36473647
if !ifc.prepareRetry(ifc, isClientTimeout || (err != nil && err.Matches(types.SERVER_NOT_AVAILABLE))) {
36483648
if bc, ok := ifc.(batcher); ok {
36493649
// Batch may be retried in separate commands.
3650-
alreadyRetried, err := bc.retryBatch(bc, cmd.node.cluster, deadline, cmd.commandSentCounter)
3650+
alreadyRetried, err := bc.retryBatch(bc, cmd.node.cluster, cmd.commandSentCounter)
36513651
if alreadyRetried {
36523652
// Batch was retried in separate subcommands. Complete this command.
36533653
applyTransactionMetrics(cmd.node, ifc.commandType(), transStart)

connection.go

+29-16
Original file line numberDiff line numberDiff line change
@@ -63,8 +63,9 @@ type Connection struct {
6363
node *Node
6464

6565
// timeouts
66-
socketTimeout time.Duration
67-
deadline time.Time
66+
socketTimeout time.Duration
67+
deadline time.Time
68+
socketDeadline time.Time // this is not strictly required, but is used in testing
6869

6970
// duration after which connection is considered idle
7071
idleTimeout time.Duration
@@ -155,7 +156,7 @@ func newConnection(address string, timeout time.Duration) (*Connection, Error) {
155156
newConn.limitReader = &io.LimitedReader{R: conn, N: 0}
156157

157158
// set timeout at the last possible moment
158-
if err := newConn.SetTimeout(time.Now().Add(timeout), timeout); err != nil {
159+
if err := newConn.SetTimeout(timeout, timeout); err != nil {
159160
newConn.Close()
160161
return nil, err
161162
}
@@ -295,33 +296,33 @@ func (ctn *Connection) IsConnected() bool {
295296
// the function will return a TIMEOUT error.
296297
func (ctn *Connection) updateDeadline() Error {
297298
now := time.Now()
298-
var socketDeadline time.Time
299+
ctn.socketDeadline = now.Add(_DEFAULT_TIMEOUT)
299300
if ctn.deadline.IsZero() {
300301
if ctn.socketTimeout > 0 {
301-
socketDeadline = now.Add(ctn.socketTimeout)
302+
ctn.socketDeadline = now.Add(ctn.socketTimeout)
302303
}
303304
} else {
304-
if now.After(ctn.deadline) {
305+
if !ctn.deadline.IsZero() && now.After(ctn.deadline) {
305306
return newError(types.TIMEOUT)
306307
}
307-
if ctn.socketTimeout == 0 {
308-
socketDeadline = ctn.deadline
308+
if ctn.socketTimeout <= 0 {
309+
ctn.socketDeadline = ctn.deadline
309310
} else {
310311
tDeadline := now.Add(ctn.socketTimeout)
311312
if tDeadline.After(ctn.deadline) {
312-
socketDeadline = ctn.deadline
313+
ctn.socketDeadline = ctn.deadline
313314
} else {
314-
socketDeadline = tDeadline
315+
ctn.socketDeadline = tDeadline
315316
}
316317
}
317318

318319
// floor to a millisecond to avoid too short timeouts
319-
if socketDeadline.Sub(now) < time.Millisecond {
320-
socketDeadline = now.Add(time.Millisecond)
320+
if ctn.socketDeadline.Sub(now) < time.Millisecond {
321+
ctn.socketDeadline = now.Add(time.Millisecond)
321322
}
322323
}
323324

324-
if err := ctn.conn.SetDeadline(socketDeadline); err != nil {
325+
if err := ctn.conn.SetDeadline(ctn.socketDeadline); err != nil {
325326
if ctn.node != nil {
326327
ctn.node.stats.ConnectionsFailed.IncrementAndGet()
327328
}
@@ -332,10 +333,22 @@ func (ctn *Connection) updateDeadline() Error {
332333
}
333334

334335
// SetTimeout sets connection timeout for both read and write operations.
335-
func (ctn *Connection) SetTimeout(deadline time.Time, socketTimeout time.Duration) Error {
336-
ctn.deadline = deadline
337-
ctn.socketTimeout = socketTimeout
336+
func (ctn *Connection) SetTimeout(totalTimeout, socketTimeout time.Duration) Error {
337+
now := time.Now()
338+
ctn.socketTimeout = _DEFAULT_TIMEOUT
339+
ctn.deadline = time.Time{}
340+
341+
if socketTimeout > 0 {
342+
ctn.socketTimeout = socketTimeout
343+
}
338344

345+
// keep the deadline.IsZero() == true if totalTimeout is not set
346+
if totalTimeout > 0 {
347+
ctn.deadline = now.Add(totalTimeout)
348+
if socketTimeout <= 0 {
349+
ctn.socketTimeout = totalTimeout
350+
}
351+
}
339352
return nil
340353
}
341354

connection_heap.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -165,9 +165,7 @@ func (h *singleConnectionHeap) RefreshIdleTail(tendInterval time.Duration) bool
165165

166166
// refresh in a goroutine asynchronously
167167
go func() {
168-
timeout := time.Second
169-
deadline := time.Now().Add(timeout)
170-
conn.SetTimeout(deadline, timeout)
168+
conn.SetTimeout(time.Second, time.Second)
171169
conn.refresh()
172170
if _, err := conn.RequestInfo("build"); err == nil {
173171
// return to the pool

connection_test.go

+69
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2014-2022 Aerospike, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package aerospike_test
16+
17+
import (
18+
"fmt"
19+
"time"
20+
21+
as "github.com/aerospike/aerospike-client-go/v8"
22+
23+
gg "github.com/onsi/ginkgo/v2"
24+
gm "github.com/onsi/gomega"
25+
)
26+
27+
// ALL tests are isolated by SetName and Key, which are 50 random characters
28+
var _ = gg.Describe("Connection Test", func() {
29+
var conn *as.Connection
30+
31+
type testExpectations struct {
32+
totalTimeout, socketTimeout time.Duration
33+
expTotalDeadline time.Time
34+
expSocketDeadline time.Time
35+
expSocketTimeout time.Duration
36+
}
37+
38+
gg.BeforeEach(func() {
39+
var err as.Error
40+
conn, err = as.NewConnection(clientPolicy, dbHosts[0])
41+
gm.Expect(err).ToNot(gm.HaveOccurred())
42+
gm.Expect(conn).ToNot(gm.BeNil())
43+
})
44+
45+
gg.It("Dealines should be calculated correctly", func() {
46+
testMatrix := []testExpectations{
47+
{0, 0, time.Time{}, time.Now().Add(as.DefaultTimeout()), as.DefaultTimeout()},
48+
{0, time.Second, time.Time{}, time.Now().Add(time.Second), time.Second},
49+
{time.Second, 0, time.Now().Add(time.Second), time.Now().Add(time.Second), time.Second},
50+
{5 * time.Second, time.Second, time.Now().Add(5 * time.Second), time.Now().Add(time.Second), time.Second},
51+
}
52+
53+
for _, matrix := range testMatrix {
54+
gg.By(fmt.Sprintf("TotalTimeout: %v, SocketTimeout: %v", matrix.totalTimeout, matrix.socketTimeout))
55+
err := conn.SetTimeout(matrix.totalTimeout, matrix.socketTimeout)
56+
gm.Expect(err).ToNot(gm.HaveOccurred())
57+
58+
expTotalDeadline, expSocketDeadline, expSocketTimeout, err := conn.UpdateDeadline()
59+
gm.Expect(err).ToNot(gm.HaveOccurred())
60+
61+
gg.By(fmt.Sprintf("expTotalDeadline: %v, expSocketDeadline: %v, expSocketTimeout: %v", matrix.expTotalDeadline, matrix.expSocketDeadline, matrix.expSocketTimeout))
62+
63+
gm.Expect(expTotalDeadline).To(gm.BeTemporally("~", matrix.expTotalDeadline, time.Millisecond))
64+
gm.Expect(expSocketDeadline).To(gm.BeTemporally("~", matrix.expSocketDeadline, time.Millisecond))
65+
gm.Expect(expSocketTimeout).To(gm.Equal(matrix.expSocketTimeout))
66+
}
67+
})
68+
69+
})

helper_test.go

+11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,12 @@
1414

1515
package aerospike
1616

17+
import "time"
18+
19+
func DefaultTimeout() time.Duration {
20+
return _DEFAULT_TIMEOUT
21+
}
22+
1723
func ParseInfoErrorCode(response string) Error {
1824
return parseInfoErrorCode(response)
1925
}
@@ -97,3 +103,8 @@ func (cmd *deleteCommand) WriteBuffer(ifc command) Error {
97103
func (cmd *deleteCommand) Buffer() []byte {
98104
return cmd.dataBuffer[:cmd.dataOffset]
99105
}
106+
107+
func (ctn *Connection) UpdateDeadline() (time.Time, time.Time, time.Duration, Error) {
108+
err := ctn.updateDeadline()
109+
return ctn.deadline, ctn.socketDeadline, ctn.socketTimeout, err
110+
}

login_command.go

+1-5
Original file line numberDiff line numberDiff line change
@@ -93,11 +93,7 @@ func (lcmd *loginCommand) login(policy *ClientPolicy, conn *Connection, hashedPa
9393

9494
lcmd.writeSize()
9595

96-
var deadline time.Time
97-
if policy.LoginTimeout > 0 {
98-
deadline = time.Now().Add(policy.Timeout)
99-
}
100-
conn.SetTimeout(deadline, policy.LoginTimeout)
96+
conn.SetTimeout(policy.LoginTimeout, policy.LoginTimeout)
10197

10298
if _, err := conn.Write(lcmd.dataBuffer[:lcmd.dataOffset]); err != nil {
10399
return err

multi_command.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ func (cmd *baseMultiCommand) prepareRetry(ifc command, isTimeout bool) bool {
113113
}
114114

115115
func (cmd *baseMultiCommand) getConnection(policy Policy) (*Connection, Error) {
116-
return cmd.node.getConnectionWithHint(policy.GetBasePolicy().deadline(), policy.GetBasePolicy().socketTimeout(), byte(rand.Int63()&0xff))
116+
return cmd.node.getConnectionWithHint(policy.GetBasePolicy().TotalTimeout, policy.GetBasePolicy().SocketTimeout, byte(rand.Int63()&0xff))
117117
}
118118

119119
func (cmd *baseMultiCommand) putConnection(conn *Connection) {

multi_policy.go

+5-2
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,11 @@ type MultiPolicy struct {
4040
MaxRecords int64
4141

4242
// RecordsPerSecond limits returned records per second (rps) rate for each server.
43-
// Will not apply rps limit if recordsPerSecond is zero (default).
44-
// Currently only applicable to a query without a defined filter.
43+
// It does not apply rps limit if RecordsPerSecond is zero (default).
44+
//
45+
// RecordsPerSecond is supported in all primary and secondary index
46+
// queries in server versions 6.0+. For background queries, RecordsPerSecond
47+
// is bounded by the server config background-query-max-rps.
4548
RecordsPerSecond int
4649

4750
// Number of records to place in queue before blocking.

0 commit comments

Comments
 (0)