Skip to content

Commit 23602ce

Browse files
committed
feat: Add pathSplit configuration to filestore:local
Splitting path into sub-directories is crucial to keep the number of entries within a single directory within sane limits.
1 parent 3468bc6 commit 23602ce

5 files changed

Lines changed: 207 additions & 26 deletions

File tree

filestore/config.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type Config struct {
1818
type LocalConfig struct {
1919
// BasePath is the filesystem directory where files are stored.
2020
BasePath string
21+
// PathSplit determines how the car name is split into subdirectories
22+
PathSplit []int
2123
}
2224

2325
type S3Config struct {
@@ -38,7 +40,9 @@ type S3Config struct {
3840
func MakeFilestore(cfg Config) (Interface, error) {
3941
switch cfg.Type {
4042
case "local":
41-
return NewLocal(cfg.Local.BasePath)
43+
return NewLocal(cfg.Local.BasePath,
44+
WithPathSplit(cfg.Local.PathSplit...),
45+
)
4246
case "s3":
4347
return NewS3(cfg.S3.BucketName,
4448
WithEndpoint(cfg.S3.Endpoint),

filestore/filestore_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,46 @@ func TestLocal(t *testing.T) {
106106
})
107107
}
108108

109+
func TestLocalWithPathSplit(t *testing.T) {
110+
carDir := t.TempDir()
111+
112+
fileStore, err := filestore.NewLocal(carDir, filestore.WithPathSplit(2, 1))
113+
require.NoError(t, err)
114+
require.Equal(t, "local", fileStore.Type())
115+
116+
t.Run("test-Local-Put", func(t *testing.T) {
117+
testPut(t, fileStore)
118+
})
119+
120+
require.FileExists(t, filepath.Join(carDir, fileName[:2], fileName[2:3], fileName))
121+
122+
t.Run("test-Local-Head", func(t *testing.T) {
123+
testHead(t, fileStore)
124+
})
125+
126+
t.Run("test-Local-Get", func(t *testing.T) {
127+
testGet(t, fileStore)
128+
})
129+
130+
// Some extra bogus filesystem entries that should be skipped
131+
for data, fName := range map[string]string{
132+
"no-path-prefix": filepath.Join(carDir, fileName),
133+
"wrong-path-prefix": filepath.Join(carDir, fileName[:2], fileName),
134+
"prefix-not-on-path-boundary": filepath.Join(carDir, "corner-"+fileName[:2], fileName[2:3], fileName),
135+
} {
136+
require.NoError(t, os.MkdirAll(filepath.Dir(fName), 0700))
137+
require.NoError(t, os.WriteFile(fName, []byte(data), 0600))
138+
}
139+
140+
t.Run("test-Local-List", func(t *testing.T) {
141+
testList(t, fileStore)
142+
})
143+
144+
t.Run("test-Local-Delete", func(t *testing.T) {
145+
testDelete(t, fileStore)
146+
})
147+
}
148+
109149
func TestMakeFilestore(t *testing.T) {
110150
cfg := filestore.Config{
111151
Type: "none",
@@ -130,6 +170,15 @@ func TestMakeFilestore(t *testing.T) {
130170
fs, err = filestore.MakeFilestore(cfg)
131171
require.NoError(t, err)
132172
require.NotNil(t, fs)
173+
174+
cfg.Local.PathSplit = []int{0, -1}
175+
fs, err = filestore.MakeFilestore(cfg)
176+
require.ErrorContains(t, err, "invalid path split")
177+
178+
cfg.Local.PathSplit = []int{7, 5}
179+
fs, err = filestore.MakeFilestore(cfg)
180+
require.NoError(t, err)
181+
require.NotNil(t, fs)
133182
}
134183

135184
func testPut(t *testing.T, fileStore filestore.Interface) {

filestore/local.go

Lines changed: 115 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,40 +7,82 @@ import (
77
"io/fs"
88
"os"
99
"path/filepath"
10+
"strings"
1011

1112
"github.com/ipni/storetheindex/fsutil"
1213
)
1314

1415
// Local is a file store that stores files in the local file system.
1516
type Local struct {
16-
basePath string
17+
basePath string
18+
pathSplit []int
1719
}
1820

19-
func NewLocal(basePath string) (*Local, error) {
21+
func NewLocal(basePath string, options ...LocalOption) (*Local, error) {
2022
if !filepath.IsAbs(basePath) {
2123
return nil, errors.New("base path must be absolute")
2224
}
25+
2326
err := fsutil.DirWritable(basePath)
2427
if err != nil {
2528
return nil, err
2629
}
30+
31+
opts, err := getLocalOpts(options)
32+
if err != nil {
33+
return nil, err
34+
}
35+
2736
return &Local{
28-
basePath: basePath,
37+
basePath: basePath,
38+
pathSplit: opts.pathSplit,
2939
}, nil
3040
}
3141

42+
// fsPath returns the filesystem path of a given object path
43+
// based on a given basePath with path splitting criteria applied.
44+
func (l *Local) fsPath(basePath, relPath string) string {
45+
fsDir, fsName := filepath.Split(filepath.FromSlash(relPath))
46+
47+
pathSegments := make([]string, 0, len(l.pathSplit)+3)
48+
pathSegments = append(pathSegments, basePath, fsDir)
49+
50+
pathSegments = l.appendFnamePathSegments(fsName, pathSegments)
51+
52+
return filepath.Join(pathSegments...)
53+
}
54+
55+
func (l *Local) appendFnamePathSegments(fileName string, pathSegments []string) []string {
56+
fsNameNoExt := fileName
57+
if dotPos := strings.IndexByte(fileName, '.'); dotPos > 0 {
58+
fsNameNoExt = fileName[:dotPos]
59+
}
60+
61+
splitPos := 0
62+
for _, split := range l.pathSplit {
63+
splitPos += split
64+
if splitPos > len(fsNameNoExt) {
65+
break
66+
}
67+
68+
pathSegments = append(pathSegments, fsNameNoExt[splitPos-split:splitPos])
69+
}
70+
71+
pathSegments = append(pathSegments, fileName)
72+
73+
return pathSegments
74+
}
75+
3276
func (l *Local) Delete(ctx context.Context, relPath string) error {
33-
err := os.Remove(filepath.Join(l.basePath, filepath.FromSlash(relPath)))
77+
err := os.Remove(l.fsPath(l.basePath, relPath))
3478
if err != nil && !errors.Is(err, os.ErrNotExist) {
3579
return err
3680
}
3781
return nil
3882
}
3983

4084
func (l *Local) Get(ctx context.Context, relPath string) (*File, io.ReadCloser, error) {
41-
absPath := filepath.Join(l.basePath, filepath.FromSlash(relPath))
42-
43-
f, err := os.Open(absPath)
85+
f, err := os.Open(l.fsPath(l.basePath, relPath))
4486
if err != nil {
4587
if os.IsNotExist(err) {
4688
return nil, nil, fs.ErrNotExist
@@ -67,8 +109,7 @@ func (l *Local) Get(ctx context.Context, relPath string) (*File, io.ReadCloser,
67109
}
68110

69111
func (l *Local) Head(ctx context.Context, relPath string) (*File, error) {
70-
absPath := filepath.Join(l.basePath, filepath.FromSlash(relPath))
71-
fi, err := os.Stat(absPath)
112+
fi, err := os.Stat(l.fsPath(l.basePath, relPath))
72113
if err != nil {
73114
if errors.Is(err, os.ErrNotExist) {
74115
return nil, fs.ErrNotExist
@@ -95,7 +136,23 @@ func (l *Local) List(ctx context.Context, relPath string, recursive bool) (<-cha
95136
defer close(e)
96137
defer close(c)
97138

98-
absPath := filepath.Join(l.basePath, filepath.FromSlash(relPath))
139+
// The relPath may either be a path to a file or a path prefix,
140+
// those cases must be handled separately due to pathSplit that only
141+
// considers the filename as a splittable segment.
142+
if stat, err := os.Stat(l.fsPath(l.basePath, relPath)); err == nil && stat.Mode().IsRegular() {
143+
// relPath points to a file - emit that one and exit
144+
c <- &File{
145+
Modified: stat.ModTime(),
146+
Path: relPath,
147+
Size: stat.Size(),
148+
}
149+
150+
return
151+
}
152+
153+
// relPath must be treated as a directory prefix
154+
absPath := filepath.Join(l.basePath, relPath)
155+
99156
e <- filepath.WalkDir(absPath, func(path string, d fs.DirEntry, err error) error {
100157
if err != nil {
101158
if errors.Is(err, os.ErrNotExist) {
@@ -106,9 +163,18 @@ func (l *Local) List(ctx context.Context, relPath string, recursive bool) (<-cha
106163
}
107164

108165
if d.IsDir() {
109-
if !recursive && path != absPath {
166+
if recursive || len(l.pathSplit) > 0 {
167+
// For both recursive scan and filepath split, keep descending
168+
// to find files in sub-directories
169+
return nil
170+
}
171+
172+
if path != absPath {
173+
// Without path split only a flat structure allowed,
174+
// skip any sub-directories for faster iteration
110175
return fs.SkipDir
111176
}
177+
112178
return nil
113179
}
114180

@@ -122,15 +188,43 @@ func (l *Local) List(ctx context.Context, relPath string, recursive bool) (<-cha
122188
return err
123189
}
124190

125-
relFilePath, err := filepath.Rel(l.basePath, path)
126-
if err != nil {
127-
return err
128-
}
129-
130191
f := &File{
131192
Modified: fi.ModTime(),
132-
Path: filepath.ToSlash(relFilePath),
133-
Size: fi.Size(),
193+
// Path: filepath.ToSlash(relFilePath),
194+
Size: fi.Size(),
195+
}
196+
197+
if len(l.pathSplit) > 0 {
198+
// Before emitting file entry, verify that the path is correct according
199+
// to path split rules
200+
fileName := filepath.Base(path)
201+
expectedFileSubPath := l.fsPath("", fileName)
202+
203+
relAbsPath, err := filepath.Rel(absPath, path)
204+
if err != nil {
205+
return err
206+
}
207+
208+
if prefix, found := strings.CutSuffix(relAbsPath, expectedFileSubPath); !found {
209+
// Skip the file, path structure does not match
210+
return nil
211+
} else if !recursive && prefix != "" {
212+
// Structure matches but is in sub-folder and not recursive mode, skip that one
213+
return nil
214+
} else if prefix != "" && prefix[len(prefix)-1] != filepath.Separator {
215+
// Corner case - the prefix path must align with the path separator boundary
216+
return nil
217+
} else {
218+
// All looks good, flatten adjust the final path by removing sub-folder structure
219+
f.Path = filepath.ToSlash(filepath.Join(relPath, prefix, fileName))
220+
}
221+
} else {
222+
relFilePath, err := filepath.Rel(l.basePath, path)
223+
if err != nil {
224+
return err
225+
}
226+
227+
f.Path = filepath.ToSlash(relFilePath)
134228
}
135229

136230
select {
@@ -146,11 +240,10 @@ func (l *Local) List(ctx context.Context, relPath string, recursive bool) (<-cha
146240
}
147241

148242
func (l *Local) Put(ctx context.Context, relPath string, r io.Reader) (*File, error) {
149-
absPath := filepath.Join(l.basePath, filepath.FromSlash(relPath))
243+
absPath := l.fsPath(l.basePath, relPath)
150244

151-
dir, _ := filepath.Split(relPath)
152-
if dir != "" {
153-
err := os.MkdirAll(filepath.Dir(absPath), 0755)
245+
if dir := filepath.Dir(absPath); dir != "" {
246+
err := os.MkdirAll(dir, 0755)
154247
if err != nil {
155248
return nil, err
156249
}

filestore/option.go

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,11 @@ type s3Config struct {
1313

1414
type S3Option func(*s3Config) error
1515

16-
func getOpts(opts []S3Option) (s3Config, error) {
16+
func getS3Opts(opts []S3Option) (s3Config, error) {
1717
var cfg s3Config
1818
for i, opt := range opts {
1919
if err := opt(&cfg); err != nil {
20-
return s3Config{}, fmt.Errorf("option %d error: %s", i, err)
20+
return s3Config{}, fmt.Errorf("option %d error: %w", i, err)
2121
}
2222
}
2323
return cfg, nil
@@ -44,3 +44,38 @@ func WithKeys(accessKey, secretKey string) S3Option {
4444
return nil
4545
}
4646
}
47+
48+
type localConfig struct {
49+
basePath string
50+
pathSplit []int
51+
}
52+
53+
type LocalOption func(*localConfig) error
54+
55+
func getLocalOpts(opts []LocalOption) (localConfig, error) {
56+
var cfg localConfig
57+
for i, opt := range opts {
58+
if err := opt(&cfg); err != nil {
59+
return localConfig{}, fmt.Errorf("option %d error: %w", i, err)
60+
}
61+
}
62+
return cfg, nil
63+
}
64+
65+
func WithPathSplit(pathSplit ...int) LocalOption {
66+
for i, splitSegment := range pathSplit {
67+
if splitSegment <= 0 {
68+
return func(lc *localConfig) error {
69+
return fmt.Errorf(
70+
"invalid path split config, segment %d size %d must be a positive integer",
71+
i, splitSegment,
72+
)
73+
}
74+
}
75+
}
76+
77+
return func(lc *localConfig) error {
78+
lc.pathSplit = pathSplit
79+
return nil
80+
}
81+
}

filestore/s3.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func NewS3(bucketName string, options ...S3Option) (*S3, error) {
3636
return nil, errors.New("s3 filestore requires bucket name")
3737
}
3838

39-
opts, err := getOpts(options)
39+
opts, err := getS3Opts(options)
4040
if err != nil {
4141
return nil, err
4242
}

0 commit comments

Comments
 (0)