Skip to content

Commit 3a7cb98

Browse files
authored
Merge pull request #81 from cankush625/ankushchavan/multi-line-commands
feat(resp): support multi-line commands split across TCP packets
2 parents d2b63e4 + 9df535f commit 3a7cb98

3 files changed

Lines changed: 177 additions & 8 deletions

File tree

resp/handler.go

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,26 @@
11
package resp
22

33
import (
4+
"bufio"
5+
"io"
46
"net"
57
)
68

7-
// HandleConn does a read and write to the connection
9+
// HandleConn reads RESP commands from the connection and writes replies.
10+
// It uses a bufio.Reader to read one complete RESP message at a time,
11+
// so commands split across multiple TCP packets are handled correctly.
812
func HandleConn(conn net.Conn) {
9-
buffer := make([]byte, 1024)
13+
reader := bufio.NewReader(conn)
1014
for {
11-
n, err := conn.Read(buffer)
15+
msg, err := readMessage(reader)
1216
if err != nil {
17+
// io.EOF means the client closed the connection cleanly.
18+
if err == io.EOF {
19+
return
20+
}
1321
conn.Write([]byte("-Error processing\r\n"))
1422
return
1523
}
16-
if n == 0 {
17-
conn.Write([]byte("-Error processing\r\n"))
18-
return
19-
}
20-
PerformRequest(buffer[:n], conn)
24+
PerformRequest(msg, conn)
2125
}
2226
}

resp/reader.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
package resp
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"fmt"
7+
"io"
8+
"strconv"
9+
"strings"
10+
)
11+
12+
// readMessage reads exactly one complete RESP message from the reader and
13+
// returns the raw bytes. It follows the RESP framing so it works correctly
14+
// regardless of how many TCP packets the message arrives in.
15+
//
16+
// Only Array messages are expected for incoming commands. Any other first byte
17+
// is read as a single line and returned as-is so ParseCommand can reject it.
18+
func readMessage(reader *bufio.Reader) ([]byte, error) {
19+
// Read the first line, e.g. "*3\r\n"
20+
line, err := reader.ReadString('\n')
21+
if err != nil {
22+
return nil, err
23+
}
24+
25+
var buf bytes.Buffer
26+
buf.WriteString(line)
27+
28+
if len(line) == 0 || line[0] != '*' {
29+
// Not an array — return the single line for ParseCommand to reject.
30+
return buf.Bytes(), nil
31+
}
32+
33+
// Parse element count from "*N\r\n"
34+
count, err := strconv.Atoi(strings.TrimRight(line[1:], "\r\n"))
35+
if err != nil {
36+
return nil, fmt.Errorf("invalid array length: %w", err)
37+
}
38+
39+
for i := 0; i < count; i++ {
40+
// Read the bulk string length line, e.g. "$5\r\n"
41+
lenLine, err := reader.ReadString('\n')
42+
if err != nil {
43+
return nil, err
44+
}
45+
buf.WriteString(lenLine)
46+
47+
if len(lenLine) == 0 || lenLine[0] != '$' {
48+
return nil, fmt.Errorf("expected bulk string, got %q", lenLine)
49+
}
50+
51+
n, err := strconv.Atoi(strings.TrimRight(lenLine[1:], "\r\n"))
52+
if err != nil {
53+
return nil, fmt.Errorf("invalid bulk string length: %w", err)
54+
}
55+
56+
// Read exactly n bytes + \r\n
57+
data := make([]byte, n+2)
58+
if _, err := io.ReadFull(reader, data); err != nil {
59+
return nil, err
60+
}
61+
buf.Write(data)
62+
}
63+
64+
return buf.Bytes(), nil
65+
}

resp/reader_test.go

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
package resp
2+
3+
import (
4+
"bufio"
5+
"bytes"
6+
"io"
7+
"strings"
8+
"testing"
9+
)
10+
11+
// TestReadMessage tests the readMessage function for complete and partial reads.
12+
func TestReadMessage(t *testing.T) {
13+
tests := []struct {
14+
input string
15+
want string
16+
wantErr bool
17+
}{
18+
// Single-element array (PING)
19+
{"*1\r\n$4\r\nPING\r\n", "*1\r\n$4\r\nPING\r\n", false},
20+
// Two-element array (ECHO hello)
21+
{"*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n", "*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n", false},
22+
// Three-element array (SET key value)
23+
{"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n", false},
24+
// Non-array first byte — returned as-is for ParseCommand to reject
25+
{"+OK\r\n", "+OK\r\n", false},
26+
}
27+
for _, test := range tests {
28+
reader := bufio.NewReader(strings.NewReader(test.input))
29+
got, err := readMessage(reader)
30+
if test.wantErr && err == nil {
31+
t.Errorf("readMessage(%q) expected error, got nil", test.input)
32+
continue
33+
}
34+
if !test.wantErr && err != nil {
35+
t.Errorf("readMessage(%q) unexpected error: %v", test.input, err)
36+
continue
37+
}
38+
if string(got) != test.want {
39+
t.Errorf("readMessage(%q) = %q; want %q", test.input, got, test.want)
40+
}
41+
}
42+
}
43+
44+
// TestReadMessage_MultipleCommands tests that readMessage reads one command at
45+
// a time from a stream containing multiple pipelined commands.
46+
func TestReadMessage_MultipleCommands(t *testing.T) {
47+
// Two commands sent back-to-back in the same stream (pipelining)
48+
input := "*1\r\n$4\r\nPING\r\n" +
49+
"*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n"
50+
51+
reader := bufio.NewReader(strings.NewReader(input))
52+
53+
first, err := readMessage(reader)
54+
if err != nil {
55+
t.Fatalf("first readMessage error: %v", err)
56+
}
57+
if string(first) != "*1\r\n$4\r\nPING\r\n" {
58+
t.Errorf("first message = %q; want %q", first, "*1\r\n$4\r\nPING\r\n")
59+
}
60+
61+
second, err := readMessage(reader)
62+
if err != nil {
63+
t.Fatalf("second readMessage error: %v", err)
64+
}
65+
if string(second) != "*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n" {
66+
t.Errorf("second message = %q; want %q", second, "*2\r\n$4\r\nECHO\r\n$5\r\nhello\r\n")
67+
}
68+
69+
// Third read should return EOF
70+
_, err = readMessage(reader)
71+
if err != io.EOF {
72+
t.Errorf("expected io.EOF after last command, got %v", err)
73+
}
74+
}
75+
76+
// TestReadMessage_SplitAcrossReads simulates a command arriving in two separate
77+
// TCP chunks by writing to a pipe with a delay between writes.
78+
func TestReadMessage_SplitAcrossReads(t *testing.T) {
79+
// Simulate split delivery: write two parts with a pipe
80+
pr, pw := io.Pipe()
81+
reader := bufio.NewReader(pr)
82+
83+
done := make(chan []byte, 1)
84+
go func() {
85+
msg, _ := readMessage(reader)
86+
done <- msg
87+
}()
88+
89+
// Write the first half
90+
pw.Write([]byte("*3\r\n$3\r\nSET\r\n"))
91+
// Write the second half
92+
pw.Write([]byte("$3\r\nkey\r\n$5\r\nvalue\r\n"))
93+
pw.Close()
94+
95+
got := <-done
96+
want := "*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n"
97+
if !bytes.Equal(got, []byte(want)) {
98+
t.Errorf("split read = %q; want %q", got, want)
99+
}
100+
}

0 commit comments

Comments
 (0)