Skip to content

Commit 8580877

Browse files
authored
Merge pull request #460 from aerospike/stage
Go Client v8.0.0-beta.2
2 parents f481016 + 8f77af4 commit 8580877

File tree

6 files changed

+424
-28
lines changed

6 files changed

+424
-28
lines changed

CHANGELOG.md

+8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,13 @@
11
# Change History
22

3+
## December 20 2024: v8.0.0-beta.2
4+
5+
- **New Features**
6+
- [CLIENT-2820] Actively refresh pool connections that will be idle before the next tend.
7+
8+
- **Fixes**
9+
- [CLIENT-3218] Fix FilterExpression encoding in Batch commands.
10+
311
## December 13 2024: v8.0.0-beta.1
412

513
Major breaking release. This release supports Multi-Record Transactions. Please note that this is a beta release and will be subject to breaking changes.

batch_index_command_get.go

+161-9
Original file line numberDiff line numberDiff line change
@@ -16,12 +16,13 @@ package aerospike
1616

1717
import (
1818
"github.com/aerospike/aerospike-client-go/v8/types"
19+
Buffer "github.com/aerospike/aerospike-client-go/v8/utils/buffer"
1920
)
2021

2122
type batchIndexCommandGet struct {
2223
batchCommandOperate
2324

24-
indexRecords []*BatchRead
25+
records []*BatchRead
2526
}
2627

2728
func newBatchIndexCommandGet(
@@ -31,19 +32,24 @@ func newBatchIndexCommandGet(
3132
records []*BatchRead,
3233
isOperation bool,
3334
) batchIndexCommandGet {
34-
recIfcs := make([]BatchRecordIfc, len(records))
35-
for i := range records {
36-
recIfcs[i] = records[i]
37-
}
38-
3935
res := batchIndexCommandGet{
40-
batchCommandOperate: newBatchCommandOperate(client, batch, policy, recIfcs),
41-
indexRecords: records,
36+
batchCommandOperate: newBatchCommandOperate(client, batch, policy, nil),
37+
records: records,
4238
}
4339
res.txn = policy.Txn
4440
return res
4541
}
4642

43+
func (cmd *batchIndexCommandGet) writeBuffer(ifc command) Error {
44+
attr, err := cmd.setBatchOperateRead(cmd.client, cmd.policy, cmd.records, cmd.batch)
45+
cmd.attr = attr
46+
return err
47+
}
48+
49+
func (cmd *batchIndexCommandGet) isRead() bool {
50+
return true
51+
}
52+
4753
func (cmd *batchIndexCommandGet) cloneBatchCommand(batch *batchNode) batcher {
4854
res := *cmd
4955
res.batch = batch
@@ -60,7 +66,7 @@ func (cmd *batchIndexCommandGet) Execute() Error {
6066
}
6167

6268
func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
63-
for _, br := range cmd.indexRecords {
69+
for _, br := range cmd.records {
6470
var ops []*Operation
6571
if br.headerOnly() {
6672
ops = []*Operation{GetHeaderOp()}
@@ -96,3 +102,149 @@ func (cmd *batchIndexCommandGet) executeSingle(client *Client) Error {
96102
}
97103
return nil
98104
}
105+
106+
// Parse all results in the batch. Add records to shared list.
107+
// If the record was not found, the bins will be nil.
108+
func (cmd *batchIndexCommandGet) parseRecordResults(ifc command, receiveSize int) (bool, Error) {
109+
//Parse each message response and add it to the result array
110+
cmd.dataOffset = 0
111+
for cmd.dataOffset < receiveSize {
112+
if err := cmd.readBytes(int(_MSG_REMAINING_HEADER_SIZE)); err != nil {
113+
return false, err
114+
}
115+
resultCode := types.ResultCode(cmd.dataBuffer[5] & 0xFF)
116+
117+
info3 := int(cmd.dataBuffer[3])
118+
119+
// If cmd is the end marker of the response, do not proceed further
120+
if resultCode == 0 && (info3&_INFO3_LAST) == _INFO3_LAST {
121+
return false, nil
122+
}
123+
124+
generation := Buffer.BytesToUint32(cmd.dataBuffer, 6)
125+
expiration := types.TTL(Buffer.BytesToUint32(cmd.dataBuffer, 10))
126+
batchIndex := int(Buffer.BytesToUint32(cmd.dataBuffer, 14))
127+
fieldCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 18))
128+
opCount := int(Buffer.BytesToUint16(cmd.dataBuffer, 20))
129+
130+
err := cmd.parseFieldsBatch(resultCode, fieldCount, cmd.records[batchIndex])
131+
if err != nil {
132+
return false, err
133+
}
134+
135+
if resultCode != 0 {
136+
if resultCode == types.FILTERED_OUT {
137+
cmd.filteredOutCnt++
138+
}
139+
140+
// If it looks like the error is on the first record and the message is marked as last part,
141+
// the error is for the whole command and not just for the first batchIndex
142+
lastMessage := (info3 & _INFO3_LAST) == _INFO3_LAST
143+
if resultCode != 0 && lastMessage && receiveSize == int(_MSG_REMAINING_HEADER_SIZE) {
144+
return false, newError(resultCode).setNode(cmd.node)
145+
}
146+
147+
if resultCode == types.UDF_BAD_RESPONSE {
148+
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
149+
if err != nil {
150+
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
151+
return false, err
152+
}
153+
154+
// for UDF failures
155+
var msg any
156+
if rec != nil {
157+
msg = rec.Bins["FAILURE"]
158+
}
159+
160+
// Need to store record because failure bin contains an error message.
161+
cmd.records[batchIndex].setRecord(rec)
162+
if msg, ok := msg.(string); ok && len(msg) > 0 {
163+
cmd.records[batchIndex].setErrorWithMsg(cmd.node, resultCode, msg, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
164+
} else {
165+
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
166+
}
167+
168+
// If cmd is the end marker of the response, do not proceed further
169+
// if (info3 & _INFO3_LAST) == _INFO3_LAST {
170+
if lastMessage {
171+
return false, nil
172+
}
173+
continue
174+
}
175+
176+
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
177+
178+
// If cmd is the end marker of the response, do not proceed further
179+
if (info3 & _INFO3_LAST) == _INFO3_LAST {
180+
return false, nil
181+
}
182+
continue
183+
}
184+
185+
if resultCode == 0 {
186+
if cmd.objects == nil {
187+
rec, err := cmd.parseRecord(cmd.records[batchIndex].key(), opCount, generation, expiration)
188+
if err != nil {
189+
cmd.records[batchIndex].setError(cmd.node, resultCode, cmd.batchInDoubt(cmd.attr.hasWrite, cmd.commandSentCounter))
190+
return false, err
191+
}
192+
cmd.records[batchIndex].setRecord(rec)
193+
} else if batchObjectParser != nil {
194+
// mark it as found
195+
cmd.objectsFound[batchIndex] = true
196+
if err := batchObjectParser(cmd, batchIndex, opCount, fieldCount, generation, expiration); err != nil {
197+
return false, err
198+
199+
}
200+
}
201+
}
202+
}
203+
204+
return true, nil
205+
}
206+
207+
// Parses the given byte buffer and populate the result object.
208+
// Returns the number of bytes that were parsed from the given buffer.
209+
func (cmd *batchIndexCommandGet) parseRecord(key *Key, opCount int, generation, expiration uint32) (*Record, Error) {
210+
bins := make(BinMap, opCount)
211+
212+
for i := 0; i < opCount; i++ {
213+
if err := cmd.readBytes(8); err != nil {
214+
return nil, err
215+
}
216+
opSize := int(Buffer.BytesToUint32(cmd.dataBuffer, 0))
217+
particleType := int(cmd.dataBuffer[5])
218+
nameSize := int(cmd.dataBuffer[7])
219+
220+
if err := cmd.readBytes(nameSize); err != nil {
221+
return nil, err
222+
}
223+
name := string(cmd.dataBuffer[:nameSize])
224+
225+
particleBytesSize := opSize - (4 + nameSize)
226+
if err := cmd.readBytes(particleBytesSize); err != nil {
227+
return nil, err
228+
}
229+
value, err := bytesToParticle(particleType, cmd.dataBuffer, 0, particleBytesSize)
230+
if err != nil {
231+
return nil, err
232+
}
233+
234+
if cmd.isOperation {
235+
if prev, ok := bins[name]; ok {
236+
if prev2, ok := prev.(OpResults); ok {
237+
bins[name] = append(prev2, value)
238+
} else {
239+
bins[name] = OpResults{prev, value}
240+
}
241+
} else {
242+
bins[name] = value
243+
}
244+
} else {
245+
bins[name] = value
246+
}
247+
}
248+
249+
return newRecord(cmd.node, key, bins, generation, expiration), nil
250+
}

0 commit comments

Comments
 (0)