From 6457e97384c25f26080eb80e2a1dacd2bab2de34 Mon Sep 17 00:00:00 2001 From: frrist Date: Thu, 31 Jul 2025 17:56:58 -0700 Subject: [PATCH] feat: create PDP API and client with matching interface - client remains unchanged from Curio implementation - client and api share a type system and staify same PDP interface - allows client and api to be used interchangable, setting things up for single process where we optionally replace the client with the API direclty --- go.mod | 2 +- go.sum | 4 +- pkg/pdp/api/middleware/errors.go | 11 + pkg/pdp/apiv2/api.go | 241 ++++++++++++++ pkg/pdp/apiv2/client/client.go | 219 +++++++++++++ pkg/pdp/apiv2/errors.go | 57 ++++ pkg/pdp/apiv2/server/middleware/errors.go | 119 +++++++ pkg/pdp/apiv2/server/middleware/logger.go | 137 ++++++++ pkg/pdp/apiv2/server/server.go | 378 ++++++++++++++++++++++ pkg/pdp/apiv2/server/util.go | 50 +++ pkg/pdp/apiv2/types.go | 128 ++++++++ pkg/pdp/server.go | 9 +- pkg/pdp/service/errors.go | 7 + pkg/pdp/service/get_proofset.go | 2 +- 14 files changed, 1356 insertions(+), 8 deletions(-) create mode 100644 pkg/pdp/apiv2/api.go create mode 100644 pkg/pdp/apiv2/client/client.go create mode 100644 pkg/pdp/apiv2/errors.go create mode 100644 pkg/pdp/apiv2/server/middleware/errors.go create mode 100644 pkg/pdp/apiv2/server/middleware/logger.go create mode 100644 pkg/pdp/apiv2/server/server.go create mode 100644 pkg/pdp/apiv2/server/util.go create mode 100644 pkg/pdp/apiv2/types.go create mode 100644 pkg/pdp/service/errors.go diff --git a/go.mod b/go.mod index aa285a01..4135c367 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/getsentry/sentry-go v0.31.1 github.com/glebarez/go-sqlite v1.21.2 github.com/glebarez/sqlite v1.11.0 - github.com/go-playground/validator/v10 v10.14.0 + github.com/go-playground/validator/v10 v10.14.1 github.com/golang-jwt/jwt/v4 v4.5.2 github.com/ipfs/go-cid v0.5.0 github.com/ipfs/go-datastore v0.8.2 diff --git a/go.sum b/go.sum index 22831376..0e13f8f7 100644 --- a/go.sum +++ b/go.sum @@ -702,8 +702,8 @@ github.com/go-playground/universal-translator v0.17.0/go.mod h1:UkSxE5sNxxRwHyU+ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJnYK9S473LQFuzCbDbfSFY= github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY= github.com/go-playground/validator/v10 v10.4.1/go.mod h1:nlOn6nFhuKACm19sB/8EGNn9GlaMV7XkbRSipzJ0Ii4= -github.com/go-playground/validator/v10 v10.14.0 h1:vgvQWe3XCz3gIeFDm/HnTIbj6UGmg/+t63MyGU2n5js= -github.com/go-playground/validator/v10 v10.14.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= +github.com/go-playground/validator/v10 v10.14.1 h1:9c50NUPC30zyuKprjL3vNZ0m5oG+jU0zvx4AqHGnv4k= +github.com/go-playground/validator/v10 v10.14.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU= github.com/go-sql-driver/mysql v1.7.0/go.mod h1:OXbVy3sEdcQ2Doequ6Z5BW6fXNQTmx+9S1MCJN5yJMI= github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= diff --git a/pkg/pdp/api/middleware/errors.go b/pkg/pdp/api/middleware/errors.go index fc269e21..237ab0be 100644 --- a/pkg/pdp/api/middleware/errors.go +++ b/pkg/pdp/api/middleware/errors.go @@ -6,6 +6,8 @@ import ( "net/http" "github.com/labstack/echo/v4" + + "github.com/storacha/piri/pkg/pdp/apiv2" ) // ContextualError is a richer error interface that provides additional context @@ -80,6 +82,15 @@ func NewError(operation string, message string, err error, code int) *PDPError { } } +func FromAPIError(operation string, err error) *PDPError { + code, msg := apiv2.GetAPIError(err) + return &PDPError{ + Operation: operation, + Message: msg, + Code: code, + } +} + // WithContext adds context information to the error func (e *PDPError) WithContext(key string, value interface{}) *PDPError { e.Context[key] = value diff --git a/pkg/pdp/apiv2/api.go b/pkg/pdp/apiv2/api.go new file mode 100644 index 00000000..adeecf39 --- /dev/null +++ b/pkg/pdp/apiv2/api.go @@ -0,0 +1,241 @@ +package apiv2 + +import ( + "context" + "encoding/hex" + "errors" + "fmt" + "io" + "net/http" + "net/url" + "path" + "strings" + + "github.com/ethereum/go-ethereum/common" + "github.com/google/uuid" + "github.com/ipfs/go-cid" + + "github.com/storacha/piri/pkg/pdp/service" + "github.com/storacha/piri/pkg/pdp/service/types" + "github.com/storacha/piri/pkg/store" +) + +var _ PDP = (*API)(nil) + +// API implements core PDP operations (transport-agnostic) +type API struct { + service *service.PDPService + endpoint *url.URL +} + +// endpoint is the URL the service backing this api is avaliable at +func New(endpoint *url.URL, s *service.PDPService) *API { + return &API{service: s, endpoint: endpoint} +} + +func (h *API) CreateProofSet(ctx context.Context, req CreateProofSet) (StatusRef, error) { + if !common.IsHexAddress(req.RecordKeeper) { + return StatusRef{}, NewError(http.StatusBadRequest, "record keeper address is not a valid address") + } + recordKeeperAddr := common.HexToAddress(req.RecordKeeper) + resp, err := h.service.ProofSetCreate(ctx, recordKeeperAddr) + if err != nil { + return StatusRef{}, WrapError(err, http.StatusInternalServerError, "failed to create proof set") + } + return StatusRef{URL: path.Join("/pdp/proof-sets/created", resp.String())}, nil +} + +func (h *API) ProofSetCreationStatus(ctx context.Context, ref StatusRef) (ProofSetStatus, error) { + // Clean txHash (ensure it starts with '0x' and is lowercase) + txHash := path.Base(ref.URL) + if !strings.HasPrefix(txHash, "0x") { + txHash = "0x" + txHash + } + txHash = strings.ToLower(txHash) + + // Validate txHash is a valid hash + if len(txHash) != 66 { // '0x' + 64 hex chars + return ProofSetStatus{}, NewError(http.StatusBadRequest, "invalid tx hash length: %s", txHash) + } + if _, err := hex.DecodeString(txHash[2:]); err != nil { + return ProofSetStatus{}, WrapError(err, http.StatusBadRequest, "invalid tx hash: %s", txHash) + } + txh := common.HexToHash(txHash) + status, err := h.service.ProofSetStatus(ctx, txh) + if err != nil { + return ProofSetStatus{}, WrapError(err, http.StatusInternalServerError, "failed to set proof set status") + } + psID := uint64(status.ProofSetId) + return ProofSetStatus{ + CreateMessageHash: status.CreateMessageHash, + ProofsetCreated: status.ProofsetCreated, + Service: status.Service, + TxStatus: status.TxStatus, + OK: &status.OK, + ProofSetId: &psID, + }, nil +} + +func (h *API) GetProofSet(ctx context.Context, id uint64) (ProofSet, error) { + ps, err := h.service.ProofSet(ctx, int64(id)) + if err != nil { + if errors.Is(err, service.ErrProofSetNotFound) { + return ProofSet{}, NewError(http.StatusNotFound, "proof set not found") + } + return ProofSet{}, WrapError(err, http.StatusInternalServerError, "failed to get proof set") + } + + resp := ProofSet{ + ID: uint64(ps.ID), + NextChallengeEpoch: &ps.NextChallengeEpoch, + } + for _, root := range ps.Roots { + resp.Roots = append(resp.Roots, RootEntry{ + RootID: root.RootID, + RootCID: root.RootCID, + SubrootCID: root.SubrootCID, + SubrootOffset: root.SubrootOffset, + }) + } + return resp, nil +} + +func (h *API) DeleteProofSetRoot(ctx context.Context, proofSetID uint64, rootID uint64) error { + return h.service.RemoveRoot(ctx, proofSetID, rootID) +} + +func (h *API) DeleteProofSet(ctx context.Context, id uint64) error { + return NewError(http.StatusNotImplemented, "delete proofSet not implemented") +} + +func (h *API) AddRootsToProofSet(ctx context.Context, id uint64, roots []AddRootRequest) error { + serviceRequests := make([]service.AddRootRequest, 0, len(roots)) + for _, r := range roots { + subroots := make([]string, 0, len(r.Subroots)) + for _, s := range r.Subroots { + subroots = append(subroots, s.SubrootCID) + } + serviceRequests = append(serviceRequests, service.AddRootRequest{ + RootCID: r.RootCID, + SubrootCIDs: subroots, + }) + } + + // TODO return the tx hash of the proof set create message + todoHash, err := h.service.ProofSetAddRoot(ctx, int64(id), serviceRequests) + _ = todoHash + return err +} + +func (h *API) AddPiece(ctx context.Context, piece AddPiece) (*UploadRef, error) { + // Validate input + if piece.Check.Hash == "" { + return nil, NewError(http.StatusBadRequest, "piece hash is required") + } + if piece.Check.Name == "" { + return nil, NewError(http.StatusBadRequest, "piece name is required") + } + + resp, err := h.service.PreparePiece(ctx, service.PiecePrepareRequest{ + Check: types.PieceHash{ + Name: piece.Check.Name, + Hash: piece.Check.Hash, + Size: piece.Check.Size, + }, + Notify: piece.Notify, + }) + if err != nil { + return nil, WrapError(err, http.StatusInternalServerError, "failed to add piece") + } + // piece already exists + // TODO do better, we should return a more complete response + if !resp.Created { + return nil, nil + } + return &UploadRef{URL: resp.Location}, nil +} + +func (h *API) UploadPiece(ctx context.Context, ref UploadRef, data io.Reader) error { + pieceUUID := path.Base(ref.URL) + uploadID, err := uuid.Parse(pieceUUID) + if err != nil { + return WrapError(err, http.StatusBadRequest, "invalid upload uuid") + } + _, err = h.service.UploadPiece(ctx, uploadID, data) + if err != nil { + return WrapError(err, http.StatusInternalServerError, "failed to upload piece") + } + return nil +} + +func (h *API) FindPiece(ctx context.Context, piece PieceHash) (FoundPiece, error) { + // Validate input + if piece.Hash == "" { + return FoundPiece{}, NewError(http.StatusBadRequest, "piece hash is required") + } + if piece.Name == "" { + return FoundPiece{}, NewError(http.StatusBadRequest, "piece name is required") + } + + p, found, err := h.service.FindPiece(ctx, piece.Name, piece.Hash, piece.Size) + if err != nil { + return FoundPiece{}, WrapError(err, http.StatusInternalServerError, "failed to find piece") + } + if !found { + return FoundPiece{}, NewError(http.StatusNotFound, "piece not found") + } + return FoundPiece{PieceCID: p.String()}, nil +} + +func (h *API) GetPiece(ctx context.Context, pieceCid string) (PieceReader, error) { + pCID, err := cid.Parse(pieceCid) + if err != nil { + return PieceReader{}, WrapError(err, http.StatusBadRequest, "invalid piece cid") + } + obj, err := h.service.Storage().Get(ctx, pCID.Hash()) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + return PieceReader{}, WrapError(err, http.StatusNotFound, "piece not found") + } + return PieceReader{}, WrapError(err, http.StatusInternalServerError, "failed to read piece") + } + // TODO we should return an io.ReadCloser, which the object store now supports. + return PieceReader{ + Data: &wrapper{obj: obj.Body()}, + Size: obj.Size(), + }, nil +} + +const piecePath = "/piece" + +func (h *API) GetPieceURL(pieceCid string) url.URL { + return *h.endpoint.JoinPath(piecePath, "/", pieceCid) +} + +func (h *API) Ping(_ context.Context) error { + return nil +} + +type wrapper struct { + obj io.Reader +} + +func (w *wrapper) Read(p []byte) (n int, err error) { + return w.obj.Read(p) +} + +func (w *wrapper) Close() error { + return fmt.Errorf("close not implemented") +} + +// Helper to check if an error from the service layer indicates "not found" +func isNotFoundError(err error) bool { + // This depends on how your service layer reports not found errors + // Examples: + // - return err.Error() == "not found" + // - return errors.Is(err, service.ErrNotFound) + // - return strings.Contains(err.Error(), "not found") + + // For now, a simple check: + return err != nil && strings.Contains(strings.ToLower(err.Error()), "not found") +} diff --git a/pkg/pdp/apiv2/client/client.go b/pkg/pdp/apiv2/client/client.go new file mode 100644 index 00000000..381db4b8 --- /dev/null +++ b/pkg/pdp/apiv2/client/client.go @@ -0,0 +1,219 @@ +package client + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strconv" + + "github.com/storacha/piri/pkg/pdp/apiv2" +) + +var _ apiv2.PDP = (*PDPClient)(nil) + +const pdpRoutePath = "/pdp" +const proofSetsPath = "/proof-sets" +const piecePath = "/piece" +const pingPath = "/ping" +const rootsPath = "/roots" + +// PDPClient implements PDP interface using HTTP calls +type PDPClient struct { + endpoint *url.URL + authHeader string + client *http.Client +} + +func New(client *http.Client, endpoint *url.URL, authHeader string) *PDPClient { + return &PDPClient{ + endpoint: endpoint, + authHeader: authHeader, + client: client, + } +} + +func (c *PDPClient) Ping(ctx context.Context) error { + u := c.endpoint.JoinPath(pdpRoutePath, pingPath).String() + return c.verifySuccess(c.sendRequest(ctx, http.MethodGet, u, nil)) +} + +func (c *PDPClient) CreateProofSet(ctx context.Context, request apiv2.CreateProofSet) (apiv2.StatusRef, error) { + u := c.endpoint.JoinPath(pdpRoutePath, proofSetsPath).String() + // send request + res, err := c.postJson(ctx, u, request) + if err != nil { + return apiv2.StatusRef{}, err + } + // all successful responses are 201 + if res.StatusCode != http.StatusCreated { + return apiv2.StatusRef{}, errFromResponse(res) + } + + return apiv2.StatusRef{URL: res.Header.Get("Location")}, nil +} + +func (c *PDPClient) ProofSetCreationStatus(ctx context.Context, ref apiv2.StatusRef) (apiv2.ProofSetStatus, error) { + // we could do this in a number of ways, including having StatusRef actually + // just be the TXHash, extracted from the location header. But ultimately + // it makes the most sense as an opaque reference from the standpoint of anyone + // using the client + // generate request + u := c.endpoint.JoinPath(ref.URL).String() + var proofSetStatus apiv2.ProofSetStatus + err := c.getJsonResponse(ctx, u, &proofSetStatus) + return proofSetStatus, err +} + +func (c *PDPClient) GetProofSet(ctx context.Context, id uint64) (apiv2.ProofSet, error) { + u := c.endpoint.JoinPath(pdpRoutePath, proofSetsPath, "/", strconv.FormatUint(id, 10)).String() + var proofSet apiv2.ProofSet + err := c.getJsonResponse(ctx, u, &proofSet) + return proofSet, err +} + +func (c *PDPClient) DeleteProofSet(ctx context.Context, id uint64) error { + u := c.endpoint.JoinPath(pdpRoutePath, proofSetsPath, strconv.FormatUint(id, 10)).String() + return c.verifySuccess(c.sendRequest(ctx, http.MethodDelete, u, nil)) +} + +func (c *PDPClient) AddRootsToProofSet(ctx context.Context, id uint64, roots []apiv2.AddRootRequest) error { + u := c.endpoint.JoinPath(pdpRoutePath, proofSetsPath, "/", strconv.FormatUint(id, 10), rootsPath).String() + payload := apiv2.AddRootsPayload{Roots: roots} + return c.verifySuccess(c.postJson(ctx, u, payload)) +} + +func (c *PDPClient) AddPiece(ctx context.Context, addPiece apiv2.AddPiece) (*apiv2.UploadRef, error) { + u := c.endpoint.JoinPath(pdpRoutePath, piecePath).String() + res, err := c.postJson(ctx, u, addPiece) + if err != nil { + return nil, err + } + if res.StatusCode == http.StatusNoContent { + return nil, nil + } + if res.StatusCode == http.StatusCreated { + return &apiv2.UploadRef{ + URL: c.endpoint.JoinPath(res.Header.Get("Location")).String(), + }, nil + } + return nil, errFromResponse(res) +} + +func (c *PDPClient) UploadPiece(ctx context.Context, ref apiv2.UploadRef, data io.Reader) error { + return c.verifySuccess(c.sendRequest(ctx, http.MethodPut, ref.URL, data)) +} + +func (c *PDPClient) FindPiece(ctx context.Context, piece apiv2.PieceHash) (apiv2.FoundPiece, error) { + u := c.endpoint.JoinPath(pdpRoutePath, piecePath) + query := u.Query() + query.Add("size", strconv.FormatInt(piece.Size, 10)) + query.Add("name", piece.Name) + query.Add("hash", piece.Hash) + u.RawQuery = query.Encode() + var foundPiece apiv2.FoundPiece + err := c.getJsonResponse(ctx, u.String(), &foundPiece) + return foundPiece, err +} + +func (c *PDPClient) GetPiece(ctx context.Context, pieceCid string) (apiv2.PieceReader, error) { + // piece gets are not at the pdp path but rather the raw /piece path + u := c.endpoint.JoinPath(piecePath, "/", pieceCid).String() + res, err := c.sendRequest(ctx, http.MethodGet, u, nil) + if err != nil { + return apiv2.PieceReader{}, err + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + return apiv2.PieceReader{}, errFromResponse(res) + } + return apiv2.PieceReader{ + Data: res.Body, + Size: res.ContentLength, + }, nil +} + +func (c *PDPClient) GetPieceURL(pieceCid string) url.URL { + return *c.endpoint.JoinPath(piecePath, "/", pieceCid) +} + +func (c *PDPClient) sendRequest(ctx context.Context, method string, url string, body io.Reader) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, method, url, body) + if err != nil { + return nil, fmt.Errorf("generating http request: %w", err) + } + // add authorization header + req.Header.Add("Authorization", c.authHeader) + req.Header.Add("Content-Type", "application/json") + // send request + res, err := c.client.Do(req) + if err != nil { + return nil, fmt.Errorf("sending request to curio: %w", err) + } + return res, nil +} + +func (c *PDPClient) postJson(ctx context.Context, url string, params interface{}) (*http.Response, error) { + var body io.Reader + if params != nil { + asBytes, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("encoding request parameters: %w", err) + } + body = bytes.NewReader(asBytes) + } + + return c.sendRequest(ctx, http.MethodPost, url, body) +} + +func (c *PDPClient) getJsonResponse(ctx context.Context, url string, target interface{}) error { + res, err := c.sendRequest(ctx, http.MethodGet, url, nil) + if err != nil { + return err + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + return errFromResponse(res) + } + data, err := io.ReadAll(res.Body) + if err != nil { + return fmt.Errorf("reading response body: %w", err) + } + err = json.Unmarshal(data, target) + if err != nil { + return fmt.Errorf("unmarshalling JSON response to target: %w", err) + } + return nil +} + +func (c *PDPClient) verifySuccess(res *http.Response, err error) error { + if err != nil { + return err + } + if res.StatusCode < 200 || res.StatusCode >= 300 { + return errFromResponse(res) + } + return nil +} + +type ErrFailedResponse struct { + StatusCode int + Body string +} + +func errFromResponse(res *http.Response) ErrFailedResponse { + err := ErrFailedResponse{StatusCode: res.StatusCode} + + message, merr := io.ReadAll(res.Body) + if merr != nil { + err.Body = merr.Error() + } else { + err.Body = string(message) + } + return err +} + +func (e ErrFailedResponse) Error() string { + return fmt.Sprintf("http request failed, status: %d %s, message: %s", e.StatusCode, http.StatusText(e.StatusCode), e.Body) +} diff --git a/pkg/pdp/apiv2/errors.go b/pkg/pdp/apiv2/errors.go new file mode 100644 index 00000000..e6669d6b --- /dev/null +++ b/pkg/pdp/apiv2/errors.go @@ -0,0 +1,57 @@ +package apiv2 + +import ( + "errors" + "fmt" + "net/http" +) + +// APIError represents an error with an associated HTTP status code +type APIError struct { + StatusCode int + Message string + Err error // Underlying error +} + +func (e *APIError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s: %v", e.Message, e.Err) + } + return e.Message +} + +func (e *APIError) Unwrap() error { + return e.Err +} + +// WrapError wraps an existing error with HTTP status information +func WrapError(err error, statusCode int, message string, args ...interface{}) *APIError { + return &APIError{ + StatusCode: statusCode, + Message: fmt.Sprintf(message, args...), + Err: err, + } +} + +func NewError(statusCode int, message string, args ...interface{}) *APIError { + return &APIError{ + StatusCode: statusCode, + Message: fmt.Sprintf(message, args...), + } +} + +// GetAPIError extracts API error information from any error +func GetAPIError(err error) (int, string) { + if err == nil { + return http.StatusOK, "" + } + + // Check if it's already an APIError + var httpErr *APIError + if errors.As(err, &httpErr) { + return httpErr.StatusCode, httpErr.Message + } + + // Default to internal server error + return http.StatusInternalServerError, err.Error() +} diff --git a/pkg/pdp/apiv2/server/middleware/errors.go b/pkg/pdp/apiv2/server/middleware/errors.go new file mode 100644 index 00000000..fc269e21 --- /dev/null +++ b/pkg/pdp/apiv2/server/middleware/errors.go @@ -0,0 +1,119 @@ +package middleware + +import ( + "errors" + "fmt" + "net/http" + + "github.com/labstack/echo/v4" +) + +// ContextualError is a richer error interface that provides additional context +// about an error that occurred during request handling. +type ContextualError interface { + error + // StatusCode returns the HTTP status code that should be returned to the client + StatusCode() int + // LogContext returns a map of additional context for logging + LogContext() map[string]interface{} + // PublicMessage returns a message safe to return to the client + PublicMessage() string + // OriginalError returns the underlying error, if any + OriginalError() error +} + +// PDPError implements the ContextualError interface +type PDPError struct { + Operation string // The operation that failed (e.g., "ProofSetAddRoot") + Message string // Internal error message (for logs) + ClientMessage string // Message safe to return to clients + Code int // HTTP status code + Err error // Original error, if any + Context map[string]interface{} // Additional context for logging +} + +// Error satisfies the error interface +func (e *PDPError) Error() string { + if e.Err != nil { + return fmt.Sprintf("%s: %s: %v", e.Operation, e.Message, e.Err) + } + return fmt.Sprintf("%s: %s", e.Operation, e.Message) +} + +// StatusCode returns the HTTP status code +func (e *PDPError) StatusCode() int { + return e.Code +} + +// LogContext returns context information for logging +func (e *PDPError) LogContext() map[string]interface{} { + ctx := make(map[string]interface{}) + for k, v := range e.Context { + ctx[k] = v + } + ctx["operation"] = e.Operation + return ctx +} + +// PublicMessage returns a message safe for client consumption +func (e *PDPError) PublicMessage() string { + if e.ClientMessage != "" { + return e.ClientMessage + } + return http.StatusText(e.Code) +} + +// OriginalError returns the underlying error +func (e *PDPError) OriginalError() error { + return e.Err +} + +// NewError creates a new PDPError +func NewError(operation string, message string, err error, code int) *PDPError { + return &PDPError{ + Operation: operation, + Message: message, + ClientMessage: message, // By default, use the same message (override for sensitive errors) + Code: code, + Err: err, + Context: make(map[string]interface{}), + } +} + +// WithContext adds context information to the error +func (e *PDPError) WithContext(key string, value interface{}) *PDPError { + e.Context[key] = value + return e +} + +// WithPublicMessage sets a client-safe message +func (e *PDPError) WithPublicMessage(message string) *PDPError { + e.ClientMessage = message + return e +} + +// HandleError converts any error to an HTTP response +// It's especially helpful for handling our custom ContextualError +func HandleError(err error, c echo.Context) { + if err == nil { + return + } + + // Check if it's our custom error type + var cErr ContextualError + if errors.As(err, &cErr) { + // Return the appropriate status code and message + _ = c.String(cErr.StatusCode(), cErr.PublicMessage()) + return + } + + // Handle echo's HTTPError + var he *echo.HTTPError + if errors.As(err, &he) { + _ = c.String(he.Code, fmt.Sprintf("%v", he.Message)) + return + } + + // Generic error handling + _ = c.String(http.StatusInternalServerError, "Internal server error") +} diff --git a/pkg/pdp/apiv2/server/middleware/logger.go b/pkg/pdp/apiv2/server/middleware/logger.go new file mode 100644 index 00000000..361dc3ed --- /dev/null +++ b/pkg/pdp/apiv2/server/middleware/logger.go @@ -0,0 +1,137 @@ +package middleware + +import ( + "errors" + "fmt" + "net/http" + "time" + + logging "github.com/ipfs/go-log/v2" + "github.com/labstack/echo/v4" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" +) + +// LogMiddleware returns a middleware that logs requests using the IPFS go-log logger +func LogMiddleware(logger *logging.ZapEventLogger) echo.MiddlewareFunc { + return func(next echo.HandlerFunc) echo.HandlerFunc { + return func(c echo.Context) error { + req := c.Request() + res := c.Response() + start := time.Now() + + // Execute the next handler + err := next(c) + if err != nil { + c.Error(err) + } + + // Calculate latency + stop := time.Now() + latency := stop.Sub(start) + + // Get request ID + id := req.Header.Get(echo.HeaderXRequestID) + if id == "" { + id = res.Header().Get(echo.HeaderXRequestID) + } + + // Normalize path + path := req.URL.Path + if path == "" { + path = "/" + } + + // Log request based on status code + statusCode := res.Status + logMsg := fmt.Sprintf("[%d:%s] %s %s %s ", statusCode, http.StatusText(statusCode), req.Method, path, req.URL.RawQuery) + // Choose fields based on log level + logFields := buildLogFields(c, req, res, id, latency, err, logger.Level()) + + // Log with appropriate level + switch { + case statusCode >= 500: + logger.Errorw(logMsg, logFields...) + case statusCode >= 400: + logger.Warnw(logMsg, logFields...) + default: // Info for status <= 400 && >= 200 + logger.Infow(logMsg, logFields...) + } + + return err + } + } +} + +// buildLogFields constructs log fields appropriate for the current log level +func buildLogFields(c echo.Context, req *http.Request, res *echo.Response, id string, latency time.Duration, err error, level zapcore.Level) []interface{} { + // Base fields - always included regardless of level + fields := []interface{}{ + "id", id, + "latency", latency.String(), + } + + if level == zap.DebugLevel { + fields = append(fields, + "remote_ip", c.RealIP(), + "host", req.Host, + "referer", req.Referer(), + "user_agent", req.UserAgent(), + "bytes_in", req.ContentLength, + "bytes_out", res.Size, + "content_type", res.Header().Get("Content-Type"), + ) + } + + // Error information + if err != nil { + fields = append(fields, "error", err.Error(), "error_type", getErrorType(err)) + + // Enhanced logging for ContextualError + if contextErr, ok := err.(ContextualError); ok { + // At info level, only include operation + if level <= zap.InfoLevel { + if operation, ok := contextErr.LogContext()["operation"]; ok { + fields = append(fields, "operation", operation) + } + if origErr := contextErr.OriginalError(); origErr != nil { + fields = append(fields, "cause", origErr.Error()) + } + } else { + // For warn and error levels, include all context fields + for k, v := range contextErr.LogContext() { + fields = append(fields, k, v) + } + if origErr := contextErr.OriginalError(); origErr != nil { + fields = append(fields, "cause", origErr.Error()) + } + } + } + } + + return fields +} + +// getErrorType extracts the type name of the error +func getErrorType(err error) string { + if err == nil { + return "" + } + + // Get the type of the error as a string + var HTTPError *echo.HTTPError + switch { + case errors.As(err, &HTTPError): + return "echo.HTTPError" + case isContextualError(err): + return "ContextualError" + default: + return "error" + } +} + +// isContextualError checks if the error implements our ContextualError interface +func isContextualError(err error) bool { + _, ok := err.(ContextualError) + return ok +} diff --git a/pkg/pdp/apiv2/server/server.go b/pkg/pdp/apiv2/server/server.go new file mode 100644 index 00000000..19e1fe2f --- /dev/null +++ b/pkg/pdp/apiv2/server/server.go @@ -0,0 +1,378 @@ +package server + +import ( + "context" + "fmt" + "net" + "net/http" + "path" + "strconv" + "strings" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/ipfs/go-cid" + logging "github.com/ipfs/go-log/v2" + "github.com/labstack/echo/v4" + echomiddleware "github.com/labstack/echo/v4/middleware" + + "github.com/storacha/piri/pkg/pdp/api/middleware" + "github.com/storacha/piri/pkg/pdp/apiv2" +) + +var log = logging.Logger("pdp/server") + +// customErrorHandler provides enhanced error handling for ContextualError types +func customErrorHandler(err error, c echo.Context) { + // Let our middleware handle the error type and logging + middleware.HandleError(err, c) +} + +type Server struct { + e *echo.Echo + h *apiv2.API +} + +func NewServer(h *apiv2.API) *Server { + e := echo.New() + // don't print echo stuff when we start, our logs cover this. + e.HideBanner = true + e.HidePort = true + + // handle panics + e.Use(echomiddleware.Recover()) + // log requests with our logging system + e.Use(middleware.LogMiddleware(log)) + + // Custom error handler for our ContextualError type + e.HTTPErrorHandler = customErrorHandler + + s := &Server{e: e, h: h} + registerRoutes(e, s) + return s +} + +const ( + PDPRoutePath = "/pdp" + PRoofSetRoutPath = "/proof-sets" + PiecePrefix = "/piece" +) + +func registerRoutes(e *echo.Echo, s *Server) { + // /pdp/proof-sets + proofSets := e.Group(path.Join(PDPRoutePath, PRoofSetRoutPath)) + proofSets.POST("", s.createProofSet) + proofSets.GET("/created/:txHash", s.getProofSetCreationStatus) + + // /pdp/proof-sets/:proofSetID + proofSets.GET("/:proofSetID", s.getProofSet) + proofSets.DELETE("/:proofSetID", s.deleteProofSet) + + // /pdp/proof-sets/:proofSetID/roots + roots := proofSets.Group("/:proofSetID/roots") + roots.POST("", s.addRootToProofSet) + roots.GET("/:rootID", s.getProofSetRoot) + roots.DELETE("/:rootID", s.deleteProofSetRoot) + + // /pdp/ping + e.GET("/pdp/ping", s.ping) + + // /pdp/piece + e.POST(path.Join(PDPRoutePath, piecePrefix), s.addPiece) + e.PUT(path.Join(PDPRoutePath, piecePrefix, "/upload/:uploadUUID"), s.uploadPiece) + e.GET(path.Join(PDPRoutePath, piecePrefix), s.findPiece) + + // retrival + e.GET(path.Join(PiecePrefix, ":cid"), s.getPiece) +} + +func (s *Server) Start(ctx context.Context, addr string) error { + errCh := make(chan error) + go func() { + errCh <- s.e.Start(addr) + }() + // wait up to one second for the server to start + // gripe: wish `Start` behaved like a normal start method and didn't block, Run would be a better name. shakes fist at clouds. + return waitForServerStart(ctx, s.e, errCh, time.Second) +} + +func (s *Server) Shutdown(ctx context.Context) error { + return s.e.Shutdown(ctx) +} + +func waitForServerStart(ctx context.Context, e *echo.Echo, errChan <-chan error, timeout time.Duration) error { + ctx, cancel := context.WithTimeout(ctx, timeout) + defer cancel() + + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-ticker.C: + var addr net.Addr + addr = e.ListenerAddr() + if addr != nil && strings.Contains(addr.String(), ":") { + return nil // was started + } + case err := <-errChan: + return err + } + } +} + +func (s *Server) ping(c echo.Context) error { + operation := "Ping" + if err := s.h.Ping(c.Request().Context()); err != nil { + // Extract HTTP status code from the error + return middleware.FromAPIError(operation, err) + } + return c.NoContent(http.StatusOK) +} + +func (s *Server) createProofSet(c echo.Context) error { + ctx := c.Request().Context() + operation := "CreateProofSet" + + var req apiv2.CreateProofSet + if err := c.Bind(&req); err != nil { + return middleware.NewError(operation, "Invalid request body", err, http.StatusBadRequest) + } + + log.Debugw("Processing CreateProofSet request", "recordKeeper", req.RecordKeeper) + + ref, err := s.h.CreateProofSet(ctx, req) + if err != nil { + // Extract HTTP status code from the error + return middleware.FromAPIError(operation, err) + } + + // The ref.URL contains the transaction hash + location := path.Join("/pdp/proof-sets/created", ref.URL) + c.Response().Header().Set("Location", location) + + log.Infow("Successfully initiated proof set creation", "txHash", ref.URL, "location", location) + return c.JSON(http.StatusCreated, ref) +} + +func (s *Server) deleteProofSet(c echo.Context) error { + return c.NoContent(http.StatusNotImplemented) +} + +// TODO do better, parse it using standard practice way +const piecePrefix = "/piece/" + +func (s *Server) getPiece(c echo.Context) error { + ctx := c.Request().Context() + operation := "GetPiece" + + // TODO do this instead is it should be equivalent + //pieceCidStr := path.Base(c.Request().URL.Path) + + // Remove the path up to the piece cid + prefixLen := len(piecePrefix) + if len(c.Request().URL.Path) <= prefixLen { + return middleware.NewError(operation, "path is missing piece CID", fmt.Errorf("invalid request URL"), http.StatusBadRequest) + } + + pieceCidStr := c.Request().URL.Path[prefixLen:] + pieceCid, err := cid.Parse(pieceCidStr) + if err != nil { + return middleware.NewError(operation, "failed to parse pieceCid", err, http.StatusBadRequest) + } + + obj, err := s.h.GetPiece(ctx, pieceCidStr) + if err != nil { + return middleware.FromAPIError(operation, err) + } + + bodyReadSeeker, err := makeReadSeeker(obj.Data) + if err != nil { + return middleware.NewError(operation, "failed to make body readSeeker", err, http.StatusInternalServerError) + } + setHeaders(c.Response(), pieceCid) + serveContent(c.Response(), c.Request(), abi.UnpaddedPieceSize(obj.Size), bodyReadSeeker) + return nil +} + +func (s *Server) findPiece(c echo.Context) error { + ctx := c.Request().Context() + operation := "FindPiece" + + sizeStr := c.QueryParam("size") + if sizeStr == "" { + return middleware.NewError(operation, "piece size required", fmt.Errorf("missing size"), http.StatusBadRequest) + } + name := c.QueryParam("name") + if name == "" { + return middleware.NewError(operation, "piece name required", fmt.Errorf("missing name"), http.StatusBadRequest) + } + hash := c.QueryParam("hash") + if hash == "" { + return middleware.NewError(operation, "piece hash required", fmt.Errorf("missing hash"), http.StatusBadRequest) + } + + size, err := strconv.ParseInt(sizeStr, 10, 64) + if err != nil { + return middleware.NewError(operation, "failed to parse piece size", err, http.StatusBadRequest) + } + + // Verify that a 'parked_pieces' entry exists for the given 'piece_cid' + resp, err := s.h.FindPiece(ctx, apiv2.PieceHash{ + Name: name, + Hash: hash, + Size: size, + }) + if err != nil { + return middleware.FromAPIError(operation, err) + } + + return c.JSON(http.StatusOK, resp) +} + +func (s *Server) getProofSet(c echo.Context) error { + ctx := c.Request().Context() + operation := "GetProofSet" + proofSetIDStr := c.Param("proofSetID") + + if proofSetIDStr == "" { + return middleware.NewError(operation, "proofSetID required", fmt.Errorf("missing proofSetID"), http.StatusBadRequest) + } + + id, err := strconv.ParseUint(proofSetIDStr, 10, 64) + if err != nil { + return middleware.NewError(operation, "failed to parse proofSetID", err, http.StatusBadRequest) + } + + resp, err := s.h.GetProofSet(ctx, id) + if err != nil { + return middleware.FromAPIError(operation, err) + } + + return c.JSON(http.StatusOK, resp) +} + +func (s *Server) getProofSetRoot(c echo.Context) error { + return c.NoContent(http.StatusNotImplemented) +} + +func (s *Server) addPiece(c echo.Context) error { + ctx := c.Request().Context() + operation := "PreparePiece" + + var req apiv2.AddPiece + if err := c.Bind(&req); err != nil { + return middleware.NewError(operation, "Invalid request body", err, http.StatusBadRequest) + } + + log.Debugw("Processing prepare piece request", + "name", req.Check, + "hash", req.Check.Hash, + "size", req.Check.Size) + + resp, err := s.h.AddPiece(ctx, req) + if err != nil { + return middleware.FromAPIError(operation, err) + } + + if resp == nil { + return c.NoContent(http.StatusNoContent) + } + + c.Response().Header().Set(echo.HeaderLocation, resp.URL) + return c.JSON(http.StatusCreated, resp) +} + +func (s *Server) deleteProofSetRoot(c echo.Context) error { + ctx := c.Request().Context() + operation := "DeleteProofSetRoot" + // Step 2: Extract parameters from the URL + proofSetIdStr := c.Param("proofSetID") + if proofSetIdStr == "" { + return middleware.NewError(operation, "proofSetID required", fmt.Errorf("missing proofSetID"), http.StatusBadRequest) + } + rootIdStr := c.Param("rootID") + if rootIdStr == "" { + return middleware.NewError(operation, "rootID required", fmt.Errorf("missing rootID"), http.StatusBadRequest) + } + + proofSetID, err := strconv.ParseUint(proofSetIdStr, 10, 64) + if err != nil { + return middleware.NewError(operation, "failed to parse proofSetID", err, http.StatusBadRequest) + } + rootID, err := strconv.ParseUint(rootIdStr, 10, 64) + if err != nil { + return middleware.NewError(operation, "failed to parse rootID", err, http.StatusBadRequest) + } + + // check if the proofset belongs to the service in pdp_proof_sets + if err := s.h.DeleteProofSetRoot(ctx, proofSetID, rootID); err != nil { + return middleware.FromAPIError(operation, err) + } + return c.NoContent(http.StatusNoContent) +} + +func (s *Server) getProofSetCreationStatus(c echo.Context) error { + ctx := c.Request().Context() + operation := "GetProofSetCreationStatus" + txHash := c.Param("txHash") + if txHash == "" { + return middleware.NewError(operation, "txHash required", fmt.Errorf("missing txHash"), http.StatusBadRequest) + } + + resp, err := s.h.ProofSetCreationStatus(ctx, apiv2.StatusRef{URL: txHash}) + if err != nil { + return middleware.FromAPIError(operation, err) + } + + return c.JSON(http.StatusOK, resp) +} + +func (s *Server) uploadPiece(c echo.Context) error { + ctx := c.Request().Context() + operation := "UploadPiece" + uploadRef := c.Param("uploadUUID") + if uploadRef == "" { + return middleware.NewError(operation, "uploadUUID required", fmt.Errorf("missing uploadUUID"), http.StatusBadRequest) + } + + log.Debugw("Processing prepare piece request", "uploadRef", uploadRef) + if err := s.h.UploadPiece(ctx, apiv2.UploadRef{URL: uploadRef}, c.Request().Body); err != nil { + return middleware.FromAPIError(operation, err) + } + + return c.NoContent(http.StatusNoContent) +} + +func (s *Server) addRootToProofSet(c echo.Context) error { + ctx := c.Request().Context() + operation := "AddRootToProofSet" + + proofSetIDStr := c.Param("proofSetID") + if proofSetIDStr == "" { + return middleware.NewError(operation, "missing proofSetID", nil, http.StatusBadRequest) + } + + id, err := strconv.ParseUint(proofSetIDStr, 10, 64) + if err != nil { + return middleware.NewError(operation, "invalid proofSetID format", err, http.StatusBadRequest). + WithContext("proofSetID", proofSetIDStr) + } + + var req apiv2.AddRootsPayload + if err := c.Bind(&req); err != nil { + return middleware.NewError(operation, "failed to parse request body", err, http.StatusBadRequest). + WithContext("proofSetID", id) + } + + if err := s.h.AddRootsToProofSet(ctx, id, req.Roots); err != nil { + return middleware.FromAPIError(operation, err) + } + + log.Infow("Successfully added roots to proofSet", + "proofSetID", id, + "rootCount", len(req.Roots)) + return c.NoContent(http.StatusCreated) +} diff --git a/pkg/pdp/apiv2/server/util.go b/pkg/pdp/apiv2/server/util.go new file mode 100644 index 00000000..f650c147 --- /dev/null +++ b/pkg/pdp/apiv2/server/util.go @@ -0,0 +1,50 @@ +package server + +import ( + "bytes" + "fmt" + "io" + "net/http" + "time" + + "github.com/filecoin-project/go-state-types/abi" + "github.com/ipfs/go-cid" +) + +func setHeaders(w http.ResponseWriter, pieceCid cid.Cid) { + w.Header().Set("Vary", "Accept-Encoding") + etag := `"` + pieceCid.String() + `.gz"` // must be quoted + w.Header().Set("Etag", etag) + w.Header().Set("Content-Type", "application/piece") + w.Header().Set("Cache-Control", "public, max-age=29030400, immutable") +} + +// For data served by the endpoints in the HTTP server that never changes +// (eg pieces identified by a piece CID) send a cache header with a constant, +// non-zero last modified time. +var lastModified = time.UnixMilli(1) + +// TODO: since the blobstore interface doesn't return a read seeker, we make one, this won't work long term +// and requires changes to the interface, or a new one. +func makeReadSeeker(r io.Reader) (io.ReadSeeker, error) { + data, err := io.ReadAll(r) + if err != nil { + return nil, err + } + return bytes.NewReader(data), nil +} + +func serveContent(res http.ResponseWriter, req *http.Request, size abi.UnpaddedPieceSize, content io.ReadSeeker) { + // Note that the last modified time is a constant value because the data + // in a piece identified by a cid will never change. + + if req.Method == http.MethodHead { + // For an HTTP HEAD request ServeContent doesn't send any data (just headers) + http.ServeContent(res, req, "", time.Time{}, nil) + return + } + + // Send the content + res.Header().Set("Content-Length", fmt.Sprintf("%d", size)) + http.ServeContent(res, req, "", lastModified, content) +} diff --git a/pkg/pdp/apiv2/types.go b/pkg/pdp/apiv2/types.go new file mode 100644 index 00000000..9a26c71f --- /dev/null +++ b/pkg/pdp/apiv2/types.go @@ -0,0 +1,128 @@ +package apiv2 + +import ( + "context" + "io" + "net/url" +) + +/* +The architecture is: + + ┌─────────────────┐ + │ HTTP Client │ + └─────────────────┘ + │ + │ HTTP + ↓ + ┌─────────────────┐ ┌─────────────────┐ + │ HTTP Server │ │ API │ ← shared business logic, can be used in process + └─────────────────┘ └─────────────────┘ + │ uses │ + └──────────┬───────────┘ + ↓ + ┌─────────────────┐ + │ PDPService │ + └─────────────────┘ + +This allow alternative implementations to simply take a dependency on the API and run everything in one process. +The server simply wraps the API, interactions then go through the client. Since the Client and API both +implement the PDP interface they may be used interchangeably. +TODO: GetPieceURL is complicated as the method assumes there is an endpoint to join the piece on. +- when operating as two process this works as it expected, the client uses the endpoint it's connected + to to create a valid URL reference +- when running as a single process this is a bit complicated, implementation now provides the API with an endpoint URL + corresponding to the service its operating over +*/ + +// PDP defines the contract for all PDP operations +type PDP interface { + Ping(ctx context.Context) error + CreateProofSet(ctx context.Context, request CreateProofSet) (StatusRef, error) + ProofSetCreationStatus(ctx context.Context, ref StatusRef) (ProofSetStatus, error) + GetProofSet(ctx context.Context, id uint64) (ProofSet, error) + DeleteProofSet(ctx context.Context, id uint64) error + AddRootsToProofSet(ctx context.Context, id uint64, addRoots []AddRootRequest) error + AddPiece(ctx context.Context, addPiece AddPiece) (*UploadRef, error) + UploadPiece(ctx context.Context, ref UploadRef, data io.Reader) error + FindPiece(ctx context.Context, piece PieceHash) (FoundPiece, error) + GetPiece(ctx context.Context, pieceCid string) (PieceReader, error) + GetPieceURL(pieceCid string) url.URL +} + +// Shared types used by both client and server + +type AddRootsPayload struct { + Roots []AddRootRequest `json:"roots"` + ExtraData *string `json:"extraData,omitempty"` +} + +type AddRootRequest struct { + RootCID string `json:"rootCid"` + Subroots []SubrootEntry `json:"subroots"` +} + +type SubrootEntry struct { + SubrootCID string `json:"subrootCid"` +} + +type CreateProofSet struct { + RecordKeeper string `json:"recordKeeper"` +} + +type StatusRef struct { + URL string +} + +type ProofSetStatus struct { + CreateMessageHash string `json:"createMessageHash"` + ProofsetCreated bool `json:"proofsetCreated"` + Service string `json:"service"` + TxStatus string `json:"txStatus"` + OK *bool `json:"ok"` + ProofSetId *uint64 `json:"proofSetId,omitempty"` +} + +type ProofSet struct { + ID uint64 `json:"id"` + NextChallengeEpoch *int64 `json:"nextChallengeEpoch"` + Roots []RootEntry `json:"roots"` +} + +type RootEntry struct { + RootID uint64 `json:"rootId"` + RootCID string `json:"rootCid"` + SubrootCID string `json:"subrootCid"` + SubrootOffset int64 `json:"subrootOffset"` +} + +type AddPiece struct { + Check PieceHash `json:"check"` + Notify string `json:"notify,omitempty"` +} + +type PieceHash struct { + // Name of the hash function used + // sha2-256-trunc254-padded - CommP + // sha2-256 - Blob sha256 + Name string `json:"name"` + + // hex encoded hash + Hash string `json:"hash"` + + // Size of the piece in bytes + Size int64 `json:"size"` +} + +type UploadRef struct { + URL string +} + +type FoundPiece struct { + PieceCID string `json:"piece_cid"` +} + +type PieceReader struct { + Data io.ReadCloser + Size int64 +} diff --git a/pkg/pdp/server.go b/pkg/pdp/server.go index bf318646..5e4bcaea 100644 --- a/pkg/pdp/server.go +++ b/pkg/pdp/server.go @@ -20,7 +20,8 @@ import ( "github.com/storacha/piri/pkg/database" "github.com/storacha/piri/pkg/database/gormdb" - "github.com/storacha/piri/pkg/pdp/api" + "github.com/storacha/piri/pkg/pdp/apiv2" + "github.com/storacha/piri/pkg/pdp/apiv2/server" "github.com/storacha/piri/pkg/pdp/curio" "github.com/storacha/piri/pkg/pdp/pieceadder" "github.com/storacha/piri/pkg/pdp/piecefinder" @@ -120,14 +121,14 @@ func NewServer( return nil, fmt.Errorf("creating pdp service: %w", err) } - pdpAPI := &api.PDP{Service: pdpService} - svr := api.NewServer(pdpAPI) + pdpAPI := apiv2.New(endpoint, pdpService) + svr := server.NewServer(pdpAPI) return &Server{ pieceFinder: piecefinder.NewCurioFinder(localPDPClient), pieceAdder: pieceadder.NewCurioAdder(localPDPClient), startFuncs: []func(ctx context.Context) error{ func(ctx context.Context) error { - if err := svr.Start(fmt.Sprintf(":%s", endpoint.Port())); err != nil { + if err := svr.Start(ctx, fmt.Sprintf(":%s", endpoint.Port())); err != nil { return fmt.Errorf("starting local pdp server: %w", err) } if err := pdpService.Start(ctx); err != nil { diff --git a/pkg/pdp/service/errors.go b/pkg/pdp/service/errors.go new file mode 100644 index 00000000..0515a84a --- /dev/null +++ b/pkg/pdp/service/errors.go @@ -0,0 +1,7 @@ +package service + +import ( + "errors" +) + +var ErrProofSetNotFound = errors.New("proof set not found") diff --git a/pkg/pdp/service/get_proofset.go b/pkg/pdp/service/get_proofset.go index 4121303f..93b57b25 100644 --- a/pkg/pdp/service/get_proofset.go +++ b/pkg/pdp/service/get_proofset.go @@ -28,7 +28,7 @@ func (p *PDPService) ProofSet(ctx context.Context, id int64) (*ProofSet, error) var proofSet models.PDPProofSet if err := p.db.WithContext(ctx).First(&proofSet, id).Error; err != nil { if errors.Is(err, gorm.ErrRecordNotFound) { - return nil, fmt.Errorf("proof set not found") + return nil, ErrProofSetNotFound } return nil, fmt.Errorf("failed to retrieve proof set: %w", err) }