Skip to content

Single Process Piri Node#192

Merged
frrist merged 11 commits intomainfrom
frrist/one-piri
Aug 21, 2025
Merged

Single Process Piri Node#192
frrist merged 11 commits intomainfrom
frrist/one-piri

Conversation

@frrist
Copy link
Copy Markdown
Member

@frrist frrist commented Aug 13, 2025

Dependencies & Review

This PR builds on #190
I would recommend reviewing each commit of this PR on it's own - I have done my best to keep them well scoped.

What

This PR makes life easier by combining two servers into one. Instead of juggling piri serve ucan and piri server pdp in separate processes, you can now just run piri serve full and call it a day.

The new command runs both UCAN and PDP servers in a single process. Don't worry if you're using an external PDP implementation (like Curio) - piri serve ucan still works standalone.

Here's what the config looks like - notice how both UCAN and PDP settings live together:

[identity]
key_file = '/etc/piri/service.pem'

[pdp]
contract_address = '0x6170dE2b09b404776197485F3dc6c968Ef948505'
lotus_endpoint = 'wss://lotus.spicystorage.tech/rpc/v1'
owner_address = '0x7469B47e006D0660aB92AE560b27A1075EEcF97F'

[repo]
data_dir = '/data/piri'
temp_dir = '/tmp/storage'

[server]
host = 'localhost'
port = '3000'
public_url = 'https://piri.spicystorage.tech'

[ucan]
proof_set = '414'

[ucan.services]
[ucan.services.indexer]
did = 'did:web:staging.indexer.warm.storacha.network'
url = 'https://staging.indexer.warm.storacha.network/claims'
proof = '<proof>'

[ucan.services.principal_mapping]
'did:web:staging.indexer.warm.storacha.network' = 'did:key:z6Mkr4QkdinnXQmJ9JdnzwhcEjR8nMnuVPEwREyh9jp2Pb7k'
'did:web:staging.up.warm.storacha.network' = 'did:key:z6MkpR58oZpK7L3cdZZciKT25ynGro7RZm6boFouWQ7AzF7v'

[ucan.services.publisher]
ipni_announce_urls = ['https://cid.contact/announce', 'https://staging.ipni.warm.storacha.network']

[ucan.services.upload]
did = 'did:web:staging.up.warm.storacha.network'
url = 'https://staging.up.warm.storacha.network'

How

