Skip to content

Commit c9c661c

Browse files
committed
refactor(dirpush): implement DirAware storage functionality with new sharedkv implementations and configuration.
1 parent afd055d commit c9c661c

File tree

11 files changed

+112
-34
lines changed

11 files changed

+112
-34
lines changed

conf/conf.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,9 @@ type Server struct {
4444
}
4545

4646
type ServerPProf struct {
47-
Username string `json:"username" yaml:"username"`
48-
Password string `json:"password" yaml:"password"`
47+
AllowLocalRequest bool `json:"allow_local_request" yaml:"allow_local_request"`
48+
Username string `json:"username" yaml:"username"`
49+
Password string `json:"password" yaml:"password"`
4950
}
5051

5152
type ServerAccessLog struct {
@@ -76,9 +77,20 @@ type Storage struct {
7677
EvictionPolicy string `json:"eviction_policy" yaml:"eviction_policy"`
7778
SelectionPolicy string `json:"selection_policy" yaml:"selection_policy"`
7879
SliceSize uint64 `json:"slice_size" yaml:"slice_size"`
80+
DirAware *DirAware `json:"diraware" yaml:"diraware"`
7981
Buckets []*Bucket `json:"buckets" yaml:"buckets"`
8082
}
8183

84+
func (r *Storage) FillDefault() {
85+
if r.DirAware == nil {
86+
r.DirAware = &DirAware{
87+
Enabled: true,
88+
StorePath: "/tmp/.diraware",
89+
AutoClear: true,
90+
}
91+
}
92+
}
93+
8294
type Bucket struct {
8395
Path string `json:"path" yaml:"path"` // local path or ?
8496
Driver string `json:"driver" yaml:"driver"` // native, custom-driver
@@ -91,6 +103,12 @@ type Bucket struct {
91103
DBConfig map[string]any `json:"db_config" yaml:"db_config"` // custom db config
92104
}
93105

106+
type DirAware struct {
107+
Enabled bool `json:"enabled" yaml:"enabled"` // 目录推送标记删除功能开关
108+
StorePath string `json:"store_path" yaml:"store_path"` // 推送任务储存路径(建议SSD)
109+
AutoClear bool `json:"auto_clear" yaml:"auto_clear"` // 自动清理过期任务(凌晨2点左右执行)
110+
}
111+
94112
type Plugin struct {
95113
Name string `json:"name" yaml:"name"`
96114
Options map[string]any `json:"options" yaml:"options"`

config.example.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,10 @@ storage:
9393
eviction_policy: fifo # fifo, lru, lfu
9494
selection_policy: hashring # hashring, roundrobin
9595
slice_size: 1048576 # 1MB
96+
diraware:
97+
enabled: true # 默认 true
98+
store_path: /cache1/.diraware
99+
auto_clear: true
96100
buckets:
97101
- path: /cache1
98102
type: normal

main.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,6 @@ import (
3737
"github.com/omalloc/tavern/proxy"
3838
"github.com/omalloc/tavern/server"
3939
"github.com/omalloc/tavern/storage"
40-
"github.com/omalloc/tavern/storage/marked"
4140
)
4241

4342
var (
@@ -114,11 +113,10 @@ func newApp(bc *conf.Bootstrap, logger log.Logger) (*kratos.App, error) {
114113
}
115114

116115
// init storage
117-
store, err := storage.New(bc.Storage, log.GetLogger())
116+
store, err := storage.New(bc.Storage, logger)
118117
if err != nil {
119118
log.Fatalf("failed to initialize storage: %v", err)
120119
}
121-
store = marked.WrapStorage(store, marked.NewSharedKVChecker(store.SharedKV()))
122120
storage.SetDefault(store)
123121

124122
// init upstream

storage/bucket/disk/disk.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func New(config *conf.Bucket, sharedkv storage.SharedKV) (storage.Bucket, error)
6969
db, err := indexdb.Create(config.DBType,
7070
indexdb.NewOption(config.DBPath, indexdb.WithType("pebble"), indexdb.WithDBConfig(config.DBConfig)))
7171
if err != nil {
72-
log.Errorf("failed to create %s indexdb %v", config.DBType, err)
72+
log.Errorf("failed to create %s(%s) indexdb %v", config.DBType, config.DBPath, err)
7373
return nil, err
7474
}
7575
bucket.indexdb = db
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package marked
1+
package diraware
22

33
import (
44
"context"
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package marked
1+
package diraware
22

33
import (
44
"context"
@@ -37,7 +37,7 @@ func WithAutoClear(clear bool) SharedKVOption {
3737
}
3838
}
3939

40-
func NewSharedKVChecker(kv storagev1.SharedKV, opts ...SharedKVOption) Checker {
40+
func NewChecker(kv storagev1.SharedKV, opts ...SharedKVOption) Checker {
4141
c := &checker{
4242
KV: kv,
4343
pathtrie: pathtrie.NewPathTrie[string, int64](),
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
package marked
1+
package diraware
22

33
import (
44
"context"
@@ -14,9 +14,9 @@ type Checker interface {
1414
TrieAdd(ctx context.Context, storePath string)
1515
}
1616

17-
// WrapStorage wraps a storage with push-mark logic.
17+
// New wraps a storage with push-mark logic.
1818
// If checker is nil, returns the original storage.
19-
func WrapStorage(base storagev1.Storage, checker Checker) storagev1.Storage {
19+
func New(base storagev1.Storage, checker Checker) storagev1.Storage {
2020
if base == nil || checker == nil {
2121
return base
2222
}

storage/sharedkv/memkv.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package sharedkv
2+
3+
import (
4+
"github.com/cockroachdb/pebble/v2"
5+
"github.com/cockroachdb/pebble/v2/vfs"
6+
"github.com/omalloc/tavern/api/defined/v1/storage"
7+
"github.com/omalloc/tavern/contrib/log"
8+
)
9+
10+
// NewMemSharedKV create a new memory kv store
11+
func NewMemSharedKV() storage.SharedKV {
12+
opts := &pebble.Options{
13+
FS: vfs.NewMem(),
14+
DisableWAL: true,
15+
Logger: log.NewHelper(log.NewFilter(log.GetLogger(), log.FilterLevel(log.LevelWarn))),
16+
}
17+
18+
db, err := newNoneKV("", opts)
19+
if err != nil {
20+
panic(err)
21+
}
22+
23+
return db
24+
}
Lines changed: 17 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,20 @@ import (
66
"errors"
77

88
"github.com/cockroachdb/pebble/v2"
9-
"github.com/cockroachdb/pebble/v2/vfs"
109
"github.com/omalloc/tavern/api/defined/v1/storage"
11-
"github.com/omalloc/tavern/contrib/log"
1210
)
1311

14-
var _ storage.SharedKV = (*memSharedKV)(nil)
12+
var _ storage.SharedKV = (*noneSharedKV)(nil)
1513

16-
type memSharedKV struct {
14+
type noneSharedKV struct {
1715
db *pebble.DB
1816
}
1917

20-
func (r *memSharedKV) Close() error {
18+
func (r *noneSharedKV) Close() error {
2119
return r.db.Close()
2220
}
2321

24-
func (r *memSharedKV) Get(_ context.Context, key []byte) ([]byte, error) {
22+
func (r *noneSharedKV) Get(_ context.Context, key []byte) ([]byte, error) {
2523
val, c, err := r.db.Get(key)
2624
if err != nil {
2725
if errors.Is(err, pebble.ErrNotFound) {
@@ -35,11 +33,11 @@ func (r *memSharedKV) Get(_ context.Context, key []byte) ([]byte, error) {
3533
return val, nil
3634
}
3735

38-
func (r *memSharedKV) Set(_ context.Context, key []byte, val []byte) error {
36+
func (r *noneSharedKV) Set(_ context.Context, key []byte, val []byte) error {
3937
return r.db.Set(key, val, pebble.NoSync)
4038
}
4139

42-
func (r *memSharedKV) Incr(_ context.Context, key []byte, delta uint32) (uint32, error) {
40+
func (r *noneSharedKV) Incr(_ context.Context, key []byte, delta uint32) (uint32, error) {
4341
batch := r.db.NewIndexedBatch()
4442
defer func() { _ = batch.Close() }()
4543

@@ -70,7 +68,7 @@ func (r *memSharedKV) Incr(_ context.Context, key []byte, delta uint32) (uint32,
7068
return counter, nil
7169
}
7270

73-
func (r *memSharedKV) Decr(_ context.Context, key []byte, delta uint32) (uint32, error) {
71+
func (r *noneSharedKV) Decr(_ context.Context, key []byte, delta uint32) (uint32, error) {
7472
batch := r.db.NewIndexedBatch()
7573
defer func() { _ = batch.Close() }()
7674

@@ -105,7 +103,7 @@ func (r *memSharedKV) Decr(_ context.Context, key []byte, delta uint32) (uint32,
105103
return counter, nil
106104
}
107105

108-
func (r *memSharedKV) GetCounter(_ context.Context, key []byte) (uint32, error) {
106+
func (r *noneSharedKV) GetCounter(_ context.Context, key []byte) (uint32, error) {
109107
val, closer, err := r.db.Get(key)
110108
if err != nil {
111109
return 0, err
@@ -115,19 +113,19 @@ func (r *memSharedKV) GetCounter(_ context.Context, key []byte) (uint32, error)
115113
return binary.BigEndian.Uint32(val), nil
116114
}
117115

118-
func (r *memSharedKV) Delete(_ context.Context, key []byte) error {
116+
func (r *noneSharedKV) Delete(_ context.Context, key []byte) error {
119117
return r.db.Delete(key, pebble.NoSync)
120118
}
121119

122-
func (r *memSharedKV) DropPrefix(ctx context.Context, prefix []byte) error {
120+
func (r *noneSharedKV) DropPrefix(ctx context.Context, prefix []byte) error {
123121
end := make([]byte, len(prefix))
124122
copy(end, prefix)
125123
end[len(end)-1]++
126124

127125
return r.db.DeleteRange(prefix, end, pebble.NoSync)
128126
}
129127

130-
func (r *memSharedKV) Iterate(ctx context.Context, f func(key []byte, val []byte) error) error {
128+
func (r *noneSharedKV) Iterate(ctx context.Context, f func(key []byte, val []byte) error) error {
131129
iter, err := r.db.NewIterWithContext(ctx, &pebble.IterOptions{})
132130
if err != nil {
133131
return err
@@ -148,7 +146,7 @@ func (r *memSharedKV) Iterate(ctx context.Context, f func(key []byte, val []byte
148146
return nil
149147
}
150148

151-
func (r *memSharedKV) IteratePrefix(ctx context.Context, prefix []byte, f func(key []byte, val []byte) error) error {
149+
func (r *noneSharedKV) IteratePrefix(ctx context.Context, prefix []byte, f func(key []byte, val []byte) error) error {
152150
iter, err := r.db.NewIterWithContext(ctx, &pebble.IterOptions{
153151
LowerBound: prefix,
154152
UpperBound: keyUpperBound(prefix),
@@ -184,17 +182,14 @@ func keyUpperBound(b []byte) []byte {
184182
return nil // no upper-bound
185183
}
186184

187-
func NewMemSharedKV() storage.SharedKV {
188-
db, err := pebble.Open("", &pebble.Options{
189-
FS: vfs.NewMem(),
190-
Logger: log.NewHelper(log.NewFilter(log.GetLogger(), log.FilterLevel(log.LevelWarn))),
191-
})
185+
func newNoneKV(storePath string, opts *pebble.Options) (storage.SharedKV, error) {
186+
db, err := pebble.Open(storePath, opts)
192187
if err != nil {
193-
panic(err)
188+
return nil, err
194189
}
195190

196-
r := &memSharedKV{
191+
r := &noneSharedKV{
197192
db: db,
198193
}
199-
return r
194+
return r, nil
200195
}

storage/sharedkv/storekv.go

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
package sharedkv
2+
3+
import (
4+
"github.com/cockroachdb/pebble/v2"
5+
"github.com/omalloc/tavern/api/defined/v1/storage"
6+
"github.com/omalloc/tavern/contrib/log"
7+
)
8+
9+
// NewStoreSharedKV create a new store kv store
10+
func NewStoreSharedKV(storePath string) storage.SharedKV {
11+
db, err := newNoneKV(storePath, &pebble.Options{
12+
DisableWAL: true,
13+
Logger: log.NewHelper(log.NewFilter(log.GetLogger(), log.FilterLevel(log.LevelWarn))),
14+
})
15+
if err != nil {
16+
panic(err)
17+
}
18+
19+
return db
20+
}

0 commit comments

Comments
 (0)