diff --git a/packages/taiko-client/cmd/flags/prover.go b/packages/taiko-client/cmd/flags/prover.go index 1a3e655e2a..191440b467 100644 --- a/packages/taiko-client/cmd/flags/prover.go +++ b/packages/taiko-client/cmd/flags/prover.go @@ -74,8 +74,10 @@ var ( MaxZKProofProposalDistance = &cli.Uint64Flag{ Name: "prover.maxZKProofProposalDistance", Usage: "The maximum proposal distance counted from lastFinalizedProposalID for requesting ZK proofs. " + - "When proposalID is greater than lastFinalizedProposalID + maxZKProofProposalDistance, " + - "the prover skips ZK proof and requests base proof instead. This flag only works for post Shasta fork. ", + "When proposalID exceeds lastFinalizedProposalID + maxZKProofProposalDistance, the prover stops " + + "requesting ZK proofs, clears the ZK backlog, and drains via the base (SGX) proof until the backlog " + + "is cleared and the ZK endpoint reports clean, then resumes ZK. Set to 0 to disable ZK proving " + + "(always use the base/SGX proof). Post Shasta fork only.", Value: 30, Category: proverCategory, EnvVars: []string{"PROVER_MAX_ZK_PROOF_PROPOSAL_DISTANCE"}, diff --git a/packages/taiko-client/internal/metrics/metrics.go b/packages/taiko-client/internal/metrics/metrics.go index b80a3dd545..db06e9a0d1 100644 --- a/packages/taiko-client/internal/metrics/metrics.go +++ b/packages/taiko-client/internal/metrics/metrics.go @@ -96,6 +96,8 @@ var ( ProverQueuedProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_queued"}) ProverSentProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_sent"}) ProverProofsAssigned = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_assigned"}) + ProverZKBacklogModeGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_zk_backlog_sgx_mode"}) + ProverZKBacklogClearCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_zk_backlog_clear"}) ProverReceivedProposedBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proposed_received"}) ProverReceivedProvenBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proven_received"}) ProverSubmissionAcceptedCounter = factory.NewCounter(prometheus.CounterOpts{ diff --git a/packages/taiko-client/prover/proof_producer/common.go b/packages/taiko-client/prover/proof_producer/common.go index 08cc8da191..c1c137ba5b 100644 --- a/packages/taiko-client/prover/proof_producer/common.go +++ b/packages/taiko-client/prover/proof_producer/common.go @@ -1,15 +1,13 @@ package producer import ( - "bytes" "context" - "encoding/json" "fmt" - "io" "net/http" "time" "github.com/ethereum/go-ethereum/log" + "github.com/go-resty/resty/v2" "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" ) @@ -67,73 +65,57 @@ type ProofDataV2 struct { Quote string `json:"quote"` } -// requestHTTPProof sends a POST request to the given URL with the given ApiKey and request body, -// to get a proof of the given type. -func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) { - res, err := requestHTTPProofResponse(ctx, url, apiKey, reqBody) - if err != nil { - return nil, err - } - defer res.Body.Close() - - resBytes, err := io.ReadAll(res.Body) - if err != nil { - return nil, err - } - - log.Debug("Proof generation output", "url", url, "output", string(resBytes)) - var output U - if err := json.Unmarshal(resBytes, &output); err != nil { - return nil, err - } - - return &output, nil -} +// raikoHTTPClient is the shared resty client used for all raiko HTTP requests. +// Each request carries its own deadline via the per-call context, matching the +// previous net/http behavior. +var raikoHTTPClient = resty.New() -// requestHTTPProofResponse sends a POST request to the given URL with the given ApiKey and request body, -// and returns the raw HTTP response, the caller is responsible for closing the response body. -func requestHTTPProofResponse[T any]( +// requestRaiko sends an HTTP request to a raiko endpoint with the shared resty +// client and unmarshals a successful (HTTP 200) JSON response into U. A nil body +// is sent without a request body; a non-nil body is JSON-encoded. The response is +// always parsed as JSON, regardless of the Content-Type the server returns. +func requestRaiko[U any]( ctx context.Context, + method string, url string, apiKey string, - reqBody T, -) (*http.Response, error) { - client := http.DefaultClient - - jsonValue, err := json.Marshal(reqBody) - if err != nil { - return nil, err - } - - log.Debug("Requesting proof", "url", url, "body", string(jsonValue)) - req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonValue)) - if err != nil { - return nil, err + body any, +) (*U, error) { + var output U + req := raikoHTTPClient.R(). + SetContext(ctx). + ForceContentType("application/json"). + SetResult(&output) + if body != nil { + req = req.SetHeader("Content-Type", "application/json").SetBody(body) } - req.Header.Set("Content-Type", "application/json") if len(apiKey) > 0 { - req.Header.Set("X-API-KEY", apiKey) + req = req.SetHeader("X-API-KEY", apiKey) } - res, err := client.Do(req) + log.Debug("Requesting raiko", "url", url, "method", method, "body", body) + resp, err := req.Execute(method, url) if err != nil { return nil, err } - if res.StatusCode != http.StatusOK { + if resp.StatusCode() != http.StatusOK { // Check for rate limiting (429 Too Many Requests) - if res.StatusCode == http.StatusTooManyRequests { + if resp.StatusCode() == http.StatusTooManyRequests { log.Error("Rate limit on L2 RPC has been reached. Using your own Taiko L2 node as RPC for Raiko is recommended") } - return nil, fmt.Errorf( - "failed to request proof, url: %s, statusCode: %d", - url, - res.StatusCode, - ) + return nil, fmt.Errorf("failed to request raiko, url: %s, statusCode: %d", url, resp.StatusCode()) } - return res, nil + log.Debug("Raiko response", "url", url, "body", string(resp.Body())) + return &output, nil +} + +// requestHTTPProof sends a POST request with the given JSON body to a raiko proof +// endpoint and unmarshals the response into U. +func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) { + return requestRaiko[U](ctx, http.MethodPost, url, apiKey, reqBody) } // updateProvingMetrics updates the metrics for the given proof type, including diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog.go b/packages/taiko-client/prover/proof_producer/zk_backlog.go new file mode 100644 index 0000000000..c6d73e0d38 --- /dev/null +++ b/packages/taiko-client/prover/proof_producer/zk_backlog.go @@ -0,0 +1,72 @@ +package producer + +import ( + "context" + "fmt" + "net/http" + + "github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc" +) + +// ZKBacklogController is implemented by proof producers whose backend exposes the +// raiko2 control-plane endpoints for draining the ZK (`zk_any`) task backlog and +// reporting when the backend is idle. See raiko2 issue #93. +type ZKBacklogController interface { + // ClearBacklog discards all non-terminal `zk_any` tasks on the ZK backend + // (POST /v3/prover/clear). + ClearBacklog(ctx context.Context) error + // StatusClean reports whether the ZK backend is fully idle, i.e. the + // `data.clean` field of GET /v3/prover/status is true. + StatusClean(ctx context.Context) (bool, error) +} + +// raikoProverStatusResponse is the body returned by GET /v3/prover/status. Only +// `data.clean` is consumed; the remaining fields (`status`, `tasks`, `network`) +// are intentionally ignored. +type raikoProverStatusResponse struct { + Data struct { + Clean bool `json:"clean"` + } `json:"data"` +} + +// ClearBacklog implements the ZKBacklogController interface. +func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error { + if s.Dummy { + return nil + } + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + // The clear endpoint only needs to return HTTP 200; its response body is unused. + if _, err := requestRaiko[struct{}]( + ctx, + http.MethodPost, + s.RaikoHostEndpoint+"/v3/prover/clear", + s.ApiKey, + nil, + ); err != nil { + return fmt.Errorf("failed to clear ZK backlog: %w", err) + } + return nil +} + +// StatusClean implements the ZKBacklogController interface. +func (s *ComposeProofProducer) StatusClean(ctx context.Context) (bool, error) { + if s.Dummy { + return true, nil + } + ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout) + defer cancel() + + out, err := requestRaiko[raikoProverStatusResponse]( + ctx, + http.MethodGet, + s.RaikoHostEndpoint+"/v3/prover/status", + s.ApiKey, + nil, + ) + if err != nil { + return false, fmt.Errorf("failed to get ZK prover status: %w", err) + } + return out.Data.Clean, nil +} diff --git a/packages/taiko-client/prover/proof_producer/zk_backlog_test.go b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go new file mode 100644 index 0000000000..14bcc407a6 --- /dev/null +++ b/packages/taiko-client/prover/proof_producer/zk_backlog_test.go @@ -0,0 +1,97 @@ +package producer + +import ( + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/suite" +) + +// Compile-time assertion that the ZK compose producer satisfies the interface. +var _ ZKBacklogController = (*ComposeProofProducer)(nil) + +type ZKBacklogTestSuite struct { + suite.Suite +} + +func TestZKBacklogTestSuite(t *testing.T) { + suite.Run(t, new(ZKBacklogTestSuite)) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanReturnsTrue() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + s.Equal(http.MethodGet, r.Method) + s.Equal("/v3/prover/status", r.URL.Path) + _, _ = w.Write([]byte( + `{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`, + )) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanReturnsFalse() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.False(clean) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanErrorsOnNon200() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusNotFound) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + _, err := p.StatusClean(s.T().Context()) + s.Error(err) +} + +func (s *ZKBacklogTestSuite) TestStatusCleanDummyShortCircuits() { + p := &ComposeProofProducer{Dummy: true} + clean, err := p.StatusClean(s.T().Context()) + s.NoError(err) + s.True(clean) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogPostsToEndpoint() { + var called bool + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + called = true + s.Equal(http.MethodPost, r.Method) + s.Equal("/v3/prover/clear", r.URL.Path) + _, _ = w.Write([]byte(`{"status":"ok"}`)) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + s.NoError(p.ClearBacklog(s.T().Context())) + s.True(called) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogErrorsOnNon200() { + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.WriteHeader(http.StatusInternalServerError) + })) + defer server.Close() + + p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second} + s.Error(p.ClearBacklog(s.T().Context())) +} + +func (s *ZKBacklogTestSuite) TestClearBacklogDummyShortCircuits() { + p := &ComposeProofProducer{Dummy: true} + s.NoError(p.ClearBacklog(s.T().Context())) +} diff --git a/packages/taiko-client/prover/proof_submitter/proof_submitter.go b/packages/taiko-client/prover/proof_submitter/proof_submitter.go index e0d8321ad4..c936180f56 100644 --- a/packages/taiko-client/prover/proof_submitter/proof_submitter.go +++ b/packages/taiko-client/prover/proof_submitter/proof_submitter.go @@ -58,6 +58,12 @@ type ProofSubmitter struct { proofPollingInterval time.Duration proposalWindowSize *big.Int maxZKProofProposalDistance *big.Int + // ZK backlog drain/resume state machine (see zk_fallback.go). + zkBacklog proofProducer.ZKBacklogController + zkFallback zkFallback + // ctx is the prover's long-lived context, used by background goroutines + // (e.g. the ZK backlog clear) that must outlive a single RequestProof call. + ctx context.Context } // NewProofSubmitter creates a new ProofSubmitter instance. @@ -100,6 +106,17 @@ func NewProofSubmitter( flushCacheNotify: flushCacheNotify, proposalWindowSize: proposalWindowSize, maxZKProofProposalDistance: maxZKProofProposalDistance, + ctx: ctx, + } + + // Wire the raiko2 control-plane client (ClearBacklog/StatusClean) when a ZK + // producer is configured. This is a compile-time capability check, not a probe + // of the remote host: with no ZK endpoint set, zkvmProofProducer is nil and + // zkBacklog stays nil, so decideUseZK bypasses the drain/resume machine. When a + // ZK endpoint IS set but its host predates raiko2 #93, the control-plane calls + // return 404 and the machine degrades by design (see canResumeZK). + if zkBacklog, ok := zkvmProofProducer.(proofProducer.ZKBacklogController); ok { + proofSubmitter.zkBacklog = zkBacklog } proofSubmitter.startBackgroundWorkers(ctx) @@ -206,18 +223,15 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoPr ) return ErrProposalOutOfAllowedRange } - if !s.shouldUseZKProof(proposalID, lastFinalizedProposalID) { - log.Info( - "Proposal too far from the last finalized proposal, skipping ZK proof", - "proposalID", proposalID, - "lastFinalizedProposalID", lastFinalizedProposalID, - "maxZKProofProposalDistance", s.maxZKProofProposalDistance, - ) - useZK = false - } + // backlogAllowsZK is the drain/resume state machine's verdict for this proposal + // (it may latch SGX mode + fire a one-off backlog clear, or resume ZK; see + // zk_fallback.go). useZK is the per-call fallback (zk_any_not_drawn / timeout) + // that sticks for the rest of this call. ZK is used only when both agree; once + // either goes false the ZK path stays off for the remaining retries. + backlogAllowsZK := s.decideUseZK(ctx, proposalID, lastFinalizedProposalID) // If zk proof is enabled, request zk proof first, and check if ZK proof is drawn. - if s.zkvmProofProducer != nil && useZK { + if s.zkvmProofProducer != nil && useZK && backlogAllowsZK { if proofResponse, err = s.zkvmProofProducer.RequestProof( ctx, opts, diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback.go b/packages/taiko-client/prover/proof_submitter/zk_fallback.go new file mode 100644 index 0000000000..c65409c6b7 --- /dev/null +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback.go @@ -0,0 +1,229 @@ +package submitter + +import ( + "context" + "math/big" + "sync" + + "github.com/cenkalti/backoff/v4" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/log" + + "github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics" + proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" +) + +// clearBackoffMaxRetries bounds the best-effort retries of POST /v3/prover/clear +// when entering SGX-draining mode. +const clearBackoffMaxRetries uint64 = 5 + +// zkFallback tracks whether the submitter is draining the ZK backlog via SGX. +// It is shared across the concurrent RequestProof goroutines, so all access is +// guarded by mu. +type zkFallback struct { + mu sync.Mutex + inSGX bool +} + +// markSGXFallback latches into SGX-draining mode. It returns true only for the +// first caller that performs the transition; that caller is responsible for +// clearing the ZK backlog exactly once. +func (s *ProofSubmitter) markSGXFallback() bool { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + if s.zkFallback.inSGX { + return false + } + s.zkFallback.inSGX = true + metrics.ProverZKBacklogModeGauge.Set(1) + return true +} + +// inSGXFallback reports whether the submitter is currently draining via SGX. +func (s *ProofSubmitter) inSGXFallback() bool { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + return s.zkFallback.inSGX +} + +// resumeZK unlatches SGX-draining mode so subsequent proposals use ZK again. +// It returns true only for the caller that performed the transition. +func (s *ProofSubmitter) resumeZK() bool { + s.zkFallback.mu.Lock() + defer s.zkFallback.mu.Unlock() + if !s.zkFallback.inSGX { + return false + } + s.zkFallback.inSGX = false + metrics.ProverZKBacklogModeGauge.Set(0) + return true +} + +// decideUseZK applies the ZK backlog drain/resume state machine and reports +// whether this proposal should be proven via ZK. It has side effects: it latches +// into SGX-draining mode (and fires a one-off backlog clear) on the first +// distance breach, and unlatches when the backlog is drained. +func (s *ProofSubmitter) decideUseZK( + ctx context.Context, + proposalID *big.Int, + lastFinalizedProposalID *big.Int, +) bool { + // Machine inactive: no positive distance configured, or no control-plane client. + // Preserve the stateless behavior from #21782 (nil = use ZK, 0 = skip ZK, + // N = within N proposals). When zkBacklog is nil this also guarantees the machine + // paths below never dereference it (canResumeZK/fireClearAsync). + if s.maxZKProofProposalDistance == nil || + s.maxZKProofProposalDistance.Sign() <= 0 || + s.zkBacklog == nil { + return s.shouldUseZKProof(proposalID, lastFinalizedProposalID) + } + + if s.inSGXFallback() { + if s.canResumeZK(ctx, proposalID, lastFinalizedProposalID) { + if s.resumeZK() { + log.Info( + "ZK backlog drained, resuming ZK proofs", + "proposalID", proposalID, + "lastFinalizedProposalID", lastFinalizedProposalID, + ) + } + return true + } + return false + } + + if !s.shouldUseZKProof(proposalID, lastFinalizedProposalID) { + if s.markSGXFallback() { + log.Warn( + "ZK proof backlog detected, clearing ZK backlog and draining via SGX", + "proposalID", proposalID, + "lastFinalizedProposalID", lastFinalizedProposalID, + "maxZKProofProposalDistance", s.maxZKProofProposalDistance, + ) + s.clearZKProofBuffersAndResend() + s.fireClearAsync() + } + return false + } + return true +} + +// canResumeZK reports whether SGX-draining mode can switch back to ZK. It checks +// the cheap local "backlog drained" condition first and only queries the ZK +// backend status when that holds. A status error (e.g. the endpoint is absent) +// degrades to resuming on the backlog-drained condition alone. +func (s *ProofSubmitter) canResumeZK( + ctx context.Context, + proposalID *big.Int, + lastFinalizedProposalID *big.Int, +) bool { + // (A) backlog drained: proposalID <= lastFinalizedProposalID + 1. + if proposalID.Cmp(new(big.Int).Add(lastFinalizedProposalID, common.Big1)) > 0 { + return false + } + // (B) ZK backend idle. + clean, err := s.zkBacklog.StatusClean(ctx) + if err != nil { + log.Warn( + "ZK prover status unavailable, resuming ZK on backlog-drained condition alone", + "proposalID", proposalID, + "error", err, + ) + return true + } + return clean +} + +// fireClearAsync clears the ZK backlog in the background with bounded retries. +// It is best-effort: clearing only accelerates the drain, so a final failure is +// logged and otherwise ignored. It uses the submitter's long-lived context +// (s.ctx), so the goroutine outlives the triggering proposal's RequestProof call. +func (s *ProofSubmitter) fireClearAsync() { + // Defensive: decideUseZK already guards against a nil zkBacklog. + if s.zkBacklog == nil { + return + } + metrics.ProverZKBacklogClearCounter.Add(1) + go func() { + bo := backoff.WithContext( + backoff.WithMaxRetries( + backoff.NewConstantBackOff(s.proofPollingInterval), + clearBackoffMaxRetries, + ), + s.ctx, + ) + if err := backoff.Retry(func() error { return s.zkBacklog.ClearBacklog(s.ctx) }, bo); err != nil { + log.Warn("Failed to clear ZK backlog after retries", "error", err) + return + } + log.Info("Cleared ZK backlog after entering SGX-draining mode") + }() +} + +// zkProofTypes are the ZK proof types whose local buffers and caches are flushed +// when entering SGX-draining mode. +var zkProofTypes = []proofProducer.ProofType{ + proofProducer.ProofTypeZKR0, + proofProducer.ProofTypeZKSP1, +} + +// clearZKProofBuffersAndResend discards any buffered or cached ZK proofs and +// re-enqueues their proposals so they are re-proven via SGX while draining. This +// prevents a partially-filled ZK proof batch from stranding once new ZK requests +// stop. +func (s *ProofSubmitter) clearZKProofBuffersAndResend() { + for _, proofType := range zkProofTypes { + s.clearProofBufferAndResend(proofType) + } +} + +// clearProofBufferAndResend flushes the buffer and cache for the given proof type +// and resends each cleared proposal to the submission channel. +func (s *ProofSubmitter) clearProofBufferAndResend(proofType proofProducer.ProofType) { + proofBuffer, ok := s.proofBuffers[proofType] + if !ok { + return + } + cacheMap, ok := s.proofCacheMaps[proofType] + if !ok { + return + } + + buffered, err := proofBuffer.ReadAll() + if err != nil { + log.Warn("Failed to read ZK proof buffer for resend", "proofType", proofType, "error", err) + return + } + + resend := make([]*proofProducer.ProofResponse, 0, len(buffered)+cacheMap.Count()) + batchIDs := make([]uint64, 0, len(buffered)) + for _, proof := range buffered { + if proof == nil || proof.BatchID == nil { + continue + } + resend = append(resend, proof) + batchIDs = append(batchIDs, proof.BatchID.Uint64()) + } + proofBuffer.ClearItems(batchIDs...) + + for item := range cacheMap.IterBuffered() { + if item.Val != nil { + resend = append(resend, item.Val) + } + cacheMap.Remove(item.Key) + } + + for _, proof := range resend { + if proof.Meta == nil { + continue + } + s.proofSubmissionCh <- &proofProducer.ProofRequestBody{Meta: proof.Meta} + } + if len(resend) > 0 { + log.Info( + "Cleared ZK proof buffer and resent proposals for SGX draining", + "proofType", proofType, + "count", len(resend), + ) + } +} diff --git a/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go new file mode 100644 index 0000000000..882a0dcfe1 --- /dev/null +++ b/packages/taiko-client/prover/proof_submitter/zk_fallback_test.go @@ -0,0 +1,272 @@ +package submitter + +import ( + "context" + "errors" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + cmap "github.com/orcaman/concurrent-map/v2" + "github.com/stretchr/testify/suite" + + proofProducer "github.com/taikoxyz/taiko-mono/packages/taiko-client/prover/proof_producer" +) + +// fakeZKBacklog is a programmable ZKBacklogController for unit tests. +type fakeZKBacklog struct { + clearCalls atomic.Int32 + clearErr error + clean bool + statusErr error + statusCalls atomic.Int32 + cleared chan struct{} +} + +func (f *fakeZKBacklog) ClearBacklog(_ context.Context) error { + f.clearCalls.Add(1) + if f.cleared != nil { + select { + case f.cleared <- struct{}{}: + default: + } + } + return f.clearErr +} + +func (f *fakeZKBacklog) StatusClean(_ context.Context) (bool, error) { + f.statusCalls.Add(1) + return f.clean, f.statusErr +} + +func newZKFallbackSubmitter(backlog proofProducer.ZKBacklogController) *ProofSubmitter { + return &ProofSubmitter{ + maxZKProofProposalDistance: big.NewInt(30), + zkBacklog: backlog, + proofPollingInterval: time.Millisecond, + // fireClearAsync reads s.ctx; the breach tests trigger it indirectly. + ctx: context.Background(), + } +} + +type ZKFallbackTestSuite struct { + suite.Suite +} + +func TestZKFallbackTestSuite(t *testing.T) { + suite.Run(t, new(ZKFallbackTestSuite)) +} + +func (s *ZKFallbackTestSuite) TestMarkSGXFallbackOnlyFirstCallerWins() { + sub := &ProofSubmitter{} + s.False(sub.inSGXFallback()) + s.True(sub.markSGXFallback()) // first caller latches + s.False(sub.markSGXFallback()) // already latched + s.True(sub.inSGXFallback()) + + sub.resumeZK() + s.False(sub.inSGXFallback()) + s.True(sub.markSGXFallback()) // can latch again after a resume +} + +func (s *ZKFallbackTestSuite) TestMarkSGXFallbackConcurrentSingleWinner() { + sub := &ProofSubmitter{} + const n = 50 + var ( + wg sync.WaitGroup + winners atomic.Int32 + ) + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + if sub.markSGXFallback() { + winners.Add(1) + } + }() + } + wg.Wait() + s.Equal(int32(1), winners.Load()) + s.True(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKDistanceZeroSkipsZK() { + // distance 0 preserves the pre-#21795 stateless behavior: never use ZK, and the + // drain/resume machine stays inactive (no latch, no clear). + sub := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(0), zkBacklog: &fakeZKBacklog{}} + s.False(sub.decideUseZK(context.Background(), big.NewInt(1000), big.NewInt(1))) + s.False(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKNilBacklogFallsBackToStateless() { + sub := &ProofSubmitter{maxZKProofProposalDistance: big.NewInt(30)} // zkBacklog nil + // 40 <= 10+30 stays ZK; 41 > 10+30 skips ZK; neither latches without a control-plane client. + s.True(sub.decideUseZK(context.Background(), big.NewInt(40), big.NewInt(10))) + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.False(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKWithinDistanceStaysZK() { + sub := newZKFallbackSubmitter(&fakeZKBacklog{}) + s.True(sub.decideUseZK(context.Background(), big.NewInt(40), big.NewInt(10))) + s.False(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKBreachLatchesAndClearsOnce() { + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + sub := newZKFallbackSubmitter(fake) + + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) // breach + s.True(sub.inSGXFallback()) + + select { + case <-fake.cleared: + case <-time.After(time.Second): + s.FailNow("clear was not called") + } + + // A second breach while latched must not clear again. + s.False(sub.decideUseZK(context.Background(), big.NewInt(50), big.NewInt(10))) + s.Equal(int32(1), fake.clearCalls.Load()) +} + +func (s *ZKFallbackTestSuite) TestFireClearAsyncRetriesThenGivesUp() { + fake := &fakeZKBacklog{clearErr: errors.New("clear failed")} + sub := newZKFallbackSubmitter(fake) + + // Distance breach: 41 > 10 + 30 -> latch + fireClearAsync (which will keep failing). + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.True(sub.inSGXFallback()) + + // fireClearAsync retries 1 + clearBackoffMaxRetries times, then gives up. + s.Eventually(func() bool { + return fake.clearCalls.Load() == int32(clearBackoffMaxRetries)+1 + }, 2*time.Second, 5*time.Millisecond) + + // Still latched after clear ultimately failed (best-effort; resume is gated elsewhere). + s.True(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKResumeWhenDrainedAndClean() { + fake := &fakeZKBacklog{clean: true} + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) + + // (A) 11 <= 10+1 holds and status is clean -> resume ZK. + s.True(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.False(sub.inSGXFallback()) + s.Equal(int32(1), fake.statusCalls.Load()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKStaysSGXWhenNotDrained() { + fake := &fakeZKBacklog{clean: true} + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) + + // (A) fails (20 > 10+1) -> stay SGX; status must not be queried until (A) holds. + s.False(sub.decideUseZK(context.Background(), big.NewInt(20), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(int32(0), fake.statusCalls.Load()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKStaysSGXWhenNotClean() { + fake := &fakeZKBacklog{clean: false} + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) + + s.False(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(int32(1), fake.statusCalls.Load()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKDegradesOnStatusError() { + fake := &fakeZKBacklog{statusErr: errors.New("status endpoint absent")} + sub := newZKFallbackSubmitter(fake) + s.True(sub.markSGXFallback()) + + // (A) holds but status errors -> degrade -> resume on backlog-drained alone. + s.True(sub.decideUseZK(context.Background(), big.NewInt(11), big.NewInt(10))) + s.False(sub.inSGXFallback()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKConcurrentBreachClearsOnce() { + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + sub := newZKFallbackSubmitter(fake) + + const n = 50 + var ( + wg sync.WaitGroup + nonBreach atomic.Int32 + ) + wg.Add(n) + for i := 0; i < n; i++ { + go func() { + defer wg.Done() + // 41 > 10 + 30 -> every caller sees a breach and must not use ZK. + if sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10)) { + nonBreach.Add(1) + } + }() + } + wg.Wait() + + s.Equal(int32(0), nonBreach.Load()) // every caller saw the breach + s.True(sub.inSGXFallback()) + select { + case <-fake.cleared: + case <-time.After(time.Second): + s.FailNow("clear was not called") + } + s.Equal(int32(1), fake.clearCalls.Load()) +} + +func (s *ZKFallbackTestSuite) TestDecideUseZKBreachClearsLocalZKBuffers() { + const zkType = proofProducer.ProofTypeZKR0 + buffer := proofProducer.NewProofBuffer(8) + cache := cmap.New[*proofProducer.ProofResponse]() + + // One buffered proof and one cached proof of the ZK type. + _, err := buffer.Write(&proofProducer.ProofResponse{BatchID: big.NewInt(5), Meta: newShastaMetaForTest(5)}) + s.NoError(err) + cache.Set("6", &proofProducer.ProofResponse{BatchID: big.NewInt(6), Meta: newShastaMetaForTest(6)}) + + fake := &fakeZKBacklog{cleared: make(chan struct{}, 1)} + ch := make(chan *proofProducer.ProofRequestBody, 8) + sub := &ProofSubmitter{ + maxZKProofProposalDistance: big.NewInt(30), + zkBacklog: fake, + proofPollingInterval: time.Millisecond, + ctx: context.Background(), + proofBuffers: map[proofProducer.ProofType]*proofProducer.ProofBuffer{zkType: buffer}, + proofCacheMaps: map[proofProducer.ProofType]cmap.ConcurrentMap[string, *proofProducer.ProofResponse]{ + zkType: cache, + }, + proofSubmissionCh: ch, + } + + // Breach (41 > 10+30): latch, flush local ZK buffer/cache, resend, clear raiko. + s.False(sub.decideUseZK(context.Background(), big.NewInt(41), big.NewInt(10))) + s.True(sub.inSGXFallback()) + s.Equal(0, buffer.Len()) + s.Equal(0, cache.Count()) + + resent := map[uint64]bool{} + for i := 0; i < 2; i++ { + select { + case req := <-ch: + resent[req.Meta.GetProposalID().Uint64()] = true + case <-time.After(time.Second): + s.FailNow("expected resend on proofSubmissionCh") + } + } + s.True(resent[5]) + s.True(resent[6]) + + select { + case <-fake.cleared: + case <-time.After(time.Second): + s.FailNow("expected raiko backlog clear") + } +}