Skip to content

Commit ee36f0c

Browse files
fix(ws): pubsub parity
Cross-checked every response and request shape in rpc/ws against agave/pubsub-client (anza-xyz/agave) and filled the gaps. Method-level parity was already complete, but option sets, response fields, and the BlockSubscribe/ParsedBlockSubscribe split had drifted in ways that produced the "unable to decode client response" error on jsonParsed block notifications (issue #291) and left several subscriptions without the options Agave exposes. BlockSubscribe unification (closes #291) ---------------------------------------- BlockSubscribe now accepts every UiTransactionEncoding the RPC supports — base58, base64, base64+zstd, jsonParsed — through a single method, matching Agave's shape. Go can't express #[serde(untagged)] the way Rust's EncodedTransaction does, so BlockResult.Value is a Go-style union: BlockResultValue carries both Block (*rpc.GetBlockResult) and ParsedBlock (*rpc.GetParsedBlockResult). Exactly one is populated per frame, chosen by the Encoding captured at subscribe time and routed inside decodeBlockNotification(isParsed). base58 / base64 / base64+zstd -> Block jsonParsed -> ParsedBlock ParsedBlockSubscribe is marked Deprecated but continues to return *ParsedBlockResult so nothing breaks for existing callers. Request / config parity ----------------------- - AccountSubscribeConfig: new struct mirroring RpcAccountInfoConfig (commitment, encoding, dataSlice, minContextSlot). The existing AccountSubscribe / AccountSubscribeWithOpts are preserved as thin wrappers; AccountSubscribeWithConfig exposes the full surface. - ProgramSubscribeConfig: new struct mirroring RpcProgramAccountsConfig (commitment, encoding, dataSlice, filters, minContextSlot, withContext, sortResults). - SignatureSubscribeConfig: new struct mirroring RpcSignatureSubscribeConfig, adds enableReceivedNotification. - Each Config has an unexported (c *Config).params() helper that matches Agave's serde(skip_serializing_if="Option::is_none") behavior. Response parity --------------- - SignatureResult.Value now models the untagged ProcessedSignature | ReceivedSignature union via SignatureResultValue{IsReceived, Processed}. Custom UnmarshalJSON handles the object form ({"err":...}), the string form "receivedSignature", and null. The old Value.Err path is replaced by Value.Processed.Err. - VoteResult gains VotePubkey and Signature to match RpcVote. - SlotsUpdatesResult adds Err (Dead variant), documents which fields are populated per discriminator. - All Context fields across the subscription result types moved from anonymous struct{Slot uint64} to a shared RPCResponseContext (slot + apiVersion), matching RpcResponseContext. Consumer adaptation ------------------- sendAndConfirmTransaction.WaitForConfirmation adapted to the new SignatureResult shape and loops on IsReceived until a processed notification arrives — previously retried only once, so a second consecutive received-only frame would have passed a nil Processed through and wrongly reported (confirmed=true, err=nil). Examples -------- - rpc/ws/examples/blockSubscribe: demonstrates the Block/ParsedBlock union via a switch on which field is populated. - rpc/ws/examples/parsedBlockSubscribe: removed (redundant now). Tests (new subscriptions_test.go + parsedBlockSubscribe_test.go) ---------------------------------------------------------------- Fixture JSON mirrors the wire format produced by Agave's rpc_subscriptions.rs integration tests: - accountNotification: base64 and jsonParsed payloads - programNotification, logsNotification, voteNotification, slotNotification, rootNotification - signatureNotification: processed (null err), processed (with err), and the "receivedSignature" string variant - slotsUpdatesNotification: all seven SlotUpdate variants - blockNotification: separate ParsedRoute / BinaryRoute tests that exercise decodeBlockNotification for both branches; the binary fixture includes a real [blob, "base64"] 2-array transaction + meta so drift between the two shapes would fail the test - TestConfigParams_Wire verifies per-field emission and omit behavior on every *SubscribeConfig.
1 parent 828ac78 commit ee36f0c

12 files changed

Lines changed: 1028 additions & 124 deletions

rpc/sendAndConfirmTransaction/sendAndConfirmTransaction.go

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -123,15 +123,22 @@ func WaitForConfirmation(
123123
timeoutCtx, cancel := context.WithTimeout(ctx, *timeout)
124124
defer cancel()
125125

126-
got, err := sub.Recv(timeoutCtx)
127-
if err != nil {
128-
if timeoutCtx.Err() == context.DeadlineExceeded {
129-
return false, ErrTimeout
126+
// Drain any "receivedSignature" notifications (if the caller opted in
127+
// to enableReceivedNotification) until we get the processed one.
128+
for {
129+
got, err := sub.Recv(timeoutCtx)
130+
if err != nil {
131+
if timeoutCtx.Err() == context.DeadlineExceeded {
132+
return false, ErrTimeout
133+
}
134+
return false, err
130135
}
131-
return false, err
132-
}
133-
if got.Value.Err != nil {
134-
return true, fmt.Errorf("confirmed transaction with execution error: %v", got.Value.Err)
136+
if got.Value.IsReceived {
137+
continue
138+
}
139+
if got.Value.Processed != nil && got.Value.Processed.Err != nil {
140+
return true, fmt.Errorf("confirmed transaction with execution error: %v", got.Value.Processed.Err)
141+
}
142+
return true, nil
135143
}
136-
return true, nil
137144
}

rpc/ws/accountSubscribe.go

Lines changed: 57 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -22,47 +22,78 @@ import (
2222
)
2323

2424
type AccountResult struct {
25-
Context struct {
26-
Slot uint64 `json:"slot"`
27-
} `json:"context"`
28-
Value *rpc.Account `json:"value"`
25+
Context RPCResponseContext `json:"context"`
26+
Value *rpc.Account `json:"value"`
27+
}
28+
29+
// AccountSubscribeConfig matches Agave's RpcAccountInfoConfig. It
30+
// supports every encoding the RPC does — base58, base64, base64+zstd,
31+
// and jsonParsed — via rpc.Account.Data, which is a union type
32+
// (rpc.DataBytesOrJSON) that decodes all of them transparently.
33+
type AccountSubscribeConfig struct {
34+
Commitment rpc.CommitmentType
35+
Encoding solana.EncodingType
36+
DataSlice *rpc.DataSlice
37+
MinContextSlot *uint64
38+
}
39+
40+
// params converts the config to the JSON-RPC params object the
41+
// accountSubscribe method expects. Missing options are omitted so the
42+
// wire format matches Agave's serde(skip_serializing_if) behavior.
43+
func (c *AccountSubscribeConfig) params() map[string]any {
44+
conf := map[string]any{"encoding": solana.EncodingBase64}
45+
if c == nil {
46+
return conf
47+
}
48+
if c.Commitment != "" {
49+
conf["commitment"] = c.Commitment
50+
}
51+
if c.Encoding != "" {
52+
conf["encoding"] = c.Encoding
53+
}
54+
if c.DataSlice != nil {
55+
conf["dataSlice"] = c.DataSlice
56+
}
57+
if c.MinContextSlot != nil {
58+
conf["minContextSlot"] = *c.MinContextSlot
59+
}
60+
return conf
2961
}
3062

3163
// AccountSubscribe subscribes to an account to receive notifications
32-
// when the lamports or data for a given account public key changes.
64+
// when its lamports or data change. Defaults to base64 encoding.
3365
func (cl *Client) AccountSubscribe(
3466
account solana.PublicKey,
3567
commitment rpc.CommitmentType,
3668
) (*AccountSubscription, error) {
37-
return cl.AccountSubscribeWithOpts(
38-
account,
39-
commitment,
40-
"",
41-
)
69+
return cl.AccountSubscribeWithOpts(account, commitment, "")
4270
}
4371

44-
// AccountSubscribe subscribes to an account to receive notifications
45-
// when the lamports or data for a given account public key changes.
72+
// AccountSubscribeWithOpts is the simple variant that accepts bare
73+
// commitment and encoding arguments.
74+
//
75+
// Deprecated: use AccountSubscribeWithConfig for the full option set
76+
// (dataSlice, minContextSlot) exposed by Agave's RpcAccountInfoConfig.
4677
func (cl *Client) AccountSubscribeWithOpts(
4778
account solana.PublicKey,
4879
commitment rpc.CommitmentType,
4980
encoding solana.EncodingType,
5081
) (*AccountSubscription, error) {
82+
return cl.AccountSubscribeWithConfig(account, &AccountSubscribeConfig{
83+
Commitment: commitment,
84+
Encoding: encoding,
85+
})
86+
}
5187

52-
params := []any{account.String()}
53-
conf := map[string]any{
54-
"encoding": "base64",
55-
}
56-
if commitment != "" {
57-
conf["commitment"] = commitment
58-
}
59-
if encoding != "" {
60-
conf["encoding"] = encoding
61-
}
62-
88+
// AccountSubscribeWithConfig mirrors the full RpcAccountInfoConfig
89+
// option surface of the underlying accountSubscribe RPC.
90+
func (cl *Client) AccountSubscribeWithConfig(
91+
account solana.PublicKey,
92+
config *AccountSubscribeConfig,
93+
) (*AccountSubscription, error) {
6394
genSub, err := cl.subscribe(
64-
params,
65-
conf,
95+
[]any{account.String()},
96+
config.params(),
6697
"accountSubscribe",
6798
"accountUnsubscribe",
6899
func(msg []byte) (any, error) {
@@ -74,9 +105,7 @@ func (cl *Client) AccountSubscribeWithOpts(
74105
if err != nil {
75106
return nil, err
76107
}
77-
return &AccountSubscription{
78-
sub: genSub,
79-
}, nil
108+
return &AccountSubscription{sub: genSub}, nil
80109
}
81110

82111
type AccountSubscription struct {

rpc/ws/blockSubscribe.go

Lines changed: 74 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -22,15 +22,34 @@ import (
2222
"github.com/gagliardetto/solana-go/rpc"
2323
)
2424

25+
// BlockResult is the unified notification payload for blockSubscribe.
26+
// Exactly one of Value.Block or Value.ParsedBlock is populated per
27+
// frame, based on the Encoding the caller passed at subscribe time.
2528
type BlockResult struct {
26-
Context struct {
27-
Slot uint64
28-
} `json:"context"`
29-
Value struct {
30-
Slot uint64 `json:"slot"`
31-
Err any `json:"err,omitempty"`
32-
Block *rpc.GetBlockResult `json:"block,omitempty"`
33-
} `json:"value"`
29+
Context RPCResponseContext `json:"context"`
30+
Value BlockResultValue `json:"value"`
31+
}
32+
33+
// BlockResultValue mirrors Agave's RpcBlockUpdate with a Go-style union
34+
// over the two block shapes.
35+
//
36+
// Encoding Field populated
37+
// ----------------------------------------------------
38+
// base58 / base64 / base64+zstd Block (*rpc.GetBlockResult)
39+
// jsonParsed ParsedBlock (*rpc.GetParsedBlockResult)
40+
//
41+
// Err / Slot are always decoded regardless of which block field is set.
42+
type BlockResultValue struct {
43+
Slot uint64 `json:"slot"`
44+
Err any `json:"err,omitempty"`
45+
46+
// Block is populated for binary encodings.
47+
Block *rpc.GetBlockResult `json:"block,omitempty"`
48+
49+
// ParsedBlock is populated when Encoding=jsonParsed was requested.
50+
// BlockSubscribe sets this at decode time; it is not read by the
51+
// default json.Unmarshal path.
52+
ParsedBlock *rpc.GetParsedBlockResult `json:"-"`
3453
}
3554

3655
type BlockSubscribeFilter interface {
@@ -76,7 +95,11 @@ type BlockSubscribeOpts struct {
7695

7796
// NOTE: Unstable, disabled by default
7897
//
79-
// Subscribe to receive notification anytime a new block is Confirmed or Finalized.
98+
// BlockSubscribe subscribes to new blocks. Supports every encoding the
99+
// RPC does — base58, base64, base64+zstd, and jsonParsed. The
100+
// BlockResult.Value field is a union: the binary encodings populate
101+
// Block (*rpc.GetBlockResult); jsonParsed populates ParsedBlock
102+
// (*rpc.GetParsedBlockResult).
80103
//
81104
// **This subscription is unstable and only available if the validator was started
82105
// with the `--rpc-pubsub-enable-block-subscription` flag. The format of this
@@ -85,6 +108,8 @@ func (cl *Client) BlockSubscribe(
85108
filter BlockSubscribeFilter,
86109
opts *BlockSubscribeOpts,
87110
) (*BlockSubscription, error) {
111+
isParsed := opts != nil && opts.Encoding == solana.EncodingJSONParsed
112+
88113
var params []any
89114
if filter != nil {
90115
switch v := filter.(type) {
@@ -94,6 +119,7 @@ func (cl *Client) BlockSubscribe(
94119
params = append(params, rpc.M{"mentionsAccountOrProgram": v.Pubkey})
95120
}
96121
}
122+
97123
if opts != nil {
98124
obj := make(rpc.M)
99125
if opts.Commitment != "" {
@@ -102,12 +128,10 @@ func (cl *Client) BlockSubscribe(
102128
if opts.Encoding != "" {
103129
if !solana.IsAnyOfEncodingType(
104130
opts.Encoding,
105-
// Valid encodings:
106-
// solana.EncodingJSON, // TODO
107-
solana.EncodingJSONParsed, // TODO
108131
solana.EncodingBase58,
109132
solana.EncodingBase64,
110133
solana.EncodingBase64Zstd,
134+
solana.EncodingJSONParsed,
111135
) {
112136
return nil, fmt.Errorf("provided encoding is not supported: %s", opts.Encoding)
113137
}
@@ -126,16 +150,13 @@ func (cl *Client) BlockSubscribe(
126150
params = append(params, obj)
127151
}
128152
}
153+
129154
genSub, err := cl.subscribe(
130155
params,
131156
nil,
132157
"blockSubscribe",
133158
"blockUnsubscribe",
134-
func(msg []byte) (any, error) {
135-
var res BlockResult
136-
err := decodeResponseFromMessage(msg, &res)
137-
return &res, err
138-
},
159+
decodeBlockNotification(isParsed),
139160
)
140161
if err != nil {
141162
return nil, err
@@ -145,6 +166,42 @@ func (cl *Client) BlockSubscribe(
145166
}, nil
146167
}
147168

169+
// decodeBlockNotification returns a frame decoder that lands the block
170+
// either in BlockResultValue.Block (binary) or .ParsedBlock (jsonParsed).
171+
// Unexported but package-local so tests can exercise both branches
172+
// without a live server.
173+
func decodeBlockNotification(isParsed bool) func([]byte) (any, error) {
174+
return func(msg []byte) (any, error) {
175+
if isParsed {
176+
var tmp struct {
177+
Context RPCResponseContext `json:"context"`
178+
Value struct {
179+
Slot uint64 `json:"slot"`
180+
Err any `json:"err,omitempty"`
181+
Block *rpc.GetParsedBlockResult `json:"block,omitempty"`
182+
} `json:"value"`
183+
}
184+
if err := decodeResponseFromMessage(msg, &tmp); err != nil {
185+
return nil, err
186+
}
187+
return &BlockResult{
188+
Context: tmp.Context,
189+
Value: BlockResultValue{
190+
Slot: tmp.Value.Slot,
191+
Err: tmp.Value.Err,
192+
ParsedBlock: tmp.Value.Block,
193+
},
194+
}, nil
195+
}
196+
197+
var res BlockResult
198+
if err := decodeResponseFromMessage(msg, &res); err != nil {
199+
return nil, err
200+
}
201+
return &res, nil
202+
}
203+
}
204+
148205
type BlockSubscription struct {
149206
sub *Subscription
150207
}
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
// Copyright 2022 github.com/gagliardetto
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
// blockSubscribe streams whole blocks. BlockSubscribe accepts every
16+
// encoding the RPC does — base58, base64, base64+zstd, and jsonParsed.
17+
// BlockResult.Value is a union: binary encodings populate Block
18+
// (*rpc.GetBlockResult); jsonParsed populates ParsedBlock
19+
// (*rpc.GetParsedBlockResult).
20+
//
21+
// Public Solana RPC endpoints do NOT expose block subscriptions by
22+
// default — the validator must be started with
23+
// `--rpc-pubsub-enable-block-subscription`. Point this at your own
24+
// endpoint, or at a provider that enables it (e.g. Helius, Triton).
25+
package main
26+
27+
import (
28+
"context"
29+
"fmt"
30+
31+
"github.com/gagliardetto/solana-go"
32+
"github.com/gagliardetto/solana-go/rpc"
33+
"github.com/gagliardetto/solana-go/rpc/ws"
34+
)
35+
36+
func main() {
37+
ctx := context.Background()
38+
39+
client, err := ws.Connect(ctx, rpc.MainNetBeta_WS)
40+
if err != nil {
41+
panic(err)
42+
}
43+
defer client.Close()
44+
45+
maxVersion := uint64(0)
46+
rewards := false
47+
48+
// Switch to solana.EncodingJSONParsed to receive human-readable,
49+
// parsed transactions via BlockResult.Value.ParsedBlock instead.
50+
sub, err := client.BlockSubscribe(
51+
ws.NewBlockSubscribeFilterAll(),
52+
&ws.BlockSubscribeOpts{
53+
Commitment: rpc.CommitmentConfirmed,
54+
Encoding: solana.EncodingBase64,
55+
TransactionDetails: rpc.TransactionDetailsFull,
56+
Rewards: &rewards,
57+
MaxSupportedTransactionVersion: &maxVersion,
58+
},
59+
)
60+
if err != nil {
61+
panic(fmt.Errorf("subscribe: %w", err))
62+
}
63+
defer sub.Unsubscribe()
64+
65+
for {
66+
got, err := sub.Recv(ctx)
67+
if err != nil {
68+
panic(err)
69+
}
70+
71+
// Exactly one of Block / ParsedBlock is set, depending on the
72+
// Encoding requested above.
73+
switch {
74+
case got.Value.Block != nil:
75+
fmt.Printf("slot=%d txs=%d (binary)\n",
76+
got.Value.Slot,
77+
len(got.Value.Block.Transactions),
78+
)
79+
case got.Value.ParsedBlock != nil:
80+
fmt.Printf("slot=%d txs=%d (parsed)\n",
81+
got.Value.Slot,
82+
len(got.Value.ParsedBlock.Transactions),
83+
)
84+
default:
85+
fmt.Printf("slot=%d no block payload\n", got.Value.Slot)
86+
}
87+
}
88+
}

rpc/ws/logsSubscribe.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,8 @@ import (
2222
)
2323

2424
type LogResult struct {
25-
Context struct {
26-
Slot uint64
27-
} `json:"context"`
28-
Value struct {
25+
Context RPCResponseContext `json:"context"`
26+
Value struct {
2927
// The transaction signature.
3028
Signature solana.Signature `json:"signature"`
3129
// Error if transaction failed, null if transaction succeeded.

0 commit comments

Comments
 (0)