Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions cmd/cli/serve/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,27 +75,27 @@ func init() {
cobra.CheckErr(viper.BindEnv("ucan.services.indexer.url", "PIRI_INDEXING_SERVICE_URL"))

FullCmd.Flags().String(
"egress-tracking-service-proof",
"egress-tracker-service-proof",
"",
"A delegation that allows the node to track egress with the egress tracking service",
"A delegation that allows the node to track egress with the egress tracker service",
)
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.proof", FullCmd.Flags().Lookup("egress-tracking-service-proof")))
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.proof", FullCmd.Flags().Lookup("egress-tracker-service-proof")))

FullCmd.Flags().String(
"egress-tracking-service-did",
presets.EgressTrackingServiceDID.String(),
"DID of the egress tracking service",
"egress-tracker-service-did",
presets.EgressTrackerServiceDID.String(),
"DID of the egress tracker service",
)
cobra.CheckErr(FullCmd.Flags().MarkHidden("egress-tracking-service-did"))
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.did", FullCmd.Flags().Lookup("egress-tracking-service-did")))
cobra.CheckErr(FullCmd.Flags().MarkHidden("egress-tracker-service-did"))
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.did", FullCmd.Flags().Lookup("egress-tracker-service-did")))

FullCmd.Flags().String(
"egress-tracking-service-url",
presets.EgressTrackingServiceURL.String(),
"URL of the egress tracking service",
"egress-tracker-service-url",
presets.EgressTrackerServiceURL.String(),
"URL of the egress tracker service",
)
cobra.CheckErr(FullCmd.Flags().MarkHidden("egress-tracking-service-url"))
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.url", FullCmd.Flags().Lookup("egress-tracking-service-url")))
cobra.CheckErr(FullCmd.Flags().MarkHidden("egress-tracker-service-url"))
cobra.CheckErr(viper.BindPFlag("ucan.services.etracker.url", FullCmd.Flags().Lookup("egress-tracker-service-url")))

FullCmd.Flags().String(
"upload-service-did",
Expand Down
2 changes: 1 addition & 1 deletion cmd/cli/setup/register.go
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ func generateConfig(cfg *appcfg.AppConfig, flags *initFlags, ownerAddress common
Indexer: config.IndexingServiceConfig{
Proof: indexerProof,
},
EgressTracker: config.EgressTrackingServiceConfig{
EgressTracker: config.EgressTrackerServiceConfig{
Proof: egressTrackerProof,
},
},
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ require (
github.com/spf13/cobra v1.9.1
github.com/spf13/viper v1.20.1
github.com/storacha/delegator v0.0.2-0.20250917082246-b34dc3785c92
github.com/storacha/go-libstoracha v0.2.6
github.com/storacha/go-ucanto v0.6.3
github.com/storacha/go-libstoracha v0.2.7
github.com/storacha/go-ucanto v0.6.5
github.com/stretchr/testify v1.10.0
github.com/testcontainers/testcontainers-go v0.37.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.37.0
Expand Down Expand Up @@ -273,7 +273,7 @@ require (
github.com/ipfs/go-merkledag v0.11.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-verifcid v0.0.3 // indirect
github.com/ipld/go-car v0.6.2 // indirect
github.com/ipld/go-car v0.6.2
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/klauspost/cpuid/v2 v2.2.10 // indirect
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1678,10 +1678,10 @@ github.com/spf13/viper v1.20.1/go.mod h1:P9Mdzt1zoHIG8m2eZQinpiBjo6kCmZSKBClNNqj
github.com/stoewer/go-strcase v1.3.0/go.mod h1:fAH5hQ5pehh+j3nZfvwdk2RgEgQjAoM8wodgtPmh1xo=
github.com/storacha/delegator v0.0.2-0.20250917082246-b34dc3785c92 h1:Xe1z064lEQPgxl5nkzlUQAXnIr9u7MAoBvUePhXWAts=
github.com/storacha/delegator v0.0.2-0.20250917082246-b34dc3785c92/go.mod h1:1xN1tz2TkpwShpImD2Bf6DZ0VLeYI3c04c84BrNMuhc=
github.com/storacha/go-libstoracha v0.2.6 h1:UlRr6OqdiRFYf6WsvVEsWQvWOHj2zKvTdTeyDn7PK0g=
github.com/storacha/go-libstoracha v0.2.6/go.mod h1:zzeqIZhBBuWR2dkGygYqv4Bhg3JsvHuuvDCcdXCwGhg=
github.com/storacha/go-ucanto v0.6.3 h1:chxFAU6HyoS2x3yoLlSmOhJDvZmbj5KadGe3DV8ULg0=
github.com/storacha/go-ucanto v0.6.3/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg=
github.com/storacha/go-libstoracha v0.2.7 h1:IlsffQq3mr8hDMqXIFyRLKVTHhq7qelyq8if/eivNC0=
github.com/storacha/go-libstoracha v0.2.7/go.mod h1:zzeqIZhBBuWR2dkGygYqv4Bhg3JsvHuuvDCcdXCwGhg=
github.com/storacha/go-ucanto v0.6.5 h1:mxy1UkJDqszAGe6SkoT0N2SG9YJ62YX7fzU1Pg9lxnA=
github.com/storacha/go-ucanto v0.6.5/go.mod h1:O35Ze4x18EWtz3ftRXXd/mTZ+b8OQVjYYrnadJ/xNjg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
Expand Down
4 changes: 2 additions & 2 deletions pkg/config/app/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type ExternalServicesConfig struct {
PrincipalMapping map[string]string

Indexer IndexingServiceConfig
EgressTracker EgressTrackingServiceConfig
EgressTracker EgressTrackerServiceConfig
Upload UploadServiceConfig
Publisher PublisherServiceConfig
}
Expand All @@ -24,7 +24,7 @@ type IndexingServiceConfig struct {
Proofs delegation.Proofs
}

type EgressTrackingServiceConfig struct {
type EgressTrackerServiceConfig struct {
Connection client.Connection
Proofs delegation.Proofs
}
Expand Down
8 changes: 8 additions & 0 deletions pkg/config/app/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type StorageConfig struct {
Claims ClaimStorageConfig
Publisher PublisherStorageConfig
Receipts ReceiptStorageConfig
EgressTracker EgressTrackerStorageConfig
Allocations AllocationStorageConfig
Replicator ReplicatorStorageConfig
KeyStore KeyStoreConfig
Expand Down Expand Up @@ -47,6 +48,13 @@ type ReceiptStorageConfig struct {
Dir string
}

// EgressTrackerStorageConfig contains egress tracker store-specific storage paths
type EgressTrackerStorageConfig struct {
Dir string
MaxBatchSize int64
DBPath string
}

// AllocationStorageConfig contains allocation-specific storage paths
type AllocationStorageConfig struct {
Dir string
Expand Down
5 changes: 5 additions & 0 deletions pkg/config/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (r RepoConfig) ToAppConfig() (app.StorageConfig, error) {
Receipts: app.ReceiptStorageConfig{
Dir: filepath.Join(r.DataDir, "receipt"),
},
EgressTracker: app.EgressTrackerStorageConfig{
Dir: filepath.Join(r.DataDir, "egress_tracker", "journal"),
DBPath: filepath.Join(r.DataDir, "egress_tracker", "jobqueue", "jobqueue.db"),
MaxBatchSize: 0,
},
Allocations: app.AllocationStorageConfig{
Dir: filepath.Join(r.DataDir, "allocation"),
},
Expand Down
44 changes: 22 additions & 22 deletions pkg/config/services.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@ import (
type ServicesConfig struct {
ServicePrincipalMapping map[string]string `mapstructure:"principal_mapping" flag:"service-principal-mapping" toml:"principal_mapping,omitempty"`

Indexer IndexingServiceConfig `mapstructure:"indexer" validate:"required" toml:"indexer,omitempty"`
EgressTracker EgressTrackingServiceConfig `mapstructure:"etracker" toml:"etracker,omitempty"`
Upload UploadServiceConfig `mapstructure:"upload" validate:"required" toml:"upload,omitempty"`
Publisher PublisherServiceConfig `mapstructure:"publisher" validate:"required" toml:"publisher,omitempty"`
Indexer IndexingServiceConfig `mapstructure:"indexer" validate:"required" toml:"indexer,omitempty"`
EgressTracker EgressTrackerServiceConfig `mapstructure:"etracker" toml:"etracker,omitempty"`
Upload UploadServiceConfig `mapstructure:"upload" validate:"required" toml:"upload,omitempty"`
Publisher PublisherServiceConfig `mapstructure:"publisher" validate:"required" toml:"publisher,omitempty"`
}

func (s ServicesConfig) Validate() error {
Expand All @@ -43,7 +43,7 @@ func (s ServicesConfig) ToAppConfig(publicURL url.URL) (app.ExternalServicesConf
}
out.EgressTracker, err = s.EgressTracker.ToAppConfig()
if err != nil {
return app.ExternalServicesConfig{}, fmt.Errorf("creating egress tracking service app config: %w", err)
return app.ExternalServicesConfig{}, fmt.Errorf("creating egress tracker service app config: %w", err)
}

out.Publisher, err = s.Publisher.ToAppConfig(publicURL)
Expand Down Expand Up @@ -106,56 +106,56 @@ func (s *IndexingServiceConfig) ToAppConfig() (app.IndexingServiceConfig, error)
return out, nil
}

type EgressTrackingServiceConfig struct {
DID string `mapstructure:"did" flag:"egress-tracking-service-did" toml:"did,omitempty"`
URL string `mapstructure:"url" flag:"egress-tracking-service-url" toml:"url,omitempty"`
Proof string `mapstructure:"proof" flag:"egress-tracking-service-proof" toml:"proof,omitempty"`
type EgressTrackerServiceConfig struct {
DID string `mapstructure:"did" flag:"egress-tracker-service-did" toml:"did,omitempty"`
URL string `mapstructure:"url" flag:"egress-tracker-service-url" toml:"url,omitempty"`
Proof string `mapstructure:"proof" flag:"egress-tracker-service-proof" toml:"proof,omitempty"`
}

func (c *EgressTrackingServiceConfig) Validate() error {
func (c *EgressTrackerServiceConfig) Validate() error {
return validateConfig(c)
}

func (c *EgressTrackingServiceConfig) ToAppConfig() (app.EgressTrackingServiceConfig, error) {
func (c *EgressTrackerServiceConfig) ToAppConfig() (app.EgressTrackerServiceConfig, error) {
if c.DID == "" {
log.Warn("no egress tracking service DID provided, egress tracking is disabled")
return app.EgressTrackingServiceConfig{}, nil
log.Warn("no egress tracker service DID provided, egress tracker is disabled")
return app.EgressTrackerServiceConfig{}, nil
}

if c.URL == "" {
log.Warn("no egress tracking service URL provided, egress tracking is disabled")
return app.EgressTrackingServiceConfig{}, nil
log.Warn("no egress tracker service URL provided, egress tracker is disabled")
return app.EgressTrackerServiceConfig{}, nil
}

sdid, err := did.Parse(c.DID)
if err != nil {
return app.EgressTrackingServiceConfig{}, fmt.Errorf("parsing egress tracking service DID: %w", err)
return app.EgressTrackerServiceConfig{}, fmt.Errorf("parsing egress tracker service DID: %w", err)
}

surl, err := url.Parse(c.URL)
if err != nil {
return app.EgressTrackingServiceConfig{}, fmt.Errorf("parsing egress tracking service URL: %w", err)
return app.EgressTrackerServiceConfig{}, fmt.Errorf("parsing egress tracker service URL: %w", err)
}

schannel := ucanhttp.NewChannel(surl)
sconn, err := client.NewConnection(sdid, schannel)
if err != nil {
return app.EgressTrackingServiceConfig{}, fmt.Errorf("creating egress tracking service connection: %w", err)
return app.EgressTrackerServiceConfig{}, fmt.Errorf("creating egress tracker service connection: %w", err)
}

out := app.EgressTrackingServiceConfig{
out := app.EgressTrackerServiceConfig{
Connection: sconn,
}

// Parse egress tracking service proofs if provided
// Parse egress tracker service proofs if provided
if c.Proof != "" {
dlg, err := delegation.Parse(c.Proof)
if err != nil {
return app.EgressTrackingServiceConfig{}, fmt.Errorf("parsing egress tracking service proof: %w", err)
return app.EgressTrackerServiceConfig{}, fmt.Errorf("parsing egress tracker service proof: %w", err)
}
out.Proofs = delegation.Proofs{delegation.FromDelegation(dlg)}
} else {
log.Warn("no egress tracking service proof provided, egress tracking will likely fail, please provide egress tracking proof")
log.Warn("no egress tracker service proof provided, egress tracking is disabled")
}

return out, nil
Expand Down
2 changes: 2 additions & 0 deletions pkg/fx/app/ucan.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/storacha/piri/pkg/fx/root"
"github.com/storacha/piri/pkg/fx/storage"
storageucan "github.com/storacha/piri/pkg/fx/storage/ucan"
"github.com/storacha/piri/pkg/service/egresstracker"
)

var UCANModule = fx.Module("ucan",
Expand All @@ -22,6 +23,7 @@ var UCANModule = fx.Module("ucan",
blobs.Module, // Provides blob service and handler
claims.Module, // Provides claims service and handler
publisher.Module, // Provides publisher service and handler
egresstracker.Module, // Provides egress tracker service
replicator.Module, // Provides replicator service (works with or without PDP)
storage.Module, // Provides storage service wrapper
retrieval.Module, // Provides retrieval service wrapper
Expand Down
44 changes: 44 additions & 0 deletions pkg/fx/database/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ var Module = fx.Module("database",
ProvideAggregatorDB,
fx.ResultTags(`name:"aggregator_db"`),
),
fx.Annotate(
ProvideEgressTrackerDB,
fx.ResultTags(`name:"egress_tracker_db"`),
),
),
)

Expand Down Expand Up @@ -160,6 +164,46 @@ func ProvideTaskEngineDB(lc fx.Lifecycle, cfg app.StorageConfig) (*gorm.DB, erro
return db, nil
}

// ProvideEgressTrackerDB provides the SQLite database for the egress tracker job queue
func ProvideEgressTrackerDB(lc fx.Lifecycle, cfg app.StorageConfig) (*sql.DB, error) {
// If no path is provided, use in-memory database
if cfg.EgressTracker.DBPath == "" {
db, err := sqlitedb.NewMemory()
if err != nil {
return nil, fmt.Errorf("creating in-memory egress tracker database: %w", err)
}
return db, nil
}

// Ensure directory exists for file-based database
dir := filepath.Dir(cfg.EgressTracker.DBPath)
if err := os.MkdirAll(dir, 0755); err != nil {
return nil, fmt.Errorf("creating egress tracker database directory: %w", err)
}

// Create SQLite database connection
db, err := sqlitedb.New(cfg.EgressTracker.DBPath,
database.WithJournalMode(database.JournalModeWAL),
database.WithTimeout(5*time.Second),
database.WithSyncMode(database.SyncModeNORMAL),
)
if err != nil {
return nil, fmt.Errorf("creating egress tracker database: %w", err)
}
configureDatabaseConnection(db)

lc.Append(fx.Hook{
OnStart: func(ctx context.Context) error {
return db.PingContext(ctx)
},
OnStop: func(ctx context.Context) error {
return db.Close()
},
})

return db, nil
}

func configureDatabaseConnection(db *sql.DB) {
// there can only be ONE connection or sqlite throws a massive tantrum about the
// database being locked...sobs...wipes tears with mouse pad...
Expand Down
62 changes: 62 additions & 0 deletions pkg/fx/retrieval/ucan/handlers/provider.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,78 @@
package handlers

import (
"context"

"go.uber.org/fx"

logging "github.com/ipfs/go-log/v2"
"github.com/storacha/go-libstoracha/capabilities/space/content"
"github.com/storacha/go-ucanto/core/invocation"
"github.com/storacha/go-ucanto/core/receipt"
fdm "github.com/storacha/go-ucanto/core/result/failure/datamodel"
ucanserver "github.com/storacha/go-ucanto/server"
ucanretrieval "github.com/storacha/go-ucanto/server/retrieval"

"github.com/storacha/piri/pkg/service/egresstracker"
"github.com/storacha/piri/pkg/service/retrieval/ucan"
)

var log = logging.Logger("retrieval/ucan")

var Module = fx.Module("retrieval/ucan/handlers",
fx.Provide(
fx.Annotate(
ucan.SpaceContentRetrieve,
fx.ResultTags(`group:"ucan_retrieval_options"`),
),
fx.Annotate(
withErrorHandler,
fx.ResultTags(`group:"ucan_retrieval_options"`),
),
fx.Annotate(
withReceiptLogger,
fx.ResultTags(`group:"ucan_retrieval_options"`),
),
),
)

func withErrorHandler() ucanretrieval.Option {
return ucanretrieval.WithErrorHandler(func(err ucanserver.HandlerExecutionError[any]) {
l := log.With("error", err.Error())
if s := err.Stack(); s != "" {
l = l.With("stack", s)
}
l.Error("ucan retrieval handler execution error")
})
}

func withReceiptLogger(ets *egresstracker.Service) ucanretrieval.Option {
return ucanretrieval.WithReceiptLogger(func(_ context.Context, rcpt receipt.AnyReceipt, inv invocation.Invocation) error {
// Egress tracking is optional, the service will be nil if it is disabled
if ets == nil {
log.Warn("Egress tracking is not configured")
return nil
}

// Make sure the receipt is self-contained, i.e. it also has invocation blocks
fullRcpt, err := rcpt.Clone()
if err != nil {
return err
}

if err := fullRcpt.AttachInvocation(inv); err != nil {
return err
}

retrievalRcpt, err := receipt.Rebind[content.RetrieveOk, fdm.FailureModel](fullRcpt, content.RetrieveOkType(), fdm.FailureType())
if err != nil {
return err
}

if err := ets.AddReceipt(context.Background(), retrievalRcpt); err != nil {
return err
}

return nil
})
}
Loading