Skip to content

Commit 2994ad5

Browse files
committed
anawareness runtime prefetch
1. add prefetchlist store prefetchlist storage service. 2. Modify the optimizer to publish the access file list as a prefetchlist to the storage service when obtaining it.
1 parent 47d4311 commit 2994ad5

File tree

3 files changed

+198
-2
lines changed

3 files changed

+198
-2
lines changed

cmd/optimizer-nri-plugin/main.go

+97-1
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,13 @@
77
package main
88

99
import (
10+
"bytes"
1011
"context"
12+
"encoding/json"
1113
"fmt"
1214
"io"
1315
"log/syslog"
16+
"net/http"
1417
"os"
1518
"path/filepath"
1619
"strings"
@@ -43,6 +46,8 @@ type PluginConfig struct {
4346
Readable bool `toml:"readable"`
4447
Timeout int `toml:"timeout"`
4548
Overwrite bool `toml:"overwrite"`
49+
Domain string `toml:"domain"`
50+
Port string `toml:"port"`
4651
}
4752

4853
type PluginArgs struct {
@@ -104,6 +109,16 @@ func buildFlags(args *PluginArgs) []cli.Flag {
104109
Usage: "whether to overwrite the existed persistent files",
105110
Destination: &args.Config.Overwrite,
106111
},
112+
&cli.StringFlag{
113+
Name: "domain",
114+
Usage: "domain of prefetchlist store service",
115+
Destination: &args.Config.Domain,
116+
},
117+
&cli.StringFlag{
118+
Name: "port",
119+
Usage: "port of prefetchlist store service",
120+
Destination: &args.Config.Port,
121+
},
107122
}
108123
}
109124

@@ -129,9 +144,12 @@ var (
129144
)
130145

131146
const (
132-
imageNameLabel = "io.kubernetes.cri.image-name"
147+
imageNameLabel = "io.kubernetes.cri.image-name"
148+
containerNameLabel = "io.kubernetes.cri.container-name"
133149
)
134150

