Skip to content

Commit d947c94

Browse files
committed
feat: adopt new store pattern
git commit too much commitment, this will get rebased, or broken up.
1 parent bc0d913 commit d947c94

86 files changed

Lines changed: 2829 additions & 1988 deletions

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

cmd/cli/serve/ucan.go

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import (
2828
"github.com/storacha/piri/pkg/service/retrieval"
2929
"github.com/storacha/piri/pkg/service/storage"
3030
"github.com/storacha/piri/pkg/store/blobstore"
31+
"github.com/storacha/piri/pkg/store/objectstore/flatfs"
3132
)
3233

3334
var (
@@ -126,16 +127,12 @@ func startServer(cmd *cobra.Command, _ []string) error {
126127
if err := os.MkdirAll(cfg.Repo.DataDir, 0755); err != nil {
127128
return fmt.Errorf("creating directory: %s: %w", cfg.Repo.DataDir, err)
128129
}
129-
if err := os.MkdirAll(cfg.Repo.TempDir, 0755); err != nil {
130-
return fmt.Errorf("creating directory: %s: %w", cfg.Repo.TempDir, err)
131-
}
132-
blobStore, err := blobstore.NewFsBlobstore(
133-
filepath.Join(cfg.Repo.DataDir, "blobs"),
134-
filepath.Join(cfg.Repo.TempDir, "blobs"),
135-
)
130+
blobDir := filepath.Join(cfg.Repo.DataDir, "blobs")
131+
blobObjStore, err := flatfs.New(blobDir, flatfs.NextToLast(2), false)
136132
if err != nil {
137133
return fmt.Errorf("creating blob storage: %w", err)
138134
}
135+
blobStore := blobstore.NewFlatfsStore(blobObjStore)
139136

140137
allocsDir, err := cliutil.Mkdirp(cfg.Repo.DataDir, "allocation")
141138
if err != nil {

cmd/cli/setup/register.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import (
3131
"github.com/storacha/piri/pkg/pdp/tasks"
3232
"github.com/storacha/piri/pkg/pdp/types"
3333

34-
"github.com/storacha/piri/pkg/store/keystore"
34+
"github.com/storacha/piri/pkg/store/local/keystore"
3535
"github.com/storacha/piri/pkg/wallet"
3636

3737
delgclient "github.com/storacha/delegator/client"

cmd/cli/wallet/root.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ import (
1414
"github.com/spf13/cobra"
1515

1616
"github.com/storacha/piri/pkg/config"
17-
"github.com/storacha/piri/pkg/store/keystore"
17+
"github.com/storacha/piri/pkg/store/local/keystore"
1818
"github.com/storacha/piri/pkg/wallet"
1919
)
2020

pkg/aws/dynamoacceptancestore.go

Lines changed: 47 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -59,53 +59,67 @@ func (d *DynamoAcceptanceStore) Get(ctx context.Context, mh multihash.Multihash,
5959
return acc, nil
6060
}
6161

62-
// List implements acceptancestore.AcceptanceStore.
63-
func (d *DynamoAcceptanceStore) List(ctx context.Context, mh multihash.Multihash, options ...acceptancestore.ListOption) ([]acceptance.Acceptance, error) {
64-
cfg := acceptancestore.ListConfig{}
65-
for _, opt := range options {
66-
opt(&cfg)
67-
}
68-
62+
// GetAny retrieves any acceptance for a blob (digest), regardless of space.
63+
func (d *DynamoAcceptanceStore) GetAny(ctx context.Context, mh multihash.Multihash) (acceptance.Acceptance, error) {
6964
keyEx := expression.Key("hash").Equal(expression.Value(digestutil.Format(mh)))
7065
expr, err := expression.NewBuilder().WithKeyCondition(keyEx).Build()
7166
if err != nil {
72-
return nil, fmt.Errorf("building query: %w", err)
67+
return acceptance.Acceptance{}, fmt.Errorf("building query: %w", err)
68+
}
69+
70+
// Query for just one item
71+
response, err := d.dynamoDbClient.Query(ctx, &dynamodb.QueryInput{
72+
TableName: aws.String(d.tableName),
73+
ExpressionAttributeNames: expr.Names(),
74+
ExpressionAttributeValues: expr.Values(),
75+
KeyConditionExpression: expr.KeyCondition(),
76+
ConsistentRead: aws.Bool(true),
77+
Limit: aws.Int32(1),
78+
})
79+
if err != nil {
80+
return acceptance.Acceptance{}, fmt.Errorf("querying acceptances: %w", err)
81+
}
82+
83+
if len(response.Items) == 0 {
84+
return acceptance.Acceptance{}, store.ErrNotFound
85+
}
86+
87+
var item acceptanceItem
88+
err = attributevalue.UnmarshalMap(response.Items[0], &item)
89+
if err != nil {
90+
return acceptance.Acceptance{}, fmt.Errorf("parsing query response: %w", err)
91+
}
92+
93+
acc, err := acceptance.Decode(item.Acceptance, dagcbor.Decode)
94+
if err != nil {
95+
return acceptance.Acceptance{}, fmt.Errorf("decoding data: %w", err)
7396
}
97+
return acc, nil
98+
}
7499

75-
var limit *int32
76-
if cfg.Limit > 0 {
77-
limit = aws.Int32(int32(cfg.Limit))
100+
// Exists checks if any acceptance exists for a blob (digest).
101+
func (d *DynamoAcceptanceStore) Exists(ctx context.Context, mh multihash.Multihash) (bool, error) {
102+
keyEx := expression.Key("hash").Equal(expression.Value(digestutil.Format(mh)))
103+
proj := expression.NamesList(expression.Name("hash"))
104+
expr, err := expression.NewBuilder().WithKeyCondition(keyEx).WithProjection(proj).Build()
105+
if err != nil {
106+
return false, fmt.Errorf("building query: %w", err)
78107
}
79108

80-
var acceptances []acceptance.Acceptance
81-
queryPaginator := dynamodb.NewQueryPaginator(d.dynamoDbClient, &dynamodb.QueryInput{
109+
response, err := d.dynamoDbClient.Query(ctx, &dynamodb.QueryInput{
82110
TableName: aws.String(d.tableName),
83111
ExpressionAttributeNames: expr.Names(),
84112
ExpressionAttributeValues: expr.Values(),
85113
KeyConditionExpression: expr.KeyCondition(),
114+
ProjectionExpression: expr.Projection(),
86115
ConsistentRead: aws.Bool(true),
87-
Limit: limit,
116+
Limit: aws.Int32(1),
88117
})
89-
for queryPaginator.HasMorePages() {
90-
response, err := queryPaginator.NextPage(ctx)
91-
if err != nil {
92-
return nil, fmt.Errorf("querying acceptances: %w", err)
93-
}
94-
var acceptancePage []acceptanceItem
95-
err = attributevalue.UnmarshalListOfMaps(response.Items, &acceptancePage)
96-
if err != nil {
97-
return nil, fmt.Errorf("parsing query responses: %w", err)
98-
}
99-
100-
for _, item := range acceptancePage {
101-
a, err := acceptance.Decode(item.Acceptance, dagcbor.Decode)
102-
if err != nil {
103-
return nil, fmt.Errorf("decoding data: %w", err)
104-
}
105-
acceptances = append(acceptances, a)
106-
}
118+
if err != nil {
119+
return false, fmt.Errorf("querying acceptances: %w", err)
107120
}
108-
return acceptances, nil
121+
122+
return len(response.Items) > 0, nil
109123
}
110124

111125
// Put implements acceptancestore.AcceptanceStore.

pkg/aws/dynamoallocationstore.go

Lines changed: 43 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -43,23 +43,6 @@ func (d *DynamoAllocationStore) Get(ctx context.Context, mh multihash.Multihash,
4343
if err != nil {
4444
return allocation.Allocation{}, fmt.Errorf("getting item: %w", err)
4545
}
46-
47-
// HACK: (ash) Temporary hack to allow allocation to be found if it was
48-
// stored with the old style key ("<digest>/<cause>") not the new key
49-
// ("<digest>/<space>"). This works because listing works on digest
50-
// prefix i.e. "<digest>/*".
51-
if res.Item == nil {
52-
allocs, listErr := d.List(ctx, mh)
53-
if listErr != nil {
54-
return allocation.Allocation{}, fmt.Errorf("listing items: %w", listErr)
55-
}
56-
for _, a := range allocs {
57-
if a.Space == space {
58-
return a, nil
59-
}
60-
}
61-
}
62-
6346
if res.Item == nil {
6447
return allocation.Allocation{}, store.ErrNotFound
6548
}
@@ -75,53 +58,63 @@ func (d *DynamoAllocationStore) Get(ctx context.Context, mh multihash.Multihash,
7558
return alloc, nil
7659
}
7760

78-
// List implements allocationstore.AllocationStore.
79-
func (d *DynamoAllocationStore) List(ctx context.Context, mh multihash.Multihash, options ...allocationstore.ListOption) ([]allocation.Allocation, error) {
80-
cfg := allocationstore.ListConfig{}
81-
for _, opt := range options {
82-
opt(&cfg)
83-
}
84-
61+
// GetAny retrieves any allocation for a blob (digest), regardless of space.
62+
func (d *DynamoAllocationStore) GetAny(ctx context.Context, mh multihash.Multihash) (allocation.Allocation, error) {
8563
keyEx := expression.Key("hash").Equal(expression.Value(digestutil.Format(mh)))
8664
expr, err := expression.NewBuilder().WithKeyCondition(keyEx).Build()
8765
if err != nil {
88-
return nil, fmt.Errorf("building query: %w", err)
66+
return allocation.Allocation{}, fmt.Errorf("building query: %w", err)
67+
}
68+
69+
res, err := d.dynamoDbClient.Query(ctx, &dynamodb.QueryInput{
70+
TableName: aws.String(d.tableName),
71+
ExpressionAttributeNames: expr.Names(),
72+
ExpressionAttributeValues: expr.Values(),
73+
KeyConditionExpression: expr.KeyCondition(),
74+
ConsistentRead: aws.Bool(true),
75+
Limit: aws.Int32(1),
76+
})
77+
if err != nil {
78+
return allocation.Allocation{}, fmt.Errorf("querying allocation: %w", err)
79+
}
80+
if len(res.Items) == 0 {
81+
return allocation.Allocation{}, store.ErrNotFound
82+
}
83+
84+
var item allocationItem
85+
err = attributevalue.UnmarshalMap(res.Items[0], &item)
86+
if err != nil {
87+
return allocation.Allocation{}, fmt.Errorf("unmarshalling allocation item: %w", err)
88+
}
89+
alloc, err := allocation.Decode(item.Allocation, dagcbor.Decode)
90+
if err != nil {
91+
return allocation.Allocation{}, fmt.Errorf("decoding allocation: %w", err)
8992
}
93+
return alloc, nil
94+
}
9095

91-
var limit *int32
92-
if cfg.Limit > 0 {
93-
limit = aws.Int32(int32(cfg.Limit))
96+
// Exists checks if any allocation exists for a blob (digest).
97+
func (d *DynamoAllocationStore) Exists(ctx context.Context, mh multihash.Multihash) (bool, error) {
98+
keyEx := expression.Key("hash").Equal(expression.Value(digestutil.Format(mh)))
99+
proj := expression.NamesList(expression.Name("hash"))
100+
expr, err := expression.NewBuilder().WithKeyCondition(keyEx).WithProjection(proj).Build()
101+
if err != nil {
102+
return false, fmt.Errorf("building query: %w", err)
94103
}
95104

96-
var allocations []allocation.Allocation
97-
queryPaginator := dynamodb.NewQueryPaginator(d.dynamoDbClient, &dynamodb.QueryInput{
105+
res, err := d.dynamoDbClient.Query(ctx, &dynamodb.QueryInput{
98106
TableName: aws.String(d.tableName),
99107
ExpressionAttributeNames: expr.Names(),
100108
ExpressionAttributeValues: expr.Values(),
101109
KeyConditionExpression: expr.KeyCondition(),
110+
ProjectionExpression: expr.Projection(),
102111
ConsistentRead: aws.Bool(true),
103-
Limit: limit,
112+
Limit: aws.Int32(1),
104113
})
105-
for queryPaginator.HasMorePages() {
106-
response, err := queryPaginator.NextPage(ctx)
107-
if err != nil {
108-
return nil, fmt.Errorf("querying allocations: %w", err)
109-
}
110-
var allocationPage []allocationItem
111-
err = attributevalue.UnmarshalListOfMaps(response.Items, &allocationPage)
112-
if err != nil {
113-
return nil, fmt.Errorf("parsing query responses: %w", err)
114-
}
115-
116-
for _, item := range allocationPage {
117-
a, err := allocation.Decode(item.Allocation, dagcbor.Decode)
118-
if err != nil {
119-
return nil, fmt.Errorf("decoding data: %w", err)
120-
}
121-
allocations = append(allocations, a)
122-
}
114+
if err != nil {
115+
return false, fmt.Errorf("querying allocation: %w", err)
123116
}
124-
return allocations, nil
117+
return len(res.Items) > 0, nil
125118
}
126119

127120
// Put implements allocationstore.AllocationStore.

pkg/aws/s3blobstore.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,3 +180,12 @@ func (s *s3BlobObject) Body() io.ReadCloser {
180180
func (s *s3BlobObject) Size() int64 {
181181
return *s.outPut.ContentLength
182182
}
183+
184+
// Delete implements blobstore.Blobstore.
185+
func (s *S3BlobStore) Delete(ctx context.Context, digest multihash.Multihash) error {
186+
_, err := s.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
187+
Bucket: aws.String(s.bucket),
188+
Key: aws.String(s.formatKey(digest)),
189+
})
190+
return err
191+
}

pkg/aws/service.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -277,10 +277,11 @@ func Construct(cfg Config) (storage.Service, error) {
277277
blobStore := NewS3BlobStore(cfg.Config, cfg.BlobStoreBucket, formatKey, blobStoreOpts...)
278278
allocationStore := NewDynamoAllocationStore(cfg.Config, cfg.AllocationsTableName, cfg.DynamoOptions...)
279279
acceptanceStore := NewDynamoAcceptanceStore(cfg.Config, cfg.AcceptanceTableName, cfg.DynamoOptions...)
280-
claimStore, err := delegationstore.NewDelegationStore(NewS3Store(cfg.Config, cfg.ClaimStoreBucket, cfg.ClaimStorePrefix, cfg.S3Options...))
281-
if err != nil {
282-
return nil, fmt.Errorf("constructing claim store: %w", err)
283-
}
280+
claimStore := delegationstore.New(
281+
NewSimpleStoreObjectAdapter(NewS3Store(cfg.Config, cfg.ClaimStoreBucket, cfg.ClaimStorePrefix, cfg.S3Options...)),
282+
"",
283+
delegationstore.S3KeyEncoder{},
284+
)
284285
ipniStore := NewS3Store(cfg.Config, cfg.IPNIStoreBucket, cfg.IPNIStorePrefix, cfg.S3Options...)
285286
chunkLinksTable := NewDynamoProviderContextTable(cfg.Config, cfg.ChunkLinksTableName, cfg.DynamoOptions...)
286287
metadataTable := NewDynamoProviderContextTable(cfg.Config, cfg.MetadataTableName, cfg.DynamoOptions...)
@@ -312,10 +313,12 @@ func Construct(cfg Config) (storage.Service, error) {
312313
}
313314
ranLinkIndex := NewDynamoRanLinkIndex(cfg.Config, cfg.RanLinkIndexTableName, cfg.DynamoOptions...)
314315
s3ReceiptStore := NewS3Store(cfg.Config, cfg.ReceiptStoreBucket, cfg.ReceiptStorePrefix, cfg.S3Options...)
315-
receiptStore, err := receiptstore.NewReceiptStore(s3ReceiptStore, ranLinkIndex)
316-
if err != nil {
317-
return nil, fmt.Errorf("setting up receipt store: %w", err)
318-
}
316+
receiptStore := receiptstore.New(
317+
NewSimpleStoreObjectAdapter(s3ReceiptStore),
318+
"",
319+
receiptstore.S3KeyEncoder{},
320+
ranLinkIndex,
321+
)
319322

320323
publishingQueue := awspublisherqueue.NewSQSPublishingQueue(cfg.Config, cfg.SQSPublishingQueueID, cfg.PublishingBucket)
321324
queuePublisher := publisherqueue.NewQueuePublisher(publishingQueue)
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package aws
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"io"
7+
"iter"
8+
9+
"github.com/storacha/go-libstoracha/ipnipublisher/store"
10+
11+
"github.com/storacha/piri/pkg/store/objectstore"
12+
)
13+
14+
// simpleStoreObjectAdapter adapts a store.SimpleStore to objectstore.ListableStore.
15+
// This enables use of SimpleStore backends (like AWS S3Store) with stores that
16+
// require objectstore.ListableStore (like delegationstore).
17+
type simpleStoreObjectAdapter struct {
18+
store store.SimpleStore
19+
}
20+
21+
var _ objectstore.ListableStore = (*simpleStoreObjectAdapter)(nil)
22+
23+
// NewSimpleStoreObjectAdapter creates an objectstore.ListableStore adapter for a SimpleStore.
24+
func NewSimpleStoreObjectAdapter(s store.SimpleStore) objectstore.ListableStore {
25+
return &simpleStoreObjectAdapter{store: s}
26+
}
27+
28+
func (a *simpleStoreObjectAdapter) Put(ctx context.Context, key string, size uint64, data io.Reader) error {
29+
return a.store.Put(ctx, key, size, data)
30+
}
31+
32+
func (a *simpleStoreObjectAdapter) Get(ctx context.Context, key string, opts ...objectstore.GetOption) (objectstore.Object, error) {
33+
// Process options but SimpleStore doesn't support range requests
34+
cfg := objectstore.NewGetConfig()
35+
cfg.ProcessOptions(opts)
36+
r := cfg.Range()
37+
if r.Start != 0 || r.End != nil {
38+
return nil, fmt.Errorf("SimpleStore adapter does not support range requests")
39+
}
40+
41+
body, err := a.store.Get(ctx, key)
42+
if err != nil {
43+
if store.IsNotFound(err) {
44+
return nil, objectstore.ErrNotExist
45+
}
46+
return nil, err
47+
}
48+
49+
return &simpleStoreObject{body: body}, nil
50+
}
51+
52+
func (a *simpleStoreObjectAdapter) Delete(ctx context.Context, key string) error {
53+
return fmt.Errorf("SimpleStore adapter does not support Delete")
54+
}
55+
56+
func (a *simpleStoreObjectAdapter) Exists(ctx context.Context, key string) (bool, error) {
57+
return false, fmt.Errorf("SimpleStore adapter does not support Exists")
58+
}
59+
60+
func (a *simpleStoreObjectAdapter) ListPrefix(ctx context.Context, prefix string) iter.Seq2[string, error] {
61+
return func(yield func(string, error) bool) {
62+
yield("", fmt.Errorf("SimpleStore adapter does not support ListPrefix"))
63+
}
64+
}
65+
66+
// simpleStoreObject wraps an io.ReadCloser as objectstore.Object.
67+
type simpleStoreObject struct {
68+
body io.ReadCloser
69+
}
70+
71+
func (o *simpleStoreObject) Size() int64 {
72+
// SimpleStore doesn't provide size information
73+
return -1
74+
}
75+
76+
func (o *simpleStoreObject) Body() io.ReadCloser {
77+
return o.body
78+
}

0 commit comments

Comments
 (0)