Skip to content

Commit ac85801

Browse files
committed
scale-out: support for GQL scale-out with local metadata
Signed-off-by: Vishwas Rajashekar <[email protected]>
1 parent ef1d6f3 commit ac85801

14 files changed

+628
-126
lines changed
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"distSpecVersion": "1.1.0",
3+
"storage": {
4+
"rootDirectory": "./workspace/zot/data/mem1",
5+
"dedupe": false
6+
},
7+
"http": {
8+
"address": "127.0.0.1",
9+
"port": "9000"
10+
},
11+
"log": {
12+
"level": "debug"
13+
},
14+
"cluster": {
15+
"members": [
16+
"127.0.0.1:9000",
17+
"127.0.0.1:9001"
18+
],
19+
"hashKey": "loremipsumdolors"
20+
},
21+
"extensions": {
22+
"search": {
23+
"cve": {
24+
"updateInterval": "2h"
25+
}
26+
},
27+
"ui": {
28+
"enable": true
29+
}
30+
}
31+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
{
2+
"distSpecVersion": "1.1.0",
3+
"storage": {
4+
"rootDirectory": "./workspace/zot/data/mem2",
5+
"dedupe": false
6+
},
7+
"http": {
8+
"address": "127.0.0.1",
9+
"port": "9001"
10+
},
11+
"log": {
12+
"level": "debug"
13+
},
14+
"cluster": {
15+
"members": [
16+
"127.0.0.1:9000",
17+
"127.0.0.1:9001"
18+
],
19+
"hashKey": "loremipsumdolors"
20+
},
21+
"extensions": {
22+
"search": {
23+
"cve": {
24+
"updateInterval": "2h"
25+
}
26+
},
27+
"ui": {
28+
"enable": true
29+
}
30+
}
31+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
global
2+
log /tmp/log local0
3+
log /tmp/log local1 notice
4+
maxconn 2000
5+
stats timeout 30s
6+
daemon
7+
8+
defaults
9+
log global
10+
mode http
11+
option httplog
12+
option dontlognull
13+
timeout connect 5000
14+
timeout client 50000
15+
timeout server 50000
16+
17+
frontend zot
18+
bind *:8080
19+
default_backend zot-cluster
20+
21+
backend zot-cluster
22+
balance roundrobin
23+
cookie SERVER insert indirect nocache
24+
server zot0 127.0.0.1:9000 cookie zot0
25+
server zot1 127.0.0.1:9001 cookie zot1

pkg/api/cluster_proxy.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package api
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"net/http"
7+
8+
"github.com/gorilla/mux"
9+
10+
"zotregistry.dev/zot/pkg/api/constants"
11+
"zotregistry.dev/zot/pkg/cluster"
12+
"zotregistry.dev/zot/pkg/proxy"
13+
)
14+
15+
// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure
16+
// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster.
17+
// based on the hash value of the repository name, the request will either be handled locally
18+
// or proxied to another zot member in the cluster to get the data before sending a response to the client.
19+
func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc {
20+
return func(next http.HandlerFunc) http.HandlerFunc {
21+
return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
22+
config := ctrlr.Config
23+
logger := ctrlr.Log
24+
25+
// if no cluster or single-node cluster, handle locally.
26+
if config.Cluster == nil || len(config.Cluster.Members) == 1 {
27+
next.ServeHTTP(response, request)
28+
29+
return
30+
}
31+
32+
// since the handler has been wrapped, it should be possible to get the name
33+
// of the repository from the mux.
34+
vars := mux.Vars(request)
35+
name, ok := vars["name"]
36+
37+
if !ok || name == "" {
38+
response.WriteHeader(http.StatusNotFound)
39+
40+
return
41+
}
42+
43+
// the target member is the only one which should do read/write for the dist-spec APIs
44+
// for the given repository.
45+
targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name)
46+
logger.Debug().Str(constants.RepositoryLogKey, name).
47+
Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex))
48+
49+
// if the target member is the same as the local member, the current member should handle the request.
50+
// since the instances have the same config, a quick index lookup is sufficient
51+
if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex {
52+
logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally")
53+
next.ServeHTTP(response, request)
54+
55+
return
56+
}
57+
58+
// if the header contains a hop-count, return an error response as there should be no multi-hop
59+
if request.Header.Get(constants.ScaleOutHopCountHeader) != "" {
60+
logger.Fatal().Str("url", request.URL.String()).
61+
Msg("failed to process request - cannot proxy an already proxied request")
62+
63+
return
64+
}
65+
66+
logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request")
67+
68+
proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, ctrlr.Config)
69+
if err != nil {
70+
logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request")
71+
http.Error(response, err.Error(), http.StatusInternalServerError)
72+
73+
return
74+
}
75+
76+
defer func() {
77+
_ = proxyResponse.Body.Close()
78+
}()
79+
80+
proxy.CopyHeader(response.Header(), proxyResponse.Header)
81+
response.WriteHeader(proxyResponse.StatusCode)
82+
_, _ = io.Copy(response, proxyResponse.Body)
83+
})
84+
}
85+
}

