Skip to content

Commit bde58fe

Browse files
jerrinotclaude
andcommitted
feat(qwp): add WithQwpDumpWriter for capturing wire bytes
Records outgoing TCP bytes (HTTP upgrade + WebSocket frames) from a QWP sender to an io.Writer, replayable via `cat dump.bin | nc host port`. When the dump writer is set and no server is configured, an in-process net.Pipe() is used with a minimal fake WebSocket acceptor that completes the RFC 6455 handshake and replies to binary frames with QWP OK ACKs — so the dump captures the full on-the-wire exchange without requiring a real server. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 8997371 commit bde58fe

File tree

7 files changed

+239
-32
lines changed

7 files changed

+239
-32
lines changed

qwp_integration_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ func qwpSkipIfNoServer(t *testing.T) {
6060
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
6161
defer cancel()
6262

63-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 0, 0, 0)
63+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 0, 0, 0, nil)
6464
if err != nil {
6565
t.Skipf("QuestDB not available at %s: %v", qwpTestAddr, err)
6666
}
@@ -136,7 +136,7 @@ func TestQwpIntegrationBasicTypes(t *testing.T) {
136136
qwpDropTable(t, tableName)
137137
defer qwpDropTable(t, tableName)
138138

139-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0)
139+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0, nil)
140140
if err != nil {
141141
t.Fatal(err)
142142
}
@@ -207,7 +207,7 @@ func TestQwpIntegrationMultipleFlushes(t *testing.T) {
207207
qwpDropTable(t, tableName)
208208
defer qwpDropTable(t, tableName)
209209

210-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0)
210+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0, nil)
211211
if err != nil {
212212
t.Fatal(err)
213213
}
@@ -261,7 +261,7 @@ func TestQwpIntegrationSymbolDedup(t *testing.T) {
261261
qwpDropTable(t, tableName)
262262
defer qwpDropTable(t, tableName)
263263

264-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0)
264+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0, nil)
265265
if err != nil {
266266
t.Fatal(err)
267267
}
@@ -300,7 +300,7 @@ func TestQwpIntegrationMultiTable(t *testing.T) {
300300
defer qwpDropTable(t, table1)
301301
defer qwpDropTable(t, table2)
302302

303-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0)
303+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0, nil)
304304
if err != nil {
305305
t.Fatal(err)
306306
}
@@ -340,7 +340,7 @@ func TestQwpIntegrationLargeBatch(t *testing.T) {
340340
qwpDropTable(t, tableName)
341341
defer qwpDropTable(t, tableName)
342342

343-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 5*time.Second, 0, 0)
343+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 5*time.Second, 0, 0, nil)
344344
if err != nil {
345345
t.Fatal(err)
346346
}
@@ -422,7 +422,7 @@ func TestQwpIntegrationAsyncMode(t *testing.T) {
422422
defer qwpDropTable(t, tableName)
423423

424424
// Create sender with in-flight window = 4.
425-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 5*time.Second, 0, 0, 4)
425+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, 5*time.Second, 0, 0, nil, 4)
426426
if err != nil {
427427
t.Fatal(err)
428428
}
@@ -508,7 +508,7 @@ func TestQwpIntegrationAutoFlush(t *testing.T) {
508508
defer qwpDropTable(t, tableName)
509509

510510
// auto-flush every 3 rows.
511-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 3, 0)
511+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 3, 0, nil)
512512
if err != nil {
513513
t.Fatal(err)
514514
}
@@ -556,7 +556,7 @@ func TestQwpIntegrationNullableColumns(t *testing.T) {
556556
qwpDropTable(t, tableName)
557557
defer qwpDropTable(t, tableName)
558558

559-
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0)
559+
s, err := newQwpLineSender(ctx, "ws://"+qwpTestAddr, qwpTransportOpts{}, time.Second, 0, 0, nil)
560560
if err != nil {
561561
t.Fatal(err)
562562
}

qwp_sender.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ package questdb
2727
import (
2828
"context"
2929
"fmt"
30+
"io"
3031
"math/big"
3132
"time"
3233
)
@@ -158,8 +159,9 @@ type qwpLineSender struct {
158159

159160
// newQwpLineSender creates a new QWP sender and establishes a
160161
// WebSocket connection to the server. If inFlightWindow > 1, async
161-
// mode is enabled with a dedicated I/O goroutine.
162-
func newQwpLineSender(ctx context.Context, address string, opts qwpTransportOpts, retryTimeout time.Duration, autoFlushRows int, autoFlushInterval time.Duration, inFlightWindow ...int) (*qwpLineSender, error) {
162+
// mode is enabled with a dedicated I/O goroutine. If dumpWriter is
163+
// non-nil, outgoing TCP bytes are recorded (see WithQwpDumpWriter).
164+
func newQwpLineSender(ctx context.Context, address string, opts qwpTransportOpts, retryTimeout time.Duration, autoFlushRows int, autoFlushInterval time.Duration, dumpWriter io.Writer, inFlightWindow ...int) (*qwpLineSender, error) {
163165
window := 1
164166
if len(inFlightWindow) > 0 && inFlightWindow[0] > 1 {
165167
window = inFlightWindow[0]
@@ -178,6 +180,7 @@ func newQwpLineSender(ctx context.Context, address string, opts qwpTransportOpts
178180
closeTimeout: 5 * time.Second,
179181
}
180182

183+
s.transport.dumpWriter = dumpWriter
181184
if err := s.transport.connect(ctx, address, opts); err != nil {
182185
return nil, err
183186
}

qwp_sender_async_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ func TestQwpAsyncCloseAfterError(t *testing.T) {
487487
defer srv.Close()
488488

489489
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
490-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 2)
490+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 2)
491491
if err != nil {
492492
t.Fatal(err)
493493
}
@@ -544,7 +544,7 @@ func TestQwpAsyncCloseUnresponsiveServer(t *testing.T) {
544544
defer srv.Close()
545545

546546
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
547-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 2)
547+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 2)
548548
if err != nil {
549549
t.Fatal(err)
550550
}

qwp_sender_test.go

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ func newQwpTestServer(t *testing.T) *httptest.Server {
7070
func newQwpSenderForTest(t *testing.T, serverURL string) *qwpLineSender {
7171
t.Helper()
7272
wsURL := "ws" + strings.TrimPrefix(serverURL, "http")
73-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
73+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
7474
if err != nil {
7575
t.Fatalf("newQwpLineSender: %v", err)
7676
}
@@ -353,7 +353,7 @@ func TestQwpSenderAutoFlushRows(t *testing.T) {
353353
defer srv.Close()
354354

355355
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
356-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 3, 0)
356+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 3, 0, nil)
357357
if err != nil {
358358
t.Fatal(err)
359359
}
@@ -404,7 +404,7 @@ func TestQwpSenderAutoFlushTimeInterval(t *testing.T) {
404404

405405
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
406406
// autoFlushRows=0 (disabled), autoFlushInterval=10ms.
407-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 10*time.Millisecond)
407+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 10*time.Millisecond, nil)
408408
if err != nil {
409409
t.Fatal(err)
410410
}
@@ -444,7 +444,7 @@ func TestQwpSenderAutoFlushDisabled(t *testing.T) {
444444

445445
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
446446
// Both autoFlushRows=0 and autoFlushInterval=0 (disabled).
447-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
447+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
448448
if err != nil {
449449
t.Fatal(err)
450450
}
@@ -1007,7 +1007,7 @@ func TestQwpSenderMethodChaining(t *testing.T) {
10071007

10081008
func TestQwpSenderIntegration(t *testing.T) {
10091009
ctx := context.Background()
1010-
s, err := newQwpLineSender(ctx, "ws://localhost:9000", qwpTransportOpts{}, time.Second, 0, 0)
1010+
s, err := newQwpLineSender(ctx, "ws://localhost:9000", qwpTransportOpts{}, time.Second, 0, 0, nil)
10111011
if err != nil {
10121012
t.Skipf("QuestDB not available: %v", err)
10131013
}
@@ -1107,7 +1107,7 @@ func TestQwpSenderSymbolDictAcrossFlushes(t *testing.T) {
11071107
defer srv.Close()
11081108

11091109
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1110-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
1110+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
11111111
if err != nil {
11121112
t.Fatal(err)
11131113
}
@@ -1203,7 +1203,7 @@ func TestQwpSenderServerError(t *testing.T) {
12031203
defer srv.Close()
12041204

12051205
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1206-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
1206+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
12071207
if err != nil {
12081208
t.Fatal(err)
12091209
}
@@ -1254,7 +1254,7 @@ func TestQwpSenderAsyncBasic(t *testing.T) {
12541254
defer srv.Close()
12551255

12561256
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1257-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 2)
1257+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 2)
12581258
if err != nil {
12591259
t.Fatal(err)
12601260
}
@@ -1320,7 +1320,7 @@ func TestQwpSenderAsyncMultipleFlushes(t *testing.T) {
13201320
defer srv.Close()
13211321

13221322
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1323-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 3)
1323+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 3)
13241324
if err != nil {
13251325
t.Fatal(err)
13261326
}
@@ -1354,7 +1354,7 @@ func TestQwpSenderAsyncCloseAutoFlush(t *testing.T) {
13541354
defer srv.Close()
13551355

13561356
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1357-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 2)
1357+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 2)
13581358
if err != nil {
13591359
t.Fatal(err)
13601360
}
@@ -1417,7 +1417,7 @@ func TestQwpSenderSchemaKeyPerTable(t *testing.T) {
14171417
defer srv.Close()
14181418

14191419
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1420-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
1420+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
14211421
if err != nil {
14221422
t.Fatal(err)
14231423
}
@@ -1632,7 +1632,7 @@ func TestQwpAsyncNoCacheOnFlushFailure(t *testing.T) {
16321632
defer srv.Close()
16331633

16341634
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1635-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, 2)
1635+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil, 2)
16361636
if err != nil {
16371637
t.Fatal(err)
16381638
}
@@ -1695,7 +1695,7 @@ func TestQwpAsyncAutoFlushNonBlocking(t *testing.T) {
16951695

16961696
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
16971697
// window=4, autoFlushRows=10
1698-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 10, 0, 4)
1698+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 10, 0, nil, 4)
16991699
if err != nil {
17001700
t.Fatal(err)
17011701
}
@@ -1797,7 +1797,7 @@ func TestQwpAuthHeaderFormat(t *testing.T) {
17971797
opts := qwpTransportOpts{
17981798
authorization: "Bearer my_token",
17991799
}
1800-
s, err := newQwpLineSender(context.Background(), wsURL, opts, 0, 0, 0)
1800+
s, err := newQwpLineSender(context.Background(), wsURL, opts, 0, 0, 0, nil)
18011801
if err != nil {
18021802
t.Fatal(err)
18031803
}
@@ -1835,7 +1835,7 @@ func TestQwpAuthHeaderFormat(t *testing.T) {
18351835
opts := qwpTransportOpts{
18361836
authorization: "Basic YWRtaW46cXVlc3Q=", // base64("admin:quest")
18371837
}
1838-
s, err := newQwpLineSender(context.Background(), wsURL, opts, 0, 0, 0)
1838+
s, err := newQwpLineSender(context.Background(), wsURL, opts, 0, 0, 0, nil)
18391839
if err != nil {
18401840
t.Fatal(err)
18411841
}
@@ -1948,7 +1948,7 @@ func TestQwpMaxBufSizeTriggersFlush(t *testing.T) {
19481948
defer srv.Close()
19491949

19501950
wsURL := "ws" + strings.TrimPrefix(srv.URL, "http")
1951-
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0)
1951+
s, err := newQwpLineSender(context.Background(), wsURL, qwpTransportOpts{}, 0, 0, 0, nil)
19521952
if err != nil {
19531953
t.Fatal(err)
19541954
}

0 commit comments

Comments
 (0)