Skip to content

Commit cd995ac

Browse files
walldissclaude
andcommitted
feat(talis): add dedicated encoder instances for fibre-txsim
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 93efbdf commit cd995ac

12 files changed

Lines changed: 401 additions & 37 deletions

File tree

tools/talis/add.go

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,17 @@ func addCmd() *cobra.Command {
4141
return fmt.Errorf("unknown provider %q (supported: digitalocean, googlecloud)", provider)
4242
}
4343
}
44+
case "encoder":
45+
for i := 0; i < count; i++ {
46+
switch provider {
47+
case "digitalocean":
48+
cfg = cfg.WithDigitalOceanEncoder(region)
49+
case "googlecloud":
50+
cfg = cfg.WithGoogleCloudEncoder(region)
51+
default:
52+
return fmt.Errorf("unknown provider %q (supported: digitalocean, googlecloud)", provider)
53+
}
54+
}
4455
case "bridge":
4556
log.Println("bridges are not yet supported")
4657
return nil
@@ -58,7 +69,7 @@ func addCmd() *cobra.Command {
5869
cmd.Flags().StringVarP(&rootDir, "directory", "d", ".", "root directory in which to initialize")
5970
cmd.Flags().IntVarP(&count, "count", "c", 0, "Number of nodes to deploy")
6071
_ = cmd.MarkFlagRequired("count")
61-
cmd.Flags().StringVarP(&nodeType, "type", "t", "", "Type of the node (validator, bridge, light)")
72+
cmd.Flags().StringVarP(&nodeType, "type", "t", "", "Type of the node (validator, encoder, bridge, light)")
6273
_ = cmd.MarkFlagRequired("type")
6374
cmd.Flags().StringVarP(&provider, "provider", "p", "digitalocean", "Provider for the node (digitalocean, googlecloud)")
6475
cmd.Flags().StringVarP(&region, "region", "r", "random", "the region to deploy the instance in (random if blank)")

tools/talis/client.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,8 @@ func NewDOClient(cfg Config) (*DOClient, error) {
7777

7878
func (c *DOClient) Up(ctx context.Context, workers int) error {
7979
insts := make([]Instance, 0)
80-
for _, v := range append(c.cfg.Validators, c.cfg.Observability...) {
80+
allInstances := append(append(c.cfg.Validators, c.cfg.Observability...), c.cfg.Encoders...)
81+
for _, v := range allInstances {
8182
if v.Provider != DigitalOcean {
8283
log.Println("unexpectedly skipping instance since only DO is supported", v.Name, "in region", v.Region)
8384
continue
@@ -149,7 +150,8 @@ func (c *DOClient) countRunningDroplets(ctx context.Context) (int, error) {
149150

150151
func (c *DOClient) Down(ctx context.Context, workers int) error {
151152
insts := make([]Instance, 0)
152-
for _, v := range append(c.cfg.Validators, c.cfg.Observability...) {
153+
allInstances := append(append(c.cfg.Validators, c.cfg.Observability...), c.cfg.Encoders...)
154+
for _, v := range allInstances {
153155
if v.Provider != DigitalOcean {
154156
log.Println("unexpectedly skipping instance since only DO is supported", v.Name, "in region", v.Region)
155157
continue

tools/talis/config.go

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@ const (
2121
Light NodeType = "light"
2222
// Observability represents a observability monitoring node for Prometheus/Grafana.
2323
Observability NodeType = "observability"
24+
// Encoder represents a dedicated fibre-txsim encoder node.
25+
Encoder NodeType = "encoder"
2426
)
2527

2628
var (
2729
valCount = atomic.Uint32{}
2830
nodeCount = atomic.Uint32{}
2931
lightCount = atomic.Uint32{}
3032
observabilityCount = atomic.Uint32{}
33+
encoderCount = atomic.Uint32{}
3134
)
3235

3336
// NodeName returns the name of the node based on its type and index. The
@@ -44,6 +47,8 @@ func NodeName(nodeType NodeType) string {
4447
index = int(lightCount.Add(1)) - 1
4548
case Observability:
4649
index = int(observabilityCount.Add(1)) - 1
50+
case Encoder:
51+
index = int(encoderCount.Add(1)) - 1
4752
default:
4853
panic(fmt.Sprintf("unknown node type: %s", nodeType))
4954
}
@@ -120,7 +125,7 @@ func ExperimentTag(nodeType NodeType, index int, experimentID, chainID string) s
120125

121126
func GetExperimentTag(tags []string) string {
122127
for _, tag := range tags {
123-
if strings.HasPrefix(tag, "validator-") || strings.HasPrefix(tag, "bridge-") || strings.HasPrefix(tag, "light-") || strings.HasPrefix(tag, "observability-") {
128+
if strings.HasPrefix(tag, "validator-") || strings.HasPrefix(tag, "bridge-") || strings.HasPrefix(tag, "light-") || strings.HasPrefix(tag, "observability-") || strings.HasPrefix(tag, "encoder-") {
124129
return tag
125130
}
126131
}
@@ -133,6 +138,7 @@ type Config struct {
133138
Bridges []Instance `json:"bridges,omitempty"`
134139
Lights []Instance `json:"lights,omitempty"`
135140
Observability []Instance `json:"observability,omitempty"`
141+
Encoders []Instance `json:"encoders,omitempty"`
136142

137143
// ChainID is the chain ID of the network. This is used to identify the
138144
// network and is also used as the chain ID of the network. It is
@@ -164,6 +170,7 @@ func NewConfig(experiment, chainID string) Config {
164170
Bridges: []Instance{},
165171
Lights: []Instance{},
166172
Observability: []Instance{},
173+
Encoders: []Instance{},
167174
Experiment: experiment,
168175
ChainID: TalisChainID(chainID),
169176
S3Config: S3Config{
@@ -230,6 +237,18 @@ func (cfg Config) WithGoogleCloudObservability(region string) Config {
230237
return cfg
231238
}
232239

240+
func (cfg Config) WithDigitalOceanEncoder(region string) Config {
241+
i := NewDigitalOceanEncoder(region).WithExperiment(cfg.Experiment, cfg.ChainID)
242+
cfg.Encoders = append(cfg.Encoders, i)
243+
return cfg
244+
}
245+
246+
func (cfg Config) WithGoogleCloudEncoder(region string) Config {
247+
i := NewGoogleCloudEncoder(region).WithExperiment(cfg.Experiment, cfg.ChainID)
248+
cfg.Encoders = append(cfg.Encoders, i)
249+
return cfg
250+
}
251+
233252
func (cfg Config) WithChainID(chainID string) Config {
234253
cfg.ChainID = TalisChainID(chainID)
235254
return cfg
@@ -306,5 +325,12 @@ func (cfg Config) UpdateInstance(name, publicIP, privateIP string) (Config, erro
306325
return cfg, nil
307326
}
308327
}
328+
for i := range cfg.Encoders {
329+
if cfg.Encoders[i].Name == name {
330+
cfg.Encoders[i].PublicIP = publicIP
331+
cfg.Encoders[i].PrivateIP = privateIP
332+
return cfg, nil
333+
}
334+
}
309335
return cfg, fmt.Errorf("instance %s not found", name)
310336
}

tools/talis/deployment.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -127,15 +127,21 @@ func deployCmd() *cobra.Command {
127127
}
128128
log.Printf("continuing despite validator deployment errors: %v", err)
129129
}
130-
return deployObservabilityIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, directUpload)
130+
if err := deployObservabilityIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, directUpload); err != nil {
131+
return err
132+
}
133+
return deployEncodersIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, workers)
131134
}
132135
if err := deployPayloadViaS3(cmd.Context(), rootDir, cfg.Validators, tarPath, SSHKeyPath, "/root", "payload/validator_init.sh", 7*time.Minute, cfg.S3Config, workers); err != nil {
133136
if !ignoreFailed {
134137
return err
135138
}
136139
log.Printf("continuing despite validator deployment errors: %v", err)
137140
}
138-
return deployObservabilityIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, directUpload)
141+
if err := deployObservabilityIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, directUpload); err != nil {
142+
return err
143+
}
144+
return deployEncodersIfConfigured(cmd.Context(), cfg, rootDir, SSHKeyPath, workers)
139145
},
140146
}
141147

@@ -185,6 +191,35 @@ func deployObservabilityIfConfigured(ctx context.Context, cfg Config, rootDir, s
185191
return nil
186192
}
187193

194+
// deployEncodersIfConfigured creates a lightweight encoder-payload tar and deploys
195+
// it to all configured encoder instances via S3.
196+
func deployEncodersIfConfigured(ctx context.Context, cfg Config, rootDir, sshKeyPath string, workers int) error {
197+
if len(cfg.Encoders) == 0 {
198+
return nil
199+
}
200+
201+
encoderPayloadDir := filepath.Join(rootDir, "encoder-payload")
202+
if _, err := os.Stat(encoderPayloadDir); os.IsNotExist(err) {
203+
return fmt.Errorf("encoder-payload directory not found — run 'talis genesis' first")
204+
}
205+
206+
encoderTarPath := filepath.Join(rootDir, "encoder-payload.tar.gz")
207+
log.Printf("Compressing encoder payload to %s\n", encoderTarPath)
208+
tarCmd := exec.Command("tar", "-czf", encoderTarPath, "-C", rootDir, "encoder-payload")
209+
tarCmd.Env = append(os.Environ(), "COPYFILE_DISABLE=1")
210+
if output, err := tarCmd.CombinedOutput(); err != nil {
211+
return fmt.Errorf("failed to compress encoder payload: %w, output: %s", err, string(output))
212+
}
213+
log.Printf("Sending encoder payload to %d encoder(s)...\n", len(cfg.Encoders))
214+
215+
if err := deployPayloadViaS3(ctx, rootDir, cfg.Encoders, encoderTarPath, sshKeyPath, "/root", "encoder-payload/encoder_init.sh", 7*time.Minute, cfg.S3Config, workers); err != nil {
216+
return fmt.Errorf("encoder deployment: %w", err)
217+
}
218+
219+
log.Printf("Encoder deployment complete\n")
220+
return nil
221+
}
222+
188223
// printGrafanaInfo prints the Grafana URL and credentials after successful observability deployment.
189224
func printGrafanaInfo(node Instance, rootDir string) {
190225
password := readGrafanaPassword(rootDir)

tools/talis/digital_ocean.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
const (
1818
DODefaultValidatorSlug = "c2-16vcpu-32gb"
19+
DODefaultEncoderSlug = "c2-8vcpu-16gb"
1920
DODefaultObservabilitySlug = "s-2vcpu-4gb"
2021
DODefaultImage = "ubuntu-22-04-x64"
2122
RandomRegion = "random"
@@ -50,6 +51,17 @@ func NewDigitalOceanValidator(region string) Instance {
5051
return i
5152
}
5253

54+
func NewDigitalOceanEncoder(region string) Instance {
55+
if region == "" || region == RandomRegion {
56+
region = RandomDORegion()
57+
}
58+
i := NewBaseInstance(Encoder)
59+
i.Provider = DigitalOcean
60+
i.Slug = DODefaultEncoderSlug
61+
i.Region = region
62+
return i
63+
}
64+
5365
func NewDigitalOceanObservability(region string) Instance {
5466
if region == "" || region == RandomRegion {
5567
region = RandomDORegion()
@@ -473,7 +485,7 @@ func checkForRunningDOExperiments(ctx context.Context, client *godo.Client, expe
473485

474486
func hasExperimentTag(tags []string, experimentID, chainID string) bool {
475487
for _, tag := range tags {
476-
if (strings.HasPrefix(tag, "validator-") || strings.HasPrefix(tag, "bridge-") || strings.HasPrefix(tag, "light-")) &&
488+
if (strings.HasPrefix(tag, "validator-") || strings.HasPrefix(tag, "bridge-") || strings.HasPrefix(tag, "light-") || strings.HasPrefix(tag, "encoder-")) &&
477489
strings.Contains(tag, experimentID) && strings.Contains(tag, chainID) {
478490
return true
479491
}

tools/talis/fibre_setup.go

Lines changed: 50 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@ const SetupFibreSessionName = "setup-fibre"
1414

1515
func setupFibreCmd() *cobra.Command {
1616
var (
17-
rootDir string
18-
SSHKeyPath string
19-
escrowAmount string
20-
fibrePort int
21-
fees string
22-
workers int
23-
fibreAccounts int
17+
rootDir string
18+
SSHKeyPath string
19+
escrowAmount string
20+
fibrePort int
21+
fees string
22+
workers int
23+
fibreAccounts int
24+
encoderFibreAccounts int
2425
)
2526

2627
cmd := &cobra.Command{
@@ -100,7 +101,47 @@ func setupFibreCmd() *cobra.Command {
100101
if err := waitForTmuxSessions(cfg.Validators, resolvedSSHKeyPath, SetupFibreSessionName, 10*time.Minute); err != nil {
101102
return fmt.Errorf("waiting for setup-fibre sessions: %w", err)
102103
}
103-
fmt.Println("Done!")
104+
fmt.Println("Validator setup done!")
105+
106+
// Deposit escrow for encoder accounts.
107+
// Each encoder runs deposit-to-escrow from its own machine using its
108+
// own keyring, broadcasting via the first validator's RPC endpoint.
109+
if len(cfg.Encoders) > 0 && len(cfg.Validators) > 0 {
110+
rpcNode := fmt.Sprintf("tcp://%s:26657", cfg.Validators[0].PublicIP)
111+
fmt.Printf("Setting up escrow for %d encoder(s) via %s...\n", len(cfg.Encoders), rpcNode)
112+
113+
for _, enc := range cfg.Encoders {
114+
encIndex := extractIndexFromName(enc.Name)
115+
keyPrefix := fmt.Sprintf("enc%d", encIndex)
116+
nAccounts := encoderFibreAccounts
117+
118+
var sb strings.Builder
119+
for i := 0; i < nAccounts; i++ {
120+
keyName := fmt.Sprintf("%s-%d", keyPrefix, i)
121+
sb.WriteString(fmt.Sprintf(
122+
"celestia-appd tx fibre deposit-to-escrow %s "+
123+
"--from %s --keyring-backend=test --home .celestia-app "+
124+
"--chain-id %s --fees %s --node %s --yes\n",
125+
escrowAmount,
126+
keyName,
127+
cfg.ChainID, fees, rpcNode,
128+
))
129+
}
130+
131+
script := sb.String()
132+
fmt.Printf("Running escrow deposits on encoder %s (%s) — %d accounts\n", enc.Name, enc.PublicIP, nAccounts)
133+
if err := runScriptInTMux([]Instance{enc}, resolvedSSHKeyPath, script, SetupFibreSessionName, 30*time.Minute); err != nil {
134+
return fmt.Errorf("encoder %s escrow setup: %w", enc.Name, err)
135+
}
136+
}
137+
138+
fmt.Printf("Waiting for encoder escrow deposits to complete...\n")
139+
if err := waitForTmuxSessions(cfg.Encoders, resolvedSSHKeyPath, SetupFibreSessionName, 15*time.Minute); err != nil {
140+
return fmt.Errorf("waiting for encoder setup-fibre sessions: %w", err)
141+
}
142+
fmt.Println("Encoder escrow setup done!")
143+
}
144+
104145
return nil
105146
},
106147
}
@@ -112,6 +153,7 @@ func setupFibreCmd() *cobra.Command {
112153
cmd.Flags().StringVar(&fees, "fees", "5000utia", "transaction fees")
113154
cmd.Flags().IntVarP(&workers, "workers", "w", 10, "number of validators to set up in parallel")
114155
cmd.Flags().IntVar(&fibreAccounts, "fibre-accounts", 100, "number of fibre worker accounts to deposit escrow for")
156+
cmd.Flags().IntVar(&encoderFibreAccounts, "encoder-fibre-accounts", 100, "number of fibre worker accounts per encoder instance")
115157

116158
return cmd
117159
}

0 commit comments

Comments
 (0)