Skip to content

feat: unify pdp client and server on common interface#169

Merged
frrist merged 5 commits intomainfrom
frrist/feat/service-client-api-interface
Aug 11, 2025
Merged

feat: unify pdp client and server on common interface#169
frrist merged 5 commits intomainfrom
frrist/feat/service-client-api-interface

Conversation

@frrist
Copy link
Copy Markdown
Member

@frrist frrist commented Aug 2, 2025

Serves as an alternative #166.

The curx of the change here is aligning the PDP HTTP Client (previously called curio client) and PDP service on a common interface. The complexity of handling the various server request and response types needed to continue supporting a curio http server has been pushed into the PDP HTTP Client.

Comment thread pkg/pdp/types/api.go
Comment on lines +81 to +94
type ProofSetAPI interface {
CreateProofSet(ctx context.Context, recordKeeper common.Address) (common.Hash, error)
GetProofSetStatus(ctx context.Context, txHash common.Hash) (*ProofSetStatus, error)
GetProofSet(ctx context.Context, proofSetID uint64) (*ProofSet, error)
AddRoots(ctx context.Context, proofSetID uint64, roots []RootAdd) (common.Hash, error)
RemoveRoot(ctx context.Context, proofSetID uint64, rootID uint64) (common.Hash, error)
}

type PieceAPI interface {
AllocatePiece(ctx context.Context, allocation PieceAllocation) (*AllocatedPiece, error)
UploadPiece(ctx context.Context, upload PieceUpload) error
FindPiece(ctx context.Context, piece Piece) (cid.Cid, bool, error)
ReadPiece(ctx context.Context, piece cid.Cid) (*PieceReader, error)
}
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interface implemented by both the PDP HTTP Client and the PDP Service, allowing them to be used interchangeably

Comment thread pkg/pdp/types/errors.go
@@ -0,0 +1,55 @@
package types
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Error package use for mapping errors from the PDP Service to http error codes.

@frrist frrist marked this pull request as ready for review August 5, 2025 00:01
@frrist frrist requested a review from alanshaw as a code owner August 5, 2025 00:01
@frrist frrist force-pushed the frrist/feat/service-client-api-interface branch from f1dbf97 to 6aae9e1 Compare August 5, 2025 00:03
@frrist frrist self-assigned this Aug 5, 2025
@frrist frrist requested a review from hannahhoward August 5, 2025 00:04
@frrist frrist force-pushed the frrist/feat/service-client-api-interface branch from 6aae9e1 to 2453fd1 Compare August 5, 2025 18:15
Copy link
Copy Markdown
Member

@hannahhoward hannahhoward left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally very good.

I put a lot of comments, but the only one that's blocking that leads to the request changes review is the conditional in the storage service, which unless I'm reading it wrong has a bug. If I'm reading it wrong, let me know! :)


"github.com/storacha/piri/pkg/pdp/aggregator/aggregate"
"github.com/storacha/piri/pkg/pdp/curio"
types2 "github.com/storacha/piri/pkg/pdp/types"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

strong preference not to use numbers in package names -- either just types, or descriptive name/letter for package rename here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i.e. pdptypes if you have to rename

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoop, will address here and everywhere else.

)

