Skip to content

Commit 6549d53

Browse files
committed
more tests
1 parent ae060da commit 6549d53

File tree

5 files changed

+548
-0
lines changed

5 files changed

+548
-0
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
.DS_Store
2+
.worktrees/

io_test.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"io"
77
"net"
8+
"testing"
89
)
910

1011
type mockMRW struct {
@@ -83,3 +84,42 @@ func (rw *mockMRW) WriteMessageMP(m MPMessage) error {
8384
bufs.WriteTo(&rw.b)
8485
return nil
8586
}
87+
88+
func TestMessage_Size(t *testing.T) {
89+
tests := []struct {
90+
name string
91+
msg Message
92+
want int
93+
}{
94+
{"empty", Message{}, 0},
95+
{"normal", Message("hello"), 5},
96+
{"binary", Message{0x00, 0xff, 0x01}, 3},
97+
}
98+
for _, tt := range tests {
99+
t.Run(tt.name, func(t *testing.T) {
100+
if got := tt.msg.Size(); got != tt.want {
101+
t.Errorf("Message.Size() = %v, want %v", got, tt.want)
102+
}
103+
})
104+
}
105+
}
106+
107+
func TestMPMessage_Size(t *testing.T) {
108+
tests := []struct {
109+
name string
110+
msg MPMessage
111+
want int
112+
}{
113+
{"empty", MPMessage{}, 0},
114+
{"single", MPMessage{[]byte("hello")}, 5},
115+
{"multiple", MPMessage{[]byte("hello"), []byte("world")}, 10},
116+
{"with empty part", MPMessage{[]byte("a"), {}, []byte("b")}, 2},
117+
}
118+
for _, tt := range tests {
119+
t.Run(tt.name, func(t *testing.T) {
120+
if got := tt.msg.Size(); got != tt.want {
121+
t.Errorf("MPMessage.Size() = %v, want %v", got, tt.want)
122+
}
123+
})
124+
}
125+
}

msgpeer/handler_test.go

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
package msgpeer
2+
3+
import (
4+
"context"
5+
"sync/atomic"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestParallelHandler_Process(t *testing.T) {
11+
var processed int32
12+
13+
h := &mockPeerHandler{
14+
processC: make(chan Request, 10),
15+
}
16+
17+
ph := ParallelHandler(h, 100*time.Millisecond, nil)
18+
19+
ph.Process(context.Background(), []byte("req1"), func(ctx context.Context, resp Response) error {
20+
atomic.AddInt32(&processed, 1)
21+
return nil
22+
})
23+
24+
select {
25+
case req := <-h.processC:
26+
if string(req) != "req1" {
27+
t.Errorf("received request = %q, want %q", req, "req1")
28+
}
29+
case <-time.After(200 * time.Millisecond):
30+
t.Fatal("request not processed")
31+
}
32+
}
33+
34+
func TestParallelHandler_OnNotify(t *testing.T) {
35+
h := &mockPeerHandler{
36+
notifyC: make(chan Notify, 10),
37+
}
38+
39+
ph := ParallelHandler(h, 100*time.Millisecond, nil)
40+
41+
ph.OnNotify(context.Background(), []byte("notify1"))
42+
43+
select {
44+
case n := <-h.notifyC:
45+
if string(n) != "notify1" {
46+
t.Errorf("received notify = %q, want %q", n, "notify1")
47+
}
48+
case <-time.After(200 * time.Millisecond):
49+
t.Fatal("notify not processed")
50+
}
51+
}
52+
53+
func TestParallelHandler_PanicRecovery(t *testing.T) {
54+
var panicValue interface{}
55+
panicCalled := make(chan bool, 1)
56+
57+
panicHandler := &panicMockHandler{}
58+
59+
ph := ParallelHandler(panicHandler, 100*time.Millisecond, func(v interface{}) {
60+
panicValue = v
61+
select {
62+
case panicCalled <- true:
63+
default:
64+
}
65+
})
66+
67+
ph.Process(context.Background(), []byte("req"), func(ctx context.Context, resp Response) error {
68+
return nil
69+
})
70+
71+
select {
72+
case <-panicCalled:
73+
case <-time.After(200 * time.Millisecond):
74+
t.Fatal("panic log not called")
75+
}
76+
77+
if panicValue != "handler panic" {
78+
t.Errorf("panic value = %v, want 'handler panic'", panicValue)
79+
}
80+
}
81+
82+
type panicMockHandler struct{}
83+
84+
func (h *panicMockHandler) Process(ctx context.Context, r Request, w ResponseWriter) {
85+
panic("handler panic")
86+
}
87+
88+
func (h *panicMockHandler) OnNotify(ctx context.Context, n Notify) {
89+
panic("handler panic")
90+
}
91+
92+
func TestParallelHandler_ContextCanceled(t *testing.T) {
93+
h := &mockPeerHandler{
94+
processC: make(chan Request, 10),
95+
}
96+
97+
ph := ParallelHandler(h, 100*time.Millisecond, nil)
98+
99+
ctx, cancel := context.WithCancel(context.Background())
100+
cancel() // Cancel immediately
101+
102+
ph.Process(ctx, []byte("req"), func(ctx context.Context, resp Response) error {
103+
return nil
104+
})
105+
106+
// Wait a short time to confirm the request is not processed
107+
time.Sleep(50 * time.Millisecond)
108+
109+
select {
110+
case <-h.processC:
111+
t.Error("request should not be processed when context is canceled")
112+
default:
113+
// Expected: request is not processed
114+
}
115+
}

msgpeer/msgpeer_test.go

Lines changed: 223 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,223 @@
1+
package msgpeer
2+
3+
import (
4+
"context"
5+
"sync"
6+
"testing"
7+
"time"
8+
9+
"github.com/someonegg/msgpump/v2"
10+
)
11+
12+
type mockPeerMRW struct {
13+
mu sync.Mutex
14+
readC chan msgpump.Message
15+
writeC chan msgpump.Message
16+
closed bool
17+
closedC chan struct{}
18+
}
19+
20+
func newMockPeerMRW() *mockPeerMRW {
21+
return &mockPeerMRW{
22+
readC: make(chan msgpump.Message, 10),
23+
writeC: make(chan msgpump.Message, 10),
24+
closedC: make(chan struct{}),
25+
}
26+
}
27+
28+
func (m *mockPeerMRW) ReadMessage() (msgpump.Message, error) {
29+
select {
30+
case msg := <-m.readC:
31+
return msg, nil
32+
case <-m.closedC:
33+
return nil, context.Canceled
34+
}
35+
}
36+
37+
func (m *mockPeerMRW) WriteMessage(msg msgpump.Message) error {
38+
m.mu.Lock()
39+
defer m.mu.Unlock()
40+
if m.closed {
41+
return context.Canceled
42+
}
43+
m.writeC <- msg
44+
return nil
45+
}
46+
47+
func (m *mockPeerMRW) WriteMessageMP(msg msgpump.MPMessage) error {
48+
m.mu.Lock()
49+
defer m.mu.Unlock()
50+
if m.closed {
51+
return context.Canceled
52+
}
53+
// Merge into a single message
54+
var combined []byte
55+
for _, part := range msg {
56+
combined = append(combined, part...)
57+
}
58+
m.writeC <- combined
59+
return nil
60+
}
61+
62+
func (m *mockPeerMRW) OnStop() {
63+
m.mu.Lock()
64+
defer m.mu.Unlock()
65+
if !m.closed {
66+
m.closed = true
67+
close(m.closedC)
68+
}
69+
}
70+
71+
type mockPeerHandler struct {
72+
processC chan Request
73+
notifyC chan Notify
74+
}
75+
76+
func newMockPeerHandler() *mockPeerHandler {
77+
return &mockPeerHandler{
78+
processC: make(chan Request, 10),
79+
notifyC: make(chan Notify, 10),
80+
}
81+
}
82+
83+
func (h *mockPeerHandler) Process(ctx context.Context, r Request, w ResponseWriter) {
84+
h.processC <- r
85+
w(ctx, []byte("response"))
86+
}
87+
88+
func (h *mockPeerHandler) OnNotify(ctx context.Context, n Notify) {
89+
h.notifyC <- n
90+
}
91+
92+
func TestPeer_Notify(t *testing.T) {
93+
mrw := newMockPeerMRW()
94+
h := newMockPeerHandler()
95+
peer := NewPeer(mrw, h, 10)
96+
peer.Start(nil)
97+
98+
err := peer.Notify(context.Background(), []byte("hello"))
99+
if err != nil {
100+
t.Fatalf("Notify failed: %v", err)
101+
}
102+
103+
select {
104+
case msg := <-mrw.writeC:
105+
// Verify message format: N\n + body
106+
expected := "N\nhello"
107+
if string(msg) != expected {
108+
t.Errorf("Notify message = %q, want %q", msg, expected)
109+
}
110+
case <-time.After(100 * time.Millisecond):
111+
t.Fatal("Notify message not written")
112+
}
113+
114+
peer.Stop()
115+
<-peer.StopD()
116+
}
117+
118+
func TestPeer_Do(t *testing.T) {
119+
mrw := newMockPeerMRW()
120+
h := newMockPeerHandler()
121+
peer := NewPeer(mrw, h, 10)
122+
peer.Start(nil)
123+
124+
// Start goroutine to simulate response
125+
go func() {
126+
select {
127+
case msg := <-mrw.writeC:
128+
// Parse request ID: R,<rid>\n<body>
129+
s := string(msg)
130+
if len(s) > 2 && s[0] == 'R' && s[1] == ',' {
131+
var rid string
132+
for i := 2; i < len(s); i++ {
133+
if s[i] == '\n' {
134+
rid = s[2:i]
135+
break
136+
}
137+
}
138+
// Send response: P,<rid>\n<response>
139+
resp := []byte("P," + rid + "\nresponse-data")
140+
mrw.readC <- resp
141+
}
142+
case <-time.After(100 * time.Millisecond):
143+
}
144+
}()
145+
146+
resp, err := peer.Do(context.Background(), []byte("request-data"))
147+
if err != nil {
148+
t.Fatalf("Do failed: %v", err)
149+
}
150+
151+
if string(resp) != "response-data" {
152+
t.Errorf("Do response = %q, want %q", resp, "response-data")
153+
}
154+
155+
peer.Stop()
156+
<-peer.StopD()
157+
}
158+
159+
func TestPeer_Do_Timeout(t *testing.T) {
160+
mrw := newMockPeerMRW()
161+
h := newMockPeerHandler()
162+
peer := NewPeer(mrw, h, 10)
163+
peer.Start(nil)
164+
165+
ctx, cancel := context.WithTimeout(context.Background(), 50*time.Millisecond)
166+
defer cancel()
167+
168+
_, err := peer.Do(ctx, []byte("request"))
169+
if err != context.DeadlineExceeded {
170+
t.Errorf("Do with timeout: got %v, want DeadlineExceeded", err)
171+
}
172+
173+
peer.Stop()
174+
<-peer.StopD()
175+
}
176+
177+
func TestPeer_Process_EdgeCases(t *testing.T) {
178+
tests := []struct {
179+
name string
180+
message []byte
181+
wantReq bool
182+
wantNot bool
183+
}{
184+
{"empty", []byte{}, false, false},
185+
{"no newline", []byte("R,123"), false, false},
186+
{"notify", []byte("N\nhello"), false, true},
187+
{"request", []byte("R,1\nbody"), true, false},
188+
{"unknown type", []byte("X,1\nbody"), false, false},
189+
}
190+
191+
for _, tt := range tests {
192+
t.Run(tt.name, func(t *testing.T) {
193+
mrw := newMockPeerMRW()
194+
h := newMockPeerHandler()
195+
peer := NewPeer(mrw, h, 10)
196+
197+
// Call Process directly
198+
peer.Process(context.Background(), tt.message)
199+
200+
gotReq := false
201+
gotNot := false
202+
203+
select {
204+
case <-h.processC:
205+
gotReq = true
206+
default:
207+
}
208+
209+
select {
210+
case <-h.notifyC:
211+
gotNot = true
212+
default:
213+
}
214+
215+
if gotReq != tt.wantReq {
216+
t.Errorf("request received = %v, want %v", gotReq, tt.wantReq)
217+
}
218+
if gotNot != tt.wantNot {
219+
t.Errorf("notify received = %v, want %v", gotNot, tt.wantNot)
220+
}
221+
})
222+
}
223+
}

0 commit comments

Comments
 (0)