Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions packages/taiko-client/cmd/flags/prover.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,9 @@ var (
MaxZKProofProposalDistance = &cli.Uint64Flag{
Name: "prover.maxZKProofProposalDistance",
Usage: "The maximum proposal distance counted from lastFinalizedProposalID for requesting ZK proofs. " +
"When proposalID is greater than lastFinalizedProposalID + maxZKProofProposalDistance, " +
"the prover skips ZK proof and requests base proof instead. This flag only works for post Shasta fork. ",
"When proposalID exceeds lastFinalizedProposalID + maxZKProofProposalDistance, the prover stops " +
"requesting ZK proofs, clears the ZK backlog, and drains via the base (SGX) proof until the backlog " +
"is cleared and the ZK endpoint reports clean, then resumes ZK. Set to 0 to disable. Post Shasta fork only.",
Value: 30,
Category: proverCategory,
EnvVars: []string{"PROVER_MAX_ZK_PROOF_PROPOSAL_DISTANCE"},
Expand Down
2 changes: 2 additions & 0 deletions packages/taiko-client/internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ var (
ProverQueuedProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_queued"})
ProverSentProofCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_all_sent"})
ProverProofsAssigned = factory.NewCounter(prometheus.CounterOpts{Name: "prover_proof_assigned"})
ProverZKBacklogModeGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_zk_backlog_sgx_mode"})
ProverZKBacklogClearCounter = factory.NewCounter(prometheus.CounterOpts{Name: "prover_zk_backlog_clear"})
ProverReceivedProposedBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proposed_received"})
ProverReceivedProvenBlockGauge = factory.NewGauge(prometheus.GaugeOpts{Name: "prover_proven_received"})
ProverSubmissionAcceptedCounter = factory.NewCounter(prometheus.CounterOpts{
Expand Down
86 changes: 34 additions & 52 deletions packages/taiko-client/prover/proof_producer/common.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
package producer

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"

"github.com/ethereum/go-ethereum/log"
"github.com/go-resty/resty/v2"

"github.com/taikoxyz/taiko-mono/packages/taiko-client/internal/metrics"
)
Expand Down Expand Up @@ -67,73 +65,57 @@ type ProofDataV2 struct {
Quote string `json:"quote"`
}

// requestHTTPProof sends a POST request to the given URL with the given ApiKey and request body,
// to get a proof of the given type.
func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) {
res, err := requestHTTPProofResponse(ctx, url, apiKey, reqBody)
if err != nil {
return nil, err
}
defer res.Body.Close()

resBytes, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
}

log.Debug("Proof generation output", "url", url, "output", string(resBytes))
var output U
if err := json.Unmarshal(resBytes, &output); err != nil {
return nil, err
}

return &output, nil
}
// raikoHTTPClient is the shared resty client used for all raiko HTTP requests.
// Each request carries its own deadline via the per-call context, matching the
// previous net/http behavior.
var raikoHTTPClient = resty.New()

// requestHTTPProofResponse sends a POST request to the given URL with the given ApiKey and request body,
// and returns the raw HTTP response, the caller is responsible for closing the response body.
func requestHTTPProofResponse[T any](
// requestRaiko sends an HTTP request to a raiko endpoint with the shared resty
// client and unmarshals a successful (HTTP 200) JSON response into U. A nil body
// is sent without a request body; a non-nil body is JSON-encoded. The response is
// always parsed as JSON, regardless of the Content-Type the server returns.
func requestRaiko[U any](
ctx context.Context,
method string,
url string,
apiKey string,
reqBody T,
) (*http.Response, error) {
client := http.DefaultClient

jsonValue, err := json.Marshal(reqBody)
if err != nil {
return nil, err
}

log.Debug("Requesting proof", "url", url, "body", string(jsonValue))
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, bytes.NewBuffer(jsonValue))
if err != nil {
return nil, err
body any,
) (*U, error) {
var output U
req := raikoHTTPClient.R().
SetContext(ctx).
ForceContentType("application/json").
SetResult(&output)
if body != nil {
req = req.SetHeader("Content-Type", "application/json").SetBody(body)
}
req.Header.Set("Content-Type", "application/json")
if len(apiKey) > 0 {
req.Header.Set("X-API-KEY", apiKey)
req = req.SetHeader("X-API-KEY", apiKey)
}

res, err := client.Do(req)
log.Debug("Requesting raiko", "url", url, "method", method)
resp, err := req.Execute(method, url)
if err != nil {
return nil, err
}

if res.StatusCode != http.StatusOK {
if resp.StatusCode() != http.StatusOK {
// Check for rate limiting (429 Too Many Requests)
if res.StatusCode == http.StatusTooManyRequests {
if resp.StatusCode() == http.StatusTooManyRequests {
log.Error("Rate limit on L2 RPC has been reached. Using your own Taiko L2 node as RPC for Raiko is recommended")
}

return nil, fmt.Errorf(
"failed to request proof, url: %s, statusCode: %d",
url,
res.StatusCode,
)
return nil, fmt.Errorf("failed to request raiko, url: %s, statusCode: %d", url, resp.StatusCode())
}

return res, nil
log.Debug("Raiko response", "url", url, "body", string(resp.Body()))
return &output, nil
}

// requestHTTPProof sends a POST request with the given JSON body to a raiko proof
// endpoint and unmarshals the response into U.
func requestHTTPProof[T, U any](ctx context.Context, url string, apiKey string, reqBody T) (*U, error) {
return requestRaiko[U](ctx, http.MethodPost, url, apiKey, reqBody)
}

// updateProvingMetrics updates the metrics for the given proof type, including
Expand Down
72 changes: 72 additions & 0 deletions packages/taiko-client/prover/proof_producer/zk_backlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package producer

import (
"context"
"fmt"
"net/http"

"github.com/taikoxyz/taiko-mono/packages/taiko-client/pkg/rpc"
)

// ZKBacklogController is implemented by proof producers whose backend exposes the
// raiko2 control-plane endpoints for draining the ZK (`zk_any`) task backlog and
// reporting when the backend is idle. See raiko2 issue #93.
type ZKBacklogController interface {
// ClearBacklog discards all non-terminal `zk_any` tasks on the ZK backend
// (POST /v3/prover/clear).
ClearBacklog(ctx context.Context) error
// StatusClean reports whether the ZK backend is fully idle, i.e. the
// `data.clean` field of GET /v3/prover/status is true.
StatusClean(ctx context.Context) (bool, error)
}

// raikoProverStatusResponse is the body returned by GET /v3/prover/status. Only
// `data.clean` is consumed; the remaining fields (`status`, `tasks`, `network`)
// are intentionally ignored.
type raikoProverStatusResponse struct {
Data struct {
Clean bool `json:"clean"`
} `json:"data"`
}

// ClearBacklog implements the ZKBacklogController interface.
func (s *ComposeProofProducer) ClearBacklog(ctx context.Context) error {
if s.Dummy {
return nil
}
ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout)
defer cancel()

// The clear endpoint only needs to return HTTP 200; its response body is unused.
if _, err := requestRaiko[struct{}](
ctx,
http.MethodPost,
s.RaikoHostEndpoint+"/v3/prover/clear",
s.ApiKey,
nil,
); err != nil {
return fmt.Errorf("failed to clear ZK backlog: %w", err)
}
return nil
}

// StatusClean implements the ZKBacklogController interface.
func (s *ComposeProofProducer) StatusClean(ctx context.Context) (bool, error) {
if s.Dummy {
return true, nil
}
ctx, cancel := rpc.CtxWithTimeoutOrDefault(ctx, s.RaikoRequestTimeout)
defer cancel()

out, err := requestRaiko[raikoProverStatusResponse](
ctx,
http.MethodGet,
s.RaikoHostEndpoint+"/v3/prover/status",
s.ApiKey,
nil,
)
if err != nil {
return false, fmt.Errorf("failed to get ZK prover status: %w", err)
}
return out.Data.Clean, nil
}
97 changes: 97 additions & 0 deletions packages/taiko-client/prover/proof_producer/zk_backlog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package producer

import (
"net/http"
"net/http/httptest"
"testing"
"time"

"github.com/stretchr/testify/suite"
)

// Compile-time assertion that the ZK compose producer satisfies the interface.
var _ ZKBacklogController = (*ComposeProofProducer)(nil)

type ZKBacklogTestSuite struct {
suite.Suite
}

func TestZKBacklogTestSuite(t *testing.T) {
suite.Run(t, new(ZKBacklogTestSuite))
}

func (s *ZKBacklogTestSuite) TestStatusCleanReturnsTrue() {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
s.Equal(http.MethodGet, r.Method)
s.Equal("/v3/prover/status", r.URL.Path)
_, _ = w.Write([]byte(
`{"status":"ok","data":{"clean":true,"tasks":{"pending":0},"network":{"risc0":{"inflight_orders":0}}}}`,
))
}))
defer server.Close()

p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second}
clean, err := p.StatusClean(s.T().Context())
s.NoError(err)
s.True(clean)
}

func (s *ZKBacklogTestSuite) TestStatusCleanReturnsFalse() {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte(`{"status":"ok","data":{"clean":false}}`))
}))
defer server.Close()

p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second}
clean, err := p.StatusClean(s.T().Context())
s.NoError(err)
s.False(clean)
}

func (s *ZKBacklogTestSuite) TestStatusCleanErrorsOnNon200() {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusNotFound)
}))
defer server.Close()

p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second}
_, err := p.StatusClean(s.T().Context())
s.Error(err)
}

func (s *ZKBacklogTestSuite) TestStatusCleanDummyShortCircuits() {
p := &ComposeProofProducer{Dummy: true}
clean, err := p.StatusClean(s.T().Context())
s.NoError(err)
s.True(clean)
}

func (s *ZKBacklogTestSuite) TestClearBacklogPostsToEndpoint() {
var called bool
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
called = true
s.Equal(http.MethodPost, r.Method)
s.Equal("/v3/prover/clear", r.URL.Path)
_, _ = w.Write([]byte(`{"status":"ok"}`))
}))
defer server.Close()

p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second}
s.NoError(p.ClearBacklog(s.T().Context()))
s.True(called)
}

func (s *ZKBacklogTestSuite) TestClearBacklogErrorsOnNon200() {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()

p := &ComposeProofProducer{RaikoHostEndpoint: server.URL, RaikoRequestTimeout: time.Second}
s.Error(p.ClearBacklog(s.T().Context()))
}

