Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ Implement Priority and Serial settings for discovered handlers #11818

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions exp/runtime/api/v1alpha1/extensionconfig_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,14 @@ type ExtensionHandler struct {
// Defaults to Fail if not set.
// +optional
FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"`

// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
// +optional
Priority int32 `json:"priority,omitempty"`

// Serial defines if the blocked hook is allowed to run in parallel with others.
// +optional
Serial bool `json:"serial,omitempty"`
}

// GroupVersionHook defines the runtime hook when the ExtensionHandler is called.
Expand Down
8 changes: 8 additions & 0 deletions exp/runtime/hooks/api/v1alpha1/discovery_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ type ExtensionHandler struct {
// failurePolicy defines how failures in calls to the ExtensionHandler should be handled by a client.
// This is defaulted to FailurePolicyFail if not defined.
FailurePolicy *FailurePolicy `json:"failurePolicy,omitempty"`

// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
// +optional
Priority int32 `json:"priority,omitempty"`

// Serial defines if the blocked hook is allowed to run in parallel with others.
// +optional
Serial bool `json:"serial,omitempty"`
}

// GroupVersionHook defines the runtime hook when the ExtensionHandler is called.
Expand Down
14 changes: 14 additions & 0 deletions exp/runtime/hooks/api/v1alpha1/zz_generated.openapi.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,10 @@ func TestExtensionReconciler_discoverExtensionConfig(t *testing.T) {
handlers := discoveredExtensionConfig.Status.Handlers
g.Expect(handlers).To(HaveLen(1))
g.Expect(handlers[0].Name).To(Equal("first.ext1"))
g.Expect(handlers[0].RequestHook.Hook).To(Equal("FakeHook"))
g.Expect(handlers[0].RequestHook.APIVersion).To(Equal("test.runtime.cluster.x-k8s.io/v1alpha1"))
g.Expect(handlers[0].Serial).To(BeTrue())
g.Expect(handlers[0].Priority).To(Equal(int32(100)))

// Expect exactly one condition and expect the condition to have type RuntimeExtensionDiscoveredCondition and
// Status true.
Expand Down Expand Up @@ -345,6 +349,8 @@ func discoveryHandler(handlerList ...string) func(http.ResponseWriter, *http.Req
Hook: "FakeHook",
APIVersion: fakev1alpha1.GroupVersion.String(),
},
Serial: true,
Priority: 100,
})
}
response := &runtimehooksv1.DiscoveryResponse{
Expand Down
8 changes: 8 additions & 0 deletions exp/runtime/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ type ExtensionHandler struct {
// If left undefined, this will be defaulted to FailurePolicyFail when processing the answer to the discovery
// call for this server.
FailurePolicy *runtimehooksv1.FailurePolicy

// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
Priority int32

// Serial defines if the blocked hook is allowed to run in parallel with others.
Serial bool
}

// AddExtensionHandler adds an extension handler to the server.
Expand Down Expand Up @@ -268,6 +274,8 @@ func discoveryHandler(handlers map[string]ExtensionHandler) func(context.Context
},
TimeoutSeconds: handler.TimeoutSeconds,
FailurePolicy: handler.FailurePolicy,
Priority: handler.Priority,
Serial: handler.Serial,
})
}

Expand Down
18 changes: 18 additions & 0 deletions internal/runtime/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func (c *client) Discover(ctx context.Context, extensionConfig *runtimev1.Extens
},
TimeoutSeconds: handler.TimeoutSeconds,
FailurePolicy: (*runtimev1.FailurePolicy)(handler.FailurePolicy),
Priority: handler.Priority,
Serial: handler.Serial,
},
)
}
Expand Down Expand Up @@ -193,6 +195,7 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook

log.V(4).Info(fmt.Sprintf("Calling all extensions of hook %q", hookName))
responses := []runtimehooksv1.ResponseObject{}
retry := false
for _, registration := range registrations {
// Creates a new instance of the response parameter.
responseObject, err := c.catalog.NewResponse(gvh)
Expand All @@ -212,13 +215,28 @@ func (c *client) CallAllExtensions(ctx context.Context, hook runtimecatalog.Hook
continue
}

if registration.Serial && retry {
log.V(5).Info(fmt.Sprintf("Serial handler %q waits for blocking response to complete", registration.Name))
break
}

err = c.CallExtension(ctx, hook, forObject, registration.Name, request, tmpResponse)
// If one of the extension handlers fails lets short-circuit here and return early.
if err != nil {
log.Error(err, "failed to call extension handlers")
return errors.Wrapf(err, "failed to call extension handlers for hook %q", gvh.GroupHook())
}

if retryResponse, isRetry := tmpResponse.(runtimehooksv1.RetryResponseObject); isRetry && !retry && retryResponse.GetRetryAfterSeconds() > 0 {
retry = isRetry
}

responses = append(responses, tmpResponse)

if registration.Serial && retry {
log.V(5).Info(fmt.Sprintf("Serial handler %q is blocking hook until it is completed", registration.Name))
break
}
}