This brings together work from several previous PRs (#187, #186, #169, #161, #144) to let both UCAN and PDP modules run in the same Piri process. Here's what changed under the hood:

Architecture Changes:

  • Refactored the config system to be composable - configs are now split into logical sections (identity, server, repo, ucan, pdp) that can be mixed and matched
  • Introduced a provider pattern for core components (wallet, stores, engines, aggregators) so they can be shared between modules
  • Created separate fx modules (UCANModule and PDPModule) that encapsulate their respective dependencies

The Magic:

  • The new piri serve full command uses fx (Uber's DI framework) to inject both modules at startup
  • Both modules share common dependencies through CommonModules (identity, HTTP server, databases)
  • Each module brings its own routes, services, and background tasks - they play nice together in the same process

Backwards Compatibility:

  • piri serve ucan still works for operators who want to run just the UCAN server
  • The old piri server pdp command is gone (replaced by the full command)
  • All existing config options and environment variables are preserved

@frrist frrist requested a review from alanshaw as a code owner August 13, 2025 00:01
This was referenced Aug 13, 2025
@frrist frrist self-assigned this Aug 13, 2025
@frrist frrist requested review from hannahhoward and volmedo August 14, 2025 00:30
@frrist frrist force-pushed the frrist/one-piri branch 6 times, most recently from 66fa621 to bcef559 Compare August 14, 2025 17:24
@frrist frrist changed the title Frrist/one piri Single Process Piri Node Aug 14, 2025
Copy link
Copy Markdown
Member

@hannahhoward hannahhoward left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left various comments, none blocking. Overall, really really nicely done. I have only minor quibbles.

Not marking any of my changes blocking since I'm headed out. But honestly, I would like to see the swear words removed. Sorry for being that bitch. :)

aggregateSubmitterQueue := aws.NewSQSAggregateQueue(cfg.Config, cfg.SQSPDPPieceAggregatorURL)
aggregateSubmitter := aggregator.NewAggregateSubmitteer(cfg.PDPProofSet, aggregateStore, apiClient, aggregateSubmitterQueue)
aggregateSubmitter := aggregator.NewAggregateSubmitteer(
&aggregator.ConfiguredProofSetProvider{ID: cfg.PDPProofSet},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just pass the ID direct?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had ideas around a different implementation of ProofSetProvider (ConfiguredProofSetProvider implements this interface) which lazily creates a proof set if one doesn't exist when it comes time to submit aggregates.

Comment thread cmd/cli/root.go
}
}

func initLogging() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

switch {
case err != nil && g.level >= logger.Error:
// Always log SQL errors with call stack
// Skip logging "record not found" errors for pdp_piece_mh_to_commp table
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unrelated change correct?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not useful altogether? Why not relegate to warning?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pdp_piece_mh_to_commp table is a mapping of (mostly) sha256 multihash to commp multihash. It's the table we frequently use to determine if a piece has been stored with us before. Therefore, we query this table a lot for existence checks, and it frequently doesn't find the cid when there is a new allocation - as expected. Returning an error or a warning here isn't appropriate since there isn't technically an error or a warning that the user needs to be made aware of - it's an in-actionable log message.

This log became spamy during my local testing/dev here, so I modified it to be less spamy.


// Create SQLite database connection
db, err := sqlitedb.New(cfg.Replicator.DBPath)
db, err := sqlitedb.New(cfg.Replicator.DBPath,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again related to change or no?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

related to single process I mean

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new options? Mostly yes.

Comment thread pkg/fx/pdp/provider.go Outdated
return s.aggregator
}

func ProvidePoorlyNamedPDPInterface(service types.API, agg aggregator.Aggregator, cfg app.AppConfig) (*Shit, error) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OMG. man this is painful though I get it. look I don't want to make a big deal but sir this is a wendys. can we not use the actual S-word in the code? Just in case we forget to change it?

Copy link
Copy Markdown
Member

@alanshaw alanshaw Aug 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for no swears

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙈 sorry!!!! I'll fix this, didn't mean to push it.

func (p *PDPService) FindPiece(ctx context.Context, piece types.Piece) (cid.Cid, bool, error) {
func (p *PDPService) FindPiece(ctx context.Context, piece types.Piece) (_ cid.Cid, _ bool, retErr error) {
log.Infow("finding piece", "request", piece)
defer func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same viewpoint as above

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, retErr is not assigned to...I assume this is what @hannahhoward is also referring to? Maybe I don't know Go good enough...?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start := time.Now()
log.Infow("uploading piece", "request", pieceUpload)
defer func() {
if retErr != nil {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this will be a thing then :P

Question: all these logs are unrelated to the single process refactor yes? generally I would put those in a seperate PR. Not a big deal though.

Comment thread pkg/pdp/types/zap.go
)

// MarshalLogObject implements zapcore.ObjectMarshaler for ProofSetStatus
func (p ProofSetStatus) MarshalLogObject(enc zapcore.ObjectEncoder) error {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

again nice refactor! again for the future I would leave this sort of thing out of an already large PR. but nb

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes sorry, did this for debugging. Will try to avoid things like this on big changes in the future.

Copy link
Copy Markdown
Member

@alanshaw alanshaw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I have any hard blocks here, but I do think it would be good to move the logging changes to a seperate PR since they're unrelated (also because I'm not sure they will actually do any logging).

I would really like piri serve full to just be piri serve though..."full" just screams legacy baggage IMHO.

Comment thread cmd/cli/root.go Outdated
logging.SetLogLevel("auth", "error")
logging.SetLogLevel("rpcenc", "error")
logging.SetLogLevel("storage", "info")
logging.SetLogLevel("resources", "error")
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The default is error right? Why not expicitly set all loggers to error and then just change the ones you want to change?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ohh, yeah good point that would be a simpler change and easier to reason about - will address.

Comment thread cmd/cli/serve/full.go
Use: "full",
Short: "Start the full piri server!",
Args: cobra.NoArgs,
RunE: fullServer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can it just be piri serve?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I don't disagree - what becomes of piri serve ucan? Perhaps enabling the PDP mode ought to be a configuration value or flag on the serve command itself?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or the other way around, as in piri serve --ucan-only

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are we okay with merging this as is, and addressing this comment(s) in a follow on?

Comment thread cmd/cli/serve/full.go Outdated

FullCmd.Flags().String(
"contract-address",
"0x6170dE2b09b404776197485F3dc6c968Ef948505", // NB(forrest): default to calibration contract addrese
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should move this to presets?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

humm, yeah that would probably be a better place for this, will address.

switch {
case err != nil && g.level >= logger.Error:
// Always log SQL errors with call stack
// Skip logging "record not found" errors for pdp_piece_mh_to_commp table
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it not useful altogether? Why not relegate to warning?

Comment thread pkg/fx/aggregator/provider.go Outdated
fx.As(new(aggregator.Aggregator)),
),
aggregator.NewPieceAccepter,
aggregator.NewAggregateSubmitteer,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
aggregator.NewAggregateSubmitteer,
aggregator.NewAggregateSubmitter,

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Submitteer predates my changes, and I had kinda just assumed Submitteer was the british-english spelling. Will change!


type PieceAccepter struct {
issuer ucan.Signer
issuer principal.Signer
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the switch here and other places?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the identity is provided and injected as a principal.Signer here: https://github.com/storacha/piri/blob/main/pkg/fx/identity/provider.go#L15
we could modify this fx code I linked to such that:

var Module = fx.Module("identity",
	fx.Provide(
        ProvideIdentity,
        fx.Annotate(
            fx.As(fx.Self()),
            fx.As(new(ucan.Signer)),
        ),
    ),
)

which will provide the identity as both interfaces - ucan.Signer and principal.Signer, allowing other methods to depend on either interface, but sticking with one here - principal.Signer because that feels simpler - fewer interfaces.

LogStatus: true,
LogContentLength: true,
LogResponseSize: true,
LogHeaders: []string{"X-Agent-Message"},
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😮 wow are you skating to where the puck will be? ❤️

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⛸️ 🕺 Yup! Trying to align with storacha/go-ucanto#48 here

} else {
log.Infow("allocated piece", "request", allocation, "response", res)
}
}()
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work? I don't think the function assigns to retErr anywhere?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes because of the change in the function signature using named result parameters:
(res *types.AllocatedPiece, retErr error)
retErr will contain an error if this function returns an error, same for res containing *types.AllocatedPiece if the function executes successfully.

func (p *PDPService) FindPiece(ctx context.Context, piece types.Piece) (cid.Cid, bool, error) {
func (p *PDPService) FindPiece(ctx context.Context, piece types.Piece) (_ cid.Cid, _ bool, retErr error) {
log.Infow("finding piece", "request", piece)
defer func() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, retErr is not assigned to...I assume this is what @hannahhoward is also referring to? Maybe I don't know Go good enough...?

log.Infow("uploading piece", "request", pieceUpload)
defer func() {
if retErr != nil {
log.Errorw("failed to upload piece", "request", pieceUpload, "duration", time.Since(start), "error", retErr)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the duration useful in the log? Can it be added to telemetry instead? Maybe this is a seperate PR?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes these should be metrics emitted from the node. I added in these logs while verifying that all APIs and their corresponding services were registered w/ fx. Should have been a separate change, but landed in here for my own convenience.

Comment thread pkg/fx/claims/provider.go
func(svc *claims.ClaimService) claims.Claims {
return svc
},
NewService,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we now get rid of the NewService -> claims.NewV2 thing now?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like to do that in a follow on, where we also move a bunch of the fx provider codes closer to the types they are providing. Also, NewService is still used for the AWS setup iirc.

func(svc *publisher.PublisherService) publisher.Publisher {
return svc
},
NewService,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same here, I'd expect the logic in NewService to be part of publisher.New in the service/publisher package

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fully agree, mentioned this here: https://github.com/storacha/piri/pull/192/files#r2285609301 would prefer to wait and do this in a follow on.

Comment thread pkg/fx/root/provider.go
var Module = fx.Module("root-handler",
fx.Provide(
fx.Annotate(
NewRootHandler,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe move this to the pkg/server package?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment thread pkg/pdp/aggregator/steps.go Outdated

"github.com/ipld/go-ipld-prime/datamodel"
"github.com/storacha/go-libstoracha/capabilities/types"
libtypes "github.com/storacha/go-libstoracha/capabilities/types"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is usually aliased as captypes in other repos (just a convention)

Comment thread pkg/pdp/aggregator/steps.go Outdated
"github.com/storacha/piri/pkg/pdp/aggregator/aggregate"
"github.com/storacha/piri/pkg/pdp/aggregator/fns"
types2 "github.com/storacha/piri/pkg/pdp/types"
types "github.com/storacha/piri/pkg/pdp/types"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is redundant now

Suggested change
types "github.com/storacha/piri/pkg/pdp/types"
"github.com/storacha/piri/pkg/pdp/types"

Comment thread pkg/fx/database/provider.go Outdated
return db, nil
}

func ProviderAggregatorDB(lc fx.Lifecycle, cfg app.StorageConfig) (*sql.DB, error) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func ProviderAggregatorDB(lc fx.Lifecycle, cfg app.StorageConfig) (*sql.DB, error) {
func ProvideAggregatorDB(lc fx.Lifecycle, cfg app.StorageConfig) (*sql.DB, error) {

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Comment thread pkg/pdp/server.go
}
blobStore := blobstore.NewTODO_DsBlobstore(namespace.Wrap(ds, datastore.NewKey("blobs")))
stashStore, err := store.NewStashStore(path.Join(dataDir))
stashStore, err := stashstore.NewStashStore(path.Join(dataDir))
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

now that the store package is renamed to stashstore, stashstore.NewStashStore could be renamed to stashstore.New

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, saving this for a follow on where we do more mechanical cleanups.

Comment thread cmd/cli/serve/full.go
Use: "full",
Short: "Start the full piri server!",
Args: cobra.NoArgs,
RunE: fullServer,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment thread pkg/fx/ucan/provider.go
Comment on lines +53 to +57
// AsRouteRegistrar provides the Handler as a RouteRegistrar
func AsRouteRegistrar(h *Handler) echofx.RouteRegistrar {
return h
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought this was what fx.As was for?

@frrist frrist changed the base branch from frrist/refactor/simplify-config to main August 19, 2025 21:58
@frrist frrist merged commit 021feb4 into main Aug 21, 2025
8 checks passed
@frrist frrist added this to the v0.0.13 release milestone Aug 26, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants