Skip to content
This repository was archived by the owner on Feb 21, 2024. It is now read-only.

Commit 0a24ab6

Browse files
authored
Merge pull request #1787 from travisturner/translate-read-buffer-increase
allow translate log entry buffer to grow
2 parents 232d790 + 9e09366 commit 0a24ab6

File tree

5 files changed

+98
-12
lines changed

5 files changed

+98
-12
lines changed

ctl/import_test.go

+65
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,11 @@ import (
1818
"bufio"
1919
"bytes"
2020
"context"
21+
"fmt"
2122
"io"
2223
"io/ioutil"
2324
"net/http"
25+
"reflect"
2426
"strings"
2527
"testing"
2628

@@ -184,6 +186,69 @@ func TestImportCommand_RunKeys(t *testing.T) {
184186
}
185187
}
186188

189+
// Ensure that import with keys runs with key replication.
190+
func TestImportCommand_KeyReplication(t *testing.T) {
191+
buf := bytes.Buffer{}
192+
stdin, stdout, stderr := GetIO(buf)
193+
cm := NewImportCommand(stdin, stdout, stderr)
194+
file, err := ioutil.TempFile("", "import-key.csv")
195+
196+
// create a large import file in order to test the
197+
// translateStoreBufferSize growth logic.
198+
keyBytes := []byte{}
199+
for row := 0; row < 100; row++ {
200+
for col := 0; col < 100; col++ {
201+
x := fmt.Sprintf("foo%d,bar%d\n", row, col)
202+
keyBytes = append(keyBytes, x...)
203+
}
204+
}
205+
x := "fooEND,barEND"
206+
keyBytes = append(keyBytes, x...)
207+
208+
file.Write(keyBytes)
209+
ctx := context.Background()
210+
if err != nil {
211+
t.Fatal(err)
212+
}
213+
214+
c := test.MustRunCluster(t, 2)
215+
cmd0 := c[0]
216+
cmd1 := c[1]
217+
218+
host0 := cmd0.API.Node().URI.HostPort()
219+
host1 := cmd1.API.Node().URI.HostPort()
220+
221+
cm.Host = host0
222+
223+
http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i", strings.NewReader(`{"options":{"keys": true}}`)))
224+
http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+cm.Host+"/index/i/field/f", strings.NewReader(`{"options":{"keys": true}}`)))
225+
226+
cm.Index = "i"
227+
cm.Field = "f"
228+
cm.Paths = []string{file.Name()}
229+
err = cm.Run(ctx)
230+
if err != nil {
231+
t.Fatalf("Import Run with key replication doesn't work: %s", err)
232+
}
233+
234+
// Verify that the data is available on both nodes.
235+
for _, host := range []string{host0, host1} {
236+
qry := "Count(Row(f=foo0))"
237+
resp, err := http.DefaultClient.Do(MustNewHTTPRequest("POST", "http://"+host+"/index/i/query", strings.NewReader(qry)))
238+
if err != nil {
239+
t.Fatalf("Querying data for validation: %s", err)
240+
}
241+
242+
// Read body and unmarshal response.
243+
exp := `{"results":[100]}` + "\n"
244+
if body, err := ioutil.ReadAll(resp.Body); err != nil {
245+
t.Fatalf("reading: %s", err)
246+
} else if !reflect.DeepEqual(body, []byte(exp)) {
247+
t.Fatalf("expected: %s, but got: %s", exp, body)
248+
}
249+
}
250+
}
251+
187252
// Ensure that integer import with keys runs.
188253
func TestImportCommand_RunValueKeys(t *testing.T) {
189254
buf := bytes.Buffer{}

http/handler.go

+20-2
Original file line numberDiff line numberDiff line change
@@ -1426,7 +1426,11 @@ func (h *Handler) handlePostClusterMessage(w http.ResponseWriter, r *http.Reques
14261426
type defaultClusterMessageResponse struct{}
14271427

14281428
// translateStoreBufferSize is the buffer size used for streaming data.
1429-
const translateStoreBufferSize = 65536
1429+
const translateStoreBufferSize = 1 << 16 // 64k
1430+
1431+
// translateStoreBufferSizeMax is the maximum size that the buffer is allowed
1432+
// to grow before raising an error.
1433+
const translateStoreBufferSizeMax = 1 << 22 // 4Mb
14301434

14311435
func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request) {
14321436
q := r.URL.Query()
@@ -1449,16 +1453,30 @@ func (h *Handler) handleGetTranslateData(w http.ResponseWriter, r *http.Request)
14491453
}
14501454

14511455
// Copy from reader to client until store or client disconnect.
1452-
buf := make([]byte, translateStoreBufferSize)
1456+
useBufferSize := translateStoreBufferSize
1457+
buf := make([]byte, useBufferSize)
14531458
for {
14541459
// Read from store.
14551460
n, err := rdr.Read(buf)
14561461
if err == io.EOF {
14571462
return
1463+
} else if err == pilosa.ErrTranslateReadTargetUndersized {
1464+
// Increase the buffer size and try to read again.
1465+
useBufferSize *= 2
1466+
// Prevent the buffer from growing without bound.
1467+
if useBufferSize > translateStoreBufferSizeMax {
1468+
h.logger.Printf("http: translate store buffer exceeded max size: %s", err)
1469+
return
1470+
}
1471+
buf = make([]byte, useBufferSize)
1472+
continue
14581473
} else if err != nil {
14591474
h.logger.Printf("http: translate store read error: %s", err)
14601475
return
14611476
} else if n == 0 {
1477+
// Reset the default buffer size.
1478+
useBufferSize = translateStoreBufferSize
1479+
buf = make([]byte, useBufferSize)
14621480
continue
14631481
}
14641482

http/handler_internal_test.go

-3
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ func TestPostIndexRequestUnmarshalJSON(t *testing.T) {
5757
t.Errorf("expected: %v, but got: %v for JSON: %s", test.expected, *actual, test.json)
5858
}
5959
}
60-
6160
}
6261
}
6362

@@ -93,7 +92,6 @@ func TestPostFieldRequestUnmarshalJSON(t *testing.T) {
9392
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
9493
}
9594
}
96-
9795
}
9896
}
9997

@@ -178,6 +176,5 @@ func TestFieldOptionValidation(t *testing.T) {
178176
t.Errorf("test %d: expected: %v, but got: %v", i, test.expected, *actual)
179177
}
180178
}
181-
182179
}
183180
}

test/pilosa.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func newCommand(opts ...server.CommandOption) *Command {
6969
m.Config.DataDir = path
7070
m.Config.Bind = "http://localhost:0"
7171
m.Config.Cluster.Disabled = true
72-
m.Config.Translation.MapSize = 100000
72+
m.Config.Translation.MapSize = 140000
7373

7474
if testing.Verbose() {
7575
m.Command.Stdout = os.Stdout

translate.go

+12-6
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,11 @@ const (
2929
)
3030

3131
var (
32-
ErrTranslateStoreClosed = errors.New("pilosa: translate store closed")
33-
ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed")
34-
ErrReplicationNotSupported = errors.New("pilosa: replication not supported")
35-
ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only")
32+
ErrTranslateStoreClosed = errors.New("pilosa: translate store closed")
33+
ErrTranslateStoreReaderClosed = errors.New("pilosa: translate store reader closed")
34+
ErrReplicationNotSupported = errors.New("pilosa: replication not supported")
35+
ErrTranslateStoreReadOnly = errors.New("pilosa: translate store could not find or create key, translate store read only")
36+
ErrTranslateReadTargetUndersized = errors.New("pilosa: translate read target is undersized")
3637
)
3738

3839
// TranslateStore is the storage for translation string-to-uint64 values.
@@ -1089,8 +1090,13 @@ func (r *translateFileReader) read(p []byte) (n int, err error) {
10891090
return 0, nil
10901091
}
10911092

1092-
// Shorten buffer to maximum read size.
1093-
if max := sz - r.offset; int64(len(p)) > max {
1093+
if max := sz - r.offset; max > int64(len(p)) {
1094+
// If p is not large enough to hold a single entry,
1095+
// return an error so the client can increase the
1096+
// size of p and try again.
1097+
return 0, ErrTranslateReadTargetUndersized
1098+
} else if int64(len(p)) > max {
1099+
// Shorten buffer to maximum read size.
10941100
p = p[:max]
10951101
}
10961102

0 commit comments

Comments
 (0)