// Aggregate all responses into a single response.
Expand Down
79 changes: 78 additions & 1 deletion internal/runtime/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -984,6 +984,17 @@ func TestClient_CallAllExtensions(t *testing.T) {
},
}

secondBlockingConfig := extensionConfig.DeepCopy()
secondBlockingConfig.Status.Handlers[0].Priority = 2
secondBlockingConfig.Status.Handlers[1].Priority = 1
secondBlockingConfig.Status.Handlers[1].Serial = true
secondBlockingConfig.Status.Handlers[0].RequestHook.Hook = "RetryableFakeHook"
secondBlockingConfig.Status.Handlers[1].RequestHook.Hook = "RetryableFakeHook"
secondBlockingConfig.Status.Handlers[2].RequestHook.Hook = "RetryableFakeHook"

secondBlockingWithPriorityConfig := secondBlockingConfig.DeepCopy()
secondBlockingWithPriorityConfig.Status.Handlers[1].Priority = 3

type args struct {
hook runtimecatalog.Hook
request runtimehooksv1.RequestObject
Expand Down Expand Up @@ -1072,6 +1083,55 @@ func TestClient_CallAllExtensions(t *testing.T) {
},
wantErr: true,
},
{
name: "should succeed and wait on previous blocking responses for serial handler",
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig},
testServer: testServerConfig{
start: true,
responses: map[string]testServerResponse{
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
// second and third extension has no handler.
},
},
args: args{
hook: fakev1alpha1.RetryableFakeHook,
request: &fakev1alpha1.RetryableFakeRequest{},
response: &fakev1alpha1.RetryableFakeResponse{},
},
},
{
name: "should succeed and wait on blocking serial handler",
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingConfig},
testServer: testServerConfig{
start: true,
responses: map[string]testServerResponse{
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/first-extension.*": response(runtimehooksv1.ResponseStatusSuccess),
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
// third-extension has no handler.
},
},
args: args{
hook: fakev1alpha1.RetryableFakeHook,
request: &fakev1alpha1.RetryableFakeRequest{},
response: &fakev1alpha1.RetryableFakeResponse{},
},
},
{
name: "should succeed and wait on blocking serial handler, which is called with priority",
registeredExtensionConfigs: []runtimev1.ExtensionConfig{*secondBlockingWithPriorityConfig},
testServer: testServerConfig{
start: true,
responses: map[string]testServerResponse{
"/test.runtime.cluster.x-k8s.io/v1alpha1/retryablefakehook/second-extension.*": retryResponse(runtimehooksv1.ResponseStatusSuccess, 1),
// second and third extension has no handler.
},
},
args: args{
hook: fakev1alpha1.RetryableFakeHook,
request: &fakev1alpha1.RetryableFakeRequest{},
response: &fakev1alpha1.RetryableFakeResponse{},
},
},
{
name: "should fail when one of the ExtensionHandlers returns 404",
registeredExtensionConfigs: []runtimev1.ExtensionConfig{extensionConfig},
Expand Down Expand Up @@ -1317,6 +1377,20 @@ func response(status runtimehooksv1.ResponseStatus) testServerResponse {
}
}

func retryResponse(status runtimehooksv1.ResponseStatus, retrySeconds int32) testServerResponse {
return testServerResponse{
response: &fakev1alpha1.RetryableFakeResponse{
CommonRetryResponse: runtimehooksv1.CommonRetryResponse{
RetryAfterSeconds: retrySeconds,
CommonResponse: runtimehooksv1.CommonResponse{
Status: status,
},
},
},
responseStatusCode: http.StatusOK,
}
}

func createSecureTestServer(server testServerConfig, callbacks ...func()) *httptest.Server {
mux := http.NewServeMux()
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
Expand All @@ -1335,7 +1409,10 @@ func createSecureTestServer(server testServerConfig, callbacks ...func()) *httpt
panic(err)
}
w.WriteHeader(resp.responseStatusCode)
_, _ = w.Write(respBody)
_, err = w.Write(respBody)
if err != nil {
panic(err)
}
return
}