func (p *PDPService) ProofSetCreate(ctx context.Context, recordKeeper common.Address) (common.Hash, error) {
func (p *PDPService) CreateProofSet(ctx context.Context, recordKeeper common.Address) (common.Hash, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems odd to rename a file to an old function name while changing a function name to the previous name for the file?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess it's ok

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, did this to make walking the code easier.

Functions were renamed with pattern: <Verb><Type>
Files were renamed with: <Type><Verb>

Files appear in the order of the type they operate on, i.e:

tree -L 1 service
service
├── contract
├── models
├── piece_allocate.go
├── piece_find.go
├── piece_read.go
├── piece_upload.go
├── proofset_create.go
├── proofset_get.go
├── proofset_list.go
├── proofset_status.go
├── root_remove.go
├── roots_add.go
├── service.go
├── types
└── util.go

Comment thread pkg/pdp/server.go
)

type Server struct {
pieceFinder piecefinder.PieceFinder
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

were these unused?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup they were never used anywhere, surprised the linter didn't catch this.

Comment thread pkg/pdp/service.go
receiptStore receiptstore.ReceiptStore,
) (*PDPService, error) {
aggregator, err := aggregator.NewLocal(ds, dbPath, client, proofSet, issuer, receiptStore)
func NewRemote(cfg *Config, id principal.Signer, receiptStore receiptstore.ReceiptStore) (*PDPService, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I must have missed this before, but having two types called "PDPService" is very confusing. How exactly is this best described as I wonder?

If I understand, this is what is needed by the higher level code that consumes PDP, and in order to work it requires a dependency which matches types.API interface in order to construct the Aggregator/PieceAdder/PieceFinder. In a single process world, this will be service.PDPService. (being consumed here by pdp.PDPService, which is confusing)

Am I wrong that what we really have is a low-ish level API (types.API -- that could be either service.PDPService or an HTTP client to a remote server) consumed by a high level API? (this service plus it's dependency pieceAdder/pieceFinder/aggregate)? And the high level API is the part that essentially ALWAYS lives in the UCAN server process?

I'm not saying fix this now, but have it in mind for the single process pr and maybe change then? I stared for at least 15 min before I realized there are two different PDPService structs

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Am I wrong that what we really have is a low-ish level API (types.API -- that could be either service.PDPService or an HTTP client to a remote server) consumed by a high level API? (this service plus it's dependency pieceAdder/pieceFinder/aggregate)? And the high level API is the part that essentially ALWAYS lives in the UCAN server process?

Nope, you are spot on!

The higher level PDP interface:

type PDP interface {
	PieceAdder() pieceadder.PieceAdder
	PieceFinder() piecefinder.PieceFinder
	Aggregator() aggregator.Aggregator
}

is used by the UCAN server/code and AWS bits.

The lower level PDP interface:

type API interface {
	ProofSetAPI
	PieceAPI
}

type ProofSetAPI interface {
	CreateProofSet(ctx context.Context, recordKeeper common.Address) (common.Hash, error)
	GetProofSetStatus(ctx context.Context, txHash common.Hash) (*ProofSetStatus, error)
	GetProofSet(ctx context.Context, proofSetID uint64) (*ProofSet, error)
	AddRoots(ctx context.Context, proofSetID uint64, roots []RootAdd) (common.Hash, error)
	RemoveRoot(ctx context.Context, proofSetID uint64, rootID uint64) (common.Hash, error)
}

type PieceAPI interface {
	AllocatePiece(ctx context.Context, allocation PieceAllocation) (*AllocatedPiece, error)
	UploadPiece(ctx context.Context, upload PieceUpload) error
	FindPiece(ctx context.Context, piece Piece) (cid.Cid, bool, error)
	ReadPiece(ctx context.Context, piece cid.Cid) (*PieceReader, error)
}

Is wrapped and used by the PieceFinder, PieceAdder, and Aggregator, under the hood this is either the HTTPClient (for curio) or the "actual" PDPServer that interacts with the chain.


I'm not saying fix this now, but have it in mind for the single process pr and maybe change then? I stared for at least 15 min before I realized there are two different PDPService structs

In hindsight, I think this interface:

type PDP interface {
	PieceAdder() pieceadder.PieceAdder
	PieceFinder() piecefinder.PieceFinder
	Aggregator() aggregator.Aggregator
}

Needs a better name, and I'd really like it to align with the Blobs interface:

type Blobs interface {
	// Blobs is the storage interface for blobs.
	Store() blobstore.Blobstore
	// Allocations is a store for received blob allocations.
	Allocations() allocationstore.AllocationStore
	// Presigner provides an interface to allow signed request access to upload blobs.
	Presigner() presigner.RequestPresigner
	// Access provides an interface to allowing public access to download blobs.
	Access() access.Access
}

So that we can remove conditionals like this throughout the ucan code:

if s.PDP() == nil {
_, err = s.Blobs().Store().Get(ctx, req.Blob.Digest)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
return nil, fmt.Errorf("blob not found: %w", err)
}
log.Errorw("getting blob", "error", err)
return nil, fmt.Errorf("getting blob: %w", err)
}
loc, err = s.Blobs().Access().GetDownloadURL(req.Blob.Digest)
if err != nil {
log.Errorw("creating retrieval URL for blob", "error", err)
return nil, fmt.Errorf("creating retrieval URL for blob: %w", err)
}
} else {
// locate the piece from the pdp service
pdpPiece, err := s.PDP().PieceFinder().FindPiece(ctx, req.Blob.Digest, req.Blob.Size)
if err != nil {
log.Errorw("finding piece for blob", "error", err)
return nil, fmt.Errorf("finding piece for blob: %w", err)
}
// get a download url
loc = s.PDP().PieceFinder().URLForPiece(pdpPiece)
// submit the piece for aggregation
err = s.PDP().Aggregator().AggregatePiece(ctx, pdpPiece)
if err != nil {
log.Errorw("submitting piece for aggregation", "error", err)
return nil, fmt.Errorf("submitting piece for aggregation: %w", err)
}
// generate the invocation that will complete when aggregation is complete and the piece is accepted
pieceAccept, err := pdp_cap.Accept.Invoke(
s.ID(),
s.ID(),
s.ID().DID().String(),
pdp_cap.AcceptCaveats{
Piece: pdpPiece,
}, delegation.WithNoExpiration())
if err != nil {
log.Error("creating piece accept invocation", "error", err)
return nil, fmt.Errorf("creating piece accept invocation: %w", err)
}
pdpAcceptInv = pieceAccept
}

Comment thread pkg/service/storage/service.go Outdated

var pdpImpl pdp.PDP
if c.pdp == nil {
if c.pdpCfg == nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking: this conditional feels broken if c.pdpCfg & c.pdp are used exclusively.

cause won't c.pdpCfg be nil when c.pdp is set? and therefore, how does pdpImpl ever get assigned c.pdp?

Copy link
Copy Markdown
Member Author

@frrist frrist Aug 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whoops, yes this should probably be:

	if c.pdpCfg == nil && c.pdpImpl == nil {

Addressed in e783256

@frrist frrist requested a review from hannahhoward August 6, 2025 18:55
Copy link
Copy Markdown
Member

@hannahhoward hannahhoward left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@frrist frrist force-pushed the frrist/feat/service-client-api-interface branch from e783256 to 703bd79 Compare August 11, 2025 18:09
@frrist frrist merged commit f989445 into main Aug 11, 2025
10 checks passed
@frrist frrist deleted the frrist/feat/service-client-api-interface branch August 11, 2025 18:52
@frrist frrist mentioned this pull request Aug 14, 2025
frrist added a commit that referenced this pull request Aug 21, 2025
# Dependencies & Review
This PR builds on #190
I would recommend reviewing each commit of this PR on it's own - I have
done my best to keep them well scoped.

# What
This PR makes life easier by combining two servers into one. Instead of
juggling `piri serve ucan` and `piri server pdp` in separate processes,
you can now just run `piri serve full` and call it a day.

The new command runs both UCAN and PDP servers in a single process.
Don't worry if you're using an external PDP implementation (like Curio)
- `piri serve ucan` still works standalone.

Here's what the config looks like - notice how both UCAN and PDP
settings live together:
```toml
[identity]
key_file = '/etc/piri/service.pem'

[pdp]
contract_address = '0x6170dE2b09b404776197485F3dc6c968Ef948505'
lotus_endpoint = 'wss://lotus.spicystorage.tech/rpc/v1'
owner_address = '0x7469B47e006D0660aB92AE560b27A1075EEcF97F'

[repo]
data_dir = '/data/piri'
temp_dir = '/tmp/storage'

[server]
host = 'localhost'
port = '3000'
public_url = 'https://piri.spicystorage.tech'

[ucan]
proof_set = '414'

[ucan.services]
[ucan.services.indexer]
did = 'did:web:staging.indexer.warm.storacha.network'
url = 'https://staging.indexer.warm.storacha.network/claims'
proof = '<proof>'

[ucan.services.principal_mapping]
'did:web:staging.indexer.warm.storacha.network' = 'did:key:z6Mkr4QkdinnXQmJ9JdnzwhcEjR8nMnuVPEwREyh9jp2Pb7k'
'did:web:staging.up.warm.storacha.network' = 'did:key:z6MkpR58oZpK7L3cdZZciKT25ynGro7RZm6boFouWQ7AzF7v'

[ucan.services.publisher]
ipni_announce_urls = ['https://cid.contact/announce', 'https://staging.ipni.warm.storacha.network']

[ucan.services.upload]
did = 'did:web:staging.up.warm.storacha.network'
url = 'https://staging.up.warm.storacha.network'
``` 

# How
This brings together work from several previous PRs (#187, #186, #169,
#161, #144) to let both UCAN and PDP modules run in the same Piri
process. Here's what changed under the hood:

**Architecture Changes:**
- Refactored the config system to be composable - configs are now split
into logical sections (identity, server, repo, ucan, pdp) that can be
mixed and matched
- Introduced a provider pattern for core components (wallet, stores,
engines, aggregators) so they can be shared between modules
- Created separate fx modules (`UCANModule` and `PDPModule`) that
encapsulate their respective dependencies

**The Magic:**
- The new `piri serve full` command uses fx (Uber's DI framework) to
inject both modules at startup
- Both modules share common dependencies through `CommonModules`
(identity, HTTP server, databases)
- Each module brings its own routes, services, and background tasks -
they play nice together in the same process

**Backwards Compatibility:**
- `piri serve ucan` still works for operators who want to run just the
UCAN server
- The old `piri server pdp` command is gone (replaced by the full
command)
- All existing config options and environment variables are preserved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants