Skip to content

feat/gql-proxy: new approach for GraphQL local scale out #3074

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions examples/scale-out-cluster-local/config-cluster-member0.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"distSpecVersion": "1.1.0",
"storage": {
"rootDirectory": "./workspace/zot/data/mem1",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have to figure out a scheme to append a member path.
Would like to have a single zot configuration that folks don't have to tweak.

Copy link
Contributor Author

@vrajashkr vrajashkr Apr 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. The reason for this is mostly because I was starting 2 binaries on the same host for development (so I had to change the path and port). For an actual deployment, the config files would be identical.

"dedupe": false
},
"http": {
"address": "127.0.0.1",
"port": "9000"
},
"log": {
"level": "debug"
},
"cluster": {
"members": [
"127.0.0.1:9000",
"127.0.0.1:9001"
],
"hashKey": "loremipsumdolors"
},
"extensions": {
"search": {
"cve": {
"updateInterval": "2h"
}
},
"ui": {
"enable": true
}
}
}
31 changes: 31 additions & 0 deletions examples/scale-out-cluster-local/config-cluster-member1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
{
"distSpecVersion": "1.1.0",
"storage": {
"rootDirectory": "./workspace/zot/data/mem2",
"dedupe": false
},
"http": {
"address": "127.0.0.1",
"port": "9001"
},
"log": {
"level": "debug"
},
"cluster": {
"members": [
"127.0.0.1:9000",
"127.0.0.1:9001"
],
"hashKey": "loremipsumdolors"
},
"extensions": {
"search": {
"cve": {
"updateInterval": "2h"
}
},
"ui": {
"enable": true
}
}
}
25 changes: 25 additions & 0 deletions examples/scale-out-cluster-local/haproxy.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
global
log /tmp/log local0
log /tmp/log local1 notice
maxconn 2000
stats timeout 30s
daemon

defaults
log global
mode http
option httplog
option dontlognull
timeout connect 5000
timeout client 50000
timeout server 50000

frontend zot
bind *:8080
default_backend zot-cluster

backend zot-cluster
balance roundrobin
cookie SERVER insert indirect nocache
server zot0 127.0.0.1:9000 cookie zot0
server zot1 127.0.0.1:9001 cookie zot1
85 changes: 85 additions & 0 deletions pkg/api/cluster_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package api

import (
"fmt"
"io"
"net/http"

"github.com/gorilla/mux"

"zotregistry.dev/zot/pkg/api/constants"
"zotregistry.dev/zot/pkg/cluster"
"zotregistry.dev/zot/pkg/proxy"
)

// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure
// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster.
// based on the hash value of the repository name, the request will either be handled locally
// or proxied to another zot member in the cluster to get the data before sending a response to the client.
func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc {
return func(next http.HandlerFunc) http.HandlerFunc {
return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) {
config := ctrlr.Config
logger := ctrlr.Log

// if no cluster or single-node cluster, handle locally.
if config.Cluster == nil || len(config.Cluster.Members) == 1 {
next.ServeHTTP(response, request)

return
}

// since the handler has been wrapped, it should be possible to get the name
// of the repository from the mux.
vars := mux.Vars(request)
name, ok := vars["name"]

if !ok || name == "" {
response.WriteHeader(http.StatusNotFound)

return
}

Check warning on line 41 in pkg/api/cluster_proxy.go

View check run for this annotation

Codecov / codecov/patch

pkg/api/cluster_proxy.go#L38-L41

Added lines #L38 - L41 were not covered by tests

// the target member is the only one which should do read/write for the dist-spec APIs
// for the given repository.
targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name)
logger.Debug().Str(constants.RepositoryLogKey, name).
Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex))

// if the target member is the same as the local member, the current member should handle the request.
// since the instances have the same config, a quick index lookup is sufficient
if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex {
logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally")
next.ServeHTTP(response, request)

return
}

// if the header contains a hop-count, return an error response as there should be no multi-hop
if request.Header.Get(constants.ScaleOutHopCountHeader) != "" {
logger.Fatal().Str("url", request.URL.String()).
Msg("failed to process request - cannot proxy an already proxied request")

return
}

Check warning on line 64 in pkg/api/cluster_proxy.go

View check run for this annotation

Codecov / codecov/patch

pkg/api/cluster_proxy.go#L60-L64

Added lines #L60 - L64 were not covered by tests

logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request")

proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, ctrlr.Config)
if err != nil {
logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request")
http.Error(response, err.Error(), http.StatusInternalServerError)

return
}

defer func() {
_ = proxyResponse.Body.Close()
}()

proxy.CopyHeader(response.Header(), proxyResponse.Header)
response.WriteHeader(proxyResponse.StatusCode)
_, _ = io.Copy(response, proxyResponse.Body)
})
}
}
9 changes: 9 additions & 0 deletions pkg/api/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,12 @@ func IsOauth2Supported(provider string) bool {

return false
}

func (c *Config) IsClusteringEnabled() bool {
return c.Cluster != nil
}

func (c *Config) IsSharedStorageEnabled() bool {
return c.Storage.RemoteCache &&
c.Storage.StorageDriver != nil && c.Storage.CacheDriver != nil
}
38 changes: 38 additions & 0 deletions pkg/api/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,42 @@ func TestConfig(t *testing.T) {

So(conf.IsRetentionEnabled(), ShouldBeTrue)
})

Convey("Test IsClusteringEnabled()", t, func() {
conf := config.New()
So(conf.Cluster, ShouldBeNil)
So(conf.IsClusteringEnabled(), ShouldBeFalse)

conf.Cluster = &config.ClusterConfig{
Members: []string{
"127.0.0.1:9090",
"127.0.0.1:9001",
},
HashKey: "loremipsumdolors",
}

So(conf.IsClusteringEnabled(), ShouldBeTrue)
})

Convey("Test IsSharedStorageEnabled()", t, func() {
conf := config.New()
So(conf.Storage.RemoteCache, ShouldBeFalse)
So(conf.Storage.CacheDriver, ShouldBeNil)
So(conf.Storage.StorageDriver, ShouldBeNil)
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)

conf.Storage.RemoteCache = true
So(conf.Storage.RemoteCache, ShouldBeTrue)
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)

storageDriver := map[string]interface{}{"name": "s3"}
conf.Storage.StorageDriver = storageDriver
So(conf.Storage.StorageDriver, ShouldNotBeNil)
So(conf.IsSharedStorageEnabled(), ShouldBeFalse)

cacheDriver := map[string]interface{}{"name": "dynamodb"}
conf.Storage.CacheDriver = cacheDriver
So(conf.Storage.CacheDriver, ShouldNotBeNil)
So(conf.IsSharedStorageEnabled(), ShouldBeTrue)
})
}
3 changes: 2 additions & 1 deletion pkg/api/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"zotregistry.dev/zot/pkg/log"
"zotregistry.dev/zot/pkg/meta"
mTypes "zotregistry.dev/zot/pkg/meta/types"
"zotregistry.dev/zot/pkg/proxy"
"zotregistry.dev/zot/pkg/scheduler"
"zotregistry.dev/zot/pkg/storage"
"zotregistry.dev/zot/pkg/storage/gc"
Expand Down Expand Up @@ -72,7 +73,7 @@ func NewController(appConfig *config.Config) *Controller {

// memberSocket is the local member's socket
// the index is also fetched for quick lookups during proxying
memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
memberSocketIdx, memberSocket, err := proxy.GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets)
if err != nil {
logger.Error().Err(err).Msg("failed to get member socket")
panic("failed to get member socket")
Expand Down
6 changes: 5 additions & 1 deletion pkg/extensions/extension_search.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"zotregistry.dev/zot/pkg/extensions/search"
cveinfo "zotregistry.dev/zot/pkg/extensions/search/cve"
"zotregistry.dev/zot/pkg/extensions/search/gql_generated"
gqlproxy "zotregistry.dev/zot/pkg/extensions/search/gql_proxy"
"zotregistry.dev/zot/pkg/log"
mTypes "zotregistry.dev/zot/pkg/meta/types"
"zotregistry.dev/zot/pkg/scheduler"
Expand Down Expand Up @@ -91,15 +92,18 @@ func SetupSearchRoutes(conf *config.Config, router *mux.Router, storeController
}

resConfig := search.GetResolverConfig(log, storeController, metaDB, cveInfo)
executableSchema := gql_generated.NewExecutableSchema(resConfig)

allowedMethods := zcommon.AllowedMethods(http.MethodGet, http.MethodPost)

gqlProxy := gqlproxy.GqlProxyRequestHandler(conf, log, executableSchema.Schema())

extRouter := router.PathPrefix(constants.ExtSearchPrefix).Subrouter()
extRouter.Use(zcommon.CORSHeadersMiddleware(conf.HTTP.AllowOrigin))
extRouter.Use(zcommon.ACHeadersMiddleware(conf, allowedMethods...))
extRouter.Use(zcommon.AddExtensionSecurityHeaders())
extRouter.Methods(allowedMethods...).
Handler(gqlHandler.NewDefaultServer(gql_generated.NewExecutableSchema(resConfig))) //nolint: staticcheck
Handler(gqlProxy(gqlHandler.NewDefaultServer(executableSchema)))

log.Info().Msg("finished setting up search routes")
}
47 changes: 47 additions & 0 deletions pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package gqlproxy

import (
"encoding/json"
"io"
"net/http"

"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/proxy"
)

func fanOutGqlHandler(config *config.Config, _ map[string]string, response http.ResponseWriter, request *http.Request) {
// Proxy to all members including self in order to get the data as calling next() won't return the
// aggregated data to this handler.
finalMap := map[string]any{}

for _, targetMember := range config.Cluster.Members {
proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, config)
if err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we want to return failure even if just one member fails?
I think if one member responds, we should return that and swallow errors maybe with some indicator somewhere. Maybe logs? HTTP status - 206 Partial Content could work also since this is our own API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds like a good idea. One thought that I had in mind is that instead of swallowing the error, perhaps we could append an error to the Errors list key in the GQL response and send it to the client so there is awareness of some error in the system.

The client can choose to ignore the error and use the valid data in the response, or ideally, show both the valid data as well as indicate that there were some errors in processing. With this approach status 206 could be the return status as you've suggested.

What do you think?

http.Error(response, "failed to process GQL request", http.StatusInternalServerError)

return
}

Check warning on line 23 in pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go#L20-L23

Added lines #L20 - L23 were not covered by tests

proxyBody, err := io.ReadAll(proxyResponse.Body)
_ = proxyResponse.Body.Close()

if err != nil {
http.Error(response, "failed to process GQL request", http.StatusInternalServerError)

return
}

Check warning on line 32 in pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go#L29-L32

Added lines #L29 - L32 were not covered by tests

responseResult := map[string]any{}

err = json.Unmarshal(proxyBody, &responseResult)
if err != nil {
http.Error(response, err.Error(), http.StatusInternalServerError)

return
}

Check warning on line 41 in pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go#L38-L41

Added lines #L38 - L41 were not covered by tests
// perform merge of fields
finalMap = deepMergeMaps(finalMap, responseResult)
}

prepareAndWriteResponse(finalMap, response)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package gqlproxy

import (
"encoding/json"
"io"
"net/http"

"zotregistry.dev/zot/pkg/api/config"
"zotregistry.dev/zot/pkg/cluster"
"zotregistry.dev/zot/pkg/proxy"
)

func repoProxyOnceGqlHandler(
config *config.Config,
args map[string]string,
response http.ResponseWriter,
request *http.Request,
) {
repoName, ok := args["repo"]
if !ok {
// no repo was specified
http.Error(response, "repo name not specified in query", http.StatusBadRequest)

return
}

Check warning on line 25 in pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go#L21-L25

Added lines #L21 - L25 were not covered by tests

proxyOnceGqlHandler(config, repoName, response, request)
}

func proxyOnceGqlHandler(config *config.Config, repoName string, response http.ResponseWriter, request *http.Request) {
_, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, repoName)

proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, config)
if err != nil {
http.Error(response, "failed to process GQL request", http.StatusInternalServerError)

return
}

Check warning on line 38 in pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go#L35-L38

Added lines #L35 - L38 were not covered by tests

proxyBody, err := io.ReadAll(proxyResponse.Body)
_ = proxyResponse.Body.Close()

if err != nil {
http.Error(response, "failed to process GQL request", http.StatusInternalServerError)

return
}

Check warning on line 47 in pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go#L44-L47

Added lines #L44 - L47 were not covered by tests

responseResult := map[string]any{}

err = json.Unmarshal(proxyBody, &responseResult)
if err != nil {
http.Error(response, err.Error(), http.StatusInternalServerError)

return
}

Check warning on line 56 in pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go

View check run for this annotation

Codecov / codecov/patch

pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go#L53-L56

Added lines #L53 - L56 were not covered by tests

prepareAndWriteResponse(responseResult, response)
}
Loading
Loading