Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 0 additions & 8 deletions pkg/aws/dynamoallocationstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,14 +122,6 @@ func createAllocationsTable(t *testing.T, c *dynamodb.Client, tableName string)
AttributeName: aws.String("multihash"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("size"),
AttributeType: types.ScalarAttributeTypeN,
},
{
AttributeName: aws.String("invocation"),
AttributeType: types.ScalarAttributeTypeS,
},
{
AttributeName: aws.String("insertedAt"),
AttributeType: types.ScalarAttributeTypeS,
Expand Down
29 changes: 23 additions & 6 deletions pkg/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,10 @@ func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(j
}

s.AddEvent(fmt.Sprintf("processing %d results", len(results)))

var indexFetchSucceeded bool
var lastIndexFetchErr error

for _, result := range results {
// unmarshall metadata for this provider
md := metadata.MetadataContext.New()
Expand Down Expand Up @@ -211,9 +215,10 @@ func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(j
case *metadata.LocationCommitmentMetadata:
s.AddEvent("processing location claim")

// for a location claim, we just store it, unless its for an index CID, in which case get the full idnex
// for a location claim, we just store it, unless its for an index CID, in which case get the full index
if j.indexForMh != nil {
// fetch (from URL or cache) the full index
// Try to fetch the index from this provider result
// If it fails, we'll continue to the next result instead of failing the entire query
shard := typedProtocol.Shard
if shard == nil {
c := cid.NewCidV1(cid.Raw, j.mh)
Expand All @@ -222,14 +227,18 @@ func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(j
url, err := fetchRetrievalURL(*result.Provider, *shard)
if err != nil {
telemetry.Error(s, err, "fetching index retrieval URL")
return fmt.Errorf("fetching retrieval URL for index %q: %w", shard, err)
log.Warnw("failed to fetch retrieval URL, will try next provider result if available", "shard", shard, "provider", result.Provider.ID, "err", err)
lastIndexFetchErr = fmt.Errorf("fetching retrieval URL for index %q from provider %s: %w", shard, result.Provider.ID, err)
continue // Try next provider result
}

s.AddEvent("fetching index")
var auth *types.RetrievalAuth
match, err := assert.Location.Match(validator.NewSource(claim.Capabilities()[0], claim))
if err != nil {
return fmt.Errorf("failed to match claim to location commitment: %w", err)
log.Warnw("failed to match claim to location commitment, will try next provider result if available", "err", err)
lastIndexFetchErr = fmt.Errorf("failed to match claim to location commitment: %w", err)
continue
}
lcCaveats := match.Value().Nb()
space := lcCaveats.Space
Expand Down Expand Up @@ -261,10 +270,13 @@ func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(j
index, err := is.blobIndexLookup.Find(mhCtx, result.ContextID, *j.indexProviderRecord, req)
if err != nil {
telemetry.Error(s, err, "fetching index blob")
return err
log.Warnw("failed to fetch index blob, will try next provider result if available", "provider", result.Provider.ID, "err", err)
lastIndexFetchErr = fmt.Errorf("fetching index blob from provider %s: %w", result.Provider.ID, err)
continue // Try next provider result
}

// Add the index to the query results, if we don't already have it
// Success! Add the index to the query results, if we don't already have it
indexFetchSucceeded = true
state.CmpSwap(
func(qs queryState) bool {
return !qs.qr.Indexes.Has(result.ContextID)
Expand All @@ -289,6 +301,11 @@ func (is *IndexingService) jobHandler(mhCtx context.Context, j job, spawn func(j
}
}
}

// If we attempted to fetch an index but all attempts failed, return the last error
if lastIndexFetchErr != nil && !indexFetchSucceeded {
return fmt.Errorf("failed to fetch index from all provider results: %w", lastIndexFetchErr)
}
return nil
}

Expand Down
185 changes: 184 additions & 1 deletion pkg/service/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -422,9 +422,192 @@ func TestQuery(t *testing.T) {

require.Error(t, err)
})

t.Run("succeeds when at least one provider result is valid despite others having incomplete addresses", func(t *testing.T) {
mockBlobIndexLookup := blobindexlookup.NewMockBlobIndexLookup(t)
mockClaimsService := contentclaims.NewMockContentClaimsService(t)
mockProviderIndex := providerindex.NewMockProviderIndex(t)

// First provider has incomplete addresses (only claim endpoint, missing blob endpoint)
// This simulates the real-world issue from cid.contact
badProviderAddr := &peer.AddrInfo{
ID: testutil.RandomPeer(t),
Addrs: []ma.Multiaddr{
testutil.Must(ma.NewMultiaddr("/dns/indexer.storacha.network/https/http-path/claim%2F%7Bclaim%7D"))(t),
},
}

// Second provider has valid addresses but will return wrong claim type
wrongClaimProviderAddr := &peer.AddrInfo{
ID: testutil.RandomPeer(t),
Addrs: []ma.Multiaddr{
testutil.Must(ma.NewMultiaddr("/dns/wrong.storacha.network/tls/http/http-path/%2Fclaims%2F%7Bclaim%7D"))(t),
testutil.Must(ma.NewMultiaddr("/dns/wrong.storacha.network/tls/http/http-path/%2Fblobs%2F%7Bblob%7D"))(t),
},
}

// Third provider has valid addresses (both claim and blob endpoints)
goodProviderAddr := &peer.AddrInfo{
ID: testutil.RandomPeer(t),
Addrs: []ma.Multiaddr{
testutil.Must(ma.NewMultiaddr("/dns/storacha.network/tls/http/http-path/%2Fclaims%2F%7Bclaim%7D"))(t),
testutil.Must(ma.NewMultiaddr("/dns/storacha.network/tls/http/http-path/%2Fblobs%2F%7Bblob%7D"))(t),
},
}

contentLink := testutil.RandomCID(t)
contentHash := contentLink.(cidlink.Link).Hash()
space := testutil.RandomDID(t)

// content will have a location claim and an index claim
locationDelegationCid, locationDelegation, locationProviderResult := buildTestLocationClaim(t, contentLink.(cidlink.Link), goodProviderAddr, space, rand.Uint64N(5000))
indexDelegationCid, indexDelegation, indexResult, indexCid, index := buildTestIndexClaim(t, contentLink.(cidlink.Link), goodProviderAddr)

contentResults := []model.ProviderResult{locationProviderResult, indexResult}

// expect a call to find records for content
mockProviderIndex.EXPECT().Find(extmocks.AnyContext, providerindex.QueryKey{
Hash: contentHash,
TargetClaims: []multicodec.Code{metadata.EqualsClaimID, metadata.IndexClaimID, metadata.LocationCommitmentID},
}).Return(contentResults, nil)

// the results for content should make the IndexingService ask for both claims
locationClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://storacha.network/claims/%s", locationDelegationCid.String())))(t)
mockClaimsService.EXPECT().Find(extmocks.AnyContext, locationDelegationCid, locationClaimUrl).Return(locationDelegation, nil)
indexClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://storacha.network/claims/%s", indexDelegationCid.String())))(t)
mockClaimsService.EXPECT().Find(extmocks.AnyContext, indexDelegationCid, indexClaimUrl).Return(indexDelegation, nil)

// then attempt to find records for the index referenced in the index claim
// This returns THREE provider results:
// 1. First one with bad addresses and nil shard (fails at fetchRetrievalURL)
// 2. Second one with valid addresses but returns wrong claim type (fails at assert.Location.Match)
// 3. Third one with good addresses and valid claim (succeeds)
indexSize := rand.Uint64N(5000)
// First provider has nil shard to test the shard == nil code path, fails at fetchRetrievalURL
badIndexLocationDelegationCid, badIndexLocationDelegation, badIndexLocationProviderResult := buildTestLocationClaimWithShard(t, indexCid, badProviderAddr, space, indexSize, nil)
// Second provider has valid addresses but we'll return wrong claim type (index delegation instead of location)
wrongClaimLocationDelegationCid, _, wrongClaimProviderResult := buildTestLocationClaim(t, indexCid, wrongClaimProviderAddr, space, indexSize)
// Third provider is valid
goodIndexLocationDelegationCid, goodIndexLocationDelegation, goodIndexLocationProviderResult := buildTestLocationClaim(t, indexCid, goodProviderAddr, space, indexSize)

mockProviderIndex.EXPECT().Find(extmocks.AnyContext, providerindex.QueryKey{
Hash: indexCid.Hash(),
TargetClaims: []multicodec.Code{metadata.LocationCommitmentID},
}).Return([]model.ProviderResult{badIndexLocationProviderResult, wrongClaimProviderResult, goodIndexLocationProviderResult}, nil)

// fetch the first index's location claim (bad provider - fails at fetchRetrievalURL)
// Note: http-path encoding produces paths without leading slash
badIndexLocationClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://indexer.storacha.network/claim/%s", badIndexLocationDelegationCid.String())))(t)
badIndexLocationClaimUrl.Path = fmt.Sprintf("claim/%s", badIndexLocationDelegationCid.String()) // Remove leading slash
mockClaimsService.EXPECT().Find(extmocks.AnyContext, badIndexLocationDelegationCid, badIndexLocationClaimUrl).Return(badIndexLocationDelegation, nil)

// fetch the second index's location claim (wrong claim provider - returns index delegation instead of location)
// This will cause assert.Location.Match to fail, covering lines 239-240
wrongClaimLocationClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://wrong.storacha.network/claims/%s", wrongClaimLocationDelegationCid.String())))(t)
mockClaimsService.EXPECT().Find(extmocks.AnyContext, wrongClaimLocationDelegationCid, wrongClaimLocationClaimUrl).Return(indexDelegation, nil) // Return index delegation instead of location

// fetch the third index's location claim (good provider)
goodIndexLocationClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://storacha.network/claims/%s", goodIndexLocationDelegationCid.String())))(t)
mockClaimsService.EXPECT().Find(extmocks.AnyContext, goodIndexLocationDelegationCid, goodIndexLocationClaimUrl).Return(goodIndexLocationDelegation, nil)

// The first provider result will fail to build a retrieval URL (no blob endpoint)
// The second provider result will fail at assert.Location.Match (wrong claim type)
// The third provider result should succeed
goodIndexBlobUrl := testutil.Must(url.Parse(fmt.Sprintf("https://storacha.network/blobs/%s", digestutil.Format(indexCid.Hash()))))(t)
retrievalReq := types.NewRetrievalRequest(goodIndexBlobUrl, &metadata.Range{Length: &indexSize}, nil)
mockBlobIndexLookup.EXPECT().Find(extmocks.AnyContext, types.EncodedContextID(goodIndexLocationProviderResult.ContextID), indexResult, retrievalReq).Return(index, nil)

service := NewIndexingService(testutil.Service, mockBlobIndexLookup, mockClaimsService, peer.AddrInfo{ID: testutil.RandomPeer(t)}, mockProviderIndex)

result, err := service.Query(t.Context(), types.Query{Hashes: []mh.Multihash{contentHash}})

// Should succeed despite the first two provider results failing
require.NoError(t, err)
require.NotNil(t, result)
require.Len(t, result.Indexes(), 1, "should have successfully fetched the index from the third provider result")
})

t.Run("returns error when all provider results are invalid (have incomplete addresses)", func(t *testing.T) {
mockBlobIndexLookup := blobindexlookup.NewMockBlobIndexLookup(t)
mockClaimsService := contentclaims.NewMockContentClaimsService(t)
mockProviderIndex := providerindex.NewMockProviderIndex(t)

// Both providers have incomplete addresses (only claim endpoint, missing blob endpoint)
badProviderAddr1 := &peer.AddrInfo{
ID: testutil.RandomPeer(t),
Addrs: []ma.Multiaddr{
testutil.Must(ma.NewMultiaddr("/dns/indexer.storacha.network/https/http-path/claim%2F%7Bclaim%7D"))(t),
},
}

badProviderAddr2 := &peer.AddrInfo{
ID: testutil.RandomPeer(t),
Addrs: []ma.Multiaddr{
testutil.Must(ma.NewMultiaddr("/dns/other.storacha.network/https/http-path/claim%2F%7Bclaim%7D"))(t),
},
}

contentLink := testutil.RandomCID(t)
contentHash := contentLink.(cidlink.Link).Hash()
space := testutil.RandomDID(t)

// content will have a location claim and an index claim
locationDelegationCid, locationDelegation, locationProviderResult := buildTestLocationClaim(t, contentLink.(cidlink.Link), badProviderAddr1, space, rand.Uint64N(5000))
indexDelegationCid, indexDelegation, indexResult, indexCid, _ := buildTestIndexClaim(t, contentLink.(cidlink.Link), badProviderAddr1)

contentResults := []model.ProviderResult{locationProviderResult, indexResult}

// expect a call to find records for content
mockProviderIndex.EXPECT().Find(extmocks.AnyContext, providerindex.QueryKey{
Hash: contentHash,
TargetClaims: []multicodec.Code{metadata.EqualsClaimID, metadata.IndexClaimID, metadata.LocationCommitmentID},
}).Return(contentResults, nil)

// the results for content should make the IndexingService ask for both claims
locationClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://indexer.storacha.network/claim/%s", locationDelegationCid.String())))(t)
locationClaimUrl.Path = fmt.Sprintf("claim/%s", locationDelegationCid.String())
mockClaimsService.EXPECT().Find(extmocks.AnyContext, locationDelegationCid, locationClaimUrl).Return(locationDelegation, nil)
indexClaimUrl := testutil.Must(url.Parse(fmt.Sprintf("https://indexer.storacha.network/claim/%s", indexDelegationCid.String())))(t)
indexClaimUrl.Path = fmt.Sprintf("claim/%s", indexDelegationCid.String())
mockClaimsService.EXPECT().Find(extmocks.AnyContext, indexDelegationCid, indexClaimUrl).Return(indexDelegation, nil)

// then attempt to find records for the index - both have bad addresses
indexSize := rand.Uint64N(5000)
badIndexLocationDelegationCid1, badIndexLocationDelegation1, badIndexLocationProviderResult1 := buildTestLocationClaim(t, indexCid, badProviderAddr1, space, indexSize)
badIndexLocationDelegationCid2, badIndexLocationDelegation2, badIndexLocationProviderResult2 := buildTestLocationClaim(t, indexCid, badProviderAddr2, space, indexSize)

mockProviderIndex.EXPECT().Find(extmocks.AnyContext, providerindex.QueryKey{
Hash: indexCid.Hash(),
TargetClaims: []multicodec.Code{metadata.LocationCommitmentID},
}).Return([]model.ProviderResult{badIndexLocationProviderResult1, badIndexLocationProviderResult2}, nil)

// fetch both location claims (both bad providers)
badIndexLocationClaimUrl1 := testutil.Must(url.Parse(fmt.Sprintf("https://indexer.storacha.network/claim/%s", badIndexLocationDelegationCid1.String())))(t)
badIndexLocationClaimUrl1.Path = fmt.Sprintf("claim/%s", badIndexLocationDelegationCid1.String())
mockClaimsService.EXPECT().Find(extmocks.AnyContext, badIndexLocationDelegationCid1, badIndexLocationClaimUrl1).Return(badIndexLocationDelegation1, nil)

badIndexLocationClaimUrl2 := testutil.Must(url.Parse(fmt.Sprintf("https://other.storacha.network/claim/%s", badIndexLocationDelegationCid2.String())))(t)
badIndexLocationClaimUrl2.Path = fmt.Sprintf("claim/%s", badIndexLocationDelegationCid2.String())
mockClaimsService.EXPECT().Find(extmocks.AnyContext, badIndexLocationDelegationCid2, badIndexLocationClaimUrl2).Return(badIndexLocationDelegation2, nil)

// Both provider results will fail to build a retrieval URL (no blob endpoint)
// No mockBlobIndexLookup expectations because we never get that far

service := NewIndexingService(testutil.Service, mockBlobIndexLookup, mockClaimsService, peer.AddrInfo{ID: testutil.RandomPeer(t)}, mockProviderIndex)

_, err := service.Query(t.Context(), types.Query{Hashes: []mh.Multihash{contentHash}})

// Should fail because all provider results have incomplete addresses
require.Error(t, err)
require.Contains(t, err.Error(), "failed to fetch index from all provider results")
})
}

func buildTestLocationClaim(t *testing.T, contentLink cidlink.Link, providerAddr *peer.AddrInfo, space did.DID, size uint64) (cidlink.Link, delegation.Delegation, model.ProviderResult) {
return buildTestLocationClaimWithShard(t, contentLink, providerAddr, space, size, &contentLink.Cid)
}

func buildTestLocationClaimWithShard(t *testing.T, contentLink cidlink.Link, providerAddr *peer.AddrInfo, space did.DID, size uint64, shard *cid.Cid) (cidlink.Link, delegation.Delegation, model.ProviderResult) {
locationClaim := cassert.Location.New(testutil.Alice.DID().String(), cassert.LocationCaveats{
Content: ctypes.FromHash(contentLink.Hash()),
Location: []url.URL{*testutil.Must(url.Parse("https://storacha.network"))(t)},
Expand All @@ -441,7 +624,7 @@ func buildTestLocationClaim(t *testing.T, contentLink cidlink.Link, providerAddr
}.Sum(testutil.Must(io.ReadAll(delegation.Archive(locationDelegation)))(t)))(t)

locationMetadata := metadata.LocationCommitmentMetadata{
Shard: &contentLink.Cid,
Shard: shard,
Claim: locationDelegationCid,
Range: &metadata.Range{Length: &size},
Expiration: time.Now().Add(time.Hour).Unix(),
Expand Down