pkg/api/config/config.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -531,3 +531,12 @@ func IsOauth2Supported(provider string) bool {
531531

532532
return false
533533
}
534+
535+
func (c *Config) IsClusteringEnabled() bool {
536+
return c.Cluster != nil
537+
}
538+
539+
func (c *Config) IsSharedStorageEnabled() bool {
540+
return c.Storage.RemoteCache &&
541+
c.Storage.StorageDriver != nil && c.Storage.CacheDriver != nil
542+
}

pkg/api/config/config_test.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,4 +129,42 @@ func TestConfig(t *testing.T) {
129129

130130
So(conf.IsRetentionEnabled(), ShouldBeTrue)
131131
})
132+
133+
Convey("Test IsClusteringEnabled()", t, func() {
134+
conf := config.New()
135+
So(conf.Cluster, ShouldBeNil)
136+
So(conf.IsClusteringEnabled(), ShouldBeFalse)
137+
138+
conf.Cluster = &config.ClusterConfig{
139+
Members: []string{
140+
"127.0.0.1:9090",
141+
"127.0.0.1:9001",
142+
},
143+
HashKey: "loremipsumdolors",
144+
}
145+
146+
So(conf.IsClusteringEnabled(), ShouldBeTrue)
147+
})
148+
149+
Convey("Test IsSharedStorageEnabled()", t, func() {
150+
conf := config.New()
151+
So(conf.Storage.RemoteCache, ShouldBeFalse)
152+
So(conf.Storage.CacheDriver, ShouldBeNil)
153+
So(conf.Storage.StorageDriver, ShouldBeNil)
154+
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)
155+
156+
conf.Storage.RemoteCache = true
157+
So(conf.Storage.RemoteCache, ShouldBeTrue)
158+
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)
159+
160+
storageDriver := map[string]interface{}{"name": "s3"}
161+
conf.Storage.StorageDriver = storageDriver
162+
So(conf.Storage.StorageDriver, ShouldNotBeNil)
163+
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)
164+
165+
cacheDriver := map[string]interface{}{"name": "dynamodb"}
166+
conf.Storage.CacheDriver = cacheDriver
167+
So(conf.Storage.CacheDriver, ShouldNotBeNil)
168+
So(conf.IsSharedStorageEnabled(), ShouldBeTrue)
169+
})
132170
}

pkg/api/controller.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"zotregistry.dev/zot/pkg/log"
2626
"zotregistry.dev/zot/pkg/meta"
2727
mTypes "zotregistry.dev/zot/pkg/meta/types"
28+
"zotregistry.dev/zot/pkg/proxy"
2829
"zotregistry.dev/zot/pkg/scheduler"
2930
"zotregistry.dev/zot/pkg/storage"
3031
"zotregistry.dev/zot/pkg/storage/gc"
@@ -72,7 +73,7 @@ func NewController(appConfig *config.Config) *Controller {
7273

7374
// memberSocket is the local member's socket
7475
// the index is also fetched for quick lookups during proxying
75-
memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
76+
memberSocketIdx, memberSocket, err := proxy.GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
7677
if err != nil {
7778
logger.Error().Err(err).Msg("failed to get member socket")
7879
panic("failed to get member socket")

pkg/extensions/extension_search.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"zotregistry.dev/zot/pkg/extensions/search"
1717
cveinfo "zotregistry.dev/zot/pkg/extensions/search/cve"
1818
"zotregistry.dev/zot/pkg/extensions/search/gql_generated"
19+
gqlproxy "zotregistry.dev/zot/pkg/extensions/search/gql_proxy"
1920
"zotregistry.dev/zot/pkg/log"
2021
mTypes "zotregistry.dev/zot/pkg/meta/types"
2122
"zotregistry.dev/zot/pkg/scheduler"
@@ -91,15 +92,18 @@ func SetupSearchRoutes(conf *config.Config, router *mux.Router, storeController
9192
}
9293

9394
resConfig := search.GetResolverConfig(log, storeController, metaDB, cveInfo)
95+
executableSchema := gql_generated.NewExecutableSchema(resConfig)
9496

9597
allowedMethods := zcommon.AllowedMethods(http.MethodGet, http.MethodPost)
9698

99+
gqlProxy := gqlproxy.GqlProxyRequestHandler(conf, log, executableSchema.Schema())
100+
97101
extRouter := router.PathPrefix(constants.ExtSearchPrefix).Subrouter()
98102
extRouter.Use(zcommon.CORSHeadersMiddleware(conf.HTTP.AllowOrigin))
99103
extRouter.Use(zcommon.ACHeadersMiddleware(conf, allowedMethods...))
100104
extRouter.Use(zcommon.AddExtensionSecurityHeaders())
101105
extRouter.Methods(allowedMethods...).
102-
Handler(gqlHandler.NewDefaultServer(gql_generated.NewExecutableSchema(resConfig))) //nolint: staticcheck
106+
Handler(gqlProxy(gqlHandler.NewDefaultServer(executableSchema)))
103107

104108
log.Info().Msg("finished setting up search routes")
105109
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
package gqlproxy
2+
3+
import (
4+
"encoding/json"
5+
"io"
6+
"net/http"
7+
8+
"zotregistry.dev/zot/pkg/api/config"
9+
"zotregistry.dev/zot/pkg/proxy"
10+
)
11+
12+
func fanOutGqlHandler(config *config.Config, response http.ResponseWriter, request *http.Request) {
13+
// Proxy to all members including self in order to get the data as calling next() won't return the
14+
// aggregated data to this handler.
15+
finalMap := map[string]any{}
16+
17+
for _, targetMember := range config.Cluster.Members {
18+
proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, config)
19+
if err != nil {
20+
http.Error(response, "failed to process GQL request", http.StatusInternalServerError)
21+
22+
return
23+
}
24+
25+
proxyBody, err := io.ReadAll(proxyResponse.Body)
26+
_ = proxyResponse.Body.Close()
27+
28+
if err != nil {
29+
http.Error(response, "failed to process GQL request", http.StatusInternalServerError)
30+
31+
return
32+
}
33+
34+
responseResult := map[string]any{}
35+
36+
err = json.Unmarshal(proxyBody, &responseResult)
37+
if err != nil {
38+
http.Error(response, err.Error(), http.StatusInternalServerError)
39+
40+
return
41+
}
42+
// perform merge of fields
43+
finalMap = deepMergeMaps(finalMap, responseResult)
44+
}
45+
46+
prepareAndWriteResponse(finalMap, response)
47+
}

0 commit comments

Comments
 (0)