-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathrepo.go
More file actions
127 lines (112 loc) · 4.89 KB
/
repo.go
File metadata and controls
127 lines (112 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package blobs
import (
"context"
"io"
"iter"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multihash"
"github.com/storacha/go-ucanto/did"
"github.com/storacha/guppy/pkg/preparation/blobs/model"
dagsmodel "github.com/storacha/guppy/pkg/preparation/dags/model"
spacesmodel "github.com/storacha/guppy/pkg/preparation/spaces/model"
"github.com/storacha/guppy/pkg/preparation/types/id"
uploadsmodel "github.com/storacha/guppy/pkg/preparation/uploads/model"
)
// Repo defines the interface for interacting with DAG scans, nodes, and links in the repository.
type Repo interface {
CreateShard(ctx context.Context, uploadID id.UploadID, size uint64, digestState, pieceCidState []byte) (*model.Shard, error)
UpdateShard(ctx context.Context, shard *model.Shard) error
ShardsForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Shard, error)
GetShardByID(ctx context.Context, shardID id.ShardID) (*model.Shard, error)
AddNodeToShard(ctx context.Context, shardID id.ShardID, nodeCID cid.Cid, spaceDID did.DID, uploadID id.UploadID, offset uint64, options ...AddNodeToShardOption) error
// NodesNotInShards returns CIDs of nodes for the given upload that are not yet assigned to shards.
NodesNotInShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID) ([]cid.Cid, error)
FindNodeByCIDAndSpaceDID(ctx context.Context, c cid.Cid, spaceDID did.DID) (dagsmodel.Node, error)
// ForEachNodeInShard iterates over all the nodes for a given shard, in the
// order they should appear in the shard.
ForEachNodeInShard(ctx context.Context, shardID id.ShardID, startOffset uint64) iter.Seq2[NodeInShard, error]
GetSpaceByDID(ctx context.Context, spaceDID did.DID) (*spacesmodel.Space, error)
DeleteShard(ctx context.Context, shardID id.ShardID) error
// Index methods
CreateIndex(ctx context.Context, uploadID id.UploadID) (*model.Index, error)
UpdateIndex(ctx context.Context, index *model.Index) error
IndexesForUploadByState(ctx context.Context, uploadID id.UploadID, state model.BlobState) ([]*model.Index, error)
GetIndexByID(ctx context.Context, indexID id.IndexID) (*model.Index, error)
AddShardToIndex(ctx context.Context, indexID id.IndexID, shardID id.ShardID) error
ShardsNotInIndexes(ctx context.Context, uploadID id.UploadID) ([]id.ShardID, error)
ShardsForIndex(ctx context.Context, indexID id.IndexID) ([]*model.Shard, error)
// ForEachNodeInIndex iterates over all nodes across all shards in an index,
// ordered by shard. This is a batch query that avoids per-shard round trips.
ForEachNodeInIndex(ctx context.Context, indexID id.IndexID) iter.Seq2[NodeInIndex, error]
// Upload methods
GetUploadByID(ctx context.Context, uploadID id.UploadID) (*uploadsmodel.Upload, error)
}
type NodeInShard struct {
Node dagsmodel.Node
ShardOffset uint64
}
type NodeInIndex struct {
NodeCID cid.Cid
NodeSize uint64
ShardDigest multihash.Multihash
ShardOffset uint64
}
// ShardEncoder is the interface for shard implementations.
type ShardEncoder interface {
// WriteHeader writes a header to the provided writer.
WriteHeader(ctx context.Context, w io.Writer) error
// WriteNode writes a node to the provided writer.
WriteNode(ctx context.Context, node dagsmodel.Node, data []byte, w io.Writer) error
// NodeEncodingLength determines the number of bytes a node will occupy when
// encoded in a shard.
NodeEncodingLength(node dagsmodel.Node) uint64
// HeaderEncodingLength returns the number of bytes of preamble that will be
// written when a shard is encoded. Note: this may be 0.
HeaderEncodingLength() uint64
// HeaderDigestState returns the digest state bytes used to jumpstart digest calculation
HeaderDigestState() []byte
// HeaderPieceCIDState returns the piece CID state bytes used to jumpstart piece cid calculation
HeaderPieceCIDState() []byte
}
type (
DigestStateUpdate struct {
digestStateUpTo uint64
digestState []byte
pieceCIDState []byte
}
AddNodeConfig struct {
digestStateUpdate *DigestStateUpdate
}
)
func (d *DigestStateUpdate) DigestStateUpTo() uint64 {
return d.digestStateUpTo
}
func (d *DigestStateUpdate) DigestState() []byte {
return d.digestState
}
func (d *DigestStateUpdate) PieceCIDState() []byte {
return d.pieceCIDState
}
func (a *AddNodeConfig) HasDigestStateUpdate() bool {
return a.digestStateUpdate != nil
}
func (a *AddNodeConfig) DigestStateUpdate() *DigestStateUpdate {
return a.digestStateUpdate
}
type AddNodeToShardOption func(cfg *AddNodeConfig)
func WithDigestStateUpdate(digestStateUpTo uint64, digestState []byte, pieceCIDState []byte) AddNodeToShardOption {
return func(cfg *AddNodeConfig) {
cfg.digestStateUpdate = &DigestStateUpdate{
digestStateUpTo: digestStateUpTo,
digestState: digestState,
pieceCIDState: pieceCIDState,
}
}
}
func NewAddNodeConfig(options ...AddNodeToShardOption) *AddNodeConfig {
cfg := &AddNodeConfig{}
for _, option := range options {
option(cfg)
}
return cfg
}