-
Notifications
You must be signed in to change notification settings - Fork 17
Expand file tree
/
Copy pathblobs.go
More file actions
651 lines (557 loc) · 19.7 KB
/
blobs.go
File metadata and controls
651 lines (557 loc) · 19.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
package blobs
import (
"context"
"encoding/binary"
"fmt"
"io"
"runtime"
"sync"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/storacha/go-libstoracha/blobindex"
"github.com/storacha/go-ucanto/did"
"github.com/storacha/guppy/internal/ctxutil"
"github.com/storacha/guppy/pkg/preparation/blobs/model"
dagsmodel "github.com/storacha/guppy/pkg/preparation/dags/model"
"github.com/storacha/guppy/pkg/preparation/dags/nodereader"
spacesmodel "github.com/storacha/guppy/pkg/preparation/spaces/model"
"github.com/storacha/guppy/pkg/preparation/storacha"
"github.com/storacha/guppy/pkg/preparation/types"
"github.com/storacha/guppy/pkg/preparation/types/id"
"github.com/storacha/guppy/pkg/preparation/uploads"
)
var log = logging.Logger("preparation/shards")
type OpenNodeReaderFunc func() (nodereader.NodeReader, error)
// API provides methods to interact with the Shards in the repository.
type API struct {
Repo Repo
OpenNodeReader OpenNodeReaderFunc
MaxNodesPerIndex int
ShardEncoder ShardEncoder
}
var _ uploads.AddNodeToUploadShardsFunc = API{}.AddNodeToUploadShards
var _ uploads.AddShardsToUploadIndexesFunc = API{}.AddShardsToUploadIndexes
var _ uploads.CloseUploadShardsFunc = API{}.CloseUploadShards
var _ storacha.ReaderForShardFunc = API{}.ReaderForShard
var _ storacha.ReaderForIndexFunc = API{}.ReaderForIndex
func (a API) AddNodeToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, nodeCID cid.Cid, data []byte, shardCB func(shard *model.Shard) error) error {
space, err := a.Repo.GetSpaceByDID(ctx, spaceDID)
if err != nil {
return fmt.Errorf("failed to get space %s: %w", spaceDID, err)
}
openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.BlobStateOpen)
if err != nil {
return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
}
node, err := a.Repo.FindNodeByCIDAndSpaceDID(ctx, nodeCID, spaceDID)
if err != nil {
return fmt.Errorf("failed to find node %s: %w", nodeCID, err)
}
if node == nil {
return fmt.Errorf("node %s not found", nodeCID)
}
var shard *model.Shard
// Look for an open shard that has room for the node, and close any that don't
// have room. (There should only be at most one open shard, but there's no
// harm handling multiple if they exist.)
for _, s := range openShards {
hasRoom, err := a.roomInShard(a.ShardEncoder, s, node, space)
if err != nil {
return fmt.Errorf("failed to check room in shard %s for node %s: %w", s.ID(), node.CID(), err)
}
if hasRoom {
shard = s
break
}
if err := a.closeShard(ctx, s); err != nil {
return fmt.Errorf("closing shard %s: %w", s.ID(), err)
}
if shardCB != nil {
err = shardCB(s)
if err != nil {
return fmt.Errorf("calling shardCB for closed shard %s: %w", s.ID(), err)
}
}
}
// If no such shard exists, create a new one
if shard == nil {
shard, err = a.Repo.CreateShard(ctx,
uploadID,
a.ShardEncoder.HeaderEncodingLength(),
a.ShardEncoder.HeaderDigestState(),
a.ShardEncoder.HeaderPieceCIDState())
if err != nil {
return fmt.Errorf("failed to create new shard for upload %s: %w", uploadID, err)
}
hasRoom, err := a.roomInShard(a.ShardEncoder, shard, node, space)
if err != nil {
return fmt.Errorf("failed to check room in new shard for node %s: %w", node.CID(), err)
}
if !hasRoom {
return fmt.Errorf("node %s (%d bytes) too large to fit in new shard for upload %s (shard size %d bytes)", node.CID(), node.Size(), uploadID, space.ShardSize())
}
}
var addNodeOptions []AddNodeToShardOption
if data != nil {
digestStateUpdate, err := a.addNodeToDigestState(ctx, shard, node, data)
if err != nil {
return fmt.Errorf("failed to add node %s to shard %s digest state: %w", node.CID(), shard.ID(), err)
}
addNodeOptions = append(addNodeOptions, WithDigestStateUpdate(digestStateUpdate.digestStateUpTo, digestStateUpdate.digestState, digestStateUpdate.pieceCIDState))
}
if err := a.Repo.AddNodeToShard(ctx, shard.ID(), node.CID(), spaceDID, uploadID, a.ShardEncoder.NodeEncodingLength(node)-node.Size(), addNodeOptions...); err != nil {
return fmt.Errorf("failed to add node %s to shard %s for upload %s: %w", node.CID(), shard.ID(), uploadID, err)
}
return nil
}
// AddNodesToUploadShards assigns all unsharded nodes for an upload to shards.
// It fetches nodes that haven't been assigned to shards yet, then iterates through
// them calling AddNodeToUploadShards for each.
func (a API) AddNodesToUploadShards(ctx context.Context, uploadID id.UploadID, spaceDID did.DID, shardCB func(shard *model.Shard) error) error {
// Get all nodes not yet in shards
nodeCIDs, err := a.Repo.NodesNotInShards(ctx, uploadID, spaceDID)
if err != nil {
return fmt.Errorf("failed to get nodes not in shards for upload %s: %w", uploadID, err)
}
if len(nodeCIDs) == 0 {
return nil // No work to do
}
// Process each node by adding it to a shard
for _, nodeCID := range nodeCIDs {
// AddNodeToUploadShards handles finding/creating shards and assigns the node
// Note: we pass nil for data since we don't have it available in this flow.
// The digest state will be calculated later when reading the shard.
err := a.AddNodeToUploadShards(ctx, uploadID, spaceDID, nodeCID, nil, shardCB)
if err != nil {
return fmt.Errorf("failed to add node %s to shard for upload %s: %w", nodeCID, uploadID, err)
}
}
return nil
}
var _ uploads.AddNodesToUploadShardsFunc = API{}.AddNodesToUploadShards
func (a API) roomInShard(encoder ShardEncoder, shard *model.Shard, node dagsmodel.Node, space *spacesmodel.Space) (bool, error) {
nodeSize := encoder.NodeEncodingLength(node)
if shard.Size()+nodeSize > space.ShardSize() {
return false, nil // No room in the shard
}
// In addition to the byte limit, limit the number of slices to what can be
// indexed. We always put entire shards in indexes.
if a.MaxNodesPerIndex > 0 && shard.SliceCount() >= a.MaxNodesPerIndex {
return false, nil // Shard has reached maximum node count
}
return true, nil
}
type digestStateUpdate struct {
digestStateUpTo uint64
digestState []byte
pieceCIDState []byte
}
func (a API) addNodeToDigestState(ctx context.Context, shard *model.Shard, node dagsmodel.Node, data []byte) (digestStateUpdate, error) {
if uint64(len(data)) != node.Size() {
return digestStateUpdate{}, fmt.Errorf("expected %d bytes for node %s, got %d", node.Size(), node.CID(), len(data))
}
hasher, err := a.updatedShardHashState(ctx, shard)
if err != nil {
return digestStateUpdate{}, fmt.Errorf("getting updated shard %s hasher: %w", shard.ID(), err)
}
err = a.ShardEncoder.WriteNode(ctx, node, data, hasher)
if err != nil {
return digestStateUpdate{}, fmt.Errorf("writing node %s to shard %s digest state: %w", node.CID(), shard.ID(), err)
}
digestState, pieceCIDState, err := hasher.marshal()
if err != nil {
return digestStateUpdate{}, fmt.Errorf("marshaling shard %s digest state: %w", shard.ID(), err)
}
return digestStateUpdate{
digestStateUpTo: shard.Size() + a.ShardEncoder.NodeEncodingLength(node),
digestState: digestState,
pieceCIDState: pieceCIDState,
}, nil
}
func (a API) updatedShardHashState(ctx context.Context, shard *model.Shard) (*shardHashState, error) {
h, err := fromShard(shard)
if err != nil {
return nil, fmt.Errorf("getting shard %s hasher: %w", shard.ID(), err)
}
if shard.DigestStateUpTo() < shard.Size() {
err := a.fastWriteShard(ctx, shard.ID(), shard.DigestStateUpTo(), h)
if err != nil {
return nil, fmt.Errorf("hashing remaining data for shard %s: %w", shard.ID(), err)
}
}
return h, nil
}
func (a API) closeShard(ctx context.Context, shard *model.Shard) error {
h, err := a.updatedShardHashState(ctx, shard)
if err != nil {
return fmt.Errorf("getting updated shard %s hasher: %w", shard.ID(), err)
}
shardDigest, pieceCID, err := h.finalize(shard.Size())
if err != nil {
return fmt.Errorf("finalizing digests for shard %s: %w", shard.ID(), err)
}
if err := shard.Close(shardDigest, pieceCID); err != nil {
return err
}
return a.Repo.UpdateShard(ctx, shard)
}
func (a API) CloseUploadShards(ctx context.Context, uploadID id.UploadID, shardCB func(shard *model.Shard) error) error {
openShards, err := a.Repo.ShardsForUploadByState(ctx, uploadID, model.BlobStateOpen)
if err != nil {
return fmt.Errorf("failed to get open shards for upload %s: %w", uploadID, err)
}
for _, s := range openShards {
if err := a.closeShard(ctx, s); err != nil {
return fmt.Errorf("updating shard %s for upload %s: %w", s.ID(), uploadID, err)
}
if shardCB != nil {
err = shardCB(s)
if err != nil {
return fmt.Errorf("calling shardCB for closed shard %s: %w", s.ID(), err)
}
}
}
return nil
}
// ReaderForShard uses fastWriteShard connected to a pipe to provide an io.Reader
// for shard data.
func (a API) ReaderForShard(ctx context.Context, shardID id.ShardID) (io.ReadCloser, error) {
ctx, cancel := context.WithCancel(ctx)
// cancel when we're done writing or when we hit an error so that worker
// goroutines exit promptly.
// NOTE: cancel is invoked in the writer goroutine.
pr, pw := io.Pipe()
go func() {
defer cancel()
defer pw.Close()
err := a.fastWriteShard(ctx, shardID, 0, pw)
if err != nil {
_ = pw.CloseWithError(err)
}
}()
return pr, nil
}
// Fast write streams data from a shard to the given io.Writer
// using multiple workers to fetch block data in parallel.
// 1. Spawning workers to fetch block data and enqueue ordered results.
// 2. Writing the shard header is starting at offset 0, then draining results in index order while
// preserving ordering via a small pending map.
// 3. Propagating errors/cancellations through the pipe while draining to let
// workers exit cleanly and bound memory via buffered channels.
func (a API) fastWriteShard(ctx context.Context, shardID id.ShardID, offset uint64, pw io.Writer) error {
type (
job struct {
idx int
node dagsmodel.Node
}
result struct {
idx int
node dagsmodel.Node
data []byte
err error
}
)
var (
workerCount = runtime.NumCPU()
jobs chan job
results chan result
)
if workerCount < 1 {
workerCount = 1
}
// must have a NodeReader to fetch block data
if a.OpenNodeReader == nil {
return fmt.Errorf("no NodeReader configured")
}
jobWindow := workerCount * 4
// Each result carries a whole block (~1MiB), so give results extra headroom
// to keep workers busy if the writer is momentarily slow.
resultWindow := jobWindow * 4
jobs = make(chan job, jobWindow)
results = make(chan result, resultWindow)
var workers sync.WaitGroup
workers.Add(workerCount)
writeBlock := func(res result) error {
if res.err != nil {
return res.err
}
if res.data == nil {
return fmt.Errorf("missing data for node %s", res.node.CID())
}
if err := a.ShardEncoder.WriteNode(ctx, res.node, res.data, pw); err != nil {
return err
}
return nil
}
nodeReader, err := a.OpenNodeReader()
if err != nil {
return fmt.Errorf("failed to open node reader for shard %s: %w", shardID, err)
}
defer nodeReader.Close()
if offset == 0 {
if err := a.ShardEncoder.WriteHeader(ctx, pw); err != nil {
return fmt.Errorf("failed to write shard header: %w", err)
}
}
for w := 0; w < workerCount; w++ {
go func() {
defer workers.Done()
for j := range jobs {
data, err := nodeReader.GetData(ctx, j.node)
if err != nil {
// Expand to all bad nodes in the shard to match previous behavior.
err = a.makeErrBadNodes(ctx, shardID, nodeReader)
}
select {
case <-ctx.Done():
return
case results <- result{idx: j.idx, node: j.node, data: data, err: err}:
}
}
}()
}
go func() {
defer close(jobs)
idx := 0
for nis, err := range a.Repo.ForEachNodeInShard(ctx, shardID, offset) {
if err != nil {
// If we fail to iterate nodes, send the error and stop producing jobs.
select {
case <-ctx.Done():
return
case results <- result{err: fmt.Errorf("failed to iterate nodes in shard %s: %w", shardID, err)}:
}
return
}
select {
case <-ctx.Done():
return
case jobs <- job{idx: idx, node: nis.Node}:
}
idx++
}
}()
go func() {
workers.Wait()
close(results)
}()
pending := make(map[int]result)
next := 0
resultsClosed := false
drainOnly := false
var closeErr error
for {
if resultsClosed && (len(pending) == 0 || drainOnly) {
return closeErr
}
var (
res result
ok bool
)
select {
case <-ctx.Done():
// Stop writing, but continue draining results so workers can exit.
drainOnly = true
closeErr = ctxutil.Cause(ctx)
continue
case res, ok = <-results:
if !ok {
resultsClosed = true
continue
}
}
if drainOnly {
continue
}
if res.err != nil {
return fmt.Errorf("failed to write node %s: %w", res.node.CID(), res.err)
}
pending[res.idx] = res
for {
pendingRes, ok := pending[next]
if !ok {
break
}
if err := writeBlock(pendingRes); err != nil {
return fmt.Errorf("failed to write node %s: %w", pendingRes.node.CID(), err)
}
delete(pending, next)
next++
}
}
}
// makeErrBadNodes attempts to read data from all nodes in the given shard, and
// returns an [ErrBadNodes] for every one that fails. This is a relatively
// expensive check, so it should only be used once we know that we're failing.
// By communicating upstream all failing nodes from the shard, we can handle
// them all at once, and avoid having to restart the upload again for each bad
// node.
func (a API) makeErrBadNodes(ctx context.Context, shardID id.ShardID, nodeReader nodereader.NodeReader) error {
// Collect the nodes first, because we can't read data for each node while
// holding the lock on the database that ForEachNodeInShard has. This means
// holding a bunch of nodes in memory, but it's limited to the size of a
// shard.
var nodes []dagsmodel.Node
for nis, err := range a.Repo.ForEachNodeInShard(ctx, shardID, 0) {
if err != nil {
return fmt.Errorf("failed to iterate over nodes in shard %s: %w", shardID, err)
}
nodes = append(nodes, nis.Node)
}
var errs []types.BadNodeError
goodCIDs := cid.NewSet()
for _, node := range nodes {
_, err := nodeReader.GetData(ctx, node)
if err != nil {
errs = append(errs, types.NewBadNodeError(node.CID(), err))
} else {
goodCIDs.Add(node.CID())
}
}
return types.NewBadNodesError(errs, shardID, goodCIDs)
}
func lengthVarint(size uint64) []byte {
buf := make([]byte, 8)
n := binary.PutUvarint(buf, size)
return buf[:n]
}
func (a API) ReaderForIndex(ctx context.Context, indexID id.IndexID) (io.ReadCloser, error) {
index, err := a.Repo.GetIndexByID(ctx, indexID)
if err != nil {
return nil, fmt.Errorf("getting index %s: %w", indexID, err)
}
// Get the upload to retrieve the root CID
upload, err := a.Repo.GetUploadByID(ctx, index.UploadID())
if err != nil {
return nil, fmt.Errorf("getting upload for index %s: %w", indexID, err)
}
if upload.RootCID() == cid.Undef {
return nil, fmt.Errorf("no root CID set yet for upload %s (index %s)", upload.ID(), indexID)
}
// Build the index by reading shards from the database
indexView := blobindex.NewShardedDagIndexView(cidlink.Link{Cid: upload.RootCID()}, -1)
// Query all nodes across all shards in this index in a single batch query
nodeCount := 0
log.Infow("building index", "index", indexID)
for nii, err := range a.Repo.ForEachNodeInIndex(ctx, indexID) {
if err != nil {
return nil, fmt.Errorf("iterating nodes in index %s: %w", indexID, err)
}
nodeCount++
if nodeCount%10000 == 0 {
log.Infow("building index", "index", indexID, "nodes", nodeCount)
}
indexView.SetSlice(nii.ShardDigest, nii.NodeCID.Hash(), blobindex.Position{Offset: nii.ShardOffset, Length: nii.NodeSize})
}
log.Infow("built index", "index", indexID, "nodes", nodeCount)
archReader, err := blobindex.Archive(indexView)
if err != nil {
return nil, fmt.Errorf("archiving index %s: %w", indexID, err)
}
return io.NopCloser(archReader), nil
}
func (a API) roomInIndex(index *model.Index, shard *model.Shard) (bool, error) {
if a.MaxNodesPerIndex > 0 && index.SliceCount()+shard.SliceCount() > a.MaxNodesPerIndex {
return false, nil // Index would exceed maximum slice count
}
return true, nil
}
func (a API) closeIndex(ctx context.Context, index *model.Index) error {
if err := index.Close(); err != nil {
return err
}
return a.Repo.UpdateIndex(ctx, index)
}
func (a API) AddShardsToUploadIndexes(ctx context.Context, uploadID id.UploadID, indexCB func(index *model.Index) error) error {
shardsToIndex, err := a.Repo.ShardsNotInIndexes(ctx, uploadID)
if err != nil {
return fmt.Errorf("failed to get shards not in indexes for upload %s: %w", uploadID, err)
}
if len(shardsToIndex) == 0 {
return nil // No work to do
}
// Process each shard by adding it to an index
for _, shardID := range shardsToIndex {
// AddShardToUploadIndexes handles finding/creating indexes and assigns the shard
err := a.AddShardToUploadIndexes(ctx, uploadID, shardID, indexCB)
if err != nil {
return fmt.Errorf("failed to add shard %s to index for upload %s: %w", shardID, uploadID, err)
}
}
return nil
}
func (a API) AddShardToUploadIndexes(ctx context.Context, uploadID id.UploadID, shardID id.ShardID, indexCB func(index *model.Index) error) error {
openIndexes, err := a.Repo.IndexesForUploadByState(ctx, uploadID, model.BlobStateOpen)
if err != nil {
return fmt.Errorf("failed to get open indexes for upload %s: %w", uploadID, err)
}
shard, err := a.Repo.GetShardByID(ctx, shardID)
if err != nil {
return fmt.Errorf("failed to find shard %s: %w", shardID, err)
}
if shard == nil {
return fmt.Errorf("shard %s not found", shardID)
}
var index *model.Index
// Look for an open index that has room for the shard.
// (There should only be at most one open index, but there's no harm handling multiple if they exist.)
for _, idx := range openIndexes {
hasRoom, err := a.roomInIndex(idx, shard)
if err != nil {
return fmt.Errorf("failed to check room in index %s for shard %s: %w", idx.ID(), shard.CID(), err)
}
if hasRoom {
index = idx
break
}
if err := a.closeIndex(ctx, idx); err != nil {
return fmt.Errorf("closing index %s: %w", idx.ID(), err)
}
if indexCB != nil {
err = indexCB(idx)
if err != nil {
return fmt.Errorf("calling indexCB for closed index %s: %w", idx.ID(), err)
}
}
}
// If no such index exists, create a new one
if index == nil {
index, err = a.Repo.CreateIndex(ctx, uploadID)
if err != nil {
return fmt.Errorf("failed to create new index for upload %s: %w", uploadID, err)
}
hasRoom, err := a.roomInIndex(index, shard)
if err != nil {
return fmt.Errorf("failed to check room in new index for shard %s: %w", shard.CID(), err)
}
if !hasRoom {
return fmt.Errorf("shard %s (%d slices) too large to fit in new index for upload %s (MaxNodesPerIndex: %d)", shard.CID(), shard.SliceCount(), uploadID, a.MaxNodesPerIndex)
}
}
if err := a.Repo.AddShardToIndex(ctx, index.ID(), shard.ID()); err != nil {
return fmt.Errorf("failed to add shard %s to index %s for upload %s: %w", shard.ID(), index.ID(), uploadID, err)
}
return nil
}
func (a API) CloseUploadIndexes(ctx context.Context, uploadID id.UploadID, indexCB func(index *model.Index) error) error {
openIndexes, err := a.Repo.IndexesForUploadByState(ctx, uploadID, model.BlobStateOpen)
if err != nil {
return fmt.Errorf("failed to get open indexes for upload %s: %w", uploadID, err)
}
for _, s := range openIndexes {
if err := a.closeIndex(ctx, s); err != nil {
return fmt.Errorf("updating index %s for upload %s: %w", s.ID(), uploadID, err)
}
if indexCB != nil {
err = indexCB(s)
if err != nil {
return fmt.Errorf("calling indexCB for closed index %s: %w", s.ID(), err)
}
}
}
return nil
}
func (a API) RemoveShard(ctx context.Context, shardID id.ShardID) error {
return a.Repo.DeleteShard(ctx, shardID)
}