From dd17e77812f62c1e9f23c78c92fc320accd10a99 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> Date: Sun, 6 Apr 2025 18:35:16 +0000 Subject: [PATCH 1/4] feat(scale-out): support for GQL scale-out with local metadata Signed-off-by: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> --- .../config-cluster-member0.json | 31 ++++ .../config-cluster-member1.json | 31 ++++ examples/scale-out-cluster-local/haproxy.cfg | 25 +++ pkg/api/cluster_proxy.go | 85 ++++++++++ pkg/api/config/config.go | 9 + pkg/api/config/config_test.go | 38 +++++ pkg/api/controller.go | 3 +- pkg/extensions/extension_search.go | 6 +- .../gql_proxy/generic_fan_out_gql_handler.go | 47 ++++++ pkg/extensions/search/gql_proxy/gql_proxy.go | 112 +++++++++++++ .../search/gql_proxy/handler_utils.go | 73 ++++++++ pkg/extensions/search/search_test.go | 121 ++++++++++++++ pkg/{api => proxy}/proxy.go | 156 +++++------------- pkg/{api => proxy}/proxy_test.go | 17 +- 14 files changed, 628 insertions(+), 126 deletions(-) create mode 100644 examples/scale-out-cluster-local/config-cluster-member0.json create mode 100644 examples/scale-out-cluster-local/config-cluster-member1.json create mode 100644 examples/scale-out-cluster-local/haproxy.cfg create mode 100644 pkg/api/cluster_proxy.go create mode 100644 pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go create mode 100644 pkg/extensions/search/gql_proxy/gql_proxy.go create mode 100644 pkg/extensions/search/gql_proxy/handler_utils.go rename pkg/{api => proxy}/proxy.go (56%) rename pkg/{api => proxy}/proxy_test.go (75%) diff --git a/examples/scale-out-cluster-local/config-cluster-member0.json b/examples/scale-out-cluster-local/config-cluster-member0.json new file mode 100644 index 000000000..8ae13f677 --- /dev/null +++ b/examples/scale-out-cluster-local/config-cluster-member0.json @@ -0,0 +1,31 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "./workspace/zot/data/mem1", + "dedupe": false + }, + "http": { + "address": "127.0.0.1", + "port": "9000" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001" + ], + "hashKey": "loremipsumdolors" + }, + "extensions": { + "search": { + "cve": { + "updateInterval": "2h" + } + }, + "ui": { + "enable": true + } + } +} diff --git a/examples/scale-out-cluster-local/config-cluster-member1.json b/examples/scale-out-cluster-local/config-cluster-member1.json new file mode 100644 index 000000000..1602a1f5a --- /dev/null +++ b/examples/scale-out-cluster-local/config-cluster-member1.json @@ -0,0 +1,31 @@ +{ + "distSpecVersion": "1.1.0", + "storage": { + "rootDirectory": "./workspace/zot/data/mem2", + "dedupe": false + }, + "http": { + "address": "127.0.0.1", + "port": "9001" + }, + "log": { + "level": "debug" + }, + "cluster": { + "members": [ + "127.0.0.1:9000", + "127.0.0.1:9001" + ], + "hashKey": "loremipsumdolors" + }, + "extensions": { + "search": { + "cve": { + "updateInterval": "2h" + } + }, + "ui": { + "enable": true + } + } +} diff --git a/examples/scale-out-cluster-local/haproxy.cfg b/examples/scale-out-cluster-local/haproxy.cfg new file mode 100644 index 000000000..f3dfdcb54 --- /dev/null +++ b/examples/scale-out-cluster-local/haproxy.cfg @@ -0,0 +1,25 @@ +global + log /tmp/log local0 + log /tmp/log local1 notice + maxconn 2000 + stats timeout 30s + daemon + +defaults + log global + mode http + option httplog + option dontlognull + timeout connect 5000 + timeout client 50000 + timeout server 50000 + +frontend zot + bind *:8080 + default_backend zot-cluster + +backend zot-cluster + balance roundrobin + cookie SERVER insert indirect nocache + server zot0 127.0.0.1:9000 cookie zot0 + server zot1 127.0.0.1:9001 cookie zot1 diff --git a/pkg/api/cluster_proxy.go b/pkg/api/cluster_proxy.go new file mode 100644 index 000000000..6912ec53d --- /dev/null +++ b/pkg/api/cluster_proxy.go @@ -0,0 +1,85 @@ +package api + +import ( + "fmt" + "io" + "net/http" + + "github.com/gorilla/mux" + + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/cluster" + "zotregistry.dev/zot/pkg/proxy" +) + +// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure +// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster. +// based on the hash value of the repository name, the request will either be handled locally +// or proxied to another zot member in the cluster to get the data before sending a response to the client. +func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { + return func(next http.HandlerFunc) http.HandlerFunc { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + config := ctrlr.Config + logger := ctrlr.Log + + // if no cluster or single-node cluster, handle locally. + if config.Cluster == nil || len(config.Cluster.Members) == 1 { + next.ServeHTTP(response, request) + + return + } + + // since the handler has been wrapped, it should be possible to get the name + // of the repository from the mux. + vars := mux.Vars(request) + name, ok := vars["name"] + + if !ok || name == "" { + response.WriteHeader(http.StatusNotFound) + + return + } + + // the target member is the only one which should do read/write for the dist-spec APIs + // for the given repository. + targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name) + logger.Debug().Str(constants.RepositoryLogKey, name). + Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex)) + + // if the target member is the same as the local member, the current member should handle the request. + // since the instances have the same config, a quick index lookup is sufficient + if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex { + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally") + next.ServeHTTP(response, request) + + return + } + + // if the header contains a hop-count, return an error response as there should be no multi-hop + if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { + logger.Fatal().Str("url", request.URL.String()). + Msg("failed to process request - cannot proxy an already proxied request") + + return + } + + logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request") + + proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, ctrlr.Config) + if err != nil { + logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request") + http.Error(response, err.Error(), http.StatusInternalServerError) + + return + } + + defer func() { + _ = proxyResponse.Body.Close() + }() + + proxy.CopyHeader(response.Header(), proxyResponse.Header) + response.WriteHeader(proxyResponse.StatusCode) + _, _ = io.Copy(response, proxyResponse.Body) + }) + } +} diff --git a/pkg/api/config/config.go b/pkg/api/config/config.go index af0b593b5..2babc4595 100644 --- a/pkg/api/config/config.go +++ b/pkg/api/config/config.go @@ -535,3 +535,12 @@ func IsOauth2Supported(provider string) bool { return false } + +func (c *Config) IsClusteringEnabled() bool { + return c.Cluster != nil +} + +func (c *Config) IsSharedStorageEnabled() bool { + return c.Storage.RemoteCache && + c.Storage.StorageDriver != nil && c.Storage.CacheDriver != nil +} diff --git a/pkg/api/config/config_test.go b/pkg/api/config/config_test.go index 61e4d6298..713d207be 100644 --- a/pkg/api/config/config_test.go +++ b/pkg/api/config/config_test.go @@ -129,4 +129,42 @@ func TestConfig(t *testing.T) { So(conf.IsRetentionEnabled(), ShouldBeTrue) }) + + Convey("Test IsClusteringEnabled()", t, func() { + conf := config.New() + So(conf.Cluster, ShouldBeNil) + So(conf.IsClusteringEnabled(), ShouldBeFalse) + + conf.Cluster = &config.ClusterConfig{ + Members: []string{ + "127.0.0.1:9090", + "127.0.0.1:9001", + }, + HashKey: "loremipsumdolors", + } + + So(conf.IsClusteringEnabled(), ShouldBeTrue) + }) + + Convey("Test IsSharedStorageEnabled()", t, func() { + conf := config.New() + So(conf.Storage.RemoteCache, ShouldBeFalse) + So(conf.Storage.CacheDriver, ShouldBeNil) + So(conf.Storage.StorageDriver, ShouldBeNil) + So(conf.IsSharedStorageEnabled(), ShouldBeFalse) + + conf.Storage.RemoteCache = true + So(conf.Storage.RemoteCache, ShouldBeTrue) + So(conf.IsSharedStorageEnabled(), ShouldBeFalse) + + storageDriver := map[string]interface{}{"name": "s3"} + conf.Storage.StorageDriver = storageDriver + So(conf.Storage.StorageDriver, ShouldNotBeNil) + So(conf.IsSharedStorageEnabled(), ShouldBeFalse) + + cacheDriver := map[string]interface{}{"name": "dynamodb"} + conf.Storage.CacheDriver = cacheDriver + So(conf.Storage.CacheDriver, ShouldNotBeNil) + So(conf.IsSharedStorageEnabled(), ShouldBeTrue) + }) } diff --git a/pkg/api/controller.go b/pkg/api/controller.go index e3cc8b9c8..829dfdf20 100644 --- a/pkg/api/controller.go +++ b/pkg/api/controller.go @@ -25,6 +25,7 @@ import ( "zotregistry.dev/zot/pkg/log" "zotregistry.dev/zot/pkg/meta" mTypes "zotregistry.dev/zot/pkg/meta/types" + "zotregistry.dev/zot/pkg/proxy" "zotregistry.dev/zot/pkg/scheduler" "zotregistry.dev/zot/pkg/storage" "zotregistry.dev/zot/pkg/storage/gc" @@ -72,7 +73,7 @@ func NewController(appConfig *config.Config) *Controller { // memberSocket is the local member's socket // the index is also fetched for quick lookups during proxying - memberSocketIdx, memberSocket, err := GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets) + memberSocketIdx, memberSocket, err := proxy.GetLocalMemberClusterSocket(appConfig.Cluster.Members, localSockets) if err != nil { logger.Error().Err(err).Msg("failed to get member socket") panic("failed to get member socket") diff --git a/pkg/extensions/extension_search.go b/pkg/extensions/extension_search.go index 6946c6b78..25ae02c0b 100644 --- a/pkg/extensions/extension_search.go +++ b/pkg/extensions/extension_search.go @@ -16,6 +16,7 @@ import ( "zotregistry.dev/zot/pkg/extensions/search" cveinfo "zotregistry.dev/zot/pkg/extensions/search/cve" "zotregistry.dev/zot/pkg/extensions/search/gql_generated" + gqlproxy "zotregistry.dev/zot/pkg/extensions/search/gql_proxy" "zotregistry.dev/zot/pkg/log" mTypes "zotregistry.dev/zot/pkg/meta/types" "zotregistry.dev/zot/pkg/scheduler" @@ -91,15 +92,18 @@ func SetupSearchRoutes(conf *config.Config, router *mux.Router, storeController } resConfig := search.GetResolverConfig(log, storeController, metaDB, cveInfo) + executableSchema := gql_generated.NewExecutableSchema(resConfig) allowedMethods := zcommon.AllowedMethods(http.MethodGet, http.MethodPost) + gqlProxy := gqlproxy.GqlProxyRequestHandler(conf, log, executableSchema.Schema()) + extRouter := router.PathPrefix(constants.ExtSearchPrefix).Subrouter() extRouter.Use(zcommon.CORSHeadersMiddleware(conf.HTTP.AllowOrigin)) extRouter.Use(zcommon.ACHeadersMiddleware(conf, allowedMethods...)) extRouter.Use(zcommon.AddExtensionSecurityHeaders()) extRouter.Methods(allowedMethods...). - Handler(gqlHandler.NewDefaultServer(gql_generated.NewExecutableSchema(resConfig))) //nolint: staticcheck + Handler(gqlProxy(gqlHandler.NewDefaultServer(executableSchema))) log.Info().Msg("finished setting up search routes") } diff --git a/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go b/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go new file mode 100644 index 000000000..f66de54d9 --- /dev/null +++ b/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go @@ -0,0 +1,47 @@ +package gqlproxy + +import ( + "encoding/json" + "io" + "net/http" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/proxy" +) + +func fanOutGqlHandler(config *config.Config, response http.ResponseWriter, request *http.Request) { + // Proxy to all members including self in order to get the data as calling next() won't return the + // aggregated data to this handler. + finalMap := map[string]any{} + + for _, targetMember := range config.Cluster.Members { + proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, config) + if err != nil { + http.Error(response, "failed to process GQL request", http.StatusInternalServerError) + + return + } + + proxyBody, err := io.ReadAll(proxyResponse.Body) + _ = proxyResponse.Body.Close() + + if err != nil { + http.Error(response, "failed to process GQL request", http.StatusInternalServerError) + + return + } + + responseResult := map[string]any{} + + err = json.Unmarshal(proxyBody, &responseResult) + if err != nil { + http.Error(response, err.Error(), http.StatusInternalServerError) + + return + } + // perform merge of fields + finalMap = deepMergeMaps(finalMap, responseResult) + } + + prepareAndWriteResponse(finalMap, response) +} diff --git a/pkg/extensions/search/gql_proxy/gql_proxy.go b/pkg/extensions/search/gql_proxy/gql_proxy.go new file mode 100644 index 000000000..93e5c487f --- /dev/null +++ b/pkg/extensions/search/gql_proxy/gql_proxy.go @@ -0,0 +1,112 @@ +package gqlproxy + +import ( + "net/http" + + "github.com/vektah/gqlparser/v2" + "github.com/vektah/gqlparser/v2/ast" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/api/constants" + "zotregistry.dev/zot/pkg/log" +) + +type GqlScaleOutHandlerFunc func(*config.Config, http.ResponseWriter, *http.Request) + +// Returns a wrapped handler that can handle request proxying for GQL +// queries when running in cluster mode without shared storage (each instance has its own metadata). +// Requests are only proxied in local cluster mode as in this mode, each instance holds only the +// metadata for the images that it serves, however, in shared storage mode, +// all the instances have access to all the metadata so any can respond. +func GqlProxyRequestHandler( + config *config.Config, + log log.Logger, + gqlSchema *ast.Schema, +) func(handler http.Handler) http.Handler { + // This map stores the handler needed for each supported GQL operation in the application. + // There are 2 main buckets of handlers: + // 1. generic handlers - these cater to general use-cases such as fan-out and aggregate and hash-based proxying. + // 2. specific handlers - these cater to specific proxy behaviour that certain operations require. + // Most cases would use generic handlers while a few select cases would make use of the specific handlers. + // When a GQL query is updated or a new one added, a change may be required here depending on the type of handler + // that the operation needs to use. + proxyFunctionalityMap := map[string]GqlScaleOutHandlerFunc{ + "GlobalSearch": fanOutGqlHandler, + } + + return func(next http.Handler) http.Handler { + return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { + // If not running in cluster mode, no op. + if !config.IsClusteringEnabled() { + next.ServeHTTP(response, request) + + return + } + // If in cluster mode, but using shared-storage, no op. + if config.IsSharedStorageEnabled() { + next.ServeHTTP(response, request) + + return + } + + // If the request has already been proxied, don't re-proxy. + if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { + next.ServeHTTP(response, request) + + return + } + + query := request.URL.Query().Get("query") + + // Load the query using gqlparser. + // This helps to read the Operation correctly which is in turn used to + // dynamically hand-off the processing to the appropriate handler. + processedGql, errList := gqlparser.LoadQuery(gqlSchema, query) + + if len(errList) != 0 { + for _, err := range errList { + log.Error().Str("query", query).Err(err).Msg(err.Message) + } + + http.Error(response, "Failed to process GQL request", http.StatusInternalServerError) + + return + } + + // Look at the first operation in the query. + operation := "" + + for _, op := range processedGql.Operations { + for _, ss := range op.SelectionSet { + switch ss := ss.(type) { + case *ast.Field: + operation = ss.Name + default: + log.Error().Str("query", query).Msg("unsupported type") + } + + break + } + } + + if operation == "" { + log.Error().Str("query", query).Msg("failed to compute operation from query") + http.Error(response, "failed to process GQL request", http.StatusInternalServerError) + + return + } + + handler, ok := proxyFunctionalityMap[operation] + if !ok { + // If the operation is not currently supported or is unknown, + // pass it on to the local GQL server to handle it. + next.ServeHTTP(response, request) + + return + } + + // invoke the handler + handler(config, response, request) + }) + } +} diff --git a/pkg/extensions/search/gql_proxy/handler_utils.go b/pkg/extensions/search/gql_proxy/handler_utils.go new file mode 100644 index 000000000..037261cb3 --- /dev/null +++ b/pkg/extensions/search/gql_proxy/handler_utils.go @@ -0,0 +1,73 @@ +package gqlproxy + +import ( + "encoding/json" + "fmt" + "maps" + "net/http" +) + +// Formats the response and sends it back to the client. +func prepareAndWriteResponse(data map[string]any, response http.ResponseWriter) { + response.Header().Set("Content-Type", "application/json") + + responseBody, err := json.MarshalIndent(data, "", " ") + if err != nil { + http.Error(response, "failed to marshal response data", http.StatusInternalServerError) + + return + } + + _, err = response.Write(responseBody) + if err != nil { + http.Error(response, "failed to write response", http.StatusInternalServerError) + + return + } +} + +// This function deep merges maps with some specific handling for known types. +// The primary usage of this is to dynamically merge the GQL responses coming +// from each of the cluster members. +func deepMergeMaps(a, b map[string]any) map[string]any { + result := make(map[string]any, len(a)) + + maps.Copy(result, a) + + for key, bValue := range b { + currentValue, keyCurrentlyPresentInResult := result[key] + + if !keyCurrentlyPresentInResult { + // it doesn't exist yet, directly add to result + result[key] = bValue + + continue + } + + switch bValue := bValue.(type) { + case map[string]any: + if currentValueMap, ok := currentValue.(map[string]any); ok { + // currentValue is also a map, recursively merge + result[key] = deepMergeMaps(currentValueMap, bValue) + + continue + } + // else force update + result[key] = bValue + case float64: + // numeric aggregation - json library assumes float64 for numeric types + if currentValueNum, ok := currentValue.(float64); ok { + result[key] = currentValueNum + bValue + } + case []any: + // if it is an array, combine the 2 arrays' elements into one array + if currentValue, ok := currentValue.([]any); ok { + result[key] = append(currentValue, bValue...) + } + default: + fmt.Println("unsupported type for merging") + } + } + + return result +} diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 6d8197b07..786195d56 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -43,6 +43,7 @@ import ( "zotregistry.dev/zot/pkg/storage/local" storageTypes "zotregistry.dev/zot/pkg/storage/types" . "zotregistry.dev/zot/pkg/test/common" + test "zotregistry.dev/zot/pkg/test/common" . "zotregistry.dev/zot/pkg/test/image-utils" "zotregistry.dev/zot/pkg/test/mocks" ociutils "zotregistry.dev/zot/pkg/test/oci-utils" @@ -60,6 +61,14 @@ var ( ErrPutManifest = errors.New("can't put manifest") ) +func makeController(conf *config.Config, dir string) *api.Controller { + ctlr := api.NewController(conf) + + ctlr.Config.Storage.RootDirectory = dir + + return ctlr +} + func readFileAndSearchString(filePath string, stringToMatch string, timeout time.Duration) (bool, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), timeout) defer cancelFunc() @@ -4508,6 +4517,118 @@ func TestGlobalSearch(t *testing.T) { //nolint: gocyclo }) } +func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { + // When there are 2 zot instances, the same GlobalSearch query should + // return aggregated data from both instances when both instances are queried. + Convey("In a local scale-out cluster with 2 members, should return correct data for GlobalSearch", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + + for idx := 0; idx < numMembers; idx++ { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = "127.0.0.1:" + port + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + defaultVal := true + conf.Extensions = &extconf.ExtensionConfig{ + Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + + defer cm.StopServer() + } + + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") + So(err, ShouldBeNil) + } + + for _, port := range ports { + query := `{GlobalSearch(query:""){ + Page { + TotalCount + ItemCount + } + Repos { + Name + } + } + }` + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.GlobalSearchResultResp{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.Repos), ShouldEqual, 4) + So(responseStruct.Page.ItemCount, ShouldEqual, 4) + So(responseStruct.Page.TotalCount, ShouldEqual, 4) + } + + // Test the pagination behaviour + for _, port := range ports { + query := `{GlobalSearch(query:"", requestedPage:{ + limit:1 + offset:0 + sortBy: DOWNLOADS + }){ + Page { + TotalCount + ItemCount + } + Repos { + Name + } + } + }` + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.GlobalSearchResultResp{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + // Length should actually be 1 - currently, we have 1 + 1. + So(len(responseStruct.Repos), ShouldEqual, 2) + // Item count should actually be 1 - currently we have 1 + 1. + So(responseStruct.Page.ItemCount, ShouldEqual, 2) + So(responseStruct.Page.TotalCount, ShouldEqual, 4) + } + }) +} + func TestCleaningFilteringParamsGlobalSearch(t *testing.T) { Convey("Test cleaning filtering parameters for global search", t, func() { dir := t.TempDir() diff --git a/pkg/api/proxy.go b/pkg/proxy/proxy.go similarity index 56% rename from pkg/api/proxy.go rename to pkg/proxy/proxy.go index 96d35bfb9..cb20c78b2 100644 --- a/pkg/api/proxy.go +++ b/pkg/proxy/proxy.go @@ -1,140 +1,39 @@ -package api +package proxy import ( "bytes" "context" - "fmt" "io" "net" "net/http" - "github.com/gorilla/mux" - + "zotregistry.dev/zot/pkg/api/config" "zotregistry.dev/zot/pkg/api/constants" - "zotregistry.dev/zot/pkg/cluster" "zotregistry.dev/zot/pkg/common" ) -// ClusterProxy wraps an http.HandlerFunc which requires proxying between zot instances to ensure -// that a given repository only has a single writer and reader for dist-spec operations in a scale-out cluster. -// based on the hash value of the repository name, the request will either be handled locally -// or proxied to another zot member in the cluster to get the data before sending a response to the client. -func ClusterProxy(ctrlr *Controller) func(http.HandlerFunc) http.HandlerFunc { - return func(next http.HandlerFunc) http.HandlerFunc { - return http.HandlerFunc(func(response http.ResponseWriter, request *http.Request) { - config := ctrlr.Config - logger := ctrlr.Log - - // if no cluster or single-node cluster, handle locally. - if config.Cluster == nil || len(config.Cluster.Members) == 1 { - next.ServeHTTP(response, request) - - return - } - - // since the handler has been wrapped, it should be possible to get the name - // of the repository from the mux. - vars := mux.Vars(request) - name, ok := vars["name"] - - if !ok || name == "" { - response.WriteHeader(http.StatusNotFound) - - return - } - - // the target member is the only one which should do read/write for the dist-spec APIs - // for the given repository. - targetMemberIndex, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, name) - logger.Debug().Str(constants.RepositoryLogKey, name). - Msg(fmt.Sprintf("target member socket: %s index: %d", targetMember, targetMemberIndex)) - - // if the target member is the same as the local member, the current member should handle the request. - // since the instances have the same config, a quick index lookup is sufficient - if targetMemberIndex == config.Cluster.Proxy.LocalMemberClusterSocketIndex { - logger.Debug().Str(constants.RepositoryLogKey, name).Msg("handling the request locally") - next.ServeHTTP(response, request) - - return - } - - // if the header contains a hop-count, return an error response as there should be no multi-hop - if request.Header.Get(constants.ScaleOutHopCountHeader) != "" { - logger.Fatal().Str("url", request.URL.String()). - Msg("failed to process request - cannot proxy an already proxied request") - - return - } - - logger.Debug().Str(constants.RepositoryLogKey, name).Msg("proxying the request") - - proxyResponse, err := proxyHTTPRequest(request.Context(), request, targetMember, ctrlr) - if err != nil { - logger.Error().Err(err).Str(constants.RepositoryLogKey, name).Msg("failed to proxy the request") - http.Error(response, err.Error(), http.StatusInternalServerError) - - return - } - defer proxyResponse.Body.Close() - - copyHeader(response.Header(), proxyResponse.Header) - response.WriteHeader(proxyResponse.StatusCode) - _, _ = io.Copy(response, proxyResponse.Body) - }) - } -} - -// gets all the server sockets of a target member - IP:Port. -// for IPv6, the socket is [IPv6]:Port. -// if the input is an IP address, returns the same targetMember in an array. -// if the input is a host name, performs a lookup and returns the server sockets. -func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) { - targetHost, targetPort, err := net.SplitHostPort(targetMemberSocket) - if err != nil { - return []string{}, err - } - - addr := net.ParseIP(targetHost) - if addr != nil { - // this is an IP address, return as is - return []string{targetMemberSocket}, nil - } - // this is a hostname - try to resolve to an IP - resolvedAddrs, err := common.GetIPFromHostName(targetHost) - if err != nil { - return []string{}, err - } - - targetSockets := make([]string, len(resolvedAddrs)) - for idx, resolvedAddr := range resolvedAddrs { - targetSockets[idx] = net.JoinHostPort(resolvedAddr, targetPort) - } - - return targetSockets, nil -} - // proxy the request to the target member and return a pointer to the response or an error. -func proxyHTTPRequest(ctx context.Context, req *http.Request, - targetMember string, ctrlr *Controller, +func ProxyHTTPRequest(ctx context.Context, req *http.Request, + targetMember string, config *config.Config, ) (*http.Response, error) { cloneURL := *req.URL proxyQueryScheme := "http" - if ctrlr.Config.HTTP.TLS != nil { + if config.HTTP.TLS != nil { proxyQueryScheme = "https" } cloneURL.Scheme = proxyQueryScheme cloneURL.Host = targetMember - clonedBody := cloneRequestBody(req) + clonedBody := CloneRequestBody(req) fwdRequest, err := http.NewRequestWithContext(ctx, req.Method, cloneURL.String(), clonedBody) if err != nil { return nil, err } - copyHeader(fwdRequest.Header, req.Header) + CopyHeader(fwdRequest.Header, req.Header) // always set hop count to 1 for now. // the handler wrapper above will terminate the process if it sees a request that @@ -142,12 +41,12 @@ func proxyHTTPRequest(ctx context.Context, req *http.Request, fwdRequest.Header.Set(constants.ScaleOutHopCountHeader, "1") clientOpts := common.HTTPClientOptions{ - TLSEnabled: ctrlr.Config.HTTP.TLS != nil, - VerifyTLS: ctrlr.Config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled + TLSEnabled: config.HTTP.TLS != nil, + VerifyTLS: config.HTTP.TLS != nil, // for now, always verify TLS when TLS mode is enabled Host: targetMember, } - tlsConfig := ctrlr.Config.Cluster.TLS + tlsConfig := config.Cluster.TLS if tlsConfig != nil { clientOpts.CertOptions.ClientCertFile = tlsConfig.Cert clientOpts.CertOptions.ClientKeyFile = tlsConfig.Key @@ -169,7 +68,7 @@ func proxyHTTPRequest(ctx context.Context, req *http.Request, // copy out the contents into a new buffer as the response body // stream should be closed to get all the data out. _, _ = io.Copy(&clonedRespBody, resp.Body) - resp.Body.Close() + _ = resp.Body.Close() // after closing the original body, substitute it with a new reader // using the buffer that was just created. @@ -179,7 +78,7 @@ func proxyHTTPRequest(ctx context.Context, req *http.Request, return resp, nil } -func cloneRequestBody(src *http.Request) io.Reader { +func CloneRequestBody(src *http.Request) io.Reader { var bCloneForOriginal, bCloneForCopy bytes.Buffer multiWriter := io.MultiWriter(&bCloneForOriginal, &bCloneForCopy) numBytesCopied, _ := io.Copy(multiWriter, src.Body) @@ -200,7 +99,7 @@ func cloneRequestBody(src *http.Request) io.Reader { return bytes.NewReader(bCloneForCopy.Bytes()) } -func copyHeader(dst, src http.Header) { +func CopyHeader(dst, src http.Header) { for k, vv := range src { for _, v := range vv { dst.Add(k, v) @@ -208,6 +107,35 @@ func copyHeader(dst, src http.Header) { } } +// gets all the server sockets of a target member - IP:Port. +// for IPv6, the socket is [IPv6]:Port. +// if the input is an IP address, returns the same targetMember in an array. +// if the input is a host name, performs a lookup and returns the server sockets. +func getTargetMemberServerSockets(targetMemberSocket string) ([]string, error) { + targetHost, targetPort, err := net.SplitHostPort(targetMemberSocket) + if err != nil { + return []string{}, err + } + + addr := net.ParseIP(targetHost) + if addr != nil { + // this is an IP address, return as is + return []string{targetMemberSocket}, nil + } + // this is a hostname - try to resolve to an IP + resolvedAddrs, err := common.GetIPFromHostName(targetHost) + if err != nil { + return []string{}, err + } + + targetSockets := make([]string, len(resolvedAddrs)) + for idx, resolvedAddr := range resolvedAddrs { + targetSockets[idx] = net.JoinHostPort(resolvedAddr, targetPort) + } + + return targetSockets, nil +} + // identifies and returns the cluster socket and index. // this is the socket which the scale out cluster members will use for // proxying and communication among each other. diff --git a/pkg/api/proxy_test.go b/pkg/proxy/proxy_test.go similarity index 75% rename from pkg/api/proxy_test.go rename to pkg/proxy/proxy_test.go index f11daede3..dbb8ff3a2 100644 --- a/pkg/api/proxy_test.go +++ b/pkg/proxy/proxy_test.go @@ -1,21 +1,18 @@ -//go:build sync && scrub && metrics && search && lint && userprefs && mgmt && imagetrust && ui -// +build sync,scrub,metrics,search,lint,userprefs,mgmt,imagetrust,ui - -package api_test +package proxy_test import ( "testing" . "github.com/smartystreets/goconvey/convey" - "zotregistry.dev/zot/pkg/api" + "zotregistry.dev/zot/pkg/proxy" ) func TestGetLocalMemberClusterSocket(t *testing.T) { Convey("Should return an error if a domain name doesn't exist", t, func() { localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} members := []string{"127.0.0.1:9001", "thisdoesnotexist:9000", "127.0.0.1:9000"} - index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + index, socket, err := proxy.GetLocalMemberClusterSocket(members, localSockets) So(err.Error(), ShouldContainSubstring, "lookup thisdoesnotexist") So(index, ShouldEqual, -1) So(socket, ShouldEqual, "") @@ -24,7 +21,7 @@ func TestGetLocalMemberClusterSocket(t *testing.T) { Convey("Should return an error if a local socket is missing a port", t, func() { localSockets := []string{"127.0.0.1", "172.16.0.1:9000"} members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} - index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + index, socket, err := proxy.GetLocalMemberClusterSocket(members, localSockets) So(err.Error(), ShouldEqual, "address 127.0.0.1: missing port in address") So(index, ShouldEqual, -1) So(socket, ShouldEqual, "") @@ -33,7 +30,7 @@ func TestGetLocalMemberClusterSocket(t *testing.T) { Convey("Should return an error if a member socket is missing a port", t, func() { localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} members := []string{"127.0.0.1:9001", "www.github.com", "127.0.0.1:9000"} - index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + index, socket, err := proxy.GetLocalMemberClusterSocket(members, localSockets) So(err.Error(), ShouldEqual, "address www.github.com: missing port in address") So(index, ShouldEqual, -1) So(socket, ShouldEqual, "") @@ -42,7 +39,7 @@ func TestGetLocalMemberClusterSocket(t *testing.T) { Convey("Should return the right socket when a local socket is part of members", t, func() { localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} members := []string{"127.0.0.1:9001", "www.github.com:443", "127.0.0.1:9000"} - index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + index, socket, err := proxy.GetLocalMemberClusterSocket(members, localSockets) So(err, ShouldBeNil) So(index, ShouldEqual, 2) So(socket, ShouldEqual, "127.0.0.1:9000") @@ -51,7 +48,7 @@ func TestGetLocalMemberClusterSocket(t *testing.T) { Convey("Should return empty when no local socket is part of members", t, func() { localSockets := []string{"127.0.0.1:9000", "172.16.0.1:9000"} members := []string{"127.0.0.1:9002", "127.0.0.1:9001", "www.github.com:443"} - index, socket, err := api.GetLocalMemberClusterSocket(members, localSockets) + index, socket, err := proxy.GetLocalMemberClusterSocket(members, localSockets) So(err, ShouldBeNil) So(index, ShouldEqual, -1) So(socket, ShouldBeEmpty) From 8aca3c63f8a21128ba78302c2a14a39d50294c7a Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> Date: Mon, 28 Apr 2025 19:52:49 +0000 Subject: [PATCH 2/4] feat(scale-out): add support for ImageList GQL Signed-off-by: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> --- .../gql_proxy/generic_fan_out_gql_handler.go | 2 +- .../generic_proxy_once_gql_handler.go | 59 +++++++++ pkg/extensions/search/gql_proxy/gql_proxy.go | 10 +- pkg/extensions/search/search_test.go | 113 +++++++++++++++++- 4 files changed, 180 insertions(+), 4 deletions(-) create mode 100644 pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go diff --git a/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go b/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go index f66de54d9..20193361d 100644 --- a/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go +++ b/pkg/extensions/search/gql_proxy/generic_fan_out_gql_handler.go @@ -9,7 +9,7 @@ import ( "zotregistry.dev/zot/pkg/proxy" ) -func fanOutGqlHandler(config *config.Config, response http.ResponseWriter, request *http.Request) { +func fanOutGqlHandler(config *config.Config, _ map[string]string, response http.ResponseWriter, request *http.Request) { // Proxy to all members including self in order to get the data as calling next() won't return the // aggregated data to this handler. finalMap := map[string]any{} diff --git a/pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go b/pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go new file mode 100644 index 000000000..39d2b9250 --- /dev/null +++ b/pkg/extensions/search/gql_proxy/generic_proxy_once_gql_handler.go @@ -0,0 +1,59 @@ +package gqlproxy + +import ( + "encoding/json" + "io" + "net/http" + + "zotregistry.dev/zot/pkg/api/config" + "zotregistry.dev/zot/pkg/cluster" + "zotregistry.dev/zot/pkg/proxy" +) + +func repoProxyOnceGqlHandler( + config *config.Config, + args map[string]string, + response http.ResponseWriter, + request *http.Request, +) { + repoName, ok := args["repo"] + if !ok { + // no repo was specified + http.Error(response, "repo name not specified in query", http.StatusBadRequest) + + return + } + + proxyOnceGqlHandler(config, repoName, response, request) +} + +func proxyOnceGqlHandler(config *config.Config, repoName string, response http.ResponseWriter, request *http.Request) { + _, targetMember := cluster.ComputeTargetMember(config.Cluster.HashKey, config.Cluster.Members, repoName) + + proxyResponse, err := proxy.ProxyHTTPRequest(request.Context(), request, targetMember, config) + if err != nil { + http.Error(response, "failed to process GQL request", http.StatusInternalServerError) + + return + } + + proxyBody, err := io.ReadAll(proxyResponse.Body) + _ = proxyResponse.Body.Close() + + if err != nil { + http.Error(response, "failed to process GQL request", http.StatusInternalServerError) + + return + } + + responseResult := map[string]any{} + + err = json.Unmarshal(proxyBody, &responseResult) + if err != nil { + http.Error(response, err.Error(), http.StatusInternalServerError) + + return + } + + prepareAndWriteResponse(responseResult, response) +} diff --git a/pkg/extensions/search/gql_proxy/gql_proxy.go b/pkg/extensions/search/gql_proxy/gql_proxy.go index 93e5c487f..17c8f65b0 100644 --- a/pkg/extensions/search/gql_proxy/gql_proxy.go +++ b/pkg/extensions/search/gql_proxy/gql_proxy.go @@ -11,7 +11,7 @@ import ( "zotregistry.dev/zot/pkg/log" ) -type GqlScaleOutHandlerFunc func(*config.Config, http.ResponseWriter, *http.Request) +type GqlScaleOutHandlerFunc func(*config.Config, map[string]string, http.ResponseWriter, *http.Request) // Returns a wrapped handler that can handle request proxying for GQL // queries when running in cluster mode without shared storage (each instance has its own metadata). @@ -32,6 +32,7 @@ func GqlProxyRequestHandler( // that the operation needs to use. proxyFunctionalityMap := map[string]GqlScaleOutHandlerFunc{ "GlobalSearch": fanOutGqlHandler, + "ImageList": repoProxyOnceGqlHandler, } return func(next http.Handler) http.Handler { @@ -75,12 +76,17 @@ func GqlProxyRequestHandler( // Look at the first operation in the query. operation := "" + gqlQueryArgs := map[string]string{} for _, op := range processedGql.Operations { for _, ss := range op.SelectionSet { switch ss := ss.(type) { case *ast.Field: operation = ss.Name + + for _, queryArg := range ss.Arguments { + gqlQueryArgs[queryArg.Name] = queryArg.Value.Raw + } default: log.Error().Str("query", query).Msg("unsupported type") } @@ -106,7 +112,7 @@ func GqlProxyRequestHandler( } // invoke the handler - handler(config, response, request) + handler(config, gqlQueryArgs, response, request) }) } } diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 786195d56..3625ce337 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -4526,7 +4526,7 @@ func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { clusterMembers := make([]string, numMembers) - for idx := 0; idx < numMembers; idx++ { + for idx := range numMembers { port := test.GetFreePort() ports[idx] = port clusterMembers[idx] = "127.0.0.1:" + port @@ -4626,6 +4626,39 @@ func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { So(responseStruct.Page.ItemCount, ShouldEqual, 2) So(responseStruct.Page.TotalCount, ShouldEqual, 4) } + + // Test query behaviour with repo query + for _, port := range ports { + query := ` + { + GlobalSearch(query:"debian") { + Page { + TotalCount + ItemCount + } + Repos { + Name + } + } + }` + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.GlobalSearchResultResp{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + + // Since a filter has been applied for 'debian', there should only be one entry. + So(len(responseStruct.Repos), ShouldEqual, 1) + + So(responseStruct.Page.ItemCount, ShouldEqual, 1) + So(responseStruct.Page.TotalCount, ShouldEqual, 1) + } }) } @@ -5000,6 +5033,84 @@ func TestImageList(t *testing.T) { }) } +func TestImageListWithScaleOutProxyLocalStorage(t *testing.T) { + // When there are 2 zot instances, the same ImageList query should + // return the correct data when both instances are queried. + Convey("In a local scale-out cluster with 2 members, should return correct data for ImageList", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + + for idx := range numMembers { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = "127.0.0.1:" + port + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + defaultVal := true + conf.Extensions = &extconf.ExtensionConfig{ + Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + + defer cm.StopServer() + } + + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") + So(err, ShouldBeNil) + } + + for _, repoName := range reposToTest { + for _, port := range ports { + query := fmt.Sprintf(`{ + ImageList(repo:"%s"){ + Results { + RepoName + Tag + } + } + }`, repoName) + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.ImageListResponse{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.Results), ShouldEqual, 1) + So(responseStruct.Results[0].RepoName, ShouldEqual, repoName) + So(responseStruct.Results[0].Tag, ShouldEqual, "1.0") + } + } + }) +} + func TestGlobalSearchPagination(t *testing.T) { Convey("Test global search pagination", t, func() { dir := t.TempDir() From befbf5adb60211ee4a67672df2eb36731c072e90 Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> Date: Tue, 29 Apr 2025 13:55:19 +0000 Subject: [PATCH 3/4] feat(scale-out): add support for ExtendedRepoInfo GQL Signed-off-by: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> --- pkg/extensions/search/gql_proxy/gql_proxy.go | 5 +- pkg/extensions/search/search_test.go | 76 ++++++++++++++++++++ 2 files changed, 79 insertions(+), 2 deletions(-) diff --git a/pkg/extensions/search/gql_proxy/gql_proxy.go b/pkg/extensions/search/gql_proxy/gql_proxy.go index 17c8f65b0..fc02c0c23 100644 --- a/pkg/extensions/search/gql_proxy/gql_proxy.go +++ b/pkg/extensions/search/gql_proxy/gql_proxy.go @@ -31,8 +31,9 @@ func GqlProxyRequestHandler( // When a GQL query is updated or a new one added, a change may be required here depending on the type of handler // that the operation needs to use. proxyFunctionalityMap := map[string]GqlScaleOutHandlerFunc{ - "GlobalSearch": fanOutGqlHandler, - "ImageList": repoProxyOnceGqlHandler, + "GlobalSearch": fanOutGqlHandler, + "ImageList": repoProxyOnceGqlHandler, + "ExpandedRepoInfo": repoProxyOnceGqlHandler, } return func(next http.Handler) http.Handler { diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index 3625ce337..a2d5237ef 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -2011,6 +2011,82 @@ func TestExpandedRepoInfo(t *testing.T) { }) } +func TestExpandedRepoInfoWithScaleOutProxyLocalStorage(t *testing.T) { + // When there are 2 zot instances, the same ExpandedRepoInfo query should + // return the correct data when both instances are queried. + Convey("In a local scale-out cluster with 2 members, should return correct data for ExpandedRepoInfo", t, func() { + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + + for idx := range numMembers { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = "127.0.0.1:" + port + } + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + defaultVal := true + conf.Extensions = &extconf.ExtensionConfig{ + Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + + defer cm.StopServer() + } + + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + So(err, ShouldBeNil) + So(resp, ShouldNotBeNil) + So(resp.StatusCode(), ShouldEqual, http.StatusOK) + } + + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} + for _, repoName := range reposToTest { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") + So(err, ShouldBeNil) + } + + for _, repoName := range reposToTest { + for _, port := range ports { + query := fmt.Sprintf(`{ + ExpandedRepoInfo(repo:"%s"){ + Images { + Tag + } + } + }`, repoName) + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.ExpandedRepoInfoResp{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.ImageSummaries), ShouldEqual, 1) + So(responseStruct.ImageSummaries[0].Tag, ShouldEqual, "1.0") + } + } + }) +} + func TestDerivedImageList(t *testing.T) { rootDir := t.TempDir() From c991fe4106864a606fe76e0037f3291cc266255f Mon Sep 17 00:00:00 2001 From: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> Date: Tue, 29 Apr 2025 14:57:54 +0000 Subject: [PATCH 4/4] feat(scale-out): refactor test setup + additional test for ExpandedRepoInfo Signed-off-by: Vishwas Rajashekar <30438425+vrajashkr@users.noreply.github.com> --- pkg/extensions/search/gql_proxy/gql_proxy.go | 2 +- pkg/extensions/search/search_test.go | 284 ++++++++++--------- 2 files changed, 149 insertions(+), 137 deletions(-) diff --git a/pkg/extensions/search/gql_proxy/gql_proxy.go b/pkg/extensions/search/gql_proxy/gql_proxy.go index fc02c0c23..5db3256e4 100644 --- a/pkg/extensions/search/gql_proxy/gql_proxy.go +++ b/pkg/extensions/search/gql_proxy/gql_proxy.go @@ -70,7 +70,7 @@ func GqlProxyRequestHandler( log.Error().Str("query", query).Err(err).Msg(err.Message) } - http.Error(response, "Failed to process GQL request", http.StatusInternalServerError) + http.Error(response, "Failed to process GQL request", http.StatusBadRequest) return } diff --git a/pkg/extensions/search/search_test.go b/pkg/extensions/search/search_test.go index a2d5237ef..577db8132 100644 --- a/pkg/extensions/search/search_test.go +++ b/pkg/extensions/search/search_test.go @@ -69,6 +69,74 @@ func makeController(conf *config.Config, dir string) *api.Controller { return ctlr } +func setupTestPairScaleOutLocalStorageCluster( + t *testing.T, + reposToPreLoad []string, +) ([]string, []*test.ControllerManager) { + t.Helper() + numMembers := 2 + ports := make([]string, numMembers) + + clusterMembers := make([]string, numMembers) + + for idx := range numMembers { + port := test.GetFreePort() + ports[idx] = port + clusterMembers[idx] = "127.0.0.1:" + port + } + + testCMs := []*test.ControllerManager{} + + for _, port := range ports { + conf := config.New() + conf.HTTP.Port = port + conf.Cluster = &config.ClusterConfig{ + Members: clusterMembers, + HashKey: "loremipsumdolors", + } + defaultVal := true + conf.Extensions = &extconf.ExtensionConfig{ + Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, + } + + ctrlr := makeController(conf, t.TempDir()) + cm := test.NewControllerManager(ctrlr) + cm.StartAndWait(port) + + testCMs = append(testCMs, &cm) + } + + for _, port := range ports { + resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") + if err != nil { + t.Errorf("failed to make a request to test instance error=%s", err.Error()) + } + + if resp == nil { + t.Error("got unexpected nil response from test instance") + } + + if resp.StatusCode() != http.StatusOK { + t.Errorf( + "expected status 200 from test instance, but got %d with body %s", + resp.StatusCode(), + resp.Body(), + ) + } + } + + for _, repoName := range reposToPreLoad { + img := CreateRandomImage() + + err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") + if err != nil { + t.Errorf("failed to upload image to test instance error=%s", err.Error()) + } + } + + return ports, testCMs +} + func readFileAndSearchString(filePath string, stringToMatch string, timeout time.Duration) (bool, error) { ctx, cancelFunc := context.WithTimeout(context.Background(), timeout) defer cancelFunc() @@ -2014,52 +2082,14 @@ func TestExpandedRepoInfo(t *testing.T) { func TestExpandedRepoInfoWithScaleOutProxyLocalStorage(t *testing.T) { // When there are 2 zot instances, the same ExpandedRepoInfo query should // return the correct data when both instances are queried. - Convey("In a local scale-out cluster with 2 members, should return correct data for ExpandedRepoInfo", t, func() { - numMembers := 2 - ports := make([]string, numMembers) - - clusterMembers := make([]string, numMembers) + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - for idx := range numMembers { - port := test.GetFreePort() - ports[idx] = port - clusterMembers[idx] = "127.0.0.1:" + port - } - - for _, port := range ports { - conf := config.New() - conf.HTTP.Port = port - conf.Cluster = &config.ClusterConfig{ - Members: clusterMembers, - HashKey: "loremipsumdolors", - } - defaultVal := true - conf.Extensions = &extconf.ExtensionConfig{ - Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, - } - - ctrlr := makeController(conf, t.TempDir()) - cm := test.NewControllerManager(ctrlr) - cm.StartAndWait(port) - - defer cm.StopServer() - } - - for _, port := range ports { - resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") - So(err, ShouldBeNil) - So(resp, ShouldNotBeNil) - So(resp.StatusCode(), ShouldEqual, http.StatusOK) - } - - reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - for _, repoName := range reposToTest { - img := CreateRandomImage() - - err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") - So(err, ShouldBeNil) - } + ports, testCMs := setupTestPairScaleOutLocalStorageCluster(t, reposToTest) + for _, cm := range testCMs { + defer cm.StopServer() + } + Convey("In a local scale-out cluster, response should contain correct data for ExpandedRepoInfo", t, func() { for _, repoName := range reposToTest { for _, port := range ports { query := fmt.Sprintf(`{ @@ -2085,6 +2115,31 @@ func TestExpandedRepoInfoWithScaleOutProxyLocalStorage(t *testing.T) { } } }) + + Convey("In a local scale-out cluster, response should contain an error when the repo does not exist", t, func() { + for _, port := range ports { + query := `{ + ExpandedRepoInfo(repo:"unknown"){ + Images { + Tag + } + } + }` + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.ExpandedRepoInfoResp{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.Errors), ShouldEqual, 1) + So(responseStruct.Errors[0].Message, ShouldEqual, "repo metadata not found for given repo name") + } + }) } func TestDerivedImageList(t *testing.T) { @@ -4596,54 +4651,18 @@ func TestGlobalSearch(t *testing.T) { //nolint: gocyclo func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { // When there are 2 zot instances, the same GlobalSearch query should // return aggregated data from both instances when both instances are queried. - Convey("In a local scale-out cluster with 2 members, should return correct data for GlobalSearch", t, func() { - numMembers := 2 - ports := make([]string, numMembers) - - clusterMembers := make([]string, numMembers) - - for idx := range numMembers { - port := test.GetFreePort() - ports[idx] = port - clusterMembers[idx] = "127.0.0.1:" + port - } - - for _, port := range ports { - conf := config.New() - conf.HTTP.Port = port - conf.Cluster = &config.ClusterConfig{ - Members: clusterMembers, - HashKey: "loremipsumdolors", - } - defaultVal := true - conf.Extensions = &extconf.ExtensionConfig{ - Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, - } - - ctrlr := makeController(conf, t.TempDir()) - cm := test.NewControllerManager(ctrlr) - cm.StartAndWait(port) + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - defer cm.StopServer() - } - - for _, port := range ports { - resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") - So(err, ShouldBeNil) - So(resp, ShouldNotBeNil) - So(resp.StatusCode(), ShouldEqual, http.StatusOK) - } - - reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - for _, repoName := range reposToTest { - img := CreateRandomImage() - - err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") - So(err, ShouldBeNil) - } + ports, testCMs := setupTestPairScaleOutLocalStorageCluster(t, reposToTest) + for _, cm := range testCMs { + defer cm.StopServer() + } + Convey("In a local scale-out cluster, response should contain correct data for GlobalSearch", t, func() { for _, port := range ports { - query := `{GlobalSearch(query:""){ + query := ` + { + GlobalSearch(query:""){ Page { TotalCount ItemCount @@ -4668,10 +4687,14 @@ func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { So(responseStruct.Page.ItemCount, ShouldEqual, 4) So(responseStruct.Page.TotalCount, ShouldEqual, 4) } + }) - // Test the pagination behaviour + Convey("In a local scale-out cluster, response to GlobalSearch should paginate incorrectly", t, func() { + // Since pagination doesn't currently work, this is expected for _, port := range ports { - query := `{GlobalSearch(query:"", requestedPage:{ + query := ` + { + GlobalSearch(query:"", requestedPage:{ limit:1 offset:0 sortBy: DOWNLOADS @@ -4702,8 +4725,9 @@ func TestGlobalSearchWithScaleOutProxyLocalStorage(t *testing.T) { So(responseStruct.Page.ItemCount, ShouldEqual, 2) So(responseStruct.Page.TotalCount, ShouldEqual, 4) } + }) - // Test query behaviour with repo query + Convey("In a local scale-out cluster, response to GlobalSearch with repo query should be correct", t, func() { for _, port := range ports { query := ` { @@ -5112,52 +5136,14 @@ func TestImageList(t *testing.T) { func TestImageListWithScaleOutProxyLocalStorage(t *testing.T) { // When there are 2 zot instances, the same ImageList query should // return the correct data when both instances are queried. - Convey("In a local scale-out cluster with 2 members, should return correct data for ImageList", t, func() { - numMembers := 2 - ports := make([]string, numMembers) - - clusterMembers := make([]string, numMembers) - - for idx := range numMembers { - port := test.GetFreePort() - ports[idx] = port - clusterMembers[idx] = "127.0.0.1:" + port - } - - for _, port := range ports { - conf := config.New() - conf.HTTP.Port = port - conf.Cluster = &config.ClusterConfig{ - Members: clusterMembers, - HashKey: "loremipsumdolors", - } - defaultVal := true - conf.Extensions = &extconf.ExtensionConfig{ - Search: &extconf.SearchConfig{BaseConfig: extconf.BaseConfig{Enable: &defaultVal}, CVE: nil}, - } + reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - ctrlr := makeController(conf, t.TempDir()) - cm := test.NewControllerManager(ctrlr) - cm.StartAndWait(port) - - defer cm.StopServer() - } - - for _, port := range ports { - resp, err := resty.R().Get(test.GetBaseURL(port) + "/v2/") - So(err, ShouldBeNil) - So(resp, ShouldNotBeNil) - So(resp.StatusCode(), ShouldEqual, http.StatusOK) - } - - reposToTest := []string{"alpine", "ubuntu", "debian", "bash"} - for _, repoName := range reposToTest { - img := CreateRandomImage() - - err := UploadImage(img, test.GetBaseURL(ports[0]), repoName, "1.0") - So(err, ShouldBeNil) - } + ports, testCMs := setupTestPairScaleOutLocalStorageCluster(t, reposToTest) + for _, cm := range testCMs { + defer cm.StopServer() + } + Convey("In a local scale-out cluster, response should contain correct data for ImageList", t, func() { for _, repoName := range reposToTest { for _, port := range ports { query := fmt.Sprintf(`{ @@ -5185,6 +5171,32 @@ func TestImageListWithScaleOutProxyLocalStorage(t *testing.T) { } } }) + + Convey("In a local scale-out cluster, response should contain an error when the repo does not exist", t, func() { + for _, port := range ports { + query := `{ + ImageList(repo:"unknown"){ + Results { + RepoName + Tag + } + } + }` + + baseURL := test.GetBaseURL(port) + resp, err := resty.R().Get(baseURL + graphqlQueryPrefix + "?query=" + url.QueryEscape(query)) + So(resp, ShouldNotBeNil) + So(err, ShouldBeNil) + So(resp.StatusCode(), ShouldEqual, 200) + + responseStruct := &zcommon.ImageListResponse{} + + err = json.Unmarshal(resp.Body(), responseStruct) + So(err, ShouldBeNil) + So(len(responseStruct.Errors), ShouldEqual, 0) + So(len(responseStruct.Results), ShouldEqual, 0) + } + }) } func TestGlobalSearchPagination(t *testing.T) {