Skip to content

Commit b1b7d28

Browse files
fix(ws): pubsub parity (#407)
* fix(ws): pubsub parity Cross-checked every response and request shape in rpc/ws against agave/pubsub-client (anza-xyz/agave) and filled the gaps. Method-level parity was already complete, but option sets, response fields, and the BlockSubscribe/ParsedBlockSubscribe split had drifted in ways that produced the "unable to decode client response" error on jsonParsed block notifications (issue #291) and left several subscriptions without the options Agave exposes. BlockSubscribe unification (closes #291) ---------------------------------------- BlockSubscribe now accepts every UiTransactionEncoding the RPC supports — base58, base64, base64+zstd, jsonParsed — through a single method, matching Agave's shape. Go can't express BlockResult.Value is a Go-style union: BlockResultValue carries both Block (*rpc.GetBlockResult) and ParsedBlock (*rpc.GetParsedBlockResult). Exactly one is populated per frame, chosen by the Encoding captured at subscribe time and routed inside decodeBlockNotification(isParsed). base58 / base64 / base64+zstd -> Block jsonParsed -> ParsedBlock ParsedBlockSubscribe is marked Deprecated but continues to return *ParsedBlockResult so nothing breaks for existing callers. Request / config parity ----------------------- - AccountSubscribeConfig: new struct mirroring RpcAccountInfoConfig (commitment, encoding, dataSlice, minContextSlot). The existing AccountSubscribe / AccountSubscribeWithOpts are preserved as thin wrappers; AccountSubscribeWithConfig exposes the full surface. - ProgramSubscribeConfig: new struct mirroring RpcProgramAccountsConfig (commitment, encoding, dataSlice, filters, minContextSlot, withContext, sortResults). - SignatureSubscribeConfig: new struct mirroring RpcSignatureSubscribeConfig, adds enableReceivedNotification. - Each Config has an unexported (c *Config).params() helper that matches Agave's serde(skip_serializing_if="Option::is_none") behavior. Response parity --------------- - SignatureResult.Value now models the untagged ProcessedSignature | ReceivedSignature union via SignatureResultValue{IsReceived, Processed}. Custom UnmarshalJSON handles the object form ({"err":...}), the string form "receivedSignature", and null. The old Value.Err path is replaced by Value.Processed.Err. - VoteResult gains VotePubkey and Signature to match RpcVote. - SlotsUpdatesResult adds Err (Dead variant), documents which fields are populated per discriminator. - All Context fields across the subscription result types moved from anonymous struct{Slot uint64} to a shared RPCResponseContext (slot + apiVersion), matching RpcResponseContext. Consumer adaptation ------------------- sendAndConfirmTransaction.WaitForConfirmation adapted to the new SignatureResult shape and loops on IsReceived until a processed notification arrives — previously retried only once, so a second consecutive received-only frame would have passed a nil Processed through and wrongly reported (confirmed=true, err=nil). Examples -------- - rpc/ws/examples/blockSubscribe: demonstrates the Block/ParsedBlock union via a switch on which field is populated. - rpc/ws/examples/parsedBlockSubscribe: removed (redundant now). Tests (new subscriptions_test.go + parsedBlockSubscribe_test.go) ---------------------------------------------------------------- Fixture JSON mirrors the wire format produced by Agave's rpc_subscriptions.rs integration tests: - accountNotification: base64 and jsonParsed payloads - programNotification, logsNotification, voteNotification, slotNotification, rootNotification - signatureNotification: processed (null err), processed (with err), and the "receivedSignature" string variant - slotsUpdatesNotification: all seven SlotUpdate variants - blockNotification: separate ParsedRoute / BinaryRoute tests that exercise decodeBlockNotification for both branches; the binary fixture includes a real [blob, "base64"] 2-array transaction + meta so drift between the two shapes would fail the test - TestConfigParams_Wire verifies per-field emission and omit behavior on every *SubscribeConfig. * chore: apply linting * fix(message): use gojson
1 parent c445f76 commit b1b7d28

16 files changed

Lines changed: 1046 additions & 139 deletions

message.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import (
2323

2424
bin "github.com/gagliardetto/binary"
2525
"github.com/gagliardetto/treeout"
26-
jsoniter "github.com/json-iterator/go"
26+
gojson "github.com/goccy/go-json"
2727

2828
"github.com/gagliardetto/solana-go/text"
2929
)
@@ -242,7 +242,7 @@ func (mx *Message) UnmarshalJSON(data []byte) error {
242242
Header MessageHeader `json:"header"`
243243
RecentBlockhash Hash `json:"recentBlockhash"`
244244
Instructions []CompiledInstruction `json:"instructions"`
245-
AddressTableLookups *jsoniter.RawMessage `json:"addressTableLookups"`
245+
AddressTableLookups *gojson.RawMessage `json:"addressTableLookups"`
246246
}{}
247247
if err := json.Unmarshal(data, &aux); err != nil {
248248
return err

rpc/jsonrpc/jsonrpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@ import (
1212
"sync/atomic"
1313

1414
"github.com/davecgh/go-spew/spew"
15-
stdjson "github.com/goccy/go-json"
1615
gojson "github.com/goccy/go-json"
16+
stdjson "github.com/goccy/go-json"
1717
"github.com/google/uuid"
1818
)
1919

rpc/sendAndConfirmTransaction/sendAndConfirmTransaction.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,22 @@ func WaitForConfirmation(
123123
timeoutCtx, cancel := context.WithTimeout(ctx, *timeout)
124124
defer cancel()
125125

126-
got, err := sub.Recv(timeoutCtx)
127-
if err != nil {
128-
if timeoutCtx.Err() == context.DeadlineExceeded {
129-
return false, ErrTimeout
126+
// Drain any "receivedSignature" notifications (if the caller opted in
127+
// to enableReceivedNotification) until we get the processed one.
128+
for {
129+
got, err := sub.Recv(timeoutCtx)
130+
if err != nil {
131+
if timeoutCtx.Err() == context.DeadlineExceeded {
132+
return false, ErrTimeout
133+
}
134+
return false, err
130135
}
131-
return false, err
132-
}
133-
if got.Value.Err != nil {
134-
return true, fmt.Errorf("confirmed transaction with execution error: %v", got.Value.Err)
136+
if got.Value.IsReceived {
137+
continue
138+
}
139+
if got.Value.Processed != nil && got.Value.Processed.Err != nil {
140+
return true, fmt.Errorf("confirmed transaction with execution error: %v", got.Value.Processed.Err)
141+
}
142+
return true, nil
135143
}
136-
return true, nil
137144
}

rpc/util_test.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@ import (
1111
// Layout references for the test data below:
1212
//
1313
// SPL Token (solana-program/token):
14-
// Mint::LEN = 82
15-
// Account::LEN = 165
14+
//
15+
// Mint::LEN = 82
16+
// Account::LEN = 165
1617
//
1718
// Token-2022 (solana-program/token-2022):
18-
// Extended records place a 1-byte AccountType discriminator at offset
19-
// Account::LEN (= 165). Mint base (82 bytes) is padded with 83 zeros so
20-
// Mint and Account share the discriminator offset.
2119
//
22-
// AccountType::Uninitialized = 0
23-
// AccountType::Mint = 1
24-
// AccountType::Account = 2
20+
// Extended records place a 1-byte AccountType discriminator at offset
21+
// Account::LEN (= 165). Mint base (82 bytes) is padded with 83 zeros so
22+
// Mint and Account share the discriminator offset.
23+
//
24+
// AccountType::Uninitialized = 0
25+
// AccountType::Mint = 1
26+
// AccountType::Account = 2
2527
//
26-
// Extensions follow as TLV: [u16 LE type][u16 LE length][value...].
28+
// Extensions follow as TLV: [u16 LE type][u16 LE length][value...].
2729
const (
2830
testAccountTypeUninitialized uint8 = 0
2931
testAccountTypeMint uint8 = 1

rpc/ws/accountSubscribe.go

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,47 +22,78 @@ import (
2222
)
2323

2424
type AccountResult struct {
25-
Context struct {
26-
Slot uint64 `json:"slot"`
27-
} `json:"context"`
28-
Value *rpc.Account `json:"value"`
25+
Context RPCResponseContext `json:"context"`
26+
Value *rpc.Account `json:"value"`
27+
}
28+
29+
// AccountSubscribeConfig matches Agave's RpcAccountInfoConfig. It
30+
// supports every encoding the RPC does — base58, base64, base64+zstd,
31+
// and jsonParsed — via rpc.Account.Data, which is a union type
32+
// (rpc.DataBytesOrJSON) that decodes all of them transparently.
33+
type AccountSubscribeConfig struct {
34+
Commitment rpc.CommitmentType
35+
Encoding solana.EncodingType
36+
DataSlice *rpc.DataSlice
37+
MinContextSlot *uint64
38+
}
39+
40+
// params converts the config to the JSON-RPC params object the
41+
// accountSubscribe method expects. Missing options are omitted so the
42+
// wire format matches Agave's serde(skip_serializing_if) behavior.
43+
func (c *AccountSubscribeConfig) params() map[string]any {
44+
conf := map[string]any{"encoding": solana.EncodingBase64}
45+
if c == nil {
46+
return conf
47+
}
48+
if c.Commitment != "" {
49+
conf["commitment"] = c.Commitment
50+
}
51+
if c.Encoding != "" {
52+
conf["encoding"] = c.Encoding
53+
}
54+
if c.DataSlice != nil {
55+
conf["dataSlice"] = c.DataSlice
56+
}
57+
if c.MinContextSlot != nil {
58+
conf["minContextSlot"] = *c.MinContextSlot
59+
}
60+
return conf
2961
}
3062

3163
// AccountSubscribe subscribes to an account to receive notifications
32-
// when the lamports or data for a given account public key changes.
64+
// when its lamports or data change. Defaults to base64 encoding.
3365
func (cl *Client) AccountSubscribe(
3466
account solana.PublicKey,
3567
commitment rpc.CommitmentType,
3668
) (*AccountSubscription, error) {
37-
return cl.AccountSubscribeWithOpts(
38-
account,
39-
commitment,
40-
"",
41-
)
69+
return cl.AccountSubscribeWithOpts(account, commitment, "")
4270
}
4371

44-
// AccountSubscribe subscribes to an account to receive notifications
45-
// when the lamports or data for a given account public key changes.
72+
// AccountSubscribeWithOpts is the simple variant that accepts bare
73+
// commitment and encoding arguments.
74+
//
75+
// Deprecated: use AccountSubscribeWithConfig for the full option set
76+
// (dataSlice, minContextSlot) exposed by Agave's RpcAccountInfoConfig.
4677
func (cl *Client) AccountSubscribeWithOpts(
4778
account solana.PublicKey,
4879
commitment rpc.CommitmentType,
4980
encoding solana.EncodingType,
5081
) (*AccountSubscription, error) {
82+
return cl.AccountSubscribeWithConfig(account, &AccountSubscribeConfig{
83+
Commitment: commitment,
84+
Encoding: encoding,
85+
})
86+
}
5187

52-
params := []any{account.String()}
53-
conf := map[string]any{
54-
"encoding": "base64",
55-
}
56-
if commitment != "" {
57-
conf["commitment"] = commitment
58-
}
59-
if encoding != "" {
60-
conf["encoding"] = encoding
61-
}
62-
88+
// AccountSubscribeWithConfig mirrors the full RpcAccountInfoConfig
89+
// option surface of the underlying accountSubscribe RPC.
90+
func (cl *Client) AccountSubscribeWithConfig(
91+
account solana.PublicKey,
92+
config *AccountSubscribeConfig,
93+
) (*AccountSubscription, error) {
6394
genSub, err := cl.subscribe(
64-
params,
65-
conf,
95+
[]any{account.String()},
96+
config.params(),
6697
"accountSubscribe",
6798
"accountUnsubscribe",
6899
func(msg []byte) (any, error) {
@@ -74,9 +105,7 @@ func (cl *Client) AccountSubscribeWithOpts(
74105
if err != nil {
75106
return nil, err
76107
}
77-
return &AccountSubscription{
78-
sub: genSub,
79-
}, nil
108+
return &AccountSubscription{sub: genSub}, nil
80109
}
81110

82111
type AccountSubscription struct {

rpc/ws/blockSubscribe.go

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,34 @@ import (
2222
"github.com/gagliardetto/solana-go/rpc"
2323
)
2424

25+
// BlockResult is the unified notification payload for blockSubscribe.
26+
// Exactly one of Value.Block or Value.ParsedBlock is populated per
27+
// frame, based on the Encoding the caller passed at subscribe time.
2528
type BlockResult struct {
26-
Context struct {
27-
Slot uint64
28-
} `json:"context"`
29-
Value struct {
30-
Slot uint64 `json:"slot"`
31-
Err any `json:"err,omitempty"`
32-
Block *rpc.GetBlockResult `json:"block,omitempty"`
33-
} `json:"value"`
29+
Context RPCResponseContext `json:"context"`
30+
Value BlockResultValue `json:"value"`
31+
}
32+
33+
// BlockResultValue mirrors Agave's RpcBlockUpdate with a Go-style union
34+
// over the two block shapes.
35+
//
36+
// Encoding Field populated
37+
// ----------------------------------------------------
38+
// base58 / base64 / base64+zstd Block (*rpc.GetBlockResult)
39+
// jsonParsed ParsedBlock (*rpc.GetParsedBlockResult)
40+
//
41+
// Err / Slot are always decoded regardless of which block field is set.
42+
type BlockResultValue struct {
43+
Slot uint64 `json:"slot"`
44+
Err any `json:"err,omitempty"`
45+
46+
// Block is populated for binary encodings.
47+
Block *rpc.GetBlockResult `json:"block,omitempty"`
48+
49+
// ParsedBlock is populated when Encoding=jsonParsed was requested.
50+
// BlockSubscribe sets this at decode time; it is not read by the
51+
// default json.Unmarshal path.
52+
ParsedBlock *rpc.GetParsedBlockResult `json:"-"`
3453
}
3554

3655
type BlockSubscribeFilter interface {
@@ -76,7 +95,11 @@ type BlockSubscribeOpts struct {
7695

7796
// NOTE: Unstable, disabled by default
7897
//
79-
// Subscribe to receive notification anytime a new block is Confirmed or Finalized.
98+
// BlockSubscribe subscribes to new blocks. Supports every encoding the
99+
// RPC does — base58, base64, base64+zstd, and jsonParsed. The
100+
// BlockResult.Value field is a union: the binary encodings populate
101+
// Block (*rpc.GetBlockResult); jsonParsed populates ParsedBlock
102+
// (*rpc.GetParsedBlockResult).
80103
//
81104
// **This subscription is unstable and only available if the validator was started
82105
// with the `--rpc-pubsub-enable-block-subscription` flag. The format of this
@@ -85,6 +108,8 @@ func (cl *Client) BlockSubscribe(
85108
filter BlockSubscribeFilter,
86109
opts *BlockSubscribeOpts,
87110
) (*BlockSubscription, error) {
111+
isParsed := opts != nil && opts.Encoding == solana.EncodingJSONParsed
112+
88113
var params []any
89114
if filter != nil {
90115
switch v := filter.(type) {
@@ -94,6 +119,7 @@ func (cl *Client) BlockSubscribe(
94119
params = append(params, rpc.M{"mentionsAccountOrProgram": v.Pubkey})
95120
}
96121
}
122+
97123
if opts != nil {
98124
obj := make(rpc.M)
99125
if opts.Commitment != "" {
@@ -102,12 +128,10 @@ func (cl *Client) BlockSubscribe(
102128
if opts.Encoding != "" {
103129
if !solana.IsAnyOfEncodingType(
104130
opts.Encoding,
105-
// Valid encodings:
106-
// solana.EncodingJSON, // TODO
107-
solana.EncodingJSONParsed, // TODO
108131
solana.EncodingBase58,
109132
solana.EncodingBase64,
110133
solana.EncodingBase64Zstd,
134+
solana.EncodingJSONParsed,
111135
) {
112136
return nil, fmt.Errorf("provided encoding is not supported: %s", opts.Encoding)
113137
}
@@ -126,16 +150,13 @@ func (cl *Client) BlockSubscribe(
126150
params = append(params, obj)
127151
}
128152
}
153+
129154
genSub, err := cl.subscribe(
130155
params,
131156
nil,
132157
"blockSubscribe",
133158
"blockUnsubscribe",
134-
func(msg []byte) (any, error) {
135-
var res BlockResult
136-
err := decodeResponseFromMessage(msg, &res)
137-
return &res, err
138-
},
159+
decodeBlockNotification(isParsed),
139160
)
140161
if err != nil {
141162
return nil, err
@@ -145,6 +166,42 @@ func (cl *Client) BlockSubscribe(
145166
}, nil
146167
}
147168

169+
// decodeBlockNotification returns a frame decoder that lands the block
170+
// either in BlockResultValue.Block (binary) or .ParsedBlock (jsonParsed).
171+
// Unexported but package-local so tests can exercise both branches
172+
// without a live server.
173+
func decodeBlockNotification(isParsed bool) func([]byte) (any, error) {
174+
return func(msg []byte) (any, error) {
175+
if isParsed {
176+
var tmp struct {
177+
Context RPCResponseContext `json:"context"`
178+
Value struct {
179+
Slot uint64 `json:"slot"`
180+
Err any `json:"err,omitempty"`
181+
Block *rpc.GetParsedBlockResult `json:"block,omitempty"`
182+
} `json:"value"`
183+
}
184+
if err := decodeResponseFromMessage(msg, &tmp); err != nil {
185+
return nil, err
186+
}
187+
return &BlockResult{
188+
Context: tmp.Context,
189+
Value: BlockResultValue{
190+
Slot: tmp.Value.Slot,
191+
Err: tmp.Value.Err,
192+
ParsedBlock: tmp.Value.Block,
193+
},
194+
}, nil
195+
}
196+
197+
var res BlockResult
198+
if err := decodeResponseFromMessage(msg, &res); err != nil {
199+
return nil, err
200+
}
201+
return &res, nil
202+
}
203+
}
204+
148205
type BlockSubscription struct {
149206
sub *Subscription
150207
}

0 commit comments

Comments
 (0)