Expand Down
15 changes: 15 additions & 0 deletions internal/runtime/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ limitations under the License.
package registry

import (
"sort"
"sync"

"github.com/pkg/errors"
Expand Down Expand Up @@ -84,6 +85,13 @@ type ExtensionRegistration struct {

// Settings captures additional information sent in call to the RuntimeExtensions.
Settings map[string]string

// Priority defines the order in which this handler will be invoked. Hooks are executed in the descending order.
Priority int32

// Serial defines if the hook should be executed serially. This ensures that previously pending hooks are finished
// first, as well as current hook reached completion, before moving to the next one.
Serial bool
}

// extensionRegistry is an implementation of ExtensionRegistry.
Expand Down Expand Up @@ -210,6 +218,11 @@ func (r *extensionRegistry) List(gh runtimecatalog.GroupHook) ([]*ExtensionRegis
l = append(l, registration)
}
}

sort.SliceStable(l, func(i, j int) bool {
return l[i].Priority > l[j].Priority
})

return l, nil
}

Expand Down Expand Up @@ -263,6 +276,8 @@ func (r *extensionRegistry) add(extensionConfig *runtimev1.ExtensionConfig) erro
TimeoutSeconds: e.TimeoutSeconds,
FailurePolicy: e.FailurePolicy,
Settings: extensionConfig.Spec.Settings,
Priority: e.Priority,
Serial: e.Serial,
})
}

Expand Down
10 changes: 6 additions & 4 deletions test/e2e/cluster_upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,10 +184,12 @@ func ClusterUpgradeConformanceSpec(ctx context.Context, inputGetter func() Clust
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
PreWaitForControlPlaneToBeUpgraded: func() {
if input.PreWaitForControlPlaneToBeUpgraded != nil {
input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName)
}
PreWaitForControlPlaneToBeUpgraded: []func(){
func() {
if input.PreWaitForControlPlaneToBeUpgraded != nil {
input.PreWaitForControlPlaneToBeUpgraded(input.BootstrapClusterProxy, namespace.Name, clusterName)
}
},
},
})
} else {
Expand Down
33 changes: 27 additions & 6 deletions test/e2e/cluster_upgrade_runtimesdk.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,12 +277,20 @@ func ClusterUpgradeWithRuntimeSDKSpec(ctx context.Context, inputGetter func() Cl
WaitForKubeProxyUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForDNSUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
WaitForEtcdUpgrade: input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"),
PreWaitForControlPlaneToBeUpgraded: func() {
beforeClusterUpgradeTestHandler(ctx,
input.BootstrapClusterProxy.GetClient(),
clusterRef,
input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo),
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
PreWaitForControlPlaneToBeUpgraded: []func(){
func() {
justBeforeClusterUpgradeTestHandler(ctx,
input.BootstrapClusterProxy.GetClient(),
clusterRef,
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
},
func() {
beforeClusterUpgradeTestHandler(ctx,
input.BootstrapClusterProxy.GetClient(),
clusterRef,
input.E2EConfig.MustGetVariable(KubernetesVersionUpgradeTo),
input.E2EConfig.GetIntervals(specName, "wait-machine-upgrade"))
},
},
PreWaitForWorkersToBeUpgraded: func() {
machineSetPreflightChecksTestHandler(ctx,
Expand Down Expand Up @@ -580,6 +588,19 @@ func beforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, clust
}, intervals)
}

// justBeforeClusterUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if
// JustBeforeClusterUpgrade hook passed, and BeforeClusterUpgrade hook is pending.
func justBeforeClusterUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, intervals []interface{}) {
hookName := "JustBeforeClusterUpgrade"
runtimeHookTestHandler(ctx, c, cluster, hookName, true, func() bool {
// Wait for JustBeforeClusterUpgrade to pass and BeforeClusterUpgrade pending
return checkLifecycleHookResponses(ctx, c, cluster, map[string]string{
"JustBeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 0",
"BeforeClusterUpgrade": "Status: Success, RetryAfterSeconds: 5",
}) != nil
}, intervals)
}

// afterControlPlaneUpgradeTestHandler calls runtimeHookTestHandler with a blocking function which returns false if any
// MachineDeployment in the Cluster has upgraded to the target Kubernetes version.
func afterControlPlaneUpgradeTestHandler(ctx context.Context, c client.Client, cluster types.NamespacedName, version string, intervals []interface{}) {
Expand Down
Loading