Skip to content

Commit 4706a9f

Browse files
authored
Node: Reobserve with custom endpoint (#4260)
* Node: Reobserve with custom endpoint * Code review rework * Node/Solana: Code review rework * Code review rework * Verify that we are connecting to the correct chain
1 parent 8a69ffd commit 4706a9f

26 files changed

+1105
-578
lines changed

node/cmd/guardiand/adminclient.go

+55
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ import (
2323
"golang.org/x/crypto/sha3"
2424

2525
"github.com/certusone/wormhole/node/pkg/guardiansigner"
26+
"github.com/certusone/wormhole/node/pkg/node"
2627
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
2728
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
2829
"github.com/wormhole-foundation/wormhole/sdk"
@@ -62,6 +63,7 @@ func init() {
6263
DumpVAAByMessageID.Flags().AddFlagSet(pf)
6364
DumpRPCs.Flags().AddFlagSet(pf)
6465
SendObservationRequest.Flags().AddFlagSet(pf)
66+
ReobserveWithEndpoint.Flags().AddFlagSet(pf)
6567
ClientChainGovernorStatusCmd.Flags().AddFlagSet(pf)
6668
ClientChainGovernorReloadCmd.Flags().AddFlagSet(pf)
6769
ClientChainGovernorDropPendingVAACmd.Flags().AddFlagSet(pf)
@@ -84,6 +86,7 @@ func init() {
8486
AdminCmd.AddCommand(DumpVAAByMessageID)
8587
AdminCmd.AddCommand(DumpRPCs)
8688
AdminCmd.AddCommand(SendObservationRequest)
89+
AdminCmd.AddCommand(ReobserveWithEndpoint)
8790
AdminCmd.AddCommand(ClientChainGovernorStatusCmd)
8891
AdminCmd.AddCommand(ClientChainGovernorReloadCmd)
8992
AdminCmd.AddCommand(ClientChainGovernorDropPendingVAACmd)
@@ -136,6 +139,13 @@ var SendObservationRequest = &cobra.Command{
136139
Args: cobra.ExactArgs(2),
137140
}
138141

142+
var ReobserveWithEndpoint = &cobra.Command{
143+
Use: "reobserve-with-endpoint [CHAIN_ID|CHAIN_NAME] [TX_HASH_HEX] [CUSTOM_URL]",
144+
Short: "Performs a local reobservation for the given chain ID and chain-specific tx_hash using the specified endpoint",
145+
Run: runReobserveWithEndpoint,
146+
Args: cobra.ExactArgs(3),
147+
}
148+
139149
var ClientChainGovernorStatusCmd = &cobra.Command{
140150
Use: "governor-status",
141151
Short: "Displays the status of the chain governor",
@@ -415,6 +425,51 @@ func runSendObservationRequest(cmd *cobra.Command, args []string) {
415425
}
416426
}
417427

428+
func runReobserveWithEndpoint(cmd *cobra.Command, args []string) {
429+
chainID, err := parseChainID(args[0])
430+
if err != nil {
431+
log.Fatalf("invalid chain ID: %v", err)
432+
}
433+
434+
// Support tx with or without leading 0x.
435+
txHash, err := hex.DecodeString(strings.TrimPrefix(args[1], "0x"))
436+
if err != nil {
437+
txHash, err = base58.Decode(args[1])
438+
if err != nil {
439+
log.Fatalf("invalid transaction hash (neither hex nor base58): %v", err)
440+
}
441+
}
442+
443+
url := args[2]
444+
if valid := node.ValidateURL(url, []string{"http", "https"}); !valid {
445+
log.Fatalf(`invalid url, must be "http" or "https"`)
446+
}
447+
448+
// Allow extra time since the watcher can block on the reobservation.
449+
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
450+
defer cancel()
451+
452+
conn, c, err := getAdminClient(ctx, *clientSocketPath)
453+
if err != nil {
454+
log.Fatalf("failed to get admin client: %v", err)
455+
}
456+
defer conn.Close()
457+
458+
resp, err := c.ReobserveWithEndpoint(ctx, &nodev1.ReobserveWithEndpointRequest{
459+
ChainId: uint32(chainID),
460+
TxHash: txHash,
461+
Url: url,
462+
})
463+
if err != nil {
464+
log.Fatalf("failed to send observation request with endpoint: %v", err)
465+
}
466+
if resp.NumObservations == 0 {
467+
fmt.Println("Did not reobserve anything")
468+
} else {
469+
fmt.Println("Reobserved", resp.NumObservations, "messages")
470+
}
471+
}
472+
418473
func runDumpRPCs(cmd *cobra.Command, args []string) {
419474
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
420475
defer cancel()

node/pkg/adminrpc/adminserver.go

+18
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/certusone/wormhole/node/pkg/guardiansigner"
2222
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
23+
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
2324
"github.com/holiman/uint256"
2425
"github.com/prometheus/client_golang/prometheus"
2526
"github.com/prometheus/client_golang/prometheus/promauto"
@@ -65,6 +66,7 @@ type nodePrivilegedService struct {
6566
guardianSigner guardiansigner.GuardianSigner
6667
guardianAddress ethcommon.Address
6768
rpcMap map[string]string
69+
reobservers interfaces.Reobservers
6870
}
6971

7072
func NewPrivService(
@@ -78,6 +80,7 @@ func NewPrivService(
7880
guardianSigner guardiansigner.GuardianSigner,
7981
guardianAddress ethcommon.Address,
8082
rpcMap map[string]string,
83+
reobservers interfaces.Reobservers,
8184

8285
) *nodePrivilegedService {
8386
return &nodePrivilegedService{
@@ -91,6 +94,7 @@ func NewPrivService(
9194
guardianSigner: guardianSigner,
9295
guardianAddress: guardianAddress,
9396
rpcMap: rpcMap,
97+
reobservers: reobservers,
9498
}
9599
}
96100

@@ -962,6 +966,20 @@ func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req
962966
return &nodev1.SendObservationRequestResponse{}, nil
963967
}
964968

969+
func (s *nodePrivilegedService) ReobserveWithEndpoint(ctx context.Context, req *nodev1.ReobserveWithEndpointRequest) (*nodev1.ReobserveWithEndpointResponse, error) {
970+
watcher := s.reobservers[vaa.ChainID(req.ChainId)]
971+
if watcher == nil {
972+
return nil, status.Errorf(codes.Internal, "chain %d does not support reobservation by endpoint", req.ChainId)
973+
}
974+
975+
numObservations, err := watcher.Reobserve(ctx, vaa.ChainID(req.ChainId), req.TxHash, req.Url)
976+
if err != nil {
977+
return nil, status.Errorf(codes.Internal, "reobservation failed: %v", err)
978+
}
979+
980+
return &nodev1.ReobserveWithEndpointResponse{NumObservations: numObservations}, nil
981+
}
982+
965983
func (s *nodePrivilegedService) ChainGovernorStatus(ctx context.Context, req *nodev1.ChainGovernorStatusRequest) (*nodev1.ChainGovernorStatusResponse, error) {
966984
if s.governor == nil {
967985
return nil, fmt.Errorf("chain governor is not enabled")

node/pkg/node/adminServiceRunnable.go

+3
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import (
1818
"github.com/certusone/wormhole/node/pkg/publicrpc"
1919
"github.com/certusone/wormhole/node/pkg/supervisor"
2020
"github.com/certusone/wormhole/node/pkg/watchers/evm/connectors"
21+
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
2122
"go.uber.org/zap"
2223

2324
ethcommon "github.com/ethereum/go-ethereum/common"
@@ -37,6 +38,7 @@ func adminServiceRunnable(
3738
ethRpc *string,
3839
ethContract *string,
3940
rpcMap map[string]string,
41+
reobservers interfaces.Reobservers,
4042
) (supervisor.Runnable, error) {
4143
// Delete existing UNIX socket, if present.
4244
fi, err := os.Stat(socketPath)
@@ -92,6 +94,7 @@ func adminServiceRunnable(
9294
guardianSigner,
9395
ethcrypto.PubkeyToAddress(guardianSigner.PublicKey(ctx)),
9496
rpcMap,
97+
reobservers,
9598
)
9699

97100
publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst, gov)

node/pkg/node/node.go

+3
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
1414
"github.com/certusone/wormhole/node/pkg/query"
1515
"github.com/certusone/wormhole/node/pkg/supervisor"
16+
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
1617
"github.com/wormhole-foundation/wormhole/sdk/vaa"
1718

1819
"go.uber.org/zap"
@@ -74,6 +75,7 @@ type G struct {
7475
// runnables
7576
runnablesWithScissors map[string]supervisor.Runnable
7677
runnables map[string]supervisor.Runnable
78+
reobservers interfaces.Reobservers
7779

7880
// various channels
7981
// Outbound gossip message queues (need to be read/write because p2p needs read/write)
@@ -140,6 +142,7 @@ func (g *G) initializeBasic(rootCtxCancel context.CancelFunc) {
140142
// allocate maps
141143
g.runnablesWithScissors = make(map[string]supervisor.Runnable)
142144
g.runnables = make(map[string]supervisor.Runnable)
145+
g.reobservers = make(interfaces.Reobservers)
143146
}
144147

145148
// applyOptions applies `options` to the GuardianNode.

node/pkg/node/options.go

+6-1
Original file line numberDiff line numberDiff line change
@@ -440,14 +440,18 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
440440
wc.SetL1Finalizer(l1watcher)
441441
}
442442

443-
l1finalizer, runnable, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.chainQueryReqC[wc.GetChainID()], chainQueryResponseC[wc.GetChainID()], g.setC.writeC, g.env)
443+
l1finalizer, runnable, reobserver, err := wc.Create(chainMsgC[wc.GetChainID()], chainObsvReqC[wc.GetChainID()], g.chainQueryReqC[wc.GetChainID()], chainQueryResponseC[wc.GetChainID()], g.setC.writeC, g.env)
444444

445445
if err != nil {
446446
return fmt.Errorf("error creating watcher: %w", err)
447447
}
448448

449449
g.runnablesWithScissors[watcherName] = runnable
450450
watchers[wc.GetNetworkID()] = l1finalizer
451+
452+
if reobserver != nil {
453+
g.reobservers[wc.GetChainID()] = reobserver
454+
}
451455
}
452456

453457
if ibcWatcherConfig != nil {
@@ -509,6 +513,7 @@ func GuardianOptionAdminService(socketPath string, ethRpc *string, ethContract *
509513
ethRpc,
510514
ethContract,
511515
rpcMap,
516+
g.reobservers,
512517
)
513518
if err != nil {
514519
return fmt.Errorf("failed to create admin service: %w", err)

node/pkg/node/url_verification.go

+2-2
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ func hasKnownSchemePrefix(urlStr string) bool {
2020
return false
2121
}
2222

23-
func validateURL(urlStr string, validSchemes []string) bool {
23+
func ValidateURL(urlStr string, validSchemes []string) bool {
2424
// If no scheme is required, validate host:port format
2525
if len(validSchemes) == 1 && validSchemes[0] == "" {
2626
host, port, err := net.SplitHostPort(urlStr)
@@ -69,7 +69,7 @@ func RegisterFlagWithValidationOrFail(cmd *cobra.Command, name string, descripti
6969
return
7070
}
7171

72-
if valid := validateURL(*flagValue, expectedSchemes); !valid {
72+
if valid := ValidateURL(*flagValue, expectedSchemes); !valid {
7373
log.Fatalf("Invalid format for flag --%s. Expected format: %s. Example: '%s'", name, formatExample, example)
7474
}
7575
})

node/pkg/node/url_verification_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func TestValidateURL(t *testing.T) {
2929
}
3030

3131
for _, test := range tests {
32-
result := validateURL(test.urlStr, test.validSchemes)
32+
result := ValidateURL(test.urlStr, test.validSchemes)
3333
assert.Equal(t, test.expected, result)
3434
}
3535
}

0 commit comments

Comments
 (0)