Skip to content

Commit 6704730

Browse files
authored
chore: integrate Fibre into Talis (#194)
## Overview Closes #185 <details> <summary>Claude generated PR. Initial impelmentation plan but the implementation changed a bit from it with time</summary> # Plan: Add Fibre Client Support to Talis ## Context Talis (`./tools/talis`) needs fibre DA tooling equivalent to its existing `txsim` command. Three new talis commands plus a standalone binary: 1. `setup-fibre` -- post-deploy step: SSH into each validator to register its fibre host address + fund its escrow account 2. `fibre-txsim` -- start the standalone `fibre-txsim` binary on remote validators via SSH + tmux 3. `fibre-throughput` -- run locally: monitor blocks in real-time, decode `MsgPayForFibre` and `MsgPayForBlobs` txs, print throughput per block, optionally write JSONL traces Additionally, a standalone load-generation binary: 4. `tools/fibre-txsim/` -- connects to a validator's gRPC endpoint and submits blobs via the Fibre protocol No proto changes, no module changes, no genesis modifiers. Registration and escrow funding happen at runtime via SSH after deploy. ## Port Layout - App gRPC: `0.0.0.0:9091` (fibre server + tx submission + valaddr queries) - CometBFT RPC: `0.0.0.0:26657` (block queries) --- ## New Files ### 1. `tools/talis/fibre_setup.go` -- Post-deploy Fibre Setup New `setup-fibre` command. SSHes into each validator and runs two txs: register fibre host + fund escrow. Uses `runScriptInTMux` with parallel workers. **Flags:** - `--directory` (string, default `.`) - `--ssh-key-path` (string) - `--escrow-amount` (string, default `200000000000000utia`) - `--fibre-port` (int, default `9091`) - `--fees` (string, default `5000utia`) - `--workers` (int, default `10`) **Script per validator:** ```bash celestia-appd tx valaddr set-host dns:///<public_ip>:<fibre_port> \ --from validator --keyring-backend=test --home .celestia-app \ --chain-id <chain_id> --fees <fees> --yes; \ sleep 20; \ celestia-appd tx fibre deposit-to-escrow <escrow_amount> \ --from validator --keyring-backend=test --home .celestia-app \ --chain-id <chain_id> --fees <fees> --yes ``` Each validator runs its own script via `runScriptInTMux` with session name `"setup-fibre"`. Workers run in parallel via a semaphore + WaitGroup. Waits ~40 seconds after all workers complete for transactions to finalize. ### 2. `tools/talis/fibre_txsim.go` -- Remote Fibre Blob Spammer Launcher New `fibre-txsim` command. SSHes into validators and starts the `fibre-txsim` binary (already deployed via `make build-talis-bins`) inside tmux sessions. **Flags:** - `--directory` (string, default `.`) - `--ssh-key-path` (string) - `--instances` (int, default `1`) -- number of validators to start on - `--concurrency` (int, default `1`) -- concurrent blob submissions per instance - `--blob-size` (int, default `1000000`) - `--interval` (duration, default `0` = no delay) - `--duration` (duration, default `0` = until killed) - `--key-name` (string, default `validator`) **Logic:** 1. Load config, select first N validators based on `--instances` 2. Build the remote command string with all flags 3. Start tmux sessions via `runScriptInTMux` with session name `"fibre-txsim"` 4. Print summary with session name, log file path, and validator list ### 3. `tools/talis/fibre_throughput.go` -- Real-time Throughput Monitor New `fibre-throughput` command. Runs locally, polls blocks from a validator's RPC. Tracks both `MsgPayForFibre` (PFF) and `MsgPayForBlobs` (PFB) transactions. Optionally writes structured JSONL traces. **Flags:** - `--directory` (string, default `.`) - `--rpc-endpoint` (string) -- defaults to first validator IP:26657 - `--duration` (duration, default `0` = until Ctrl+C) - `--start-height` (int64, default `0` = latest + 1) -- block height to start from - `--with-traces` (bool, default `false`) -- enable JSONL trace file output - `--traces-dir` (string, default `traces/throughput`) -- directory for trace files **Trace record struct:** ```go type blockTrace struct { Height int64 `json:"height"` Timestamp string `json:"timestamp"` BlockTimeSec float64 `json:"block_time_sec"` PFFCount int `json:"pff_count"` PFBCount int `json:"pfb_count"` TotalPFFBytes int64 `json:"total_pff_bytes"` TotalPFBBytes int64 `json:"total_pfb_bytes"` PFFThroughputMBs float64 `json:"pff_throughput_mbs"` PFBThroughputMBs float64 `json:"pfb_throughput_mbs"` } ``` **Logic:** 1. Load config, resolve RPC endpoint (first validator IP:26657) 2. Create CometBFT HTTP client + tx decoder via `encCfg.TxConfig.TxDecoder()` 3. If `--start-height > 0`, start from that height; otherwise query latest height + 1 4. If `--with-traces`, create traces directory via `os.MkdirAll`, open timestamped JSONL file (e.g. `throughput_2026-02-18T20:59:35Z.jsonl`), create `json.NewEncoder` 5. Poll for new blocks (1s interval) 6. For each block: decode all txs, type-assert for `*fibretypes.MsgPayForFibre` (sum `PaymentPromise.BlobSize`) and `*blobtypes.MsgPayForBlobs` (sum `BlobSizes`) 7. Compute per-block throughput: `bytes / blockTimeDelta / (1024 * 1024)` for both PFF and PFB 8. Print one line per block: `height=N pff_txs=N pfb_txs=N pff_bytes=NMB pfb_bytes=NMB block_time=Ns pff_throughput=NMB/s pfb_throughput=NMB/s` 9. If traces enabled, encode `blockTrace` struct via `encoder.Encode(trace)` 10. On exit: print summary (avg throughput, total bytes, total blocks), close trace file ### 4. `tools/fibre-txsim/main.go` -- Standalone Fibre Blob Spammer Standalone binary that runs on validator nodes. Connects to a validator's gRPC endpoint, creates random blobs, and submits them via `fibre.Client.Put()`. **Flags:** - `--chain-id` (string, required) - `--grpc-endpoint` (string, default `localhost:9091`) - `--keyring-dir` (string, default `.celestia-app`) - `--key-name` (string, default `validator`) - `--blob-size` (int, default `1000000`) - `--concurrency` (int, default `1`) - `--interval` (duration, default `0` = no delay) - `--duration` (duration, default `0` = until killed) **Logic:** 1. Set up fibre client: - `encoding.MakeConfig(app.ModuleEncodingRegisters...)` - `keyring.New(app.Name, keyring.BackendTest, keyringDir, nil, encCfg.Codec)` - `grpc.NewClient(endpoint, insecure, maxMsgSize)` - `fibregrpc.NewSetGetter(coregrpc.NewBlockAPIClient(grpcConn))` - `fibregrpc.NewHostRegistry(valaddrtypes.NewQueryClient(grpcConn))` - `user.SetupTxClient(ctx, kr, grpcConn, encCfg, user.WithDefaultAccount(keyName))` - `fibre.NewClient(txClient, kr, valGet, hostReg, clientCfg)` 2. Spam loop: bounded concurrency via semaphore, optional ticker-based pacing via `--interval`, random namespace + random blob data 3. Print each result to stdout as it completes (height, tx hash, latency, error) 4. On Ctrl+C / timeout: print summary (total sent, successes, failures, avg latency) Built for Linux via `make build-talis-bins` and deployed to validators in the payload. --- ## Modified Files ### 5. `tools/talis/main.go` Add three new commands: ```go setupFibreCmd(), fibreTxsimCmd(), fibreThroughputCmd(), ``` --- ## Documentation ### 6. `tools/fibre-txsim/README.md` Documents the standalone `fibre-txsim` binary: what it does, how to build, all flags, how it works, and typical deployment via `talis fibre-txsim`. ### 7. `tools/talis/fibre.md` End-to-end guide for fibre experiments. References the main `README.md` for network setup, then documents the three fibre-specific steps: 1. `talis setup-fibre` -- with flag table 2. `talis fibre-txsim` -- with flag table 3. `talis fibre-throughput` -- with flag table, trace file format documentation (JSONL schema), and `--start-height` for replaying past blocks --- ## Key Reference Files - `tools/fibre-txsim/main.go` -- fibre client setup pattern (grpc, keyring, fibre.NewClient, Put) - `tools/talis/txsim.go` -- cobra command pattern + `runScriptInTMux` usage - `tools/talis/execution.go` -- `runScriptInTMux()` implementation - `tools/talis/status.go` -- CometBFT RPC client pattern - `scripts/single-node-fibre.sh` -- `set-host` + sleep pattern - `fibre/client.go` + `fibre/client_put.go` -- `NewClient()` and `Put()` API - `x/fibre/types/tx.pb.go` -- `MsgPayForFibre` with `PaymentPromise.BlobSize` - `x/blob/types/tx.pb.go` -- `MsgPayForBlobs` with `BlobSizes []uint32` ## Workflow ``` 1. talis init 2. talis add 3. talis up 4. talis genesis 5. talis deploy 6. talis setup-fibre -- registers hosts + funds escrow 7. talis fibre-txsim -- starts fibre-txsim on remote validators 8. talis fibre-throughput -- monitors throughput (with optional JSONL traces) 9. talis download / talis upload-data 10. talis down ``` ## Verification 1. Build talis: `go build ./tools/talis/` 2. Build fibre-txsim: `go build ./tools/fibre-txsim/` 3. Run throughput without traces: `talis fibre-throughput --directory <dir>` -- prints PFF + PFB stats per block 4. Run throughput with traces: `talis fibre-throughput --directory <dir> --with-traces` -- creates `traces/throughput/throughput_<timestamp>.jsonl` 5. Run throughput from specific height: `talis fibre-throughput --directory <dir> --start-height 100` 6. Verify JSONL: each line is valid JSON with all `blockTrace` fields </detail>
1 parent a748d29 commit 6704730

File tree

10 files changed

+905
-1
lines changed

10 files changed

+905
-1
lines changed

Makefile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -360,12 +360,13 @@ txsim-build-docker:
360360
docker build -t ghcr.io/celestiaorg/txsim -f docker/txsim/Dockerfile .
361361
.PHONY: txsim-build-docker
362362

363-
## build-talis-bins: Build celestia-appd, txsim, and latency-monitor binaries for talis VMs (ubuntu 22.04 LTS)
363+
## build-talis-bins: Build celestia-appd, txsim, latency-monitor, and fibre-txsim binaries for talis VMs (ubuntu 22.04 LTS)
364364
build-talis-bins:
365365
mkdir -p build
366366
GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS_STANDALONE)" -o build/celestia-appd ./cmd/celestia-appd
367367
GOOS=linux GOARCH=amd64 go build -o build/txsim ./test/cmd/txsim
368368
GOOS=linux GOARCH=amd64 go build -o build/latency-monitor ./tools/latency-monitor 2>/dev/null || echo "latency-monitor not found, skipping"
369+
GOOS=linux GOARCH=amd64 go build -o build/fibre-txsim ./tools/fibre-txsim
369370
.PHONY: build-talis-bins
370371

371372

tools/fibre-txsim/README.md

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
# fibre-txsim
2+
3+
A load-generation tool that submits blobs to a Celestia network through the Fibre protocol. It connects to a validator's gRPC endpoint, creates random blobs, and sends them via `MsgPayForFibre` as fast as possible (or at a configured interval).
4+
5+
This binary is built for Linux and deployed to validator nodes by `make build-talis-bins`. It is started remotely via the `talis fibre-txsim` command.
6+
7+
## Build
8+
9+
```sh
10+
# Cross-compile for talis VMs (Linux amd64)
11+
make build-talis-bins
12+
13+
# Build for your local machine (useful for local testing)
14+
go build -o fibre-txsim ./tools/fibre-txsim/
15+
```
16+
17+
## Usage
18+
19+
```sh
20+
fibre-txsim \
21+
--chain-id <chain-id> \
22+
--grpc-endpoint localhost:9091 \
23+
--keyring-dir .celestia-app \
24+
--key-name validator \
25+
--blob-size 1000000 \
26+
--concurrency 4 \
27+
--interval 0s
28+
```
29+
30+
## Flags
31+
32+
| Flag | Default | Description |
33+
|-------------------|------------------|-----------------------------------------------------------------------------|
34+
| `--chain-id` | *(required)* | Chain ID of the network |
35+
| `--grpc-endpoint` | `localhost:9091` | gRPC endpoint of the validator |
36+
| `--keyring-dir` | `.celestia-app` | Path to the keyring directory |
37+
| `--key-name` | `validator` | Key name in the keyring |
38+
| `--blob-size` | `1000000` | Size of each blob in bytes |
39+
| `--concurrency` | `1` | Number of concurrent blob submissions |
40+
| `--interval` | `0` | Delay between blob submissions (`0` = no delay, submit as fast as possible) |
41+
| `--duration` | `0` | How long to run (`0` = until killed with Ctrl+C) |
42+
43+
## How it works
44+
45+
1. Connects to a validator via gRPC and initializes a Fibre client.
46+
2. Spawns up to `--concurrency` goroutines that each:
47+
- Generate a random namespace and random blob data of `--blob-size` bytes.
48+
- Call `fibreClient.Put()` to submit the blob through the Fibre protocol.
49+
- Log the resulting block height, tx hash, and submission latency.
50+
3. On shutdown (Ctrl+C or `--duration` elapsed), prints a summary with total sent, successes, failures, and average latency.
51+
52+
## Typical deployment
53+
54+
You don't normally run `fibre-txsim` directly. Instead, use `talis fibre-txsim` which SSHes into validators and starts it inside a tmux session:
55+
56+
```sh
57+
talis fibre-txsim --directory <experiment-dir> \
58+
--instances 4 \
59+
--concurrency 2 \
60+
--blob-size 500000 \
61+
--duration 10m
62+
```
63+
64+
See `tools/talis/fibre.md` for the full experiment workflow.

tools/fibre-txsim/main.go

Lines changed: 231 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,231 @@
1+
package main
2+
3+
import (
4+
"context"
5+
"crypto/rand"
6+
"flag"
7+
"fmt"
8+
"math"
9+
"os"
10+
"os/signal"
11+
"sync"
12+
"sync/atomic"
13+
"time"
14+
15+
"github.com/celestiaorg/celestia-app-fibre/v6/app"
16+
"github.com/celestiaorg/celestia-app-fibre/v6/app/encoding"
17+
"github.com/celestiaorg/celestia-app-fibre/v6/fibre"
18+
fibregrpc "github.com/celestiaorg/celestia-app-fibre/v6/fibre/grpc"
19+
"github.com/celestiaorg/celestia-app-fibre/v6/pkg/user"
20+
valaddrtypes "github.com/celestiaorg/celestia-app-fibre/v6/x/valaddr/types"
21+
"github.com/celestiaorg/go-square/v4/share"
22+
coregrpc "github.com/cometbft/cometbft/rpc/grpc"
23+
"github.com/cosmos/cosmos-sdk/crypto/keyring"
24+
"google.golang.org/grpc"
25+
"google.golang.org/grpc/credentials/insecure"
26+
)
27+
28+
func main() {
29+
var (
30+
chainID string
31+
grpcEndpoint string
32+
keyringDir string
33+
keyName string
34+
blobSize int
35+
concurrency int
36+
interval time.Duration
37+
duration time.Duration
38+
)
39+
40+
flag.StringVar(&chainID, "chain-id", "", "chain ID (required)")
41+
flag.StringVar(&grpcEndpoint, "grpc-endpoint", "localhost:9091", "gRPC endpoint")
42+
flag.StringVar(&keyringDir, "keyring-dir", ".celestia-app", "keyring directory")
43+
flag.StringVar(&keyName, "key-name", "validator", "key name in keyring")
44+
flag.IntVar(&blobSize, "blob-size", 1000000, "size of each blob in bytes")
45+
flag.IntVar(&concurrency, "concurrency", 1, "number of concurrent blob submissions")
46+
flag.DurationVar(&interval, "interval", 0, "delay between blob submissions (0 = no delay)")
47+
flag.DurationVar(&duration, "duration", 0, "how long to run (0 = until killed)")
48+
flag.Parse()
49+
50+
if chainID == "" {
51+
fmt.Fprintln(os.Stderr, "error: --chain-id is required")
52+
flag.Usage()
53+
os.Exit(1)
54+
}
55+
56+
if err := run(chainID, grpcEndpoint, keyringDir, keyName, blobSize, concurrency, interval, duration); err != nil {
57+
fmt.Fprintf(os.Stderr, "error: %v\n", err)
58+
os.Exit(1)
59+
}
60+
}
61+
62+
func run(chainID, grpcEndpoint, keyringDir, keyName string, blobSize, concurrency int, interval, duration time.Duration) error {
63+
encCfg := encoding.MakeConfig(app.ModuleEncodingRegisters...)
64+
65+
kr, err := keyring.New(app.Name, keyring.BackendTest, keyringDir, nil, encCfg.Codec)
66+
if err != nil {
67+
return fmt.Errorf("failed to initialize keyring: %w", err)
68+
}
69+
70+
grpcConn, err := grpc.NewClient(
71+
grpcEndpoint,
72+
grpc.WithTransportCredentials(insecure.NewCredentials()),
73+
grpc.WithDefaultCallOptions(
74+
grpc.MaxCallSendMsgSize(math.MaxInt32),
75+
grpc.MaxCallRecvMsgSize(math.MaxInt32),
76+
),
77+
)
78+
if err != nil {
79+
return fmt.Errorf("failed to create gRPC connection: %w", err)
80+
}
81+
defer grpcConn.Close()
82+
83+
valGet := fibregrpc.NewSetGetter(coregrpc.NewBlockAPIClient(grpcConn))
84+
hostReg := fibregrpc.NewHostRegistry(valaddrtypes.NewQueryClient(grpcConn))
85+
86+
clientCfg := fibre.DefaultClientConfig()
87+
clientCfg.ChainID = chainID
88+
clientCfg.DefaultKeyName = keyName
89+
90+
ctx, cancel := context.WithCancel(context.Background())
91+
defer cancel()
92+
93+
txClient, err := user.SetupTxClient(ctx, kr, grpcConn, encCfg, user.WithDefaultAccount(keyName))
94+
if err != nil {
95+
return fmt.Errorf("failed to set up tx client: %w", err)
96+
}
97+
98+
fibreClient, err := fibre.NewClient(txClient, kr, valGet, hostReg, clientCfg)
99+
if err != nil {
100+
return fmt.Errorf("failed to create fibre client: %w", err)
101+
}
102+
defer fibreClient.Close()
103+
104+
// Handle signals
105+
sigCh := make(chan os.Signal, 1)
106+
signal.Notify(sigCh, os.Interrupt)
107+
go func() {
108+
<-sigCh
109+
fmt.Println("\nReceived interrupt, shutting down...")
110+
cancel()
111+
}()
112+
113+
// Apply duration limit if set
114+
if duration > 0 {
115+
ctx, cancel = context.WithTimeout(ctx, duration)
116+
defer cancel()
117+
}
118+
119+
// Stats
120+
var (
121+
totalSent atomic.Int64
122+
successes atomic.Int64
123+
failures atomic.Int64
124+
totalLatNs atomic.Int64
125+
)
126+
startTime := time.Now()
127+
128+
// Semaphore for bounded concurrency
129+
sem := make(chan struct{}, concurrency)
130+
var wg sync.WaitGroup
131+
132+
fmt.Println("\nStarting fibre blob spam...")
133+
134+
// If interval is set, use a ticker to pace blob submissions.
135+
// Otherwise, fire as fast as the semaphore allows.
136+
var tick <-chan time.Time
137+
if interval > 0 {
138+
t := time.NewTicker(interval)
139+
defer t.Stop()
140+
tick = t.C
141+
}
142+
143+
for ctx.Err() == nil {
144+
// Wait for the interval tick (if configured)
145+
if tick != nil {
146+
select {
147+
case <-ctx.Done():
148+
continue
149+
case <-tick:
150+
}
151+
}
152+
153+
// Acquire semaphore slot
154+
select {
155+
case <-ctx.Done():
156+
continue
157+
case sem <- struct{}{}:
158+
}
159+
160+
wg.Add(1)
161+
go func() {
162+
defer wg.Done()
163+
defer func() { <-sem }()
164+
165+
// Generate random namespace
166+
nsID := make([]byte, share.NamespaceVersionZeroIDSize)
167+
if _, err := rand.Read(nsID); err != nil {
168+
fmt.Printf("error generating namespace: %v\n", err)
169+
failures.Add(1)
170+
totalSent.Add(1)
171+
return
172+
}
173+
id := make([]byte, 0, share.NamespaceIDSize)
174+
id = append(id, share.NamespaceVersionZeroPrefix...)
175+
id = append(id, nsID...)
176+
ns, err := share.NewNamespace(share.NamespaceVersionZero, id)
177+
if err != nil {
178+
fmt.Printf("error creating namespace: %v\n", err)
179+
failures.Add(1)
180+
totalSent.Add(1)
181+
return
182+
}
183+
184+
// Generate random blob data
185+
data := make([]byte, blobSize)
186+
if _, err := rand.Read(data); err != nil {
187+
fmt.Printf("error generating blob data: %v\n", err)
188+
failures.Add(1)
189+
totalSent.Add(1)
190+
return
191+
}
192+
193+
t := time.Now()
194+
result, err := fibreClient.Put(ctx, ns, data)
195+
lat := time.Since(t)
196+
197+
totalSent.Add(1)
198+
if err != nil {
199+
if ctx.Err() != nil {
200+
return
201+
}
202+
failures.Add(1)
203+
fmt.Printf("error: %v (latency=%s)\n", err, lat)
204+
return
205+
}
206+
207+
successes.Add(1)
208+
totalLatNs.Add(lat.Nanoseconds())
209+
fmt.Printf("height=%d tx=%s latency=%s\n", result.Height, result.TxHash, lat)
210+
}()
211+
}
212+
213+
wg.Wait()
214+
215+
elapsed := time.Since(startTime)
216+
s := successes.Load()
217+
f := failures.Load()
218+
var avgLat time.Duration
219+
if s > 0 {
220+
avgLat = time.Duration(totalLatNs.Load() / s)
221+
}
222+
223+
fmt.Printf("\n--- Summary ---\n")
224+
fmt.Printf("Duration: %s\n", elapsed.Truncate(time.Second))
225+
fmt.Printf("Total sent: %d\n", totalSent.Load())
226+
fmt.Printf("Successes: %d\n", s)
227+
fmt.Printf("Failures: %d\n", f)
228+
fmt.Printf("Avg latency (success): %s\n", avgLat)
229+
230+
return nil
231+
}

0 commit comments

Comments
 (0)