Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/node/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,9 @@ func Storage(bcfg *BuildCfg, cfg *config.Config) fx.Option {

finalBstore := fx.Provide(GcBlockstoreCtor)
if cfg.Experimental.FilestoreEnabled || cfg.Experimental.UrlstoreEnabled {
finalBstore = fx.Provide(FilestoreBlockstoreCtor)
finalBstore = fx.Provide(FilestoreBlockstoreCtor(
cfg.Provide.Strategy.WithDefault(config.DefaultProvideStrategy),
))
}

return fx.Options(
Expand Down
28 changes: 20 additions & 8 deletions core/node/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"go.uber.org/fx"

"github.com/ipfs/boxo/filestore"
"github.com/ipfs/boxo/provider"
"github.com/ipfs/kubo/core/node/helpers"
"github.com/ipfs/kubo/repo"
"github.com/ipfs/kubo/thirdparty/verifbs"
Expand Down Expand Up @@ -77,14 +78,25 @@ func GcBlockstoreCtor(bb BaseBlocks) (gclocker blockstore.GCLocker, gcbs blockst
}

// FilestoreBlockstoreCtor wraps GcBlockstore and adds Filestore support
func FilestoreBlockstoreCtor(repo repo.Repo, bb BaseBlocks, prov DHTProvider) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker = blockstore.NewGCLocker()
func FilestoreBlockstoreCtor(
providingStrategy string,
) func(repo repo.Repo, bb BaseBlocks, prov DHTProvider) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
return func(repo repo.Repo, bb BaseBlocks, prov DHTProvider) (gclocker blockstore.GCLocker, gcbs blockstore.GCBlockstore, bs blockstore.Blockstore, fstore *filestore.Filestore) {
gclocker = blockstore.NewGCLocker()

// hash security
fstore = filestore.NewFilestore(bb, repo.FileManager(), prov)
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}
var fstoreProv provider.MultihashProvider
strategyFlag := config.ParseProvideStrategy(providingStrategy)
if strategyFlag&config.ProvideStrategyAll != 0 {
fstoreProv = prov
}

bs = gcbs
return
fstore = filestore.NewFilestore(bb, repo.FileManager(), fstoreProv)

// hash security
gcbs = blockstore.NewGCBlockstore(fstore, gclocker)
gcbs = &verifbs.VerifBSGC{GCBlockstore: gcbs}

bs = gcbs
return
}
}
5 changes: 5 additions & 0 deletions docs/changelogs/v0.41.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ This release was brought to you by the [Shipyard](https://ipshipyard.com/) team.
- [✨ New `ipfs cid inspect` command](#-new-ipfs-cid-inspect-command)
- [🖥️ WebUI Improvements](#-webui-improvements)
- [🔧 Correct provider addresses for custom HTTP routing](#-correct-provider-addresses-for-custom-http-routing)
- [🔧 Filestore now respects `Provide.Strategy`](#-filestore-now-respects-providestrategy)
- [`ipfs object patch` validates UnixFS node types](#ipfs-object-patch-validates-unixfs-node-types)
- [📦️ Dependency updates](#-dependency-updates)
- [📝 Changelog](#-changelog)
Expand Down Expand Up @@ -77,6 +78,10 @@ Peer locations load faster thanks to UX optimizations in the underlying ipfs-geo

Nodes using custom routing (`Routing.Type=custom`) with [IPIP-526](https://github.com/ipfs/specs/pull/526) could end up publishing unresolved `0.0.0.0` addresses in provider records. Addresses are now resolved at provide-time, and when AutoNAT V2 has confirmed publicly reachable addresses, those are preferred automatically. See [#11213](https://github.com/ipfs/kubo/issues/11213).

#### 🔧 Filestore now respects `Provide.Strategy`

Previously, blocks added via the [filestore](https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#ipfs-filestore) or [urlstore](https://github.com/ipfs/kubo/blob/master/docs/experimental-features.md#ipfs-urlstore) were always provided to the DHT regardless of the configured `Provide.Strategy`. The filestore now checks the strategy the same way the regular blockstore does, and only provides on add when the strategy includes `all`.

#### `ipfs object patch` validates UnixFS node types

As part of the ongoing deprecation of the legacy `ipfs object` API (which
Expand Down
230 changes: 203 additions & 27 deletions test/cli/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,51 +20,157 @@ import (
)

const (
timeStep = 20 * time.Millisecond
timeout = time.Second
providerPollInterval = 200 * time.Millisecond
providerTimeout = 45 * time.Second
)

type cfgApplier func(*harness.Node)

func runProviderSuite(t *testing.T, reprovide bool, apply cfgApplier) {
t.Helper()

expectNoProviders := func(t *testing.T, cid string, nodes ...*harness.Node) {
t.Helper()
// Poll for providerTimeout to be confident that provide records
// have not propagated. Uses RunIPFS (not IPFS) to avoid panics
// if the node is cleaned up while require.Never is still polling.
require.Never(t, func() bool {
for _, node := range nodes {
res := node.RunIPFS("routing", "findprovs", "-n=1", cid)
if res.Err == nil && res.Stdout.Trimmed() != "" {
return true
}
}
return false
}, providerTimeout, providerPollInterval, "expected no providers for %s", cid)
}

// expectNoneProvided is like expectNoProviders but checks multiple CIDs
// in parallel subtests so they share a single providerTimeout window
// instead of accumulating sequentially.
expectNoneProvided := func(t *testing.T, cids []string, nodes ...*harness.Node) {
t.Helper()
t.Run("expect-no-providers", func(t *testing.T) {
for _, c := range cids {
t.Run(c[:16], func(t *testing.T) {
t.Parallel()
require.Never(t, func() bool {
for _, node := range nodes {
res := node.RunIPFS("routing", "findprovs", "-n=1", c)
if res.Err == nil && res.Stdout.Trimmed() != "" {
return true
}
}
return false
}, providerTimeout, providerPollInterval, "expected no providers for %s", c)
})
}
})
}

expectProviders := func(t *testing.T, cid, expectedProvider string, nodes ...*harness.Node) {
t.Helper()
for _, node := range nodes {
// Uses RunIPFS (not IPFS) to avoid panics if the node is
// cleaned up while require.Eventually is still polling.
require.Eventually(t, func() bool {
res := node.RunIPFS("routing", "findprovs", "-n=1", cid)
return res.Err == nil && res.Stdout.Trimmed() == expectedProvider
}, providerTimeout, providerPollInterval, "expected a provider for %s", cid)
}
}

// provideStatResult holds the subset of 'ipfs provide stat --enc=json'
// fields needed to determine provider readiness.
type provideStatResult struct {
Sweep *struct {
Connectivity struct {
Status string `json:"status"`
} `json:"connectivity"`
} `json:"Sweep"`
Legacy *json.RawMessage `json:"Legacy"`
}

// isProviderOnline parses 'ipfs provide stat --enc=json' output and
// returns true when the provider is ready to accept provide requests.
//
// - LegacyProvider: ready as soon as stats are available (Legacy != nil).
// - SweepingProvider: ready when connectivity.status == "online",
// meaning approxPrefixLen completed and StartProviding won't
// silently drop requests.
isProviderOnline := func(output string) bool {
var s provideStatResult
if err := json.Unmarshal([]byte(output), &s); err != nil {
return false
}
if s.Legacy != nil {
return true
}
return s.Sweep != nil && s.Sweep.Connectivity.Status == "online"
}

// waitForProviderReady blocks until every node's provider system is
// initialized and the DHT provide+find pipeline works end-to-end.
//
// It polls 'ipfs provide stat' (WAN then LAN, since test networks
// use private addresses where only the LAN DHT has peers) and then
// runs a canary provide+findprovs round-trip. The canary uses manual
// 'ipfs routing provide' which bypasses Strategy and Interval checks,
// so it works for every configuration where Provide.Enabled is true.
waitForProviderReady := func(t *testing.T, nodes harness.Nodes) {
t.Helper()
providerActive := false
for _, node := range nodes {
require.Eventually(t, func() bool {
res := node.RunIPFS("provide", "stat", "--enc=json")
if res.Err != nil {
return true // providing disabled (NoopProvider)
}
if isProviderOnline(res.Stdout.Trimmed()) {
providerActive = true
return true
}
// WAN DHT stays offline in test networks (private addrs
// only). Fall back to LAN DHT stats.
res = node.RunIPFS("provide", "stat", "--lan", "--enc=json")
if res.Err != nil {
return false
}
if isProviderOnline(res.Stdout.Trimmed()) {
providerActive = true
return true
}
return false
}, providerTimeout, providerPollInterval,
"timed out waiting for provider to come online")
}

if providerActive && len(nodes) >= 2 {
canary := nodes[0].IPFSAddStr(time.Now().String(), "--pin=false")
nodes[0].IPFS("routing", "provide", canary)
expectProviders(t, canary, nodes[0].PeerID().String(), nodes[1:]...)
}
}

initNodes := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes {
t.Helper()
nodes := harness.NewT(t).NewNodes(n).Init()
nodes.ForEachPar(apply)
nodes.ForEachPar(fn)
nodes = nodes.StartDaemons().Connect()
time.Sleep(500 * time.Millisecond) // wait for DHT clients to be bootstrapped
time.Sleep(500 * time.Millisecond) // baseline delay for DHT routing tables to settle
waitForProviderReady(t, nodes)
return nodes
}

initNodesWithoutStart := func(t *testing.T, n int, fn func(n *harness.Node)) harness.Nodes {
t.Helper()
nodes := harness.NewT(t).NewNodes(n).Init()
nodes.ForEachPar(apply)
nodes.ForEachPar(fn)
return nodes
}

expectNoProviders := func(t *testing.T, cid string, nodes ...*harness.Node) {
for _, node := range nodes {
res := node.IPFS("routing", "findprovs", "-n=1", cid)
require.Empty(t, res.Stdout.String())
}
}

expectProviders := func(t *testing.T, cid, expectedProvider string, nodes ...*harness.Node) {
outerLoop:
for _, node := range nodes {
for i := time.Duration(0); i*timeStep < timeout; i++ {
res := node.IPFS("routing", "findprovs", "-n=1", cid)
if res.Stdout.Trimmed() == expectedProvider {
continue outerLoop
}
}
require.FailNowf(t, "found no providers", "expected a provider for %s", cid)
}
}

t.Run("Provide.Enabled=true announces new CIDs created by ipfs add", func(t *testing.T) {
t.Parallel()

Expand Down Expand Up @@ -355,6 +461,72 @@ func runProviderSuite(t *testing.T, reprovide bool, apply cfgApplier) {
expectProviders(t, cid, nodes[0].PeerID().String(), nodes[1:]...)
})

// The filestore (--nocopy) uses a separate code path (FilestoreBlockstoreCtor)
// that passes a provider to filestore.NewFilestore. These two tests work as
// a pair: the positive test proves filestore content is findable when the
// strategy says it should be (control), and the negative test verifies the
// provider is NOT passed when the strategy does not include "all".
//
// Both tests use a 2MiB file with 1MiB chunks to produce 2 leaf blocks
// plus a root, so we can verify behavior for all block types while
// keeping the provide queue workload small for slow CI runners.

initFilestoreNodesWithProvideStrategy := func(t *testing.T, strategy string) (provider *harness.Node, querier *harness.Node) {
t.Helper()
nodes := initNodes(t, 2, func(n *harness.Node) {
n.SetIPFSConfig("Experimental.FilestoreEnabled", true)
n.SetIPFSConfig("Provide.Strategy", strategy)
// Use 1MiB chunks so the 2MiB test file produces only 2 leaf
// blocks, keeping the provide queue workload small on slow CI.
n.SetIPFSConfig("Import.UnixFSChunker", "size-1048576")
})
t.Cleanup(func() { nodes.StopDaemons() })
return nodes[0], nodes[1]
}

addNocopyFile := func(t *testing.T, node *harness.Node, addFlags ...string) (rootCID string, leafCIDs []string) {
t.Helper()
filePath := filepath.Join(node.Dir, "testfile.bin")
require.NoError(t, os.WriteFile(filePath, random.Bytes(2*1024*1024), 0o644))

args := append([]string{"add", "-q", "--nocopy"}, addFlags...)
args = append(args, filePath)
rootCID = strings.TrimSpace(node.IPFS(args...).Stdout.String())

leafCIDs = strings.Fields(node.IPFS("refs", rootCID).Stdout.Trimmed())
require.NotEmpty(t, leafCIDs)
return rootCID, leafCIDs
}

t.Run("Filestore with 'all' strategy provides --nocopy blocks", func(t *testing.T) {
t.Parallel()
provider, querier := initFilestoreNodesWithProvideStrategy(t, "all")

rootCID, leafCIDs := addNocopyFile(t, provider)

// Check root and at least one leaf CID. The root alone is not
// sufficient because --fast-provide-root (on by default) provides
// it via a separate mechanism. Leaf blocks are only provided by
// the filestore provider, so finding one proves it works.
pid := provider.PeerID().String()
expectProviders(t, rootCID, pid, querier)
expectProviders(t, leafCIDs[0], pid, querier)
})

t.Run("Filestore with non-all strategy does not provide --nocopy blocks", func(t *testing.T) {
t.Parallel()
provider, querier := initFilestoreNodesWithProvideStrategy(t, "roots")

// Use --pin=false to avoid recursive pins that would trigger
// pin-based providing, and --fast-provide-root=false to prevent
// the root from being immediately provided via the fast path.
// This isolates the test to the filestore add-time provide path.
rootCID, leafCIDs := addNocopyFile(t, provider, "--pin=false", "--fast-provide-root=false")

// Before the fix, the filestore always provided on add regardless of strategy.
expectNoneProvided(t, []string{rootCID, leafCIDs[0]}, querier)
})

if reprovide {

t.Run("Reprovides with 'all' strategy when strategy is '' (empty)", func(t *testing.T) {
Expand Down Expand Up @@ -413,11 +585,15 @@ func runProviderSuite(t *testing.T, reprovide bool, apply cfgApplier) {
cidFoo := nodes[0].IPFSAdd(bytes.NewReader(foo), "--pin=false")
cidBar := nodes[0].IPFSAdd(bytes.NewReader(bar), "--pin=false")

// Nothing should have been provided. The pin was offline, and
// the others should not be provided per the strategy.
// Only cidBarDir is pinned (added offline with default --pin=true).
// cidBar is unpinned but shares raw blocks with cidBarDir (same
// bar bytes), so it becomes findable once cidBarDir is reprovided.
// cidFoo is completely independent unpinned content.
//
// We only assert cidFoo here because cidBar and cidBarDir may
// already be provided by the LegacyProvider's auto-reprovide
// (1 min initial delay) before this point on slow CI.
expectNoProviders(t, cidFoo, nodes[1:]...)
expectNoProviders(t, cidBar, nodes[1:]...)
expectNoProviders(t, cidBarDir, nodes[1:]...)

nodes[0].IPFS("routing", "reprovide")

Expand Down
Loading