Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 26 additions & 1 deletion rpc/ws/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,13 +186,38 @@ func getUint64WithOk(data []byte, path ...string) (uint64, bool) {
return 0, false
}

func getRPCError(data []byte) error {
rawErr, dataType, _, err := jsonparser.Get(data, "error")
if err != nil || dataType == jsonparser.Null {
return nil
}

if dataType != jsonparser.Object {
return fmt.Errorf("rpc error: %s", string(rawErr))
}

parsed := new(json2.Error)
if err := json.Unmarshal(rawErr, parsed); err != nil {
return fmt.Errorf("rpc error: %s", string(rawErr))
}
return parsed
}

func (c *Client) handleMessage(message []byte) {
// when receiving message with id. the result will be a subscription number.
// that number will be associated to all future message destine to this request

requestID, ok := getUint64WithOk(message, "id")
if ok {
subID, _ := getUint64WithOk(message, "result")
if rpcErr := getRPCError(message); rpcErr != nil {
c.closeSubscription(requestID, rpcErr)
return
}
subID, ok := getUint64WithOk(message, "result")
if !ok {
c.closeSubscription(requestID, fmt.Errorf("subscription response missing result for request_id=%d", requestID))
return
}
c.handleNewSubscriptionMessage(requestID, subID)
return
}
Expand Down
51 changes: 51 additions & 0 deletions rpc/ws/fixes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,57 @@ func TestConnectionDrop_AllSubscriptionsNotified(t *testing.T) {
}
}

func TestSubscribe_ErrorResponseClosesSubscription(t *testing.T) {
m := newMockWSServer(t)
defer m.stop()

c := connectClient(t, m)
defer c.Close()

type subResult struct {
sub *Subscription
err error
}
ch := make(chan subResult, 1)
go func() {
sub, err := c.subscribe(
[]any{"test"},
nil,
"testSubscribe",
"testUnsubscribe",
func(msg []byte) (any, error) {
var res SlotResult
err := decodeResponseFromMessage(msg, &res)
return &res, err
},
)
ch <- subResult{sub, err}
}()

var reqID uint64
select {
case msg := <-m.incoming:
id, ok := getUint64WithOk(msg, "id")
require.True(t, ok, "could not parse request ID from %s", string(msg))
reqID = id
case <-time.After(2 * time.Second):
t.Fatal("timed out waiting for subscription request")
}

resp := fmt.Sprintf(`{"jsonrpc":"2.0","error":{"code":-32601,"message":"Method not found"},"id":%d}`, reqID)
m.send(t, resp)

r := <-ch
require.NoError(t, r.err)
require.NotNil(t, r.sub)

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
_, err := r.sub.Recv(ctx)
require.Error(t, err)
require.Contains(t, err.Error(), "Method not found")
}

func TestConcurrentUnsubscribeAndConnectionDrop_NoPanic(t *testing.T) {
m := newMockWSServer(t)
defer m.stop()
Expand Down
Loading