Skip to content

Commit 92fa9b3

Browse files
committed
feat(rpc): add error handling for RPC responses and corresponding test
1 parent aec058e commit 92fa9b3

2 files changed

Lines changed: 77 additions & 1 deletion

File tree

rpc/ws/client.go

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,38 @@ func getUint64WithOk(data []byte, path ...string) (uint64, bool) {
186186
return 0, false
187187
}
188188

189+
func getRPCError(data []byte) error {
190+
rawErr, dataType, _, err := jsonparser.Get(data, "error")
191+
if err != nil || dataType == jsonparser.Null {
192+
return nil
193+
}
194+
195+
if dataType != jsonparser.Object {
196+
return fmt.Errorf("rpc error: %s", string(rawErr))
197+
}
198+
199+
parsed := new(json2.Error)
200+
if err := json.Unmarshal(rawErr, parsed); err != nil {
201+
return fmt.Errorf("rpc error: %s", string(rawErr))
202+
}
203+
return parsed
204+
}
205+
189206
func (c *Client) handleMessage(message []byte) {
190207
// when receiving message with id. the result will be a subscription number.
191208
// that number will be associated to all future message destine to this request
192209

193210
requestID, ok := getUint64WithOk(message, "id")
194211
if ok {
195-
subID, _ := getUint64WithOk(message, "result")
212+
if rpcErr := getRPCError(message); rpcErr != nil {
213+
c.closeSubscription(requestID, rpcErr)
214+
return
215+
}
216+
subID, ok := getUint64WithOk(message, "result")
217+
if !ok {
218+
c.closeSubscription(requestID, fmt.Errorf("subscription response missing result for request_id=%d", requestID))
219+
return
220+
}
196221
c.handleNewSubscriptionMessage(requestID, subID)
197222
return
198223
}

rpc/ws/fixes_test.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -411,6 +411,57 @@ func TestConnectionDrop_AllSubscriptionsNotified(t *testing.T) {
411411
}
412412
}
413413

414+
func TestSubscribe_ErrorResponseClosesSubscription(t *testing.T) {
415+
m := newMockWSServer(t)
416+
defer m.stop()
417+
418+
c := connectClient(t, m)
419+
defer c.Close()
420+
421+
type subResult struct {
422+
sub *Subscription
423+
err error
424+
}
425+
ch := make(chan subResult, 1)
426+
go func() {
427+
sub, err := c.subscribe(
428+
[]any{"test"},
429+
nil,
430+
"testSubscribe",
431+
"testUnsubscribe",
432+
func(msg []byte) (any, error) {
433+
var res SlotResult
434+
err := decodeResponseFromMessage(msg, &res)
435+
return &res, err
436+
},
437+
)
438+
ch <- subResult{sub, err}
439+
}()
440+
441+
var reqID uint64
442+
select {
443+
case msg := <-m.incoming:
444+
id, ok := getUint64WithOk(msg, "id")
445+
require.True(t, ok, "could not parse request ID from %s", string(msg))
446+
reqID = id
447+
case <-time.After(2 * time.Second):
448+
t.Fatal("timed out waiting for subscription request")
449+
}
450+
451+
resp := fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":%d}`, reqID)
452+
m.send(t, resp)
453+
454+
r := <-ch
455+
require.NoError(t, r.err)
456+
require.NotNil(t, r.sub)
457+
458+
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
459+
defer cancel()
460+
_, err := r.sub.Recv(ctx)
461+
require.Error(t, err)
462+
require.Contains(t, err.Error(), "Method not found")
463+
}
464+
414465
func TestConcurrentUnsubscribeAndConnectionDrop_NoPanic(t *testing.T) {
415466
m := newMockWSServer(t)
416467
defer m.stop()

0 commit comments

Comments
 (0)