From 35c1d100b2065f61e4395fdedf5a2e0a8b196f0b Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 09:54:31 +0800 Subject: [PATCH 01/13] feat(taiko-client): add raiko2 ZK backlog control-plane client Add ZKBacklogController (ClearBacklog / StatusClean) implemented by the ZK ComposeProofProducer, wrapping raiko2 POST /v3/prover/clear and GET /v3/prover/status (data.clean). Dummy mode short-circuits. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_producer/zk_backlog.go | 115 ++++++++++++++++++ .../prover/proof_producer/zk_backlog_test.go | 93 ++++++++++++++ 2 files changed, 208 insertions(+) create mode 100644 packages/taiko-client/prover/proof_producer/zk_backlog.go create mode 100644 packages/taiko-client/prover/proof_producer/zk_backlog_test.go diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog.go b/packages/taiko-client/prover/proof_producer/zk_backlog.go new file mode 100644 index 0000000000..be754c8a7d --- /dev/null +++ b/packages/taiko-client/prover/proof_producer/zk_backlog.go @@ -0,0 +1,115 @@ +package producer + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + + "github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc" +) + +// ZKBacklogController is implemented by proof producers whose backend exposes the +// raiko2 control-plane endpoints for draining the ZK (`zk_any`) task backlog and +// reporting when the backend is idle. See raiko2 issue #93. +type ZKBacklogController interface { + // ClearBacklog discards all non-terminal `zk_any` tasks on the ZK backend + // (POST /v3/prover/clear). + ClearBacklog(ctx context.Context) error + // StatusClean reports whether the ZK backend is fully idle, i.e. the + // `data.clean` field of GET /v3/prover/status is true. + StatusClean(ctx context.Context) (bool, error) +} + +// raikoControlPlaneResponse is the minimal body returned by POST /v3/prover/clear. +type raikoControlPlaneResponse struct { + Status string `json:"status"` +} + +// RaikoProverStatusResponse is the body returned by GET /v3/prover/status. Only +// `data.clean` is consumed; `tasks` and `network` are intentionally ignored. +type RaikoProverStatusResponse struct { + Status string `json:"status"` + Data struct { + Clean bool `json:"clean"` + } `json:"data"` +} + +// requestRaikoControlPlane sends a bodyless request (GET or POST) to a raiko2 +// control-plane endpoint and unmarshals the JSON response into U. A non-200 +// status code (including 404 when the endpoint is absent) returns an error. +func requestRaikoControlPlane[U any]( + ctx context.Context, + method string, + url string, + apiKey string, +) (*U, error) { + req, err := http.NewRequestWithContext(ctx, method, url, nil) + if err != nil { + return nil, err + } + if len(apiKey) > 0 { + req.Header.Set("X-API-KEY", apiKey) + } + + res, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != http.StatusOK { + return nil, fmt.Errorf("unexpected status code from %s: %d", url, res.StatusCode) + } + + resBytes, err := io.ReadAll(res.Body) + if err != nil { + return nil, err + } + + var output U + if err := json.Unmarshal(resBytes, &output); err != nil { + return nil, err + } + return &output, nil +} + +// ClearBacklog implements the ZKBacklogController interface. +func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error { + if s.Dummy { + return nil + } + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + if _, err := requestRaikoControlPlane[raikoControlPlaneResponse]( + ctx, + http.MethodPost, + s.RaikoHostEndpoint+"/v3/prover/clear", + s.ApiKey, + ); err != nil { + return fmt.Errorf("failed to clear ZK backlog: %w", err) + } + return nil +} + +// StatusClean implements the ZKBacklogController interface. +func (s *ComposeProofProducer) StatusClean(ctx context.Context) (bool, error) { + if s.Dummy { + return true, nil + } + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + out, err := requestRaikoControlPlane[RaikoProverStatusResponse]( + ctx, + http.MethodGet, + s.RaikoHostEndpoint+"/v3/prover/status", + s.ApiKey, + ) + if err != nil { + return false, fmt.Errorf("failed to get ZK prover status: %w", err) + } + return out.Data.Clean, nil +} diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go new file mode 100644 index 0000000000..9267f0293a --- /dev/null +++ b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go @@ -0,0 +1,93 @@ +package producer + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +// Compile-time assertion that the ZK compose producer satisfies the interface. +var _ ZKBacklogController = (*ComposeProofProducer)(nil) + +func TestComposeProofProducerStatusClean(t *testing.T) { + t.Run("data.clean true", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + require.Equal(t, http.MethodGet, r.Method) + require.Equal(t, "/v3/prover/status", r.URL.Path) + _, _ = w.Write([]byte( + `{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`, + )) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(t.Context()) + require.NoError(t, err) + require.True(t, clean) + }) + + t.Run("data.clean false", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + _, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(t.Context()) + require.NoError(t, err) + require.False(t, clean) + }) + + t.Run("non-200 errors", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + _, err := p.StatusClean(t.Context()) + require.Error(t, err) + }) + + t.Run("dummy short-circuits to clean", func(t *testing.T) { + p := &ComposeProofProducer{Dummy: true} + clean, err := p.StatusClean(t.Context()) + require.NoError(t, err) + require.True(t, clean) + }) +} + +func TestComposeProofProducerClearBacklog(t *testing.T) { + t.Run("posts to clear endpoint", func(t *testing.T) { + var called bool + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + require.Equal(t, http.MethodPost, r.Method) + require.Equal(t, "/v3/prover/clear", r.URL.Path) + _, _ = w.Write([]byte(`{"status":"ok"}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + require.NoError(t, p.ClearBacklog(t.Context())) + require.True(t, called) + }) + + t.Run("non-200 errors", func(t *testing.T) { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + require.Error(t, p.ClearBacklog(t.Context())) + }) + + t.Run("dummy short-circuits to nil", func(t *testing.T) { + p := &ComposeProofProducer{Dummy: true} + require.NoError(t, p.ClearBacklog(t.Context())) + }) +} From 3549aa2e15436d4de8edd821bee7885a9d9153b9 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 10:02:25 +0800 Subject: [PATCH 02/13] feat(taiko-client): add ZK-fallback latch state to proof submitter Add the mutex-guarded SGX-draining latch (markSGXFallback / inSGXFallback / resumeZK), wire the ZK control-plane client into NewProofSubmitter, and add prover_zk_backlog_sgx_mode and prover_zk_backlog_clear metrics. Co-Authored-By: Claude Opus 4.8 --- .../taiko-client/internal/metrics/metrics.go | 2 + .../prover/proof_submitter/proof_submitter.go | 9 ++++ .../prover/proof_submitter/zk_fallback.go | 51 +++++++++++++++++++ .../proof_submitter/zk_fallback_test.go | 42 +++++++++++++++ 4 files changed, 104 insertions(+) create mode 100644 packages/taiko-client/prover/proof_submitter/zk_fallback.go create mode 100644 packages/taiko-client/prover/proof_submitter/zk_fallback_test.go diff --git a/packages/taiko-client/internal/metrics/metrics.go b/packages/taiko-client/internal/metrics/metrics.go index b80a3dd545..db06e9a0d1 100644 --- a/packages/taiko-client/internal/metrics/metrics.go +++ b/packages/taiko-client/internal/metrics/metrics.go @@ -96,6 +96,8 @@ var ( ProverQueuedProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_queued"}) ProverSentProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_sent"}) ProverProofsAssigned = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_assigned"}) + ProverZKBacklogModeGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_zk_backlog_sgx_mode"}) + ProverZKBacklogClearCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_zk_backlog_clear"}) ProverReceivedProposedBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proposed_received"}) ProverReceivedProvenBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proven_received"}) ProverSubmissionAcceptedCounter = factory.NewCounter(prometheus.CounterOpts{ diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index e0d8321ad4..f9f2d22a82 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -58,6 +58,9 @@ type ProofSubmitter struct { proofPollingInterval time.Duration proposalWindowSize *big.Int maxZKProofProposalDistance *big.Int + // ZK backlog drain/resume state machine (see zk_fallback.go). + zkBacklog proofProducer.ZKBacklogController + zkFallback zkFallback } // NewProofSubmitter creates a new ProofSubmitter instance. @@ -102,6 +105,12 @@ func NewProofSubmitter( maxZKProofProposalDistance: maxZKProofProposalDistance, } + // Use the ZK producer's raiko2 control-plane API when available; otherwise + // the state machine falls back to the stateless distance check. + if zkBacklog, ok := zkvmProofProducer.(proofProducer.ZKBacklogController); ok { + proofSubmitter.zkBacklog = zkBacklog + } + proofSubmitter.startBackgroundWorkers(ctx) return proofSubmitter, nil } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go new file mode 100644 index 0000000000..7ae38384e7 --- /dev/null +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -0,0 +1,51 @@ +package submitter + +import ( + "sync" + + "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" +) + +// clearBackoffMaxRetries bounds the best-effort retries of POST /v3/prover/clear +// when entering SGX-draining mode. +const clearBackoffMaxRetries uint64 = 5 + +// zkFallback tracks whether the submitter is draining the ZK backlog via SGX. +// It is shared across the concurrent RequestProof goroutines, so all access is +// guarded by mu. +type zkFallback struct { + mu sync.Mutex + inSGX bool +} + +// markSGXFallback latches into SGX-draining mode. It returns true only for the +// first caller that performs the transition; that caller is responsible for +// clearing the ZK backlog exactly once. +func (s *ProofSubmitter) markSGXFallback() bool { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + if s.zkFallback.inSGX { + return false + } + s.zkFallback.inSGX = true + metrics.ProverZKBacklogModeGauge.Set(1) + return true +} + +// inSGXFallback reports whether the submitter is currently draining via SGX. +func (s *ProofSubmitter) inSGXFallback() bool { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + return s.zkFallback.inSGX +} + +// resumeZK unlatches SGX-draining mode so subsequent proposals use ZK again. +func (s *ProofSubmitter) resumeZK() { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + if !s.zkFallback.inSGX { + return + } + s.zkFallback.inSGX = false + metrics.ProverZKBacklogModeGauge.Set(0) +} diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go new file mode 100644 index 0000000000..9f1f2d3f1a --- /dev/null +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -0,0 +1,42 @@ +package submitter + +import ( + "sync" + "sync/atomic" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMarkSGXFallbackOnlyFirstCallerWins(t *testing.T) { + s := &ProofSubmitter{} + require.False(t, s.inSGXFallback()) + require.True(t, s.markSGXFallback()) // first caller latches + require.False(t, s.markSGXFallback()) // already latched + require.True(t, s.inSGXFallback()) + + s.resumeZK() + require.False(t, s.inSGXFallback()) + require.True(t, s.markSGXFallback()) // can latch again after a resume +} + +func TestMarkSGXFallbackConcurrentSingleWinner(t *testing.T) { + s := &ProofSubmitter{} + const n = 50 + var ( + wg sync.WaitGroup + winners atomic.Int32 + ) + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + if s.markSGXFallback() { + winners.Add(1) + } + }() + } + wg.Wait() + require.Equal(t, int32(1), winners.Load()) + require.True(t, s.inSGXFallback()) +} From 16a839e9479c7a1c0dd0a5caf5b302c131013d72 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 10:11:09 +0800 Subject: [PATCH 03/13] feat(taiko-client): add ZK drain/resume decision logic decideUseZK latches into SGX on the first distance breach (firing a one-off backlog clear) and resumes ZK once the backlog is drained and the ZK status is clean, degrading to drained-alone when /status is unavailable. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_submitter/zk_fallback.go | 111 ++++++++++++- .../proof_submitter/zk_fallback_test.go | 147 ++++++++++++++++++ 2 files changed, 256 insertions(+), 2 deletions(-) diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go index 7ae38384e7..a28777c8e0 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -1,8 +1,14 @@ package submitter import ( + "context" + "math/big" "sync" + "github.com/cenkalti/backoff/v4" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" ) @@ -40,12 +46,113 @@ func (s *ProofSubmitter) inSGXFallback() bool { } // resumeZK unlatches SGX-draining mode so subsequent proposals use ZK again. -func (s *ProofSubmitter) resumeZK() { +// It returns true only for the caller that performed the transition. +func (s *ProofSubmitter) resumeZK() bool { s.zkFallback.mu.Lock() defer s.zkFallback.mu.Unlock() if !s.zkFallback.inSGX { - return + return false } s.zkFallback.inSGX = false metrics.ProverZKBacklogModeGauge.Set(0) + return true +} + +// decideUseZK applies the ZK backlog drain/resume state machine and reports +// whether this proposal should be proven via ZK. It has side effects: it latches +// into SGX-draining mode (and fires a one-off backlog clear) on the first +// distance breach, and unlatches when the backlog is drained. +func (s *ProofSubmitter) decideUseZK( + ctx context.Context, + proposalID *big.Int, + lastFinalizedProposalID *big.Int, +) bool { + // Machine disabled when no distance is configured. + if s.maxZKProofProposalDistance == nil || s.maxZKProofProposalDistance.Sign() <= 0 { + return true + } + // Without control-plane support, preserve the stateless distance behavior. + if s.zkBacklog == nil { + return s.shouldUseZKProof(proposalID, lastFinalizedProposalID) + } + + if s.inSGXFallback() { + if s.canResumeZK(ctx, proposalID, lastFinalizedProposalID) { + if s.resumeZK() { + log.Info( + "ZK backlog drained, resuming ZK proofs", + "proposalID", proposalID, + "lastFinalizedProposalID", lastFinalizedProposalID, + ) + } + return true + } + return false + } + + if !s.shouldUseZKProof(proposalID, lastFinalizedProposalID) { + if s.markSGXFallback() { + log.Warn( + "ZK proof backlog detected, clearing ZK backlog and draining via SGX", + "proposalID", proposalID, + "lastFinalizedProposalID", lastFinalizedProposalID, + "maxZKProofProposalDistance", s.maxZKProofProposalDistance, + ) + s.fireClearAsync(ctx) + } + return false + } + return true +} + +// canResumeZK reports whether SGX-draining mode can switch back to ZK. It checks +// the cheap local "backlog drained" condition first and only queries the ZK +// backend status when that holds. A status error (e.g. the endpoint is absent) +// degrades to resuming on the backlog-drained condition alone. +func (s *ProofSubmitter) canResumeZK( + ctx context.Context, + proposalID *big.Int, + lastFinalizedProposalID *big.Int, +) bool { + // (A) backlog drained: proposalID <= lastFinalizedProposalID + 1. + if proposalID.Cmp(new(big.Int).Add(lastFinalizedProposalID, common.Big1)) > 0 { + return false + } + // (B) ZK backend idle. + clean, err := s.zkBacklog.StatusClean(ctx) + if err != nil { + log.Warn( + "ZK prover status unavailable, resuming ZK on backlog-drained condition alone", + "proposalID", proposalID, + "error", err, + ) + return true + } + return clean +} + +// fireClearAsync clears the ZK backlog in the background with bounded retries. +// It is best-effort: clearing only accelerates the drain, so a final failure is +// logged and otherwise ignored. ctx is the prover's long-lived context, so the +// goroutine outlives the triggering proposal's RequestProof call. +func (s *ProofSubmitter) fireClearAsync(ctx context.Context) { + // Defensive: decideUseZK already guards against a nil zkBacklog. + if s.zkBacklog == nil { + return + } + metrics.ProverZKBacklogClearCounter.Add(1) + go func() { + bo := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewConstantBackOff(s.proofPollingInterval), + clearBackoffMaxRetries, + ), + ctx, + ) + if err := backoff.Retry(func() error { return s.zkBacklog.ClearBacklog(ctx) }, bo); err != nil { + log.Warn("Failed to clear ZK backlog after retries", "error", err) + return + } + log.Info("Cleared ZK backlog after entering SGX-draining mode") + }() } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 9f1f2d3f1a..9ff8f4ff9a 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -1,11 +1,17 @@ package submitter import ( + "context" + "errors" + "math/big" "sync" "sync/atomic" "testing" + "time" "github.com/stretchr/testify/require" + + proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) func TestMarkSGXFallbackOnlyFirstCallerWins(t *testing.T) { @@ -40,3 +46,144 @@ func TestMarkSGXFallbackConcurrentSingleWinner(t *testing.T) { require.Equal(t, int32(1), winners.Load()) require.True(t, s.inSGXFallback()) } + +// fakeZKBacklog is a programmable ZKBacklogController for unit tests. +type fakeZKBacklog struct { + clearCalls atomic.Int32 + clearErr error + clean bool + statusErr error + statusCalls atomic.Int32 + cleared chan struct{} +} + +func (f *fakeZKBacklog) ClearBacklog(_ context.Context) error { + f.clearCalls.Add(1) + if f.cleared != nil { + select { + case f.cleared <- struct{}{}: + default: + } + } + return f.clearErr +} + +func (f *fakeZKBacklog) StatusClean(_ context.Context) (bool, error) { + f.statusCalls.Add(1) + return f.clean, f.statusErr +} + +func newZKFallbackSubmitter(backlog proofProducer.ZKBacklogController) *ProofSubmitter { + return &ProofSubmitter{ + maxZKProofProposalDistance: big.NewInt(30), + zkBacklog: backlog, + proofPollingInterval: time.Millisecond, + } +} + +func TestDecideUseZKMachineDisabled(t *testing.T) { + s := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(0), zkBacklog: &fakeZKBacklog{}} + require.True(t, s.decideUseZK(t.Context(), big.NewInt(1000), big.NewInt(1))) + require.False(t, s.inSGXFallback()) +} + +func TestDecideUseZKNilBacklogFallsBackToStateless(t *testing.T) { + s := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(30)} // zkBacklog nil + require.True(t, s.decideUseZK(t.Context(), big.NewInt(40), big.NewInt(10))) // 40 <= 10+30 + require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) // 41 > 10+30 + require.False(t, s.inSGXFallback()) // never latches without control plane +} + +func TestDecideUseZKWithinDistanceStaysZK(t *testing.T) { + s := newZKFallbackSubmitter(&fakeZKBacklog{}) + require.True(t, s.decideUseZK(t.Context(), big.NewInt(40), big.NewInt(10))) + require.False(t, s.inSGXFallback()) +} + +func TestDecideUseZKBreachLatchesAndClearsOnce(t *testing.T) { + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + s := newZKFallbackSubmitter(fake) + + require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) // breach + require.True(t, s.inSGXFallback()) + + select { + case <-fake.cleared: + case <-time.After(time.Second): + t.Fatal("clear was not called") + } + + // A second breach while latched must not clear again. + require.False(t, s.decideUseZK(t.Context(), big.NewInt(50), big.NewInt(10))) + require.Equal(t, int32(1), fake.clearCalls.Load()) +} + +func TestDecideUseZKResumeWhenDrainedAndClean(t *testing.T) { + fake := &fakeZKBacklog{clean: true} + s := newZKFallbackSubmitter(fake) + require.True(t, s.markSGXFallback()) + + require.True(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) // (A) 11<=10+1, clean + require.False(t, s.inSGXFallback()) + require.Equal(t, int32(1), fake.statusCalls.Load()) +} + +func TestDecideUseZKStaysSGXWhenNotDrained(t *testing.T) { + fake := &fakeZKBacklog{clean: true} + s := newZKFallbackSubmitter(fake) + require.True(t, s.markSGXFallback()) + + require.False(t, s.decideUseZK(t.Context(), big.NewInt(20), big.NewInt(10))) // (A) fails + require.True(t, s.inSGXFallback()) + require.Equal(t, int32(0), fake.statusCalls.Load()) // status not queried until (A) holds +} + +func TestDecideUseZKStaysSGXWhenNotClean(t *testing.T) { + fake := &fakeZKBacklog{clean: false} + s := newZKFallbackSubmitter(fake) + require.True(t, s.markSGXFallback()) + + require.False(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) + require.True(t, s.inSGXFallback()) + require.Equal(t, int32(1), fake.statusCalls.Load()) +} + +func TestDecideUseZKDegradesOnStatusError(t *testing.T) { + fake := &fakeZKBacklog{statusErr: errors.New("status endpoint absent")} + s := newZKFallbackSubmitter(fake) + require.True(t, s.markSGXFallback()) + + require.True(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) // (A) holds, status errors -> degrade -> resume + require.False(t, s.inSGXFallback()) +} + +func TestDecideUseZKConcurrentBreachClearsOnce(t *testing.T) { + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + s := newZKFallbackSubmitter(fake) + + const n = 50 + var ( + wg sync.WaitGroup + nonBreach atomic.Int32 + ) + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + // 41 > 10 + 30 -> every caller sees a breach and must not use ZK. + if s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10)) { + nonBreach.Add(1) + } + }() + } + wg.Wait() + + require.Equal(t, int32(0), nonBreach.Load()) // every caller saw the breach + require.True(t, s.inSGXFallback()) + select { + case <-fake.cleared: + case <-time.After(time.Second): + t.Fatal("clear was not called") + } + require.Equal(t, int32(1), fake.clearCalls.Load()) +} From 8950bd761ec02d02de3963a5b8012d2875bc4b1b Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 10:33:30 +0800 Subject: [PATCH 04/13] feat(taiko-client): drive ZK/SGX selection via drain/resume machine RequestProof now consults decideUseZK each retry instead of the stateless distance check, so a backlog breach latches into SGX draining until caught up. Per-call zk_any_not_drawn / timeout fallbacks are unchanged. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_submitter/proof_submitter.go | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index f9f2d22a82..0c7720b71c 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -215,18 +215,15 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoPr ) return ErrProposalOutOfAllowedRange } - if !s.shouldUseZKProof(proposalID, lastFinalizedProposalID) { - log.Info( - "Proposal too far from the last finalized proposal, skipping ZK proof", - "proposalID", proposalID, - "lastFinalizedProposalID", lastFinalizedProposalID, - "maxZKProofProposalDistance", s.maxZKProofProposalDistance, - ) - useZK = false - } + // machineSaysZK is the drain/resume state machine's verdict for this proposal + // (it may latch SGX mode + fire a one-off backlog clear, or resume ZK; see + // zk_fallback.go). useZK is the per-call fallback (zk_any_not_drawn / timeout) + // that sticks for the rest of this call. ZK is used only when both agree; once + // either goes false the ZK path stays off for the remaining retries. + machineSaysZK := s.decideUseZK(ctx, proposalID, lastFinalizedProposalID) // If zk proof is enabled, request zk proof first, and check if ZK proof is drawn. - if s.zkvmProofProducer != nil && useZK { + if s.zkvmProofProducer != nil && useZK && machineSaysZK { if proofResponse, err = s.zkvmProofProducer.RequestProof( ctx, opts, From 94ca3b0e699f814dd99f9e4bf33431e9dd135170 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 10:49:08 +0800 Subject: [PATCH 05/13] refactor(taiko-client): address ZK drain/resume review follow-ups - Store the prover's long-lived context on ProofSubmitter and use it for the background backlog-clear goroutine (so it can't inherit a request-scoped ctx). - Add a test for the ClearBacklog bounded-retry failure posture. - Update maxZKProofProposalDistance flag help to describe the latched drain/resume. Co-Authored-By: Claude Opus 4.8 --- packages/taiko-client/cmd/flags/prover.go | 5 +++-- .../prover/proof_submitter/proof_submitter.go | 7 +++++++ .../prover/proof_submitter/zk_fallback.go | 12 +++++------ .../proof_submitter/zk_fallback_test.go | 20 +++++++++++++++++++ 4 files changed, 36 insertions(+), 8 deletions(-) diff --git a/packages/taiko-client/cmd/flags/prover.go b/packages/taiko-client/cmd/flags/prover.go index 1a3e655e2a..327bcadd90 100644 --- a/packages/taiko-client/cmd/flags/prover.go +++ b/packages/taiko-client/cmd/flags/prover.go @@ -74,8 +74,9 @@ var ( MaxZKProofProposalDistance = &cli.Uint64Flag{ Name: "prover.maxZKProofProposalDistance", Usage: "The maximum proposal distance counted from lastFinalizedProposalID for requesting ZK proofs. " + - "When proposalID is greater than lastFinalizedProposalID + maxZKProofProposalDistance, " + - "the prover skips ZK proof and requests base proof instead. This flag only works for post Shasta fork. ", + "When proposalID exceeds lastFinalizedProposalID + maxZKProofProposalDistance, the prover stops " + + "requesting ZK proofs, clears the ZK backlog, and drains via the base (SGX) proof until the backlog " + + "is cleared and the ZK endpoint reports clean, then resumes ZK. Set to 0 to disable. Post Shasta fork only.", Value: 30, Category: proverCategory, EnvVars: []string{"PROVER_MAX_ZK_PROOF_PROPOSAL_DISTANCE"}, diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index 0c7720b71c..a09cafad42 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -61,6 +61,9 @@ type ProofSubmitter struct { // ZK backlog drain/resume state machine (see zk_fallback.go). zkBacklog proofProducer.ZKBacklogController zkFallback zkFallback + // ctx is the prover's long-lived context, used by background goroutines + // (e.g. the ZK backlog clear) that must outlive a single RequestProof call. + ctx context.Context } // NewProofSubmitter creates a new ProofSubmitter instance. @@ -111,6 +114,10 @@ func NewProofSubmitter( proofSubmitter.zkBacklog = zkBacklog } + // Store the prover's long-lived context for background goroutines (e.g. the ZK + // backlog clear) that must outlive a single RequestProof call. + proofSubmitter.ctx = ctx + proofSubmitter.startBackgroundWorkers(ctx) return proofSubmitter, nil } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go index a28777c8e0..d9c17bc02d 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -98,7 +98,7 @@ func (s *ProofSubmitter) decideUseZK( "lastFinalizedProposalID", lastFinalizedProposalID, "maxZKProofProposalDistance", s.maxZKProofProposalDistance, ) - s.fireClearAsync(ctx) + s.fireClearAsync() } return false } @@ -133,9 +133,9 @@ func (s *ProofSubmitter) canResumeZK( // fireClearAsync clears the ZK backlog in the background with bounded retries. // It is best-effort: clearing only accelerates the drain, so a final failure is -// logged and otherwise ignored. ctx is the prover's long-lived context, so the -// goroutine outlives the triggering proposal's RequestProof call. -func (s *ProofSubmitter) fireClearAsync(ctx context.Context) { +// logged and otherwise ignored. It uses the submitter's long-lived context +// (s.ctx), so the goroutine outlives the triggering proposal's RequestProof call. +func (s *ProofSubmitter) fireClearAsync() { // Defensive: decideUseZK already guards against a nil zkBacklog. if s.zkBacklog == nil { return @@ -147,9 +147,9 @@ func (s *ProofSubmitter) fireClearAsync(ctx context.Context) { backoff.NewConstantBackOff(s.proofPollingInterval), clearBackoffMaxRetries, ), - ctx, + s.ctx, ) - if err := backoff.Retry(func() error { return s.zkBacklog.ClearBacklog(ctx) }, bo); err != nil { + if err := backoff.Retry(func() error { return s.zkBacklog.ClearBacklog(s.ctx) }, bo); err != nil { log.Warn("Failed to clear ZK backlog after retries", "error", err) return } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 9ff8f4ff9a..5492f8c7b5 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -78,6 +78,8 @@ func newZKFallbackSubmitter(backlog proofProducer.ZKBacklogController) *ProofSub maxZKProofProposalDistance: big.NewInt(30), zkBacklog: backlog, proofPollingInterval: time.Millisecond, + // fireClearAsync now reads s.ctx; the breach tests trigger it indirectly. + ctx: context.Background(), } } @@ -118,6 +120,24 @@ func TestDecideUseZKBreachLatchesAndClearsOnce(t *testing.T) { require.Equal(t, int32(1), fake.clearCalls.Load()) } +func TestFireClearAsyncRetriesThenGivesUp(t *testing.T) { + fake := &fakeZKBacklog{clearErr: errors.New("clear failed")} + s := newZKFallbackSubmitter(fake) + s.ctx = t.Context() + + // Distance breach: 41 > 10 + 30 -> latch + fireClearAsync. + require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) + require.True(t, s.inSGXFallback()) // latched even though clear will fail + + // fireClearAsync retries 1 + clearBackoffMaxRetries times, then gives up. + require.Eventually(t, func() bool { + return fake.clearCalls.Load() == int32(clearBackoffMaxRetries)+1 + }, 2*time.Second, 5*time.Millisecond) + + // Still latched after clear ultimately failed (best-effort; resume is gated elsewhere). + require.True(t, s.inSGXFallback()) +} + func TestDecideUseZKResumeWhenDrainedAndClean(t *testing.T) { fake := &fakeZKBacklog{clean: true} s := newZKFallbackSubmitter(fake) From 053c4f0eddd324539c891d1d5f72334d7cb3aa3e Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 10:57:33 +0800 Subject: [PATCH 06/13] refactor(taiko-client): simplify ZK backlog control-plane types Consolidate the raiko2 control-plane response types: drop the clear-only response struct (the body is unused), unexport the status response, and remove the unused status fields. Fold the submitter ctx into the struct literal. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_producer/zk_backlog.go | 20 ++++++++----------- .../prover/proof_submitter/proof_submitter.go | 5 +---- .../proof_submitter/zk_fallback_test.go | 2 +- 3 files changed, 10 insertions(+), 17 deletions(-) diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog.go b/packages/taiko-client/prover/proof_producer/zk_backlog.go index be754c8a7d..20c95df604 100644 --- a/packages/taiko-client/prover/proof_producer/zk_backlog.go +++ b/packages/taiko-client/prover/proof_producer/zk_backlog.go @@ -22,16 +22,11 @@ type ZKBacklogController interface { StatusClean(ctx context.Context) (bool, error) } -// raikoControlPlaneResponse is the minimal body returned by POST /v3/prover/clear. -type raikoControlPlaneResponse struct { - Status string `json:"status"` -} - -// RaikoProverStatusResponse is the body returned by GET /v3/prover/status. Only -// `data.clean` is consumed; `tasks` and `network` are intentionally ignored. -type RaikoProverStatusResponse struct { - Status string `json:"status"` - Data struct { +// raikoProverStatusResponse is the body returned by GET /v3/prover/status. Only +// `data.clean` is consumed; the remaining fields (`status`, `tasks`, `network`) +// are intentionally ignored. +type raikoProverStatusResponse struct { + Data struct { Clean bool `json:"clean"` } `json:"data"` } @@ -83,7 +78,8 @@ func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error { ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) defer cancel() - if _, err := requestRaikoControlPlane[raikoControlPlaneResponse]( + // The clear endpoint only needs to return HTTP 200; its response body is unused. + if _, err := requestRaikoControlPlane[struct{}]( ctx, http.MethodPost, s.RaikoHostEndpoint+"/v3/prover/clear", @@ -102,7 +98,7 @@ func (s *ComposeProofProducer) StatusClean(ctx context.Context) (bool, error) { ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) defer cancel() - out, err := requestRaikoControlPlane[RaikoProverStatusResponse]( + out, err := requestRaikoControlPlane[raikoProverStatusResponse]( ctx, http.MethodGet, s.RaikoHostEndpoint+"/v3/prover/status", diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index a09cafad42..fb00b6cf85 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -106,6 +106,7 @@ func NewProofSubmitter( flushCacheNotify: flushCacheNotify, proposalWindowSize: proposalWindowSize, maxZKProofProposalDistance: maxZKProofProposalDistance, + ctx: ctx, } // Use the ZK producer's raiko2 control-plane API when available; otherwise @@ -114,10 +115,6 @@ func NewProofSubmitter( proofSubmitter.zkBacklog = zkBacklog } - // Store the prover's long-lived context for background goroutines (e.g. the ZK - // backlog clear) that must outlive a single RequestProof call. - proofSubmitter.ctx = ctx - proofSubmitter.startBackgroundWorkers(ctx) return proofSubmitter, nil } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 5492f8c7b5..4210d1cb94 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -78,7 +78,7 @@ func newZKFallbackSubmitter(backlog proofProducer.ZKBacklogController) *ProofSub maxZKProofProposalDistance: big.NewInt(30), zkBacklog: backlog, proofPollingInterval: time.Millisecond, - // fireClearAsync now reads s.ctx; the breach tests trigger it indirectly. + // fireClearAsync reads s.ctx; the breach tests trigger it indirectly. ctx: context.Background(), } } From 46369b7027725bc1b927578afde4a20bc6547025 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 11:12:19 +0800 Subject: [PATCH 07/13] docs(taiko-client): clarify ZK control-plane wiring is compile-time The zkvmProofProducer.(ZKBacklogController) assertion is a compile-time capability check, not a runtime probe of the Raiko host. Clarify that the zkBacklog==nil branch covers the no-ZK-endpoint config (and guards canResumeZK from a nil deref); a ZK host predating raiko2 #93 returns 404 and the machine degrades by design. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_submitter/proof_submitter.go | 8 ++++++-- .../taiko-client/prover/proof_submitter/zk_fallback.go | 4 +++- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index fb00b6cf85..94ea23e670 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -109,8 +109,12 @@ func NewProofSubmitter( ctx: ctx, } - // Use the ZK producer's raiko2 control-plane API when available; otherwise - // the state machine falls back to the stateless distance check. + // Wire the raiko2 control-plane client (ClearBacklog/StatusClean) when a ZK + // producer is configured. This is a compile-time capability check, not a probe + // of the remote host: with no ZK endpoint set, zkvmProofProducer is nil and + // zkBacklog stays nil, so decideUseZK bypasses the drain/resume machine. When a + // ZK endpoint IS set but its host predates raiko2 #93, the control-plane calls + // return 404 and the machine degrades by design (see canResumeZK). if zkBacklog, ok := zkvmProofProducer.(proofProducer.ZKBacklogController); ok { proofSubmitter.zkBacklog = zkBacklog } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go index d9c17bc02d..03809fa3bb 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -71,7 +71,9 @@ func (s *ProofSubmitter) decideUseZK( if s.maxZKProofProposalDistance == nil || s.maxZKProofProposalDistance.Sign() <= 0 { return true } - // Without control-plane support, preserve the stateless distance behavior. + // No ZK endpoint configured (no control-plane client): bypass the drain/resume + // machine and keep the stateless distance check. This also guarantees zkBacklog + // is non-nil below, so canResumeZK/fireClearAsync can dereference it safely. if s.zkBacklog == nil { return s.shouldUseZKProof(proposalID, lastFinalizedProposalID) } From 0abeaf17c9e7b77fdcb3107b5fa5c145ada29eee Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 11:20:20 +0800 Subject: [PATCH 08/13] test(taiko-client): convert ZK drain/resume tests to testify suite Restructure the new zk_backlog and zk_fallback tests as suite.Suite (matching the rest of the prover package), and wrap the long assertion line that tripped the lll linter. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_producer/zk_backlog_test.go | 72 ++++--- .../proof_submitter/zk_fallback_test.go | 195 +++++++++--------- 2 files changed, 143 insertions(+), 124 deletions(-) diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go index 9267f0293a..e90c1ceb6e 100644 --- a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go +++ b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go @@ -6,17 +6,25 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" ) // Compile-time assertion that the ZK compose producer satisfies the interface. var _ ZKBacklogController = (*ComposeProofProducer)(nil) -func TestComposeProofProducerStatusClean(t *testing.T) { - t.Run("data.clean true", func(t *testing.T) { +type ZKBacklogTestSuite struct { + suite.Suite +} + +func TestZKBacklogTestSuite(t *testing.T) { + suite.Run(t, new(ZKBacklogTestSuite)) +} + +func (s *ZKBacklogTestSuite) TestStatusClean() { + s.Run("data.clean true", func() { server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - require.Equal(t, http.MethodGet, r.Method) - require.Equal(t, "/v3/prover/status", r.URL.Path) + s.Equal(http.MethodGet, r.Method) + s.Equal("/v3/prover/status", r.URL.Path) _, _ = w.Write([]byte( `{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`, )) @@ -24,70 +32,70 @@ func TestComposeProofProducerStatusClean(t *testing.T) { defer server.Close() p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - clean, err := p.StatusClean(t.Context()) - require.NoError(t, err) - require.True(t, clean) + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) }) - t.Run("data.clean false", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.Run("data.clean false", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { _, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`)) })) defer server.Close() p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - clean, err := p.StatusClean(t.Context()) - require.NoError(t, err) - require.False(t, clean) + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.False(clean) }) - t.Run("non-200 errors", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.Run("non-200 errors", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusNotFound) })) defer server.Close() p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - _, err := p.StatusClean(t.Context()) - require.Error(t, err) + _, err := p.StatusClean(s.T().Context()) + s.Error(err) }) - t.Run("dummy short-circuits to clean", func(t *testing.T) { + s.Run("dummy short-circuits to clean", func() { p := &ComposeProofProducer{Dummy: true} - clean, err := p.StatusClean(t.Context()) - require.NoError(t, err) - require.True(t, clean) + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) }) } -func TestComposeProofProducerClearBacklog(t *testing.T) { - t.Run("posts to clear endpoint", func(t *testing.T) { +func (s *ZKBacklogTestSuite) TestClearBacklog() { + s.Run("posts to clear endpoint", func() { var called bool server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { called = true - require.Equal(t, http.MethodPost, r.Method) - require.Equal(t, "/v3/prover/clear", r.URL.Path) + s.Equal(http.MethodPost, r.Method) + s.Equal("/v3/prover/clear", r.URL.Path) _, _ = w.Write([]byte(`{"status":"ok"}`)) })) defer server.Close() p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - require.NoError(t, p.ClearBacklog(t.Context())) - require.True(t, called) + s.NoError(p.ClearBacklog(s.T().Context())) + s.True(called) }) - t.Run("non-200 errors", func(t *testing.T) { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.Run("non-200 errors", func() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { w.WriteHeader(http.StatusInternalServerError) })) defer server.Close() p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - require.Error(t, p.ClearBacklog(t.Context())) + s.Error(p.ClearBacklog(s.T().Context())) }) - t.Run("dummy short-circuits to nil", func(t *testing.T) { + s.Run("dummy short-circuits to nil", func() { p := &ComposeProofProducer{Dummy: true} - require.NoError(t, p.ClearBacklog(t.Context())) + s.NoError(p.ClearBacklog(s.T().Context())) }) } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 4210d1cb94..04fe0b101b 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -9,44 +9,11 @@ import ( "testing" "time" - "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) -func TestMarkSGXFallbackOnlyFirstCallerWins(t *testing.T) { - s := &ProofSubmitter{} - require.False(t, s.inSGXFallback()) - require.True(t, s.markSGXFallback()) // first caller latches - require.False(t, s.markSGXFallback()) // already latched - require.True(t, s.inSGXFallback()) - - s.resumeZK() - require.False(t, s.inSGXFallback()) - require.True(t, s.markSGXFallback()) // can latch again after a resume -} - -func TestMarkSGXFallbackConcurrentSingleWinner(t *testing.T) { - s := &ProofSubmitter{} - const n = 50 - var ( - wg sync.WaitGroup - winners atomic.Int32 - ) - wg.Add(n) - for i := 0; i < n; i++ { - go func() { - defer wg.Done() - if s.markSGXFallback() { - winners.Add(1) - } - }() - } - wg.Wait() - require.Equal(t, int32(1), winners.Load()) - require.True(t, s.inSGXFallback()) -} - // fakeZKBacklog is a programmable ZKBacklogController for unit tests. type fakeZKBacklog struct { clearCalls atomic.Int32 @@ -83,103 +50,147 @@ func newZKFallbackSubmitter(backlog proofProducer.ZKBacklogController) *ProofSub } } -func TestDecideUseZKMachineDisabled(t *testing.T) { - s := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(0), zkBacklog: &fakeZKBacklog{}} - require.True(t, s.decideUseZK(t.Context(), big.NewInt(1000), big.NewInt(1))) - require.False(t, s.inSGXFallback()) +type ZKFallbackTestSuite struct { + suite.Suite +} + +func TestZKFallbackTestSuite(t *testing.T) { + suite.Run(t, new(ZKFallbackTestSuite)) +} + +func (s *ZKFallbackTestSuite) TestMarkSGXFallbackOnlyFirstCallerWins() { + sub := &ProofSubmitter{} + s.False(sub.inSGXFallback()) + s.True(sub.markSGXFallback()) // first caller latches + s.False(sub.markSGXFallback()) // already latched + s.True(sub.inSGXFallback()) + + sub.resumeZK() + s.False(sub.inSGXFallback()) + s.True(sub.markSGXFallback()) // can latch again after a resume +} + +func (s *ZKFallbackTestSuite) TestMarkSGXFallbackConcurrentSingleWinner() { + sub := &ProofSubmitter{} + const n = 50 + var ( + wg sync.WaitGroup + winners atomic.Int32 + ) + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + if sub.markSGXFallback() { + winners.Add(1) + } + }() + } + wg.Wait() + s.Equal(int32(1), winners.Load()) + s.True(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKMachineDisabled() { + sub := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(0), zkBacklog: &fakeZKBacklog{}} + s.True(sub.decideUseZK(context.Background(), big.NewInt(1000), big.NewInt(1))) + s.False(sub.inSGXFallback()) } -func TestDecideUseZKNilBacklogFallsBackToStateless(t *testing.T) { - s := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(30)} // zkBacklog nil - require.True(t, s.decideUseZK(t.Context(), big.NewInt(40), big.NewInt(10))) // 40 <= 10+30 - require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) // 41 > 10+30 - require.False(t, s.inSGXFallback()) // never latches without control plane +func (s *ZKFallbackTestSuite) TestDecideUseZKNilBacklogFallsBackToStateless() { + sub := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(30)} // zkBacklog nil + // 40 <= 10+30 stays ZK; 41 > 10+30 skips ZK; neither latches without a control-plane client. + s.True(sub.decideUseZK(context.Background(), big.NewInt(40), big.NewInt(10))) + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.False(sub.inSGXFallback()) } -func TestDecideUseZKWithinDistanceStaysZK(t *testing.T) { - s := newZKFallbackSubmitter(&fakeZKBacklog{}) - require.True(t, s.decideUseZK(t.Context(), big.NewInt(40), big.NewInt(10))) - require.False(t, s.inSGXFallback()) +func (s *ZKFallbackTestSuite) TestDecideUseZKWithinDistanceStaysZK() { + sub := newZKFallbackSubmitter(&fakeZKBacklog{}) + s.True(sub.decideUseZK(context.Background(), big.NewInt(40), big.NewInt(10))) + s.False(sub.inSGXFallback()) } -func TestDecideUseZKBreachLatchesAndClearsOnce(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKBreachLatchesAndClearsOnce() { fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} - s := newZKFallbackSubmitter(fake) + sub := newZKFallbackSubmitter(fake) - require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) // breach - require.True(t, s.inSGXFallback()) + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) // breach + s.True(sub.inSGXFallback()) select { case <-fake.cleared: case <-time.After(time.Second): - t.Fatal("clear was not called") + s.FailNow("clear was not called") } // A second breach while latched must not clear again. - require.False(t, s.decideUseZK(t.Context(), big.NewInt(50), big.NewInt(10))) - require.Equal(t, int32(1), fake.clearCalls.Load()) + s.False(sub.decideUseZK(context.Background(), big.NewInt(50), big.NewInt(10))) + s.Equal(int32(1), fake.clearCalls.Load()) } -func TestFireClearAsyncRetriesThenGivesUp(t *testing.T) { +func (s *ZKFallbackTestSuite) TestFireClearAsyncRetriesThenGivesUp() { fake := &fakeZKBacklog{clearErr: errors.New("clear failed")} - s := newZKFallbackSubmitter(fake) - s.ctx = t.Context() + sub := newZKFallbackSubmitter(fake) - // Distance breach: 41 > 10 + 30 -> latch + fireClearAsync. - require.False(t, s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10))) - require.True(t, s.inSGXFallback()) // latched even though clear will fail + // Distance breach: 41 > 10 + 30 -> latch + fireClearAsync (which will keep failing). + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.True(sub.inSGXFallback()) // fireClearAsync retries 1 + clearBackoffMaxRetries times, then gives up. - require.Eventually(t, func() bool { + s.Eventually(func() bool { return fake.clearCalls.Load() == int32(clearBackoffMaxRetries)+1 }, 2*time.Second, 5*time.Millisecond) // Still latched after clear ultimately failed (best-effort; resume is gated elsewhere). - require.True(t, s.inSGXFallback()) + s.True(sub.inSGXFallback()) } -func TestDecideUseZKResumeWhenDrainedAndClean(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKResumeWhenDrainedAndClean() { fake := &fakeZKBacklog{clean: true} - s := newZKFallbackSubmitter(fake) - require.True(t, s.markSGXFallback()) + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) - require.True(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) // (A) 11<=10+1, clean - require.False(t, s.inSGXFallback()) - require.Equal(t, int32(1), fake.statusCalls.Load()) + // (A) 11 <= 10+1 holds and status is clean -> resume ZK. + s.True(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.False(sub.inSGXFallback()) + s.Equal(int32(1), fake.statusCalls.Load()) } -func TestDecideUseZKStaysSGXWhenNotDrained(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKStaysSGXWhenNotDrained() { fake := &fakeZKBacklog{clean: true} - s := newZKFallbackSubmitter(fake) - require.True(t, s.markSGXFallback()) + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) - require.False(t, s.decideUseZK(t.Context(), big.NewInt(20), big.NewInt(10))) // (A) fails - require.True(t, s.inSGXFallback()) - require.Equal(t, int32(0), fake.statusCalls.Load()) // status not queried until (A) holds + // (A) fails (20 > 10+1) -> stay SGX; status must not be queried until (A) holds. + s.False(sub.decideUseZK(context.Background(), big.NewInt(20), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(int32(0), fake.statusCalls.Load()) } -func TestDecideUseZKStaysSGXWhenNotClean(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKStaysSGXWhenNotClean() { fake := &fakeZKBacklog{clean: false} - s := newZKFallbackSubmitter(fake) - require.True(t, s.markSGXFallback()) + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) - require.False(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) - require.True(t, s.inSGXFallback()) - require.Equal(t, int32(1), fake.statusCalls.Load()) + s.False(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(int32(1), fake.statusCalls.Load()) } -func TestDecideUseZKDegradesOnStatusError(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKDegradesOnStatusError() { fake := &fakeZKBacklog{statusErr: errors.New("status endpoint absent")} - s := newZKFallbackSubmitter(fake) - require.True(t, s.markSGXFallback()) + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) - require.True(t, s.decideUseZK(t.Context(), big.NewInt(11), big.NewInt(10))) // (A) holds, status errors -> degrade -> resume - require.False(t, s.inSGXFallback()) + // (A) holds but status errors -> degrade -> resume on backlog-drained alone. + s.True(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.False(sub.inSGXFallback()) } -func TestDecideUseZKConcurrentBreachClearsOnce(t *testing.T) { +func (s *ZKFallbackTestSuite) TestDecideUseZKConcurrentBreachClearsOnce() { fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} - s := newZKFallbackSubmitter(fake) + sub := newZKFallbackSubmitter(fake) const n = 50 var ( @@ -191,19 +202,19 @@ func TestDecideUseZKConcurrentBreachClearsOnce(t *testing.T) { go func() { defer wg.Done() // 41 > 10 + 30 -> every caller sees a breach and must not use ZK. - if s.decideUseZK(t.Context(), big.NewInt(41), big.NewInt(10)) { + if sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10)) { nonBreach.Add(1) } }() } wg.Wait() - require.Equal(t, int32(0), nonBreach.Load()) // every caller saw the breach - require.True(t, s.inSGXFallback()) + s.Equal(int32(0), nonBreach.Load()) // every caller saw the breach + s.True(sub.inSGXFallback()) select { case <-fake.cleared: case <-time.After(time.Second): - t.Fatal("clear was not called") + s.FailNow("clear was not called") } - require.Equal(t, int32(1), fake.clearCalls.Load()) + s.Equal(int32(1), fake.clearCalls.Load()) } From ccd20d407e8dfebfa1aee3c923ae54ff646a9f03 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 11:22:37 +0800 Subject: [PATCH 09/13] refactor(taiko-client): rename machineSaysZK to backlogAllowsZK Co-Authored-By: Claude Opus 4.8 --- .../taiko-client/prover/proof_submitter/proof_submitter.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index 94ea23e670..c936180f56 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -223,15 +223,15 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoPr ) return ErrProposalOutOfAllowedRange } - // machineSaysZK is the drain/resume state machine's verdict for this proposal + // backlogAllowsZK is the drain/resume state machine's verdict for this proposal // (it may latch SGX mode + fire a one-off backlog clear, or resume ZK; see // zk_fallback.go). useZK is the per-call fallback (zk_any_not_drawn / timeout) // that sticks for the rest of this call. ZK is used only when both agree; once // either goes false the ZK path stays off for the remaining retries. - machineSaysZK := s.decideUseZK(ctx, proposalID, lastFinalizedProposalID) + backlogAllowsZK := s.decideUseZK(ctx, proposalID, lastFinalizedProposalID) // If zk proof is enabled, request zk proof first, and check if ZK proof is drawn. - if s.zkvmProofProducer != nil && useZK && machineSaysZK { + if s.zkvmProofProducer != nil && useZK && backlogAllowsZK { if proofResponse, err = s.zkvmProofProducer.RequestProof( ctx, opts, From 9434749b8668651ecfc492ea12ddd9c2c045921f Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 13:13:42 +0800 Subject: [PATCH 10/13] test(taiko-client): split zk_backlog suite subtests into methods Replace the s.Run subtests in TestStatusClean/TestClearBacklog with one suite method per case. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_producer/zk_backlog_test.go | 148 +++++++++--------- 1 file changed, 72 insertions(+), 76 deletions(-) diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go index e90c1ceb6e..14bcc407a6 100644 --- a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go +++ b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go @@ -20,82 +20,78 @@ func TestZKBacklogTestSuite(t *testing.T) { suite.Run(t, new(ZKBacklogTestSuite)) } -func (s *ZKBacklogTestSuite) TestStatusClean() { - s.Run("data.clean true", func() { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - s.Equal(http.MethodGet, r.Method) - s.Equal("/v3/prover/status", r.URL.Path) - _, _ = w.Write([]byte( - `{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`, - )) - })) - defer server.Close() - - p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - clean, err := p.StatusClean(s.T().Context()) - s.NoError(err) - s.True(clean) - }) - - s.Run("data.clean false", func() { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - _, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`)) - })) - defer server.Close() - - p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - clean, err := p.StatusClean(s.T().Context()) - s.NoError(err) - s.False(clean) - }) - - s.Run("non-200 errors", func() { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusNotFound) - })) - defer server.Close() - - p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - _, err := p.StatusClean(s.T().Context()) - s.Error(err) - }) - - s.Run("dummy short-circuits to clean", func() { - p := &ComposeProofProducer{Dummy: true} - clean, err := p.StatusClean(s.T().Context()) - s.NoError(err) - s.True(clean) - }) +func (s *ZKBacklogTestSuite) TestStatusCleanReturnsTrue() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.Equal(http.MethodGet, r.Method) + s.Equal("/v3/prover/status", r.URL.Path) + _, _ = w.Write([]byte( + `{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`, + )) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) } -func (s *ZKBacklogTestSuite) TestClearBacklog() { - s.Run("posts to clear endpoint", func() { - var called bool - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - called = true - s.Equal(http.MethodPost, r.Method) - s.Equal("/v3/prover/clear", r.URL.Path) - _, _ = w.Write([]byte(`{"status":"ok"}`)) - })) - defer server.Close() - - p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - s.NoError(p.ClearBacklog(s.T().Context())) - s.True(called) - }) - - s.Run("non-200 errors", func() { - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - w.WriteHeader(http.StatusInternalServerError) - })) - defer server.Close() - - p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} - s.Error(p.ClearBacklog(s.T().Context())) - }) - - s.Run("dummy short-circuits to nil", func() { - p := &ComposeProofProducer{Dummy: true} - s.NoError(p.ClearBacklog(s.T().Context())) - }) +func (s *ZKBacklogTestSuite) TestStatusCleanReturnsFalse() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.False(clean) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanErrorsOnNon200() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + _, err := p.StatusClean(s.T().Context()) + s.Error(err) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanDummyShortCircuits() { + p := &ComposeProofProducer{Dummy: true} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogPostsToEndpoint() { + var called bool + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + s.Equal(http.MethodPost, r.Method) + s.Equal("/v3/prover/clear", r.URL.Path) + _, _ = w.Write([]byte(`{"status":"ok"}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + s.NoError(p.ClearBacklog(s.T().Context())) + s.True(called) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogErrorsOnNon200() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + s.Error(p.ClearBacklog(s.T().Context())) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogDummyShortCircuits() { + p := &ComposeProofProducer{Dummy: true} + s.NoError(p.ClearBacklog(s.T().Context())) } From f180065d1397f10bcb591cae0f2655bfc36a829a Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 13:17:30 +0800 Subject: [PATCH 11/13] refactor(taiko-client): use resty for raiko HTTP requests (#21796) Co-authored-by: Claude Opus 4.8 --- .../prover/proof_producer/common.go | 86 ++++++++----------- .../prover/proof_producer/zk_backlog.go | 47 +--------- 2 files changed, 38 insertions(+), 95 deletions(-) diff --git a/packages/taiko-client/prover/proof_producer/common.go b/packages/taiko-client/prover/proof_producer/common.go index 08cc8da191..6e0daaa670 100644 --- a/packages/taiko-client/prover/proof_producer/common.go +++ b/packages/taiko-client/prover/proof_producer/common.go @@ -1,15 +1,13 @@ package producer import ( - "bytes" "context" - "encoding/json" "fmt" - "io" "net/http" "time" "github.com/ethereum/go-ethereum/log" + "github.com/go-resty/resty/v2" "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" ) @@ -67,73 +65,57 @@ type ProofDataV2 struct { Quote string `json:"quote"` } -// requestHTTPProof sends a POST request to the given URL with the given ApiKey and request body, -// to get a proof of the given type. -func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) { - res, err := requestHTTPProofResponse(ctx, url, apiKey, reqBody) - if err != nil { - return nil, err - } - defer res.Body.Close() - - resBytes, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - - log.Debug("Proof generation output", "url", url, "output", string(resBytes)) - var output U - if err := json.Unmarshal(resBytes, &output); err != nil { - return nil, err - } - - return &output, nil -} +// raikoHTTPClient is the shared resty client used for all raiko HTTP requests. +// Each request carries its own deadline via the per-call context, matching the +// previous net/http behavior. +var raikoHTTPClient = resty.New() -// requestHTTPProofResponse sends a POST request to the given URL with the given ApiKey and request body, -// and returns the raw HTTP response, the caller is responsible for closing the response body. -func requestHTTPProofResponse[T any]( +// requestRaiko sends an HTTP request to a raiko endpoint with the shared resty +// client and unmarshals a successful (HTTP 200) JSON response into U. A nil body +// is sent without a request body; a non-nil body is JSON-encoded. The response is +// always parsed as JSON, regardless of the Content-Type the server returns. +func requestRaiko[U any]( ctx context.Context, + method string, url string, apiKey string, - reqBody T, -) (*http.Response, error) { - client := http.DefaultClient - - jsonValue, err := json.Marshal(reqBody) - if err != nil { - return nil, err - } - - log.Debug("Requesting proof", "url", url, "body", string(jsonValue)) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonValue)) - if err != nil { - return nil, err + body any, +) (*U, error) { + var output U + req := raikoHTTPClient.R(). + SetContext(ctx). + ForceContentType("application/json"). + SetResult(&output) + if body != nil { + req = req.SetHeader("Content-Type", "application/json").SetBody(body) } - req.Header.Set("Content-Type", "application/json") if len(apiKey) > 0 { - req.Header.Set("X-API-KEY", apiKey) + req = req.SetHeader("X-API-KEY", apiKey) } - res, err := client.Do(req) + log.Debug("Requesting raiko", "url", url, "method", method) + resp, err := req.Execute(method, url) if err != nil { return nil, err } - if res.StatusCode != http.StatusOK { + if resp.StatusCode() != http.StatusOK { // Check for rate limiting (429 Too Many Requests) - if res.StatusCode == http.StatusTooManyRequests { + if resp.StatusCode() == http.StatusTooManyRequests { log.Error("Rate limit on L2 RPC has been reached. Using your own Taiko L2 node as RPC for Raiko is recommended") } - return nil, fmt.Errorf( - "failed to request proof, url: %s, statusCode: %d", - url, - res.StatusCode, - ) + return nil, fmt.Errorf("failed to request raiko, url: %s, statusCode: %d", url, resp.StatusCode()) } - return res, nil + log.Debug("Raiko response", "url", url, "body", string(resp.Body())) + return &output, nil +} + +// requestHTTPProof sends a POST request with the given JSON body to a raiko proof +// endpoint and unmarshals the response into U. +func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) { + return requestRaiko[U](ctx, http.MethodPost, url, apiKey, reqBody) } // updateProvingMetrics updates the metrics for the given proof type, including diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog.go b/packages/taiko-client/prover/proof_producer/zk_backlog.go index 20c95df604..c6d73e0d38 100644 --- a/packages/taiko-client/prover/proof_producer/zk_backlog.go +++ b/packages/taiko-client/prover/proof_producer/zk_backlog.go @@ -2,9 +2,7 @@ package producer import ( "context" - "encoding/json" "fmt" - "io" "net/http" "github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc" @@ -31,45 +29,6 @@ type raikoProverStatusResponse struct { } `json:"data"` } -// requestRaikoControlPlane sends a bodyless request (GET or POST) to a raiko2 -// control-plane endpoint and unmarshals the JSON response into U. A non-200 -// status code (including 404 when the endpoint is absent) returns an error. -func requestRaikoControlPlane[U any]( - ctx context.Context, - method string, - url string, - apiKey string, -) (*U, error) { - req, err := http.NewRequestWithContext(ctx, method, url, nil) - if err != nil { - return nil, err - } - if len(apiKey) > 0 { - req.Header.Set("X-API-KEY", apiKey) - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - defer res.Body.Close() - - if res.StatusCode != http.StatusOK { - return nil, fmt.Errorf("unexpected status code from %s: %d", url, res.StatusCode) - } - - resBytes, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - - var output U - if err := json.Unmarshal(resBytes, &output); err != nil { - return nil, err - } - return &output, nil -} - // ClearBacklog implements the ZKBacklogController interface. func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error { if s.Dummy { @@ -79,11 +38,12 @@ func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error { defer cancel() // The clear endpoint only needs to return HTTP 200; its response body is unused. - if _, err := requestRaikoControlPlane[struct{}]( + if _, err := requestRaiko[struct{}]( ctx, http.MethodPost, s.RaikoHostEndpoint+"/v3/prover/clear", s.ApiKey, + nil, ); err != nil { return fmt.Errorf("failed to clear ZK backlog: %w", err) } @@ -98,11 +58,12 @@ func (s *ComposeProofProducer) StatusClean(ctx context.Context) (bool, error) { ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) defer cancel() - out, err := requestRaikoControlPlane[raikoProverStatusResponse]( + out, err := requestRaiko[raikoProverStatusResponse]( ctx, http.MethodGet, s.RaikoHostEndpoint+"/v3/prover/status", s.ApiKey, + nil, ) if err != nil { return false, fmt.Errorf("failed to get ZK prover status: %w", err) From 8d8d742b0a95e2869e7c81a39d7d4fb93799d598 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 13:34:25 +0800 Subject: [PATCH 12/13] fix(taiko-client): preserve skip-ZK semantics for maxZKProofProposalDistance=0 decideUseZK short-circuited a non-positive distance to "always use ZK", flipping the pre-#21795 behavior where 0 meant "skip ZK" (proposalID > lastFinalized+0 is always true). Delegate the inactive-machine guard to the stateless shouldUseZKProof so 0/nil/N all match the prior behavior, and clarify the flag help. Co-Authored-By: Claude Opus 4.8 --- packages/taiko-client/cmd/flags/prover.go | 3 ++- .../prover/proof_submitter/zk_fallback.go | 15 +++++++-------- .../prover/proof_submitter/zk_fallback_test.go | 6 ++++-- 3 files changed, 13 insertions(+), 11 deletions(-) diff --git a/packages/taiko-client/cmd/flags/prover.go b/packages/taiko-client/cmd/flags/prover.go index 327bcadd90..191440b467 100644 --- a/packages/taiko-client/cmd/flags/prover.go +++ b/packages/taiko-client/cmd/flags/prover.go @@ -76,7 +76,8 @@ var ( Usage: "The maximum proposal distance counted from lastFinalizedProposalID for requesting ZK proofs. " + "When proposalID exceeds lastFinalizedProposalID + maxZKProofProposalDistance, the prover stops " + "requesting ZK proofs, clears the ZK backlog, and drains via the base (SGX) proof until the backlog " + - "is cleared and the ZK endpoint reports clean, then resumes ZK. Set to 0 to disable. Post Shasta fork only.", + "is cleared and the ZK endpoint reports clean, then resumes ZK. Set to 0 to disable ZK proving " + + "(always use the base/SGX proof). Post Shasta fork only.", Value: 30, Category: proverCategory, EnvVars: []string{"PROVER_MAX_ZK_PROOF_PROPOSAL_DISTANCE"}, diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go index 03809fa3bb..a671a366f6 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -67,14 +67,13 @@ func (s *ProofSubmitter) decideUseZK( proposalID *big.Int, lastFinalizedProposalID *big.Int, ) bool { - // Machine disabled when no distance is configured. - if s.maxZKProofProposalDistance == nil || s.maxZKProofProposalDistance.Sign() <= 0 { - return true - } - // No ZK endpoint configured (no control-plane client): bypass the drain/resume - // machine and keep the stateless distance check. This also guarantees zkBacklog - // is non-nil below, so canResumeZK/fireClearAsync can dereference it safely. - if s.zkBacklog == nil { + // Machine inactive: no positive distance configured, or no control-plane client. + // Preserve the stateless behavior from #21782 (nil = use ZK, 0 = skip ZK, + // N = within N proposals). When zkBacklog is nil this also guarantees the machine + // paths below never dereference it (canResumeZK/fireClearAsync). + if s.maxZKProofProposalDistance == nil || + s.maxZKProofProposalDistance.Sign() <= 0 || + s.zkBacklog == nil { return s.shouldUseZKProof(proposalID, lastFinalizedProposalID) } diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 04fe0b101b..769afba058 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -91,9 +91,11 @@ func (s *ZKFallbackTestSuite) TestMarkSGXFallbackConcurrentSingleWinner() { s.True(sub.inSGXFallback()) } -func (s *ZKFallbackTestSuite) TestDecideUseZKMachineDisabled() { +func (s *ZKFallbackTestSuite) TestDecideUseZKDistanceZeroSkipsZK() { + // distance 0 preserves the pre-#21795 stateless behavior: never use ZK, and the + // drain/resume machine stays inactive (no latch, no clear). sub := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(0), zkBacklog: &fakeZKBacklog{}} - s.True(sub.decideUseZK(context.Background(), big.NewInt(1000), big.NewInt(1))) + s.False(sub.decideUseZK(context.Background(), big.NewInt(1000), big.NewInt(1))) s.False(sub.inSGXFallback()) } From a816c92f42c39ac6ed464f567fa1ded59cef0926 Mon Sep 17 00:00:00 2001 From: David Date: Tue, 16 Jun 2026 13:42:33 +0800 Subject: [PATCH 13/13] feat(taiko-client): flush local ZK proof buffers on fallback + debug logs On entering SGX-draining mode, also clear the local ZKR0/ZKSP1 proof buffers and caches and resend those proposals to the submission channel so they are re-proven via SGX (avoids stranding a partially-filled ZK batch); see taikoxyz#21794. Also log the raiko request body alongside the response at debug level. Co-Authored-By: Claude Opus 4.8 --- .../prover/proof_producer/common.go | 2 +- .../prover/proof_submitter/zk_fallback.go | 70 +++++++++++++++++++ .../proof_submitter/zk_fallback_test.go | 50 +++++++++++++ 3 files changed, 121 insertions(+), 1 deletion(-) diff --git a/packages/taiko-client/prover/proof_producer/common.go b/packages/taiko-client/prover/proof_producer/common.go index 6e0daaa670..c1c137ba5b 100644 --- a/packages/taiko-client/prover/proof_producer/common.go +++ b/packages/taiko-client/prover/proof_producer/common.go @@ -93,7 +93,7 @@ func requestRaiko[U any]( req = req.SetHeader("X-API-KEY", apiKey) } - log.Debug("Requesting raiko", "url", url, "method", method) + log.Debug("Requesting raiko", "url", url, "method", method, "body", body) resp, err := req.Execute(method, url) if err != nil { return nil, err diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go index a671a366f6..c65409c6b7 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -10,6 +10,7 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" + proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" ) // clearBackoffMaxRetries bounds the best-effort retries of POST /v3/prover/clear @@ -99,6 +100,7 @@ func (s *ProofSubmitter) decideUseZK( "lastFinalizedProposalID", lastFinalizedProposalID, "maxZKProofProposalDistance", s.maxZKProofProposalDistance, ) + s.clearZKProofBuffersAndResend() s.fireClearAsync() } return false @@ -157,3 +159,71 @@ func (s *ProofSubmitter) fireClearAsync() { log.Info("Cleared ZK backlog after entering SGX-draining mode") }() } + +// zkProofTypes are the ZK proof types whose local buffers and caches are flushed +// when entering SGX-draining mode. +var zkProofTypes = []proofProducer.ProofType{ + proofProducer.ProofTypeZKR0, + proofProducer.ProofTypeZKSP1, +} + +// clearZKProofBuffersAndResend discards any buffered or cached ZK proofs and +// re-enqueues their proposals so they are re-proven via SGX while draining. This +// prevents a partially-filled ZK proof batch from stranding once new ZK requests +// stop. +func (s *ProofSubmitter) clearZKProofBuffersAndResend() { + for _, proofType := range zkProofTypes { + s.clearProofBufferAndResend(proofType) + } +} + +// clearProofBufferAndResend flushes the buffer and cache for the given proof type +// and resends each cleared proposal to the submission channel. +func (s *ProofSubmitter) clearProofBufferAndResend(proofType proofProducer.ProofType) { + proofBuffer, ok := s.proofBuffers[proofType] + if !ok { + return + } + cacheMap, ok := s.proofCacheMaps[proofType] + if !ok { + return + } + + buffered, err := proofBuffer.ReadAll() + if err != nil { + log.Warn("Failed to read ZK proof buffer for resend", "proofType", proofType, "error", err) + return + } + + resend := make([]*proofProducer.ProofResponse, 0, len(buffered)+cacheMap.Count()) + batchIDs := make([]uint64, 0, len(buffered)) + for _, proof := range buffered { + if proof == nil || proof.BatchID == nil { + continue + } + resend = append(resend, proof) + batchIDs = append(batchIDs, proof.BatchID.Uint64()) + } + proofBuffer.ClearItems(batchIDs...) + + for item := range cacheMap.IterBuffered() { + if item.Val != nil { + resend = append(resend, item.Val) + } + cacheMap.Remove(item.Key) + } + + for _, proof := range resend { + if proof.Meta == nil { + continue + } + s.proofSubmissionCh <- &proofProducer.ProofRequestBody{Meta: proof.Meta} + } + if len(resend) > 0 { + log.Info( + "Cleared ZK proof buffer and resent proposals for SGX draining", + "proofType", proofType, + "count", len(resend), + ) + } +} diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go index 769afba058..882a0dcfe1 100644 --- a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -9,6 +9,7 @@ import ( "testing" "time" + cmap "github.com/orcaman/concurrent-map/v2" "github.com/stretchr/testify/suite" proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" @@ -220,3 +221,52 @@ func (s *ZKFallbackTestSuite) TestDecideUseZKConcurrentBreachClearsOnce() { } s.Equal(int32(1), fake.clearCalls.Load()) } + +func (s *ZKFallbackTestSuite) TestDecideUseZKBreachClearsLocalZKBuffers() { + const zkType = proofProducer.ProofTypeZKR0 + buffer := proofProducer.NewProofBuffer(8) + cache := cmap.New[*proofProducer.ProofResponse]() + + // One buffered proof and one cached proof of the ZK type. + _, err := buffer.Write(&proofProducer.ProofResponse{BatchID: big.NewInt(5), Meta: newShastaMetaForTest(5)}) + s.NoError(err) + cache.Set("6", &proofProducer.ProofResponse{BatchID: big.NewInt(6), Meta: newShastaMetaForTest(6)}) + + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + ch := make(chan *proofProducer.ProofRequestBody, 8) + sub := &ProofSubmitter{ + maxZKProofProposalDistance: big.NewInt(30), + zkBacklog: fake, + proofPollingInterval: time.Millisecond, + ctx: context.Background(), + proofBuffers: map[proofProducer.ProofType]*proofProducer.ProofBuffer{zkType: buffer}, + proofCacheMaps: map[proofProducer.ProofType]cmap.ConcurrentMap[string, *proofProducer.ProofResponse]{ + zkType: cache, + }, + proofSubmissionCh: ch, + } + + // Breach (41 > 10+30): latch, flush local ZK buffer/cache, resend, clear raiko. + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(0, buffer.Len()) + s.Equal(0, cache.Count()) + + resent := map[uint64]bool{} + for i := 0; i < 2; i++ { + select { + case req := <-ch: + resent[req.Meta.GetProposalID().Uint64()] = true + case <-time.After(time.Second): + s.FailNow("expected resend on proofSubmissionCh") + } + } + s.True(resent[5]) + s.True(resent[6]) + + select { + case <-fake.cleared: + case <-time.After(time.Second): + s.FailNow("expected raiko backlog clear") + } +}