Skip to content

Commit b74e55d

Browse files
committed
Abstract data fetching layer with DataFetcher interface to support bob backend
Introduce a DataFetcher interface that decouples the processing pipeline from the go-node-connector P2P library, enabling both Qubic core nodes and bob (JSON-RPC indexer) as data sources. - Add DataFetcher interface with TickStatus and SystemMetadata types - Add NodeDataFetcher wrapping existing pool with Main/Alt client pattern - Add BobDataFetcher fetching from bob's REST and JSON-RPC API - Refactor processor and validator to use DataFetcher instead of QubicClient - Add skipScoreCheck to quorum validation for backends without TargetTickVoteSignature - Add bob-path for computor fetching (once per epoch, skip signature-change detection) - Skip arbitrator signature validation for bob-sourced computors - Skip tx status (moneyFlew) for bob with TODO for log-based recomputation - Add backend selection via config (BACKEND_TYPE=node|bob, BACKEND_BOBURL) - Add BOB_GAPS.md documenting missing features for future sprints
1 parent 4dcfe11 commit b74e55d

18 files changed

Lines changed: 1405 additions & 398 deletions

.github/workflows/push-docker-custom.yaml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ name: Deploy prod images to GHCR
33
on:
44
push:
55
branches:
6-
- 'fix/tx-status-validation'
6+
- 'feature/bob-connector'
77

88
jobs:
99
push-store-image:
@@ -21,5 +21,5 @@ jobs:
2121

2222
- name: 'Build Inventory Image'
2323
run: |
24-
docker build . --tag ghcr.io/qubic/qubic-archiver-v2:snapshot
25-
docker push ghcr.io/qubic/qubic-archiver-v2:snapshot
24+
docker build . --tag ghcr.io/qubic/qubic-archiver-v2:bob-connector
25+
docker push ghcr.io/qubic/qubic-archiver-v2:bob-connector

BOB_GAPS.md

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
# Bob Connector - Missing Features & Simplifications
2+
3+
This document tracks what's missing or simplified when using bob as the data source
4+
for the archiver, compared to the direct Qubic node connector. These are candidates
5+
for future sprint work.
6+
7+
## 1. TargetTickVoteSignature (quorum vote score filtering)
8+
9+
**What**: Bob doesn't expose `SystemInfo.TargetTickVoteSignature`.
10+
11+
**Current behavior**: The archiver skips the vote signature score check
12+
(`skipScoreCheck=true`) when using bob. Cryptographic signature verification
13+
of individual votes still runs.
14+
15+
**Impact**: Minor. The score check is an optimization/filtering heuristic, not a
16+
security-critical validation. Bob has already validated the votes.
17+
18+
**Future fix**: Expose `TargetTickVoteSignature` in bob's `/status` or `/tick` endpoint.
19+
20+
---
21+
22+
## 2. ComputorPacketSignature (mid-epoch computor change detection)
23+
24+
**What**: Bob doesn't expose `SystemInfo.ComputorPacketSignature`.
25+
26+
**Current behavior**: The archiver fetches the computor list once per epoch and
27+
caches it. It does NOT detect mid-epoch computor list changes.
28+
29+
**Impact**: Low. Computor list changes mid-epoch are rare. If they occur, the
30+
archiver may validate ticks against stale computor keys until the next epoch.
31+
32+
**Future fix**: Expose `ComputorPacketSignature` in bob's status endpoint, OR
33+
add a computor-change notification mechanism.
34+
35+
---
36+
37+
## 3. Arbitrator signature on computor list
38+
39+
**What**: Bob returns computor identities (60-char strings) via `qubic_getComputors`,
40+
but does NOT return the raw 64-byte arbitrator signature on the computor list.
41+
42+
**Current behavior**: The archiver skips arbitrator signature validation for
43+
bob-sourced computors and marks them as pre-validated (`Validated: true`).
44+
45+
**Impact**: Low. Bob has already validated the computor list. The arbitrator
46+
signature is a trust chain verification that's redundant when trusting bob.
47+
48+
**Future fix**: Expose the raw computor list signature in bob's `qubic_getComputors`
49+
response.
50+
51+
---
52+
53+
## 4. Transaction status (moneyFlew)
54+
55+
**What**: Bob has per-transaction `executed` status from log events, but does NOT
56+
expose the `MoneyFlew` bit array that the node connector provides.
57+
58+
**Current behavior**: The archiver returns empty transaction status (all moneyFlew
59+
bits set to false) when using bob. The `txstatus` validation is effectively skipped.
60+
61+
**Impact**: Medium. The `moneyFlew` field is used by API consumers to determine if
62+
a transaction's side effects were executed. Currently all transactions appear as
63+
"not executed" when using bob.
64+
65+
**Future fix**: Use `qubic_getTransactionReceipt` to fetch per-transaction execution
66+
status, then reconstruct the MoneyFlew bit array from the `executed` field.
67+
68+
---
69+
70+
## 5. Transaction fetching efficiency
71+
72+
**What**: Bob's `GET /tick/{tick}` returns transaction digests but not full transaction
73+
bodies. The archiver must make an additional RPC call (`qubic_getTickByNumber` with
74+
`includeTransactions=true`) to get full transactions with signatures.
75+
76+
**Impact**: Performance. Two HTTP calls per tick instead of one.
77+
78+
**Future fix**: Add full transaction bodies (with signatures) to the REST
79+
`GET /tick/{tick}` response, or add a query parameter to include them.
80+
81+
---
82+
83+
## 6. SystemInfo.InitialTick in GetSystemMetadata
84+
85+
**What**: `GetSystemMetadata` for bob currently returns `InitialTick: 0`. The
86+
`InitialTick` is instead obtained from `GetTickStatus` which reads `/status`.
87+
88+
**Current behavior**: The validator gets `InitialTick` from `GetTickStatus` and
89+
passes it to computor validation. However, `GetSystemMetadata` returns 0 for
90+
`InitialTick` which means the computor's initial `TickNumber` will be set to 0
91+
on first fetch.
92+
93+
**Impact**: Very low. The computor TickNumber is metadata only, not used in
94+
validation logic.
95+
96+
**Future fix**: Populate `InitialTick` from the cached status response in
97+
`GetSystemMetadata`.

main.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/qubic/go-archiver-v2/db"
2222
metrics "github.com/qubic/go-archiver-v2/metrics"
2323
"github.com/qubic/go-archiver-v2/network"
24+
"github.com/qubic/go-archiver-v2/network/bob"
2425
"github.com/qubic/go-archiver-v2/processor"
2526
"github.com/qubic/go-archiver-v2/protobuf"
2627
"github.com/qubic/go-archiver-v2/validator"
@@ -70,6 +71,10 @@ func run() error {
7071
Metrics struct {
7172
Namespace string `conf:"default:qubic_archiver_v2"`
7273
}
74+
Backend struct {
75+
Type string `conf:"default:node"` // "node" or "bob"
76+
BobURL string `conf:"default:http://127.0.0.1:40420"`
77+
}
7378
}
7479

7580
help, err := conf.Parse(prefix, &cfg)
@@ -115,17 +120,34 @@ func run() error {
115120
}
116121
defer dbPool.Close()
117122

118-
clientPool, err := network.NewNodeConnectorPool(qubic.PoolConfig{
119-
InitialCap: cfg.Pool.InitialCap,
120-
MaxCap: cfg.Pool.MaxCap,
121-
MaxIdle: cfg.Pool.MaxIdle,
122-
IdleTimeout: cfg.Pool.IdleTimeout,
123-
NodeFetcherUrl: cfg.Pool.NodeFetcherUrl,
124-
NodeFetcherTimeout: cfg.Pool.NodeFetcherTimeout,
125-
NodePort: cfg.Pool.NodePort,
126-
})
127-
if err != nil {
128-
return fmt.Errorf("creating node pool: %w", err)
123+
// create data fetcher factory based on backend type
124+
var fetcherFactory func() (network.DataFetcher, error)
125+
126+
switch cfg.Backend.Type {
127+
case "node":
128+
clientPool, err := network.NewNodeConnectorPool(qubic.PoolConfig{
129+
InitialCap: cfg.Pool.InitialCap,
130+
MaxCap: cfg.Pool.MaxCap,
131+
MaxIdle: cfg.Pool.MaxIdle,
132+
IdleTimeout: cfg.Pool.IdleTimeout,
133+
NodeFetcherUrl: cfg.Pool.NodeFetcherUrl,
134+
NodeFetcherTimeout: cfg.Pool.NodeFetcherTimeout,
135+
NodePort: cfg.Pool.NodePort,
136+
})
137+
if err != nil {
138+
return fmt.Errorf("creating node pool: %w", err)
139+
}
140+
fetcherFactory = func() (network.DataFetcher, error) {
141+
return network.NewNodeDataFetcher(clientPool)
142+
}
143+
case "bob":
144+
bobFetcher := bob.NewBobDataFetcher(cfg.Backend.BobURL)
145+
fetcherFactory = func() (network.DataFetcher, error) {
146+
return bobFetcher, nil
147+
}
148+
log.Printf("main: Using bob backend at [%s]", cfg.Backend.BobURL)
149+
default:
150+
return fmt.Errorf("unknown backend type: %s", cfg.Backend.Type)
129151
}
130152

131153
// start processor
@@ -135,7 +157,7 @@ func run() error {
135157
return fmt.Errorf("calculating arbitrator public key from [%s]: %w", cfg.Qubic.ArbitratorIdentity, err)
136158
}
137159
tickValidator := validator.NewValidator(arbitratorPubKey, cfg.Qubic.EnableTxStatusAddon)
138-
proc := processor.NewProcessor(clientPool, dbPool, tickValidator, processor.Config{
160+
proc := processor.NewProcessor(fetcherFactory, dbPool, tickValidator, processor.Config{
139161
ProcessTickTimeout: cfg.Qubic.ProcessTickTimeout,
140162
}, m)
141163

network/bob/client.go

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
package bob
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/json"
7+
"fmt"
8+
"io"
9+
"net/http"
10+
"time"
11+
)
12+
13+
// Client is an HTTP client for bob's REST and JSON-RPC 2.0 API.
14+
type Client struct {
15+
baseURL string
16+
httpClient *http.Client
17+
}
18+
19+
// NewClient creates a new bob HTTP client.
20+
func NewClient(baseURL string) *Client {
21+
return &Client{
22+
baseURL: baseURL,
23+
httpClient: &http.Client{
24+
Timeout: 30 * time.Second,
25+
},
26+
}
27+
}
28+
29+
type jsonRPCRequest struct {
30+
JSONRPC string `json:"jsonrpc"`
31+
Method string `json:"method"`
32+
Params interface{} `json:"params"`
33+
ID int `json:"id"`
34+
}
35+
36+
type jsonRPCResponse struct {
37+
JSONRPC string `json:"jsonrpc"`
38+
Result json.RawMessage `json:"result"`
39+
Error *jsonRPCError `json:"error,omitempty"`
40+
ID int `json:"id"`
41+
}
42+
43+
type jsonRPCError struct {
44+
Code int `json:"code"`
45+
Message string `json:"message"`
46+
}
47+
48+
func (e *jsonRPCError) Error() string {
49+
return fmt.Sprintf("RPC error %d: %s", e.Code, e.Message)
50+
}
51+
52+
// RPCCall makes a JSON-RPC 2.0 call to bob.
53+
func (c *Client) RPCCall(ctx context.Context, method string, params interface{}) (json.RawMessage, error) {
54+
reqBody := jsonRPCRequest{
55+
JSONRPC: "2.0",
56+
Method: method,
57+
Params: params,
58+
ID: 1,
59+
}
60+
61+
body, err := json.Marshal(reqBody)
62+
if err != nil {
63+
return nil, fmt.Errorf("marshalling RPC request: %w", err)
64+
}
65+
66+
req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/qubic", bytes.NewReader(body))
67+
if err != nil {
68+
return nil, fmt.Errorf("creating RPC request: %w", err)
69+
}
70+
req.Header.Set("Content-Type", "application/json")
71+
72+
resp, err := c.httpClient.Do(req)
73+
if err != nil {
74+
return nil, fmt.Errorf("executing RPC request: %w", err)
75+
}
76+
defer resp.Body.Close()
77+
78+
respBody, err := io.ReadAll(resp.Body)
79+
if err != nil {
80+
return nil, fmt.Errorf("reading RPC response: %w", err)
81+
}
82+
83+
var rpcResp jsonRPCResponse
84+
if err := json.Unmarshal(respBody, &rpcResp); err != nil {
85+
return nil, fmt.Errorf("unmarshalling RPC response: %w", err)
86+
}
87+
88+
if rpcResp.Error != nil {
89+
return nil, rpcResp.Error
90+
}
91+
92+
return rpcResp.Result, nil
93+
}
94+
95+
// RESTGet makes a GET request to a bob REST endpoint.
96+
func (c *Client) RESTGet(ctx context.Context, path string) ([]byte, error) {
97+
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+path, nil)
98+
if err != nil {
99+
return nil, fmt.Errorf("creating REST request: %w", err)
100+
}
101+
102+
resp, err := c.httpClient.Do(req)
103+
if err != nil {
104+
return nil, fmt.Errorf("executing REST request: %w", err)
105+
}
106+
defer resp.Body.Close()
107+
108+
body, err := io.ReadAll(resp.Body)
109+
if err != nil {
110+
return nil, fmt.Errorf("reading REST response: %w", err)
111+
}
112+
113+
if resp.StatusCode != http.StatusOK {
114+
return nil, fmt.Errorf("REST request failed with status %d: %s", resp.StatusCode, string(body))
115+
}
116+
117+
return body, nil
118+
}

0 commit comments

Comments
 (0)