151+
const defaultEndpoint = "/api/v1/prefetch/upload"
152+
135153
func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, error) {
136154
log.Infof("got configuration data: %q from runtime %s %s", config, runtime, version)
137155
if config == "" {
@@ -156,11 +174,26 @@ func (p *plugin) Configure(config, runtime, version string) (stub.EventMask, err
156174
return p.mask, nil
157175
}
158176

177+
type PrefetchFile struct {
178+
Path string
179+
}
180+
181+
type CacheItem struct {
182+
ImageName string
183+
ContainerName string
184+
PrefetchFiles []PrefetchFile
185+
}
186+
187+
type Cache struct {
188+
Items map[string]*CacheItem
189+
}
190+
159191
func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) error {
160192
dir, imageName, err := GetImageName(container.Annotations)
161193
if err != nil {
162194
return err
163195
}
196+
containerName := container.Annotations[containerNameLabel]
164197

165198
persistDir := filepath.Join(cfg.PersistDir, dir)
166199
if err := os.MkdirAll(persistDir, os.ModePerm); err != nil {
@@ -178,11 +211,74 @@ func (p *plugin) StartContainer(_ *api.PodSandbox, container *api.Container) err
178211
return err
179212
}
180213

214+
imageRepo := container.Annotations[imageNameLabel]
215+
serverURL := fmt.Sprintf("%s:%s", cfg.Domain, cfg.Port)
216+
217+
go func() {
218+
time.Sleep(1 * time.Minute)
219+
if err := sendToServer(imageRepo, persistFile, containerName, serverURL); err != nil {
220+
log.WithError(err).Error("failed to send prefetch to http server")
221+
}
222+
}()
223+
181224
globalFanotifyServer[imageName] = fanotifyServer
182225

183226
return nil
184227
}
185228

229+
func sendToServer(imageName, prefetchlistPath, containerName, serverURL string) error {
230+
data, err := os.ReadFile(prefetchlistPath)
231+
if err != nil {
232+
return fmt.Errorf("error reading file: %w", err)
233+
}
234+
235+
filePaths := strings.Split(string(data), "\n")
236+
237+
var prefetchFiles []PrefetchFile
238+
for _, path := range filePaths {
239+
if path != "" {
240+
prefetchFiles = append(prefetchFiles, PrefetchFile{Path: path})
241+
}
242+
}
243+
244+
item := CacheItem{
245+
ImageName: imageName,
246+
ContainerName: containerName,
247+
PrefetchFiles: prefetchFiles,
248+
}
249+
250+
url := fmt.Sprintf("http://%s%s", serverURL, defaultEndpoint)
251+
252+
err = postRequest(item, url)
253+
if err != nil {
254+
return fmt.Errorf("error uploading to server: %w", err)
255+
}
256+
257+
return nil
258+
}
259+
260+
func postRequest(item CacheItem, endpoint string) error {
261+
data, err := json.Marshal(item)
262+
if err != nil {
263+
return err
264+
}
265+
266+
resp, err := http.Post(endpoint, "application/json", bytes.NewBuffer(data))
267+
if err != nil {
268+
return err
269+
}
270+
defer resp.Body.Close()
271+
272+
body, err := io.ReadAll(resp.Body)
273+
if err != nil {
274+
return fmt.Errorf("failed to read response body: %w", err)
275+
}
276+
277+
fmt.Println("Server Response:", string(body))
278+
279+
return nil
280+
}
281+
186282
func (p *plugin) StopContainer(_ *api.PodSandbox, container *api.Container) ([]*api.ContainerUpdate, error) {
187283
var update = []*api.ContainerUpdate{}
188284
_, imageName, err := GetImageName(container.Annotations)

misc/example/optimizer-nri-plugin.conf

+5-1
Original file line numberDiff line numberDiff line change
@@ -10,4 +10,8 @@ timeout = 0
1010
overwrite = false
1111
# The events that containerd subscribes to.
1212
# Do not change this element.
13-
events = [ "StartContainer", "StopContainer" ]
13+
events = [ "StartContainer", "StopContainer" ]
14+
# The domain and port of prefetchlist store service
15+
# ip mappings should be established in advance
16+
domain = "prefetchlist.store.local"
17+
port = "1323"
+96
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright (c) 2023. Nydus Developers. All rights reserved.
3+
*
4+
* SPDX-License-Identifier: Apache-2.0
5+
*/
6+
package main
7+
8+
import (
9+
"encoding/json"
10+
"fmt"
11+
"io"
12+
"net/http"
13+
"sync"
14+
)
15+
16+
type PrefetchFile struct {
17+
Path string
18+
}
19+
20+
type CacheItem struct {
21+
ImageName string
22+
ContainerName string
23+
PrefetchFiles []PrefetchFile
24+
}
25+
26+
type Cache struct {
27+
Items map[string]*CacheItem
28+
mutex sync.Mutex
29+
}
30+
31+
func (cache *Cache) key(imageName, containerName string) string {
32+
return fmt.Sprintf("%s,%s", imageName, containerName)
33+
}
34+
35+
func (cache *Cache) Get(imageName, containerName string) *CacheItem {
36+
cache.mutex.Lock()
37+
defer cache.mutex.Unlock()
38+
39+
return cache.Items[cache.key(imageName, containerName)]
40+
}
41+
42+
func (cache *Cache) Set(item CacheItem) {
43+
cache.mutex.Lock()
44+
defer cache.mutex.Unlock()
45+
46+
cache.Items[cache.key(item.ImageName, item.ContainerName)] = &item
47+
}
48+
49+
var serverCache Cache
50+
51+
func uploadHandler(w http.ResponseWriter, r *http.Request) {
52+
var item CacheItem
53+
body, err := io.ReadAll(r.Body)
54+
if err != nil {
55+
http.Error(w, "Failed to read request body", http.StatusBadRequest)
56+
return
57+
}
58+
err = json.Unmarshal(body, &item)
59+
if err != nil {
60+
http.Error(w, "Invalid request payload", http.StatusBadRequest)
61+
return
62+
}
63+
64+
serverCache.Set(item)
65+
fmt.Fprintf(w, "Uploaded CacheItem for %s, %s successfully", item.ImageName, item.ContainerName)
66+
}
67+
68+
func downloadHandler(w http.ResponseWriter, r *http.Request) {
69+
imageName := r.URL.Query().Get("imageName")
70+
containerName := r.URL.Query().Get("containerName")
71+
72+
item := serverCache.Get(imageName, containerName)
73+
if item == nil {
74+
http.Error(w, "CacheItem not found", http.StatusNotFound)
75+
return
76+
}
77+
78+
err := json.NewEncoder(w).Encode(item)
79+
if err != nil {
80+
http.Error(w, "Failed to encode CacheItem to JSON", http.StatusInternalServerError)
81+
return
82+
}
83+
}
84+
85+
func main() {
86+
serverCache = Cache{Items: make(map[string]*CacheItem)}
87+
88+
http.HandleFunc("/api/v1/prefetch/upload", uploadHandler)
89+
http.HandleFunc("/api/v1/prefetch/download", downloadHandler)
90+
91+
fmt.Println("Server started on :1323")
92+
err := http.ListenAndServe(":1323", nil)
93+
if err != nil {
94+
fmt.Printf("Failed to start server: %v\n", err)
95+
}
96+
}

0 commit comments

Comments
 (0)