Skip to content

Go: Batch #3938

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
May 27, 2025
Merged
431 changes: 412 additions & 19 deletions ffi/src/lib.rs

Large diffs are not rendered by default.

169 changes: 169 additions & 0 deletions go/base_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import (
"github.com/valkey-io/valkey-glide/go/v2/internal/utils"
"github.com/valkey-io/valkey-glide/go/v2/models"
"github.com/valkey-io/valkey-glide/go/v2/options"
"github.com/valkey-io/valkey-glide/go/v2/pipeline"
"google.golang.org/protobuf/proto"
)

Expand Down Expand Up @@ -387,6 +388,174 @@ func toCStrings(args []string) ([]C.uintptr_t, []C.ulong) {
return cStrings, stringLengths
}

func (client *baseClient) executeBatch(
ctx context.Context,
batch pipeline.Batch,
raiseOnError bool,
options *pipeline.BatchOptions,
) ([]any, error) {
// Check if context is already done
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
// Continue with execution
}
// make the channel buffered, so that we don't need to acquire the client.mu in the successCallback and failureCallback.
resultChannel := make(chan payload, 1)
resultChannelPtr := unsafe.Pointer(&resultChannel)

pinner := pinner{}
pinnedChannelPtr := uintptr(pinner.Pin(resultChannelPtr))
defer pinner.Unpin()

client.mu.Lock()
if client.coreClient == nil {
client.mu.Unlock()
return nil, &errors.ClosingError{Msg: "ExecuteBatch failed. The client is closed."}
}
client.pending[resultChannelPtr] = struct{}{}

batchInfo := createBatchInfo(pinner, batch)
var optionsPtr *C.BatchOptionsInfo
if options != nil {
batchOptionsInfo := createBatchOptionsInfo(pinner, *options)
optionsPtr = &batchOptionsInfo
}

C.batch(
client.coreClient,
C.uintptr_t(pinnedChannelPtr),
&batchInfo,
C._Bool(raiseOnError),
optionsPtr,
)
client.mu.Unlock()

// Wait for result or context cancellation
var payload payload
select {
case <-ctx.Done():
client.mu.Lock()
if client.pending != nil {
delete(client.pending, resultChannelPtr)
}
client.mu.Unlock()
// TODO: Need to deal with cleaning up any allocated memory
return nil, ctx.Err()
case payload = <-resultChannel:
// Continue with normal processing
}

client.mu.Lock()
if client.pending != nil {
delete(client.pending, resultChannelPtr)
}
client.mu.Unlock()

if payload.error != nil {
return nil, payload.error
}
response, err := handleAnyArrayResponse(payload.value)
if err != nil {
return nil, err
}
return batch.Convert(response)
}

func createBatchOptionsInfo(pinner pinner, options pipeline.BatchOptions) C.BatchOptionsInfo {
info := C.BatchOptionsInfo{}
if options.RetryStrategy != nil {
info.retry_server_error = C._Bool(options.RetryStrategy.RetryServerError)
info.retry_connection_error = C._Bool(options.RetryStrategy.RetryConnectionError)
} else {
info.retry_server_error = C._Bool(false)
info.retry_connection_error = C._Bool(false)
}
if options.Timeout != nil {
info.has_timeout = C._Bool(true)
info.timeout = C.uint(*options.Timeout)
} else {
info.has_timeout = C._Bool(false)
}
if options.Route != nil {
info.route_info = (*C.RouteInfo)(pinner.Pin(unsafe.Pointer(createRouteInfo(pinner, *options.Route))))
} else {
info.route_info = nil
}
return info
}

// TODO align with others to return struct, not a pointer
func createRouteInfo(pinner pinner, route config.Route) *C.RouteInfo {
if route != nil {
routeInfo := C.RouteInfo{}
switch r := route.(type) {
case config.SimpleNodeRoute:
// enum variants have the same ordinals
routeInfo.route_type = (uint32)(r)
case *config.SlotIdRoute:
routeInfo.route_type = C.SlotId
routeInfo.slot_id = C.int(r.SlotID)
// enum variants have the same ordinals
routeInfo.slot_type = uint32(r.SlotType)
case *config.SlotKeyRoute:
routeInfo.route_type = C.SlotKey
// when converting string to []byte, it is converted to an UTF8 string (not a binary string)
routeInfo.hostname = (*C.char)(pinner.Pin(unsafe.Pointer(&[]byte(r.SlotKey)[0])))
// enum variants have the same ordinals
routeInfo.slot_type = uint32(r.SlotType)
case *config.ByAddressRoute:
routeInfo.route_type = C.ByAddress
// when converting string to []byte, it is converted to an UTF8 string (not a binary string)
routeInfo.hostname = (*C.char)(pinner.Pin(unsafe.Pointer(&[]byte(r.Host)[0])))
routeInfo.port = C.int(r.Port)
}
return &routeInfo
}
return nil
}

func createBatchInfo(pinner pinner, batch pipeline.Batch) C.BatchInfo {
numCommands := len(batch.Commands)
info := C.BatchInfo{}
info.is_atomic = C._Bool(batch.IsAtomic)
info.cmd_count = C.ulong(numCommands)

cmdPtrs := make([]*C.CmdInfo, numCommands)

for i, cmd := range batch.Commands {
cmdInfo := createCmdInfo(pinner, cmd)
cmdPtrs[i] = (*C.CmdInfo)(pinner.Pin(unsafe.Pointer(&cmdInfo)))
}

if numCommands > 0 {
info.cmds = (**C.CmdInfo)(pinner.Pin(unsafe.Pointer(&cmdPtrs[0])))
}

return info
}

func createCmdInfo(pinner pinner, cmd pipeline.Cmd) C.CmdInfo {
numArgs := len(cmd.Args)
info := C.CmdInfo{}
info.request_type = uint32(cmd.RequestType)
cArgsPtr := make([]*C.uchar, numArgs)
argLengthsPtr := make([]C.ulong, numArgs)
for i, str := range cmd.Args {
// TODO do we need to pin there too?
// cArgsPtr[i] = (*C.uchar)(pinner.Pin(unsafe.Pointer(unsafe.StringData((str)))))
cArgsPtr[i] = (*C.uchar)(unsafe.Pointer(unsafe.StringData((str))))
argLengthsPtr[i] = C.size_t(len(str))
}
info.arg_count = C.ulong(numArgs)
if numArgs > 0 {
info.args = (**C.uchar)(pinner.Pin(unsafe.Pointer(&cArgsPtr[0])))
info.args_len = (*C.ulong)(pinner.Pin(unsafe.Pointer(&argLengthsPtr[0])))
}
return info
}

func (client *baseClient) submitConnectionPasswordUpdate(
ctx context.Context,
password string,
Expand Down
158 changes: 158 additions & 0 deletions go/batch_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright Valkey GLIDE Project Contributors - SPDX Identifier: Apache-2.0

package glide

import (
"context"
"fmt"

"github.com/valkey-io/valkey-glide/go/v2/config"
"github.com/valkey-io/valkey-glide/go/v2/pipeline"
)

// TODO replace CustomCommand

func ExampleClient_Exec_transaction() {
var client *Client = getExampleClient() // example helper function
// Example 1: Atomic Batch (Transaction)
batch := pipeline.NewStandaloneBatch(true)
batch.Set("key", "1").CustomCommand([]string{"incr", "key"}).Get("key")

result, err := client.Exec(context.Background(), *batch, true)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK 2 2]
}

func ExampleClient_Exec_pipeline() {
var client *Client = getExampleClient() // example helper function
// Example 2: Non-Atomic Batch (Pipeline)
batch := pipeline.NewStandaloneBatch(false)
batch.Set("key1", "value1").Set("key2", "value2").Get("key1").Get("key2")

result, err := client.Exec(context.Background(), *batch, true)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK OK value1 value2]
}

func ExampleClient_ExecWithOptions_transaction() {
var client *Client = getExampleClient() // example helper function
// Example 1: Atomic Batch (Transaction)
batch := pipeline.NewStandaloneBatch(true)
batch.Set("key", "1").CustomCommand([]string{"incr", "key"}).Get("key")
// Set a timeout of 1000 milliseconds
options := pipeline.NewStandaloneBatchOptions().WithTimeout(1000)

result, err := client.ExecWithOptions(context.Background(), *batch, false, *options)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK 2 2]
}

func ExampleClient_ExecWithOptions_pipeline() {
var client *Client = getExampleClient() // example helper function
// Example 2: Non-Atomic Batch (Pipeline)
batch := pipeline.NewStandaloneBatch(false)
batch.Set("key1", "value1").Set("key2", "value2").Get("key1").Get("key2")
// Set a timeout of 1000 milliseconds
options := pipeline.NewStandaloneBatchOptions().WithTimeout(1000)

result, err := client.ExecWithOptions(context.Background(), *batch, false, *options)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK OK value1 value2]
}

func ExampleClusterClient_Exec_transaction() {
var client *ClusterClient = getExampleClusterClient() // example helper function
// Example 1: Atomic Batch (Transaction)
batch := pipeline.NewClusterBatch(true)
batch.Set("key", "1").CustomCommand([]string{"incr", "key"}).Get("key")

result, err := client.Exec(context.Background(), *batch, false)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK 2 2]
}

func ExampleClusterClient_Exec_pipeline() {
var client *ClusterClient = getExampleClusterClient() // example helper function
// Example 2: Non-Atomic Batch (Pipeline)
batch := pipeline.NewClusterBatch(false)
batch.Set("key1", "value1").Set("key2", "value2").Get("key1").Get("key2")

result, err := client.Exec(context.Background(), *batch, false)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK OK value1 value2]
}

func ExampleClusterClient_ExecWithOptions_transaction() {
var client *ClusterClient = getExampleClusterClient() // example helper function
// Example 1: Atomic Batch (Transaction)
batch := pipeline.NewClusterBatch(true)
batch.Set("key", "1").CustomCommand([]string{"incr", "key"}).Get("key")
// Set a timeout of 1000 milliseconds
options := pipeline.NewClusterBatchOptions().WithTimeout(1000)

result, err := client.ExecWithOptions(context.Background(), *batch, false, *options)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK 2 2]
}

func ExampleClusterClient_ExecWithOptions_pipeline() {
var client *ClusterClient = getExampleClusterClient() // example helper function
// Example 2: Non-Atomic Batch (Pipeline)
batch := pipeline.NewClusterBatch(false)
batch.Set("key1", "value1").Set("key2", "value2").Get("key1").Get("key2")
// Set command retry parameters
retryStrategy := pipeline.NewClusterBatchRetryStrategy().WithRetryServerError(true).WithRetryConnectionError(true)
options := pipeline.NewClusterBatchOptions().WithRetryStrategy(*retryStrategy)

result, err := client.ExecWithOptions(context.Background(), *batch, false, *options)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
fmt.Println(result)

// Output: [OK OK value1 value2]
}

func ExampleClusterClient_ExecWithOptions_route() {
var client *ClusterClient = getExampleClusterClient() // example helper function
// Example 2: Non-Atomic Batch (Pipeline)
batch := pipeline.NewClusterBatch(true)
batch.CustomCommand([]string{"info", "server"})
// Set batch route
route := config.NewSlotKeyRoute(config.SlotTypePrimary, "abc")
options := pipeline.NewClusterBatchOptions().WithRoute(route)

_, err := client.ExecWithOptions(context.Background(), *batch, false, *options)
if err != nil {
fmt.Println("Glide example failed with an error: ", err)
}
// Output:
}
Loading
Loading