Skip to content

Commit 4020856

Browse files
committed
unawareness prefetch implementation on snapshotter side
1. send post request to http server 2. store prefetchlist 3. add prefetchlist in nydusd
1 parent dc68149 commit 4020856

File tree

11 files changed

+138
-69
lines changed

11 files changed

+138
-69
lines changed

config/config.go

+7
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ type SystemControllerConfig struct {
209209
DebugConfig DebugConfig `toml:"debug"`
210210
}
211211

212+
type PrefetchControllerConfig struct {
213+
Enable bool `toml:"enable"`
214+
PrefetchConfig string `toml:"distribution_pull_endpoint"`
215+
}
216+
212217
type SnapshotterConfig struct {
213218
// Configuration format version
214219
Version int `toml:"version"`
@@ -229,6 +234,8 @@ type SnapshotterConfig struct {
229234
LoggingConfig LoggingConfig `toml:"log"`
230235
CgroupConfig CgroupConfig `toml:"cgroup"`
231236
Experimental Experimental `toml:"experimental"`
237+
// Get prefetch list from http server
238+
PrefetchControllerConfig PrefetchControllerConfig `toml:"prefetch"`
232239
}
233240

234241
func LoadSnapshotterConfig(path string) (*SnapshotterConfig, error) {

config/global.go

+14
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ type GlobalConfig struct {
3838
DaemonThreadsNum int
3939
CacheGCPeriod time.Duration
4040
MirrorsConfig MirrorsConfig
41+
PrefetchRoot string
4142
}
4243

4344
func IsFusedevSharedModeEnabled() bool {
@@ -64,6 +65,18 @@ func GetConfigRoot() string {
6465
return globalConfig.ConfigRoot
6566
}
6667

68+
func GetPrefetchRoot() string {
69+
return globalConfig.PrefetchRoot
70+
}
71+
72+
func GetPrefetchEndpoint() string {
73+
return globalConfig.origin.PrefetchControllerConfig.PrefetchConfig
74+
}
75+
76+
func IsPrefetchEnabled() bool {
77+
return globalConfig.origin.PrefetchControllerConfig.Enable
78+
}
79+
6780
func GetMirrorsConfigDir() string {
6881
return globalConfig.MirrorsConfig.Dir
6982
}
@@ -181,6 +194,7 @@ func ProcessConfigurations(c *SnapshotterConfig) error {
181194
globalConfig.ConfigRoot = filepath.Join(c.Root, "config")
182195
globalConfig.SocketRoot = filepath.Join(c.Root, "socket")
183196
globalConfig.RootMountpoint = filepath.Join(c.Root, "mnt")
197+
globalConfig.PrefetchRoot = filepath.Join(c.Root, "prefetch")
184198

185199
globalConfig.MirrorsConfig = c.RemoteConfig.MirrorsConfig
186200

go.mod

+3-6
Original file line numberDiff line numberDiff line change
@@ -56,12 +56,6 @@ require (
5656
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
5757
)
5858

59-
require (
60-
github.com/labstack/gommon v0.4.2 // indirect
61-
github.com/valyala/bytebufferpool v1.0.0 // indirect
62-
github.com/valyala/fasttemplate v1.2.2 // indirect
63-
)
64-
6559
require (
6660
github.com/AdamKorcz/go-118-fuzz-build v0.0.0-20221215162035-5330a85ea652 // indirect
6761
github.com/Microsoft/go-winio v0.6.0 // indirect
@@ -117,6 +111,7 @@ require (
117111
github.com/jmespath/go-jmespath v0.4.0 // indirect
118112
github.com/josharian/intern v1.0.0 // indirect
119113
github.com/json-iterator/go v1.1.12 // indirect
114+
github.com/labstack/gommon v0.4.2 // indirect
120115
github.com/mailru/easyjson v0.7.7 // indirect
121116
github.com/mattn/go-colorable v0.1.13 // indirect
122117
github.com/mattn/go-isatty v0.0.20 // indirect
@@ -138,6 +133,8 @@ require (
138133
github.com/russross/blackfriday/v2 v2.1.0 // indirect
139134
github.com/spf13/pflag v1.0.5 // indirect
140135
github.com/stefanberger/go-pkcs11uri v0.0.0-20201008174630-78d3cae3a980 // indirect
136+
github.com/valyala/bytebufferpool v1.0.0 // indirect
137+
github.com/valyala/fasttemplate v1.2.2 // indirect
141138
github.com/vbatts/tar-split v0.11.2 // indirect
142139
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
143140
go.mozilla.org/pkcs7 v0.0.0-20200128120323-432b2356ecb1 // indirect

misc/snapshotter/config.toml

+4
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,10 @@ daemon_mode = "dedicated"
99
# Whether snapshotter should try to clean up resources when it is closed
1010
cleanup_on_close = false
1111

12+
[prefetch]
13+
enable = false
14+
get_prefetch_endpoint = "http://localhost:1323/api/v1/prefetch/download"
15+
1216
[system]
1317
# Snapshotter's debug and trace HTTP server interface
1418
enable = true

pkg/daemon/config.go

+15
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/containerd/nydus-snapshotter/config"
1818
"github.com/containerd/nydus-snapshotter/internal/constant"
19+
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
1920
)
2021

2122
// Build runtime nydusd daemon object, which might be persisted later
@@ -31,6 +32,20 @@ func WithSocketDir(dir string) NewDaemonOpt {
3132
}
3233
}
3334

35+
func WithPrefetchDir(dir, imageID string) NewDaemonOpt {
36+
return func(d *Daemon) error {
37+
s := filepath.Join(dir, d.ID())
38+
prefetchDir, err := prefetch.GetPrefetchList(s, imageID)
39+
if err != nil {
40+
return errors.Wrapf(err, "failed to get prefetchList for image %s in path %s", imageID, s)
41+
}
42+
if prefetchDir != "" {
43+
d.States.PrefetchDir = prefetchDir
44+
}
45+
return nil
46+
}
47+
}
48+
3449
func WithRef(ref int32) NewDaemonOpt {
3550
return func(d *Daemon) error {
3651
d.ref = ref

pkg/daemon/daemon.go

+2-1
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ type ConfigState struct {
5454
SupervisorPath string
5555
ThreadNum int
5656
// Where the configuration file resides, all rafs instances share the same configuration template
57-
ConfigDir string
57+
ConfigDir string
58+
PrefetchDir string
5859
}
5960

6061
// TODO: Record queried nydusd state

pkg/filesystem/fs.go

+7-3
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (fs *Filesystem) Mount(ctx context.Context, snapshotID string, labels map[s
285285
if err != nil {
286286
return err
287287
}
288-
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0)
288+
d, err = fs.createDaemon(fsManager, config.DaemonModeDedicated, mp, 0, imageID)
289289
// if daemon already exists for snapshotID, just return
290290
if err != nil && !errdefs.IsAlreadyExists(err) {
291291
return err
@@ -578,7 +578,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
578578
return errors.Errorf("got null mountpoint for fsDriver %s", fsManager.FsDriver)
579579
}
580580

581-
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0)
581+
d, err := fs.createDaemon(fsManager, daemonMode, mp, 0, "")
582582
if err != nil {
583583
return errors.Wrap(err, "initialize shared daemon")
584584
}
@@ -612,7 +612,7 @@ func (fs *Filesystem) initSharedDaemon(fsManager *manager.Manager) (err error) {
612612

613613
// createDaemon create new nydus daemon by snapshotID and imageID
614614
func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config.DaemonMode,
615-
mountpoint string, ref int32) (d *daemon.Daemon, err error) {
615+
mountpoint string, ref int32, imageID string) (d *daemon.Daemon, err error) {
616616
opts := []daemon.NewDaemonOpt{
617617
daemon.WithRef(ref),
618618
daemon.WithSocketDir(config.GetSocketRoot()),
@@ -631,6 +631,10 @@ func (fs *Filesystem) createDaemon(fsManager *manager.Manager, daemonMode config
631631
opts = append(opts, daemon.WithMountpoint(mountpoint))
632632
}
633633

634+
if imageID != "" {
635+
opts = append(opts, daemon.WithPrefetchDir(config.GetPrefetchRoot(), imageID))
636+
}
637+
634638
d, err = daemon.NewDaemon(opts...)
635639
if err != nil {
636640
return nil, errors.Wrapf(err, "new daemon")

pkg/manager/daemon_adaptor.go

+2-10
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import (
2323
"github.com/containerd/nydus-snapshotter/pkg/errdefs"
2424
"github.com/containerd/nydus-snapshotter/pkg/metrics/collector"
2525
metrics "github.com/containerd/nydus-snapshotter/pkg/metrics/tool"
26-
"github.com/containerd/nydus-snapshotter/pkg/prefetch"
2726
)
2827

2928
const endpointGetBackend string = "/api/v1/daemons/%s/backend"
@@ -122,7 +121,6 @@ func (m *Manager) StartDaemon(d *daemon.Daemon) error {
122121
// Build commandline according to nydusd daemon configuration.
123122
func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool) (*exec.Cmd, error) {
124123
var cmdOpts []command.Opt
125-
var imageReference string
126124

127125
nydusdThreadNum := d.NydusdThreadNum()
128126

@@ -148,8 +146,6 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
148146
return nil, errors.Wrapf(errdefs.ErrNotFound, "daemon %s no rafs instance associated", d.ID())
149147
}
150148

151-
imageReference = rafs.ImageID
152-
153149
bootstrap, err := rafs.BootstrapFile()
154150
if err != nil {
155151
return nil, errors.Wrapf(err, "locate bootstrap %s", bootstrap)
@@ -176,12 +172,8 @@ func (m *Manager) BuildDaemonCommand(d *daemon.Daemon, bin string, upgrade bool)
176172
command.WithID(d.ID()))
177173
}
178174

179-
if imageReference != "" {
180-
prefetchfiles := prefetch.Pm.GetPrefetchInfo(imageReference)
181-
if prefetchfiles != "" {
182-
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(prefetchfiles))
183-
prefetch.Pm.DeleteFromPrefetchMap(imageReference)
184-
}
175+
if d.States.PrefetchDir != "" {
176+
cmdOpts = append(cmdOpts, command.WithPrefetchFiles(d.States.PrefetchDir))
185177
}
186178

187179
cmdOpts = append(cmdOpts,

pkg/manager/manager.go

+4
Original file line numberDiff line numberDiff line change
@@ -287,6 +287,10 @@ func (m *Manager) cleanUpDaemonResources(d *daemon.Daemon) {
287287
resource := []string{d.States.ConfigDir, d.States.LogDir}
288288
if !d.IsSharedDaemon() {
289289
socketDir := path.Dir(d.GetAPISock())
290+
if d.States.PrefetchDir != "" {
291+
prefetchDir := path.Dir(d.States.PrefetchDir)
292+
resource = append(resource, prefetchDir)
293+
}
290294
resource = append(resource, socketDir)
291295
}
292296

pkg/prefetch/prefetch.go

+80-31
Original file line numberDiff line numberDiff line change
@@ -8,53 +8,102 @@ package prefetch
88

99
import (
1010
"encoding/json"
11-
"sync"
11+
"fmt"
12+
"io"
13+
"net/http"
14+
"os"
15+
"path/filepath"
16+
"strings"
1217

1318
"github.com/containerd/containerd/log"
19+
"github.com/pkg/errors"
20+
21+
"github.com/containerd/nydus-snapshotter/config"
1422
)
1523

16-
type prefetchInfo struct {
17-
prefetchMap map[string]string
18-
prefetchMutex sync.Mutex
24+
type prefetchlist struct {
25+
FilePaths []string `json:"files"`
1926
}
2027

21-
var Pm prefetchInfo
28+
func GetPrefetchList(prefetchDir, imageRepo string) (string, error) {
29+
if config.IsPrefetchEnabled() {
30+
url := config.GetPrefetchEndpoint()
31+
getURL := fmt.Sprintf("%s?imageName=%s", url, imageRepo)
32+
33+
resp, err := http.Get(getURL)
34+
if err != nil {
35+
return "", err
36+
}
37+
defer resp.Body.Close()
38+
39+
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusNotFound {
40+
return "", fmt.Errorf("get from server returned a non-OK status code: %d, HTTP Status Error", resp.StatusCode)
41+
}
42+
43+
body, err := io.ReadAll(resp.Body)
44+
if err != nil {
45+
return "", err
46+
}
2247

23-
func (p *prefetchInfo) SetPrefetchFiles(body []byte) error {
24-
p.prefetchMutex.Lock()
25-
defer p.prefetchMutex.Unlock()
48+
if strings.Contains(string(body), "CacheItem not found") {
49+
log.L.Infof("Cache item not found for image: %s\n", imageRepo)
50+
return "", nil
51+
}
52+
53+
prefetchfilePath, err := storePrefetchList(prefetchDir, body)
54+
if err != nil {
55+
return "", err
56+
}
57+
return prefetchfilePath, nil
58+
}
59+
return "", nil
60+
}
2661

27-
var prefetchMsg []map[string]string
28-
if err := json.Unmarshal(body, &prefetchMsg); err != nil {
29-
return err
62+
func storePrefetchList(prefetchDir string, list []byte) (string, error) {
63+
if err := os.MkdirAll(prefetchDir, 0755); err != nil {
64+
return "", errors.Wrapf(err, "create prefetch dir %s", prefetchDir)
3065
}
3166

32-
if p.prefetchMap == nil {
33-
p.prefetchMap = make(map[string]string)
67+
filePath := filepath.Join(prefetchDir, "list")
68+
jsonfilePath := filepath.Join(prefetchDir, "list.json")
69+
70+
file, err := os.OpenFile(filePath, os.O_WRONLY|os.O_APPEND|os.O_CREATE, 0644)
71+
if err != nil {
72+
fmt.Println("Error opening file:", err)
73+
return "", errors.Wrap(err, "error opening prefetch file")
3474
}
35-
for _, item := range prefetchMsg {
36-
image := item["image"]
37-
prefetchfiles := item["prefetch"]
38-
p.prefetchMap[image] = prefetchfiles
75+
defer file.Close()
76+
77+
var prefetchSlice []string
78+
err = json.Unmarshal(list, &prefetchSlice)
79+
if err != nil {
80+
return "", errors.Wrap(err, "failed to parse prefetch list")
3981
}
4082

41-
log.L.Infof("received prefetch list from nri plugin: %v ", p.prefetchMap)
42-
return nil
43-
}
83+
for _, path := range prefetchSlice {
84+
content := path + "\n"
85+
_, err := file.WriteString(content)
86+
if err != nil {
87+
return "", errors.Wrap(err, "error writing to prefetch file")
88+
}
89+
}
4490

45-
func (p *prefetchInfo) GetPrefetchInfo(image string) string {
46-
p.prefetchMutex.Lock()
47-
defer p.prefetchMutex.Unlock()
91+
prefetchStruct := prefetchlist{FilePaths: prefetchSlice}
92+
jsonByte, err := json.Marshal(prefetchStruct)
93+
if err != nil {
94+
return "", errors.Wrap(err, "failed to marshal to JSON")
95+
}
4896

49-
if prefetchfiles, ok := p.prefetchMap[image]; ok {
50-
return prefetchfiles
97+
jsonfile, err := os.Create(jsonfilePath)
98+
if err != nil {
99+
return "", errors.Wrapf(err, "failed to create file %s", jsonfilePath)
51100
}
52-
return ""
53-
}
101+
defer jsonfile.Close()
54102

55-
func (p *prefetchInfo) DeleteFromPrefetchMap(image string) {
56-
p.prefetchMutex.Lock()
57-
defer p.prefetchMutex.Unlock()
103+
_, err = jsonfile.Write(jsonByte)
104+
if err != nil {
105+
return "", errors.Wrap(err, "error writing JSON to file")
106+
}
58107

59-
delete(p.prefetchMap, image)
108+
return filePath, nil
60109
}

0 commit comments

Comments
 (0)