Skip to content

Commit d7e9eab

Browse files
committed
api: add support of a batch insert request
Add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW. Closes #399
1 parent 59aa1a3 commit d7e9eab

11 files changed

+534
-20
lines changed

CHANGELOG.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
99
## [Unreleased]
1010

1111
### Added
12-
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13-
connection and ctx is not canceled;
14-
also added logs for error case of `ConnectionPool.tryConnect()` calls in
12+
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13+
connection and ctx is not canceled;
14+
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
1717
- Implemented stringer methods for pool.Role (#405).
18+
- Support the IPROTO_INSERT_ARROW request (#399).
1819

1920
### Changed
2021

arrow/arrow.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package arrow
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Arrow MessagePack extension type.
11+
const arrowExtId = 8
12+
13+
// Arrow struct wraps a raw arrow data buffer.
14+
type Arrow struct {
15+
data []byte
16+
}
17+
18+
// MakeArrow returns a new arrow.Arrow object that contains
19+
// wrapped a raw arrow data buffer.
20+
func MakeArrow(arrow []byte) (Arrow, error) {
21+
if len(arrow) == 0 {
22+
return Arrow{}, fmt.Errorf("no Arrow data")
23+
}
24+
return Arrow{arrow}, nil
25+
}
26+
27+
// Raw returns a []byte that contains Arrow raw data.
28+
func (a Arrow) Raw() []byte {
29+
return a.data
30+
}
31+
32+
func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
33+
arrow := Arrow{
34+
data: make([]byte, 0, extLen),
35+
}
36+
n, err := d.Buffered().Read(arrow.data)
37+
if err != nil {
38+
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
39+
}
40+
if n < extLen || n != len(arrow.data) {
41+
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
42+
}
43+
44+
v.Set(reflect.ValueOf(arrow))
45+
return nil
46+
}
47+
48+
func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
49+
if v.IsValid() {
50+
return v.Interface().(Arrow).data, nil
51+
}
52+
53+
return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
54+
}
55+
56+
func init() {
57+
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
58+
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
59+
}

arrow/config.lua

+35
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
-- Do not set listen for now so connector won't be
2+
-- able to send requests until everything is configured.
3+
box.cfg {
4+
work_dir = os.getenv("TEST_TNT_WORK_DIR")
5+
}
6+
7+
box.schema.user.create('test', {
8+
password = 'test',
9+
if_not_exists = true
10+
})
11+
box.schema.user.grant('test', 'execute', 'universe', nil, {
12+
if_not_exists = true
13+
})
14+
15+
local s = box.schema.space.create('testArrow', {
16+
if_not_exists = true
17+
})
18+
s:create_index('primary', {
19+
type = 'tree',
20+
parts = {{
21+
field = 1,
22+
type = 'integer'
23+
}},
24+
if_not_exists = true
25+
})
26+
s:truncate()
27+
28+
box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
29+
if_not_exists = true
30+
})
31+
32+
-- Set listen only when every other thing is configured.
33+
box.cfg {
34+
listen = os.getenv("TEST_TNT_LISTEN")
35+
}

arrow/example_test.go

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
// Run Tarantool instance before example execution:
2+
//
3+
// Terminal 1:
4+
// $ cd arrow
5+
// $ TEST_TNT_LISTEN=3013 TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config.lua
6+
//
7+
// Terminal 2:
8+
// $ go test -v example_test.go
9+
package arrow_test
10+
11+
import (
12+
"context"
13+
"encoding/hex"
14+
"fmt"
15+
"log"
16+
"strings"
17+
"time"
18+
19+
"github.com/tarantool/go-tarantool/v2"
20+
"github.com/tarantool/go-tarantool/v2/arrow"
21+
)
22+
23+
var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
24+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
25+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
26+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
27+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
28+
"01000000000000003400000008000000000000000200000000000000000000000000" +
29+
"00000000000000000000000000000800000000000000000000000100000001000000" +
30+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
31+
"00000000000000000000")
32+
33+
func Example() {
34+
dialer := tarantool.NetDialer{
35+
Address: "127.0.0.1:3013",
36+
User: "test",
37+
Password: "test",
38+
}
39+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
40+
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
41+
cancel()
42+
if err != nil {
43+
log.Fatalf("Failed to connect: %s", err)
44+
}
45+
46+
arr, err := arrow.MakeArrow(arrowBinData)
47+
if err != nil {
48+
log.Fatalf("Failed prepare Arrow data: %s", err)
49+
}
50+
51+
req := arrow.NewInsertRequest("spaceMemcs", arr)
52+
53+
_, err = client.Do(req).Get()
54+
if err != nil {
55+
msg := strings.Split(err.Error(), "(")[0]
56+
fmt.Printf("Failed insert Arrow: %s\n", msg)
57+
}
58+
}

arrow/request.go

+93
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
package arrow
2+
3+
import (
4+
"context"
5+
"io"
6+
7+
"github.com/tarantool/go-iproto"
8+
"github.com/tarantool/go-tarantool/v2"
9+
"github.com/vmihailenco/msgpack/v5"
10+
)
11+
12+
// INSERT Arrow request.
13+
//
14+
// FIXME: replace with iproto.IPROTO_INSERT_ARROW when iproto will released.
15+
// https://github.com/tarantool/go-tarantool/issues/412
16+
const iprotoInsertArrowType = iproto.Type(17)
17+
18+
// The data in Arrow format.
19+
//
20+
// FIXME: replace with iproto.IPROTO_ARROW when iproto will released.
21+
// https://github.com/tarantool/go-tarantool/issues/412
22+
const iprotoArrowKey = iproto.Key(0x36)
23+
24+
// InsertRequest helps you to create an insert request object for execution
25+
// by a Connection.
26+
type InsertRequest struct {
27+
arrow Arrow
28+
space interface{}
29+
ctx context.Context
30+
}
31+
32+
// NewInsertRequest returns a new empty InsertRequest.
33+
func NewInsertRequest(space interface{}, arrow Arrow) *InsertRequest {
34+
return &InsertRequest{
35+
arrow: arrow,
36+
space: space,
37+
}
38+
}
39+
40+
// Type returns a IPROTO_INSERT_ARROW type for the request.
41+
func (r *InsertRequest) Type() iproto.Type {
42+
return iprotoInsertArrowType
43+
}
44+
45+
// Async returns false to the request return a response.
46+
func (r *InsertRequest) Async() bool {
47+
return false
48+
}
49+
50+
// Ctx returns a context of the request.
51+
func (r *InsertRequest) Ctx() context.Context {
52+
return r.ctx
53+
}
54+
55+
// Context sets a passed context to the request.
56+
//
57+
// Pay attention that when using context with request objects,
58+
// the timeout option for Connection does not affect the lifetime
59+
// of the request. For those purposes use context.WithTimeout() as
60+
// the root context.
61+
func (r *InsertRequest) Context(ctx context.Context) *InsertRequest {
62+
r.ctx = ctx
63+
return r
64+
}
65+
66+
// Arrow sets the arrow for insertion the insert arrow request.
67+
// Note: default value is nil.
68+
func (r *InsertRequest) Arrow(arrow Arrow) *InsertRequest {
69+
r.arrow = arrow
70+
return r
71+
}
72+
73+
// Body fills an msgpack.Encoder with the insert arrow request body.
74+
func (r *InsertRequest) Body(res tarantool.SchemaResolver, enc *msgpack.Encoder) error {
75+
if err := enc.EncodeMapLen(2); err != nil {
76+
return err
77+
}
78+
if err := tarantool.EncodeSpace(res, enc, r.space); err != nil {
79+
return err
80+
}
81+
if err := enc.EncodeUint(uint64(iprotoArrowKey)); err != nil {
82+
return err
83+
}
84+
return enc.Encode(r.arrow)
85+
}
86+
87+
// Response creates a response for the InsertRequest.
88+
func (r *InsertRequest) Response(
89+
header tarantool.Header,
90+
body io.Reader,
91+
) (tarantool.Response, error) {
92+
return tarantool.DecodeBaseResponse(header, body)
93+
}

arrow/tarantool_test.go

+120
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package arrow_test
2+
3+
import (
4+
"encoding/hex"
5+
"log"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/tarantool/go-tarantool/v2"
13+
"github.com/tarantool/go-tarantool/v2/arrow"
14+
"github.com/tarantool/go-tarantool/v2/test_helpers"
15+
)
16+
17+
var isArrowSupported = false
18+
19+
var server = "127.0.0.1:3013"
20+
var dialer = tarantool.NetDialer{
21+
Address: server,
22+
User: "test",
23+
Password: "test",
24+
}
25+
var space = "testArrow"
26+
27+
var opts = tarantool.Opts{
28+
Timeout: 5 * time.Second,
29+
}
30+
31+
// TestInsert uses Arrow sequence from Tarantool's test .
32+
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
33+
func TestInsert_invalid(t *testing.T) {
34+
arrows := []struct {
35+
arrow string
36+
expected string
37+
}{
38+
{
39+
"",
40+
"no Arrow data",
41+
},
42+
{
43+
"00",
44+
"Failed to decode Arrow IPC data",
45+
},
46+
{
47+
"ffffffff70000000040000009effffff0400010004000000" +
48+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
49+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
50+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
51+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
52+
"01000000000000003400000008000000000000000200000000000000000000000000" +
53+
"00000000000000000000000000000800000000000000000000000100000001000000" +
54+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
55+
"00000000000000000000",
56+
"memtx does not support arrow format",
57+
},
58+
}
59+
60+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
61+
defer conn.Close()
62+
63+
for i, a := range arrows {
64+
t.Run(strconv.Itoa(i), func(t *testing.T) {
65+
data, err := hex.DecodeString(a.arrow)
66+
require.NoError(t, err)
67+
68+
arr, err := arrow.MakeArrow(data)
69+
if err != nil {
70+
require.ErrorContains(t, err, a.expected)
71+
return
72+
}
73+
req := arrow.NewInsertRequest(space, arr)
74+
75+
_, err = conn.Do(req).Get()
76+
require.ErrorContains(t, err, a.expected)
77+
})
78+
}
79+
80+
}
81+
82+
// runTestMain is a body of TestMain function
83+
// (see https://pkg.go.dev/testing#hdr-Main).
84+
// Using defer + os.Exit is not works so TestMain body
85+
// is a separate function, see
86+
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
87+
func runTestMain(m *testing.M) int {
88+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
89+
if err != nil {
90+
log.Fatalf("Failed to extract Tarantool version: %s", err)
91+
}
92+
isArrowSupported = !isLess
93+
94+
if !isArrowSupported {
95+
log.Println("Skipping insert Arrow tests...")
96+
return m.Run()
97+
}
98+
99+
instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
100+
Dialer: dialer,
101+
InitScript: "config.lua",
102+
Listen: server,
103+
WaitStart: 100 * time.Millisecond,
104+
ConnectRetry: 10,
105+
RetryTimeout: 500 * time.Millisecond,
106+
})
107+
defer test_helpers.StopTarantoolWithCleanup(instance)
108+
109+
if err != nil {
110+
log.Printf("Failed to prepare test Tarantool: %s", err)
111+
return 1
112+
}
113+
114+
return m.Run()
115+
}
116+
117+
func TestMain(m *testing.M) {
118+
code := runTestMain(m)
119+
os.Exit(code)
120+
}

0 commit comments

Comments
 (0)