func (s *ZKBacklogTestSuite) TestClearBacklogDummyShortCircuits() {
p := &ComposeProofProducer{Dummy: true}
s.NoError(p.ClearBacklog(s.T().Context()))
}
34 changes: 24 additions & 10 deletions packages/taiko-client/prover/proof_submitter/proof_submitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ type ProofSubmitter struct {
proofPollingInterval time.Duration
proposalWindowSize *big.Int
maxZKProofProposalDistance *big.Int
// ZK backlog drain/resume state machine (see zk_fallback.go).
zkBacklog proofProducer.ZKBacklogController
zkFallback zkFallback
// ctx is the prover's long-lived context, used by background goroutines
// (e.g. the ZK backlog clear) that must outlive a single RequestProof call.
ctx context.Context
}

// NewProofSubmitter creates a new ProofSubmitter instance.
Expand Down Expand Up @@ -100,6 +106,17 @@ func NewProofSubmitter(
flushCacheNotify: flushCacheNotify,
proposalWindowSize: proposalWindowSize,
maxZKProofProposalDistance: maxZKProofProposalDistance,
ctx: ctx,
}

// Wire the raiko2 control-plane client (ClearBacklog/StatusClean) when a ZK
// producer is configured. This is a compile-time capability check, not a probe
// of the remote host: with no ZK endpoint set, zkvmProofProducer is nil and
// zkBacklog stays nil, so decideUseZK bypasses the drain/resume machine. When a
// ZK endpoint IS set but its host predates raiko2 #93, the control-plane calls
// return 404 and the machine degrades by design (see canResumeZK).
if zkBacklog, ok := zkvmProofProducer.(proofProducer.ZKBacklogController); ok {
Comment thread
davidtaikocha marked this conversation as resolved.
proofSubmitter.zkBacklog = zkBacklog
}

proofSubmitter.startBackgroundWorkers(ctx)
Expand Down Expand Up @@ -206,18 +223,15 @@ func (s *ProofSubmitter) RequestProof(ctx context.Context, meta metadata.TaikoPr
)
return ErrProposalOutOfAllowedRange
}
if !s.shouldUseZKProof(proposalID, lastFinalizedProposalID) {
log.Info(
"Proposal too far from the last finalized proposal, skipping ZK proof",
"proposalID", proposalID,
"lastFinalizedProposalID", lastFinalizedProposalID,
"maxZKProofProposalDistance", s.maxZKProofProposalDistance,
)
useZK = false
}
// backlogAllowsZK is the drain/resume state machine's verdict for this proposal
// (it may latch SGX mode + fire a one-off backlog clear, or resume ZK; see
// zk_fallback.go). useZK is the per-call fallback (zk_any_not_drawn / timeout)
// that sticks for the rest of this call. ZK is used only when both agree; once
// either goes false the ZK path stays off for the remaining retries.
backlogAllowsZK := s.decideUseZK(ctx, proposalID, lastFinalizedProposalID)

// If zk proof is enabled, request zk proof first, and check if ZK proof is drawn.
if s.zkvmProofProducer != nil && useZK {
if s.zkvmProofProducer != nil && useZK && backlogAllowsZK {
if proofResponse, err = s.zkvmProofProducer.RequestProof(
ctx,
opts,
Expand Down
Loading
Loading