Skip to content

Commit b6ad2cc

Browse files
committed
api: add support of a batch insert request
Draft changes: add support the IPROTO_INSERT_ARROW request and message pack type MP_ARROW . Closes #399
1 parent 592db69 commit b6ad2cc

12 files changed

+560
-23
lines changed

CHANGELOG.md

+4-3
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@ Versioning](http://semver.org/spec/v2.0.0.html) except to the first release.
99
## [Unreleased]
1010

1111
### Added
12-
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13-
connection and ctx is not canceled;
14-
also added logs for error case of `ConnectionPool.tryConnect()` calls in
12+
- Add err log to `ConnectionPool.Add()` in case, when unable to establish
13+
connection and ctx is not canceled;
14+
also added logs for error case of `ConnectionPool.tryConnect()` calls in
1515
`ConnectionPool.controller()` and `ConnectionPool.reconnect()`
1616
- Methods that are implemented but not included in the pooler interface (#395).
1717
- Implemented stringer methods for pool.Role (#405).
18+
- Support the IPROTO_INSERT_ARROW request (#399).
1819

1920
### Changed
2021

arrow/arrow.go

+59
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
package arrow
2+
3+
import (
4+
"fmt"
5+
"reflect"
6+
7+
"github.com/vmihailenco/msgpack/v5"
8+
)
9+
10+
// Arrow MessagePack extension type
11+
const arrowExtId = 8
12+
13+
// Arrow struct wraps a raw arrow data buffer.
14+
type Arrow struct {
15+
data []byte
16+
}
17+
18+
// MakeArrow returns a new arrow.Arrow object that contains
19+
// wrapped a raw arrow data buffer.
20+
func MakeArrow(arrow []byte) (Arrow, error) {
21+
if len(arrow) == 0 {
22+
return Arrow{}, fmt.Errorf("no Arrow data")
23+
}
24+
return Arrow{arrow}, nil
25+
}
26+
27+
// ToArrow returns a []byte that contains Arrow raw data.
28+
func (a *Arrow) ToArrow() []byte {
29+
return a.data
30+
}
31+
32+
func arrowDecoder(d *msgpack.Decoder, v reflect.Value, extLen int) error {
33+
arrow := Arrow{
34+
data: make([]byte, 0, extLen),
35+
}
36+
n, err := d.Buffered().Read(arrow.data)
37+
if err != nil {
38+
return fmt.Errorf("msgpack: can't read bytes on Arrow decode: %w", err)
39+
}
40+
if n < extLen || n != len(arrow.data) {
41+
return fmt.Errorf("msgpack: unexpected end of stream after %d Arrow bytes", n)
42+
}
43+
44+
v.Set(reflect.ValueOf(arrow))
45+
return nil
46+
}
47+
48+
func arrowEncoder(e *msgpack.Encoder, v reflect.Value) ([]byte, error) {
49+
if v.IsValid() {
50+
return v.Interface().(Arrow).data, nil
51+
}
52+
53+
return []byte{}, fmt.Errorf("msgpack: not valid Arrow value")
54+
}
55+
56+
func init() {
57+
msgpack.RegisterExtDecoder(arrowExtId, Arrow{}, arrowDecoder)
58+
msgpack.RegisterExtEncoder(arrowExtId, Arrow{}, arrowEncoder)
59+
}

arrow/arrow_test.go

+130
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,130 @@
1+
package arrow_test
2+
3+
import (
4+
"encoding/hex"
5+
"log"
6+
"os"
7+
"strconv"
8+
"testing"
9+
"time"
10+
11+
"github.com/stretchr/testify/require"
12+
"github.com/tarantool/go-tarantool/v2"
13+
"github.com/tarantool/go-tarantool/v2/arrow"
14+
"github.com/tarantool/go-tarantool/v2/test_helpers"
15+
)
16+
17+
var isArrowSupported = false
18+
19+
var server = "127.0.0.1:3013"
20+
var dialer = tarantool.NetDialer{
21+
Address: server,
22+
User: "test",
23+
Password: "test",
24+
}
25+
var space = "testArrow"
26+
27+
var opts = tarantool.Opts{
28+
Timeout: 5 * time.Second,
29+
}
30+
31+
func skipIfArrowUnsupported(t *testing.T) {
32+
t.Helper()
33+
if !isArrowSupported {
34+
t.Skip("Skipping test for Tarantool without Arrow support in msgpack")
35+
}
36+
}
37+
38+
// TestInsert uses Arrow sequence from Tarantool's test .
39+
// nolint:lll
40+
// See: https://github.com/tarantool/tarantool/blob/master/test/box-luatest/gh_10508_iproto_insert_arrow_test.lua
41+
func TestInsert_invalid(t *testing.T) {
42+
skipIfArrowUnsupported(t)
43+
44+
arrows := []struct {
45+
arrow string
46+
expected string
47+
}{
48+
{
49+
"",
50+
"no Arrow data",
51+
},
52+
{
53+
"00",
54+
"Failed to decode Arrow IPC data",
55+
},
56+
{
57+
"ffffffff70000000040000009effffff0400010004000000" +
58+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
59+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
60+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
61+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
62+
"01000000000000003400000008000000000000000200000000000000000000000000" +
63+
"00000000000000000000000000000800000000000000000000000100000001000000" +
64+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
65+
"00000000000000000000",
66+
"memtx does not support arrow format",
67+
},
68+
}
69+
70+
conn := test_helpers.ConnectWithValidation(t, dialer, opts)
71+
defer conn.Close()
72+
73+
for i, a := range arrows {
74+
t.Run(strconv.Itoa(i), func(t *testing.T) {
75+
data, err := hex.DecodeString(a.arrow)
76+
require.NoError(t, err)
77+
78+
arr, err := arrow.MakeArrow(data)
79+
if err != nil {
80+
require.ErrorContains(t, err, a.expected)
81+
return
82+
}
83+
req := arrow.NewInsertRequest(space, arr)
84+
85+
_, err = conn.Do(req).Get()
86+
require.ErrorContains(t, err, a.expected)
87+
})
88+
}
89+
90+
}
91+
92+
// runTestMain is a body of TestMain function
93+
// (see https://pkg.go.dev/testing#hdr-Main).
94+
// Using defer + os.Exit is not works so TestMain body
95+
// is a separate function, see
96+
// https://stackoverflow.com/questions/27629380/how-to-exit-a-go-program-honoring-deferred-calls
97+
func runTestMain(m *testing.M) int {
98+
isLess, err := test_helpers.IsTarantoolVersionLess(3, 3, 0)
99+
if err != nil {
100+
log.Fatalf("Failed to extract Tarantool version: %s", err)
101+
}
102+
isArrowSupported = !isLess
103+
104+
if !isArrowSupported {
105+
log.Println("Skipping insert Arrow tests...")
106+
return m.Run()
107+
}
108+
109+
instance, err := test_helpers.StartTarantool(test_helpers.StartOpts{
110+
Dialer: dialer,
111+
InitScript: "config.lua",
112+
Listen: server,
113+
WaitStart: 100 * time.Millisecond,
114+
ConnectRetry: 10,
115+
RetryTimeout: 500 * time.Millisecond,
116+
})
117+
defer test_helpers.StopTarantoolWithCleanup(instance)
118+
119+
if err != nil {
120+
log.Printf("Failed to prepare test Tarantool: %s", err)
121+
return 1
122+
}
123+
124+
return m.Run()
125+
}
126+
127+
func TestMain(m *testing.M) {
128+
code := runTestMain(m)
129+
os.Exit(code)
130+
}

arrow/config.lua

+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
-- Do not set listen for now so connector won't be
2+
-- able to send requests until everything is configured.
3+
box.cfg {
4+
work_dir = os.getenv("TEST_TNT_WORK_DIR")
5+
}
6+
7+
box.schema.user.create('test', {
8+
password = 'test',
9+
if_not_exists = true
10+
})
11+
box.schema.user.grant('test', 'execute', 'universe', nil, {
12+
if_not_exists = true
13+
})
14+
15+
local s = box.schema.space.create('testArrow', {
16+
id = 524,
17+
if_not_exists = true
18+
})
19+
s:create_index('primary', {
20+
type = 'tree',
21+
parts = {{
22+
field = 1,
23+
type = 'integer'
24+
}},
25+
if_not_exists = true
26+
})
27+
s:truncate()
28+
29+
box.schema.user.grant('test', 'read,write', 'space', 'testArrow', {
30+
if_not_exists = true
31+
})
32+
33+
-- Set listen only when every other thing is configured.
34+
box.cfg {
35+
listen = os.getenv("TEST_TNT_LISTEN")
36+
}

arrow/example_test.go

+62
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
// Run Tarantool instance before example execution:
2+
//
3+
// Terminal 1:
4+
// $ cd arrow
5+
// $ TEST_TNT_LISTEN=3013 TEST_TNT_WORK_DIR=$(mktemp -d -t 'tarantool.XXX') tarantool config.lua
6+
//
7+
// Terminal 2:
8+
// $ go test -v example_test.go
9+
package arrow_test
10+
11+
import (
12+
"context"
13+
"encoding/hex"
14+
"fmt"
15+
"log"
16+
"strings"
17+
"time"
18+
19+
"github.com/tarantool/go-tarantool/v2"
20+
"github.com/tarantool/go-tarantool/v2/arrow"
21+
)
22+
23+
var arrowBinData, _ = hex.DecodeString("ffffffff70000000040000009effffff0400010004000000" +
24+
"b6ffffff0c00000004000000000000000100000004000000daffffff140000000202" +
25+
"000004000000f0ffffff4000000001000000610000000600080004000c0010000400" +
26+
"080009000c000c000c0000000400000008000a000c00040006000800ffffffff8800" +
27+
"0000040000008affffff0400030010000000080000000000000000000000acffffff" +
28+
"01000000000000003400000008000000000000000200000000000000000000000000" +
29+
"00000000000000000000000000000800000000000000000000000100000001000000" +
30+
"0000000000000000000000000a00140004000c0010000c0014000400060008000c00" +
31+
"00000000000000000000")
32+
33+
func Example() {
34+
dialer := tarantool.NetDialer{
35+
Address: "127.0.0.1:3013",
36+
User: "test",
37+
Password: "test",
38+
}
39+
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
40+
client, err := tarantool.Connect(ctx, dialer, tarantool.Opts{})
41+
cancel()
42+
if err != nil {
43+
log.Fatalf("Failed to connect: %s", err)
44+
}
45+
46+
arr, err := arrow.MakeArrow(arrowBinData)
47+
if err != nil {
48+
log.Fatalf("Failed prepare Arrow data: %s", err)
49+
}
50+
51+
spaceNo := uint32(524)
52+
req := arrow.NewInsertRequest(spaceNo, arr)
53+
54+
_, err = client.Do(req).Get()
55+
if err != nil {
56+
msg := strings.Split(err.Error(), "(")[0]
57+
fmt.Printf("Failed insert Arrow: %s\n", msg)
58+
}
59+
60+
//! Output:
61+
// Failed insert Arrow: memtx does not support arrow format
62+
}

0 commit comments

Comments
 (0)