Skip to content

Commit 6a5af58

Browse files
committed
use separate crd controller
1 parent 3f68a69 commit 6a5af58

File tree

8 files changed

+312
-92
lines changed

8 files changed

+312
-92
lines changed

cmd/controller/main.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ import (
3232
"knative.dev/pkg/signals"
3333

3434
"github.com/lionelvillard/knative-functions-controller/pkg/dynamic"
35+
"github.com/lionelvillard/knative-functions-controller/pkg/reconciler/crds"
3536
"github.com/lionelvillard/knative-functions-controller/pkg/reconciler/functions"
3637
)
3738

@@ -47,6 +48,8 @@ var (
4748
func main() {
4849
flag.Parse()
4950

51+
// Load functions CRDs.
52+
5053
cfg, err := sharedmain.GetConfig(*masterURL, *kubeconfig)
5154
if err != nil {
5255
log.Fatal("Error building kubeconfig", err)
@@ -67,9 +70,11 @@ func main() {
6770
log.Fatalf("Error processing environment: %v", err)
6871
}
6972

70-
ctx := signals.NewContext()
73+
// Create a controller per function CRD.
74+
75+
controllers := make([]injection.ControllerConstructor, len(defs.Items)+1)
76+
controllers[0] = crds.NewController
7177

72-
controllers := make([]injection.ControllerConstructor, len(defs.Items))
7378
names := make([]string, len(defs.Items))
7479
for i, crd := range defs.Items {
7580
gvr := schema.GroupVersionResource{
@@ -79,21 +84,19 @@ func main() {
7984
}
8085
names[i] = crd.Name
8186

82-
log.Printf("injecting %v", gvr)
83-
8487
injection.Default.RegisterInformer(dynamic.WithInformer(gvr))
85-
86-
controllers[i] = functions.NewController(gvr)
88+
controllers[i+1] = functions.NewController(gvr)
8789
}
8890

91+
// Watch for any CRD changes
92+
ctx := signals.NewContext()
93+
8994
f := externalversions.NewSharedInformerFactory(clientset, time.Hour)
9095
crdInformer := f.Apiextensions().V1beta1().CustomResourceDefinitions().Informer()
9196
crdInformer.AddEventHandler(newHandler(names))
9297
go crdInformer.Run(ctx.Done())
9398

94-
if len(controllers) > 0 {
95-
sharedmain.MainWithConfig(ctx, "controller", cfg, controllers...)
96-
}
99+
sharedmain.MainWithConfig(ctx, "controller", cfg, controllers...)
97100

98101
<-ctx.Done()
99102
}

pkg/apis/duck/v1alpha1/function.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ import (
2424
"knative.dev/pkg/kmeta"
2525
)
2626

27+
const (
28+
ConfigMapAnnotation = "functions.knative.dev/configmap-version"
29+
)
30+
2731
// +genclient
2832
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
2933

pkg/dynamic/dynamic.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@ type Key struct {
3434

3535
func WithInformer(gvr schema.GroupVersionResource) func(ctx context.Context) (context.Context, controller.Informer) {
3636
return func(ctx context.Context) (context.Context, controller.Informer) {
37-
3837
f := factory.Get(ctx)
3938
inf := f.ForResource(gvr)
4039
return context.WithValue(ctx, Key{gvr: gvr}, inf), inf.Informer()

pkg/reconciler/crds/controller.go

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
/*
2+
Copyright 2019 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package crds
18+
19+
import (
20+
"context"
21+
22+
corev1 "k8s.io/api/core/v1"
23+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
24+
"k8s.io/client-go/kubernetes/scheme"
25+
"k8s.io/client-go/tools/cache"
26+
"k8s.io/client-go/tools/record"
27+
apiextensionsclient "knative.dev/pkg/client/injection/apiextensions/client"
28+
crdinformers "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1beta1/customresourcedefinition"
29+
kubeclient "knative.dev/pkg/client/injection/kube/client"
30+
"knative.dev/pkg/configmap"
31+
"knative.dev/pkg/controller"
32+
"knative.dev/pkg/logging"
33+
servingclient "knative.dev/serving/pkg/client/injection/client"
34+
serviceinformer "knative.dev/serving/pkg/client/injection/informers/serving/v1beta1/service"
35+
)
36+
37+
const (
38+
controllerAgentName = "crd-controller"
39+
)
40+
41+
// NewController returns a new CRD reconcile controller.
42+
func NewController(ctx context.Context, cmw configmap.Watcher) *controller.Impl {
43+
logger := logging.FromContext(ctx)
44+
45+
crdInformer := crdinformers.Get(ctx)
46+
serviceInformer := serviceinformer.Get(ctx)
47+
48+
r := &Reconciler{
49+
kubeClient: kubeclient.Get(ctx),
50+
crdClient: apiextensionsclient.Get(ctx),
51+
crdLister: crdInformer.Lister(),
52+
servingClient: servingclient.Get(ctx),
53+
serviceLister: serviceInformer.Lister(),
54+
Recorder: record.NewBroadcaster().NewRecorder(
55+
scheme.Scheme, corev1.EventSource{Component: controllerAgentName}),
56+
}
57+
impl := controller.NewImpl(r, logger, "crd")
58+
59+
logger.Info("Setting up event handlers")
60+
61+
crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
62+
FilterFunc: func(obj interface{}) bool {
63+
if object, ok := obj.(metav1.Object); ok {
64+
if labels := object.GetLabels(); labels != nil {
65+
if v, ok := labels["functions.knative.dev/crd"]; ok && v == "true" {
66+
// for _, name := range names {
67+
// if name == object.GetName() {
68+
// return false
69+
// }
70+
// }
71+
return true
72+
}
73+
}
74+
}
75+
return false
76+
},
77+
Handler: controller.HandleAll(impl.Enqueue),
78+
})
79+
80+
return impl
81+
}

pkg/reconciler/crds/crds.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
/*
2+
Copyright 2019 The Knative Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
package crds
18+
19+
import (
20+
"context"
21+
"errors"
22+
"fmt"
23+
24+
"go.uber.org/zap"
25+
corev1 "k8s.io/api/core/v1"
26+
apiextv1beta1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
27+
28+
apiextclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
29+
apiextensionsv1beta1 "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/v1beta1"
30+
"k8s.io/apimachinery/pkg/api/equality"
31+
apierrs "k8s.io/apimachinery/pkg/api/errors"
32+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33+
"k8s.io/client-go/kubernetes"
34+
"k8s.io/client-go/tools/record"
35+
"knative.dev/pkg/controller"
36+
"knative.dev/pkg/logging"
37+
"knative.dev/pkg/system"
38+
servingv1beta1 "knative.dev/serving/pkg/apis/serving/v1beta1"
39+
servingclient "knative.dev/serving/pkg/client/clientset/versioned"
40+
servingv1beta1listers "knative.dev/serving/pkg/client/listers/serving/v1beta1"
41+
42+
"github.com/lionelvillard/knative-functions-controller/pkg/reconciler/crds/resources"
43+
)
44+
45+
// Reconciler implements controller.Reconciler for dynamic resources.
46+
type Reconciler struct {
47+
// KubeClient allows us to talk to the k8s for core APIs
48+
kubeClient kubernetes.Interface
49+
50+
//
51+
crdClient apiextclientset.Interface
52+
53+
// servingClient allows us to talk to the serving APIs
54+
servingClient servingclient.Interface
55+
56+
// serviceLister index properties about Knative services
57+
serviceLister servingv1beta1listers.ServiceLister
58+
59+
// crdLister index properties about CRDs
60+
crdLister apiextensionsv1beta1.CustomResourceDefinitionLister
61+
62+
// Recorder is an event recorder for recording Event resources to the
63+
// Kubernetes API.
64+
Recorder record.EventRecorder
65+
}
66+
67+
// Check that our Reconciler implements controller.Reconciler
68+
var _ controller.Reconciler = (*Reconciler)(nil)
69+
70+
// Reconcile implements controller.Reconciler
71+
func (r *Reconciler) Reconcile(ctx context.Context, name string) error {
72+
logger := logging.FromContext(ctx)
73+
74+
crd, err := r.crdLister.Get(name)
75+
if apierrs.IsNotFound(err) {
76+
// The resource may no longer exist, in which case we stop processing.
77+
logger.Errorf("resource %q no longer exists", name)
78+
return nil
79+
} else if err != nil {
80+
return err
81+
}
82+
83+
return r.reconcile(ctx, crd)
84+
}
85+
86+
func (r *Reconciler) reconcile(ctx context.Context, crd *apiextv1beta1.CustomResourceDefinition) error {
87+
if crd.GetDeletionTimestamp() != nil {
88+
// Check for a DeletionTimestamp. If present, elide the normal reconcile logic.
89+
// When a controller needs finalizer handling, it would go here.
90+
return nil
91+
}
92+
93+
functionName := crd.Spec.Names.Plural
94+
95+
// Make sure the function service/configmaps exists
96+
cm, err := r.reconcileConfig(ctx, functionName)
97+
if err != nil {
98+
return err
99+
}
100+
101+
_, err = r.reconcileService(ctx, functionName, cm)
102+
if err != nil {
103+
104+
return err
105+
}
106+
107+
return nil
108+
}
109+
110+
func (r *Reconciler) reconcileConfig(ctx context.Context, functionName string) (*corev1.ConfigMap, error) {
111+
logger := logging.FromContext(ctx)
112+
113+
cmname := fmt.Sprintf("config-function-%s", functionName)
114+
115+
cm, err := r.kubeClient.CoreV1().ConfigMaps(system.Namespace()).Get(cmname, metav1.GetOptions{})
116+
if err != nil {
117+
if apierrs.IsNotFound(err) {
118+
cm, err = resources.MakeConfigMap(system.Namespace(), cmname)
119+
if err != nil {
120+
logger.Error("Failed to create the function configmap", zap.Error(err))
121+
return nil, err
122+
}
123+
cm, err = r.kubeClient.CoreV1().ConfigMaps(system.Namespace()).Create(cm)
124+
if err != nil {
125+
logger.Error("Failed to create the function configmap", zap.Error(err))
126+
return nil, err
127+
}
128+
} else {
129+
logger.Error("Unable to get the function configmap", zap.Error(err))
130+
return nil, err
131+
}
132+
}
133+
134+
return cm, nil
135+
}
136+
137+
func (r *Reconciler) reconcileService(ctx context.Context, functionName string, cm *corev1.ConfigMap) (*servingv1beta1.Service, error) {
138+
logger := logging.FromContext(ctx)
139+
140+
crd, err := r.crdLister.Get(functionName + ".functions.knative.dev")
141+
if err != nil {
142+
logger.Error("Failed to get function Custom Resource Definition", zap.Error(err))
143+
return nil, fmt.Errorf("Failed to get function Custom Resource Definition: %v", err)
144+
}
145+
146+
image, ok := crd.Annotations["functions.knative.dev/image"]
147+
if !ok {
148+
logger.Error("Missing functions.knative.dev/image annotation on function CRD", zap.Any("functionName", functionName))
149+
return nil, errors.New("Missing functions.knative.dev/image annotation on function CRD")
150+
}
151+
152+
expected := resources.MakeKnativeService(functionName, cm.ResourceVersion, image)
153+
154+
// Update service annotation with config map UUID.
155+
service, err := r.serviceLister.Services("knative-functions").Get(functionName)
156+
if err != nil {
157+
if apierrs.IsNotFound(err) {
158+
159+
ksvc, err := r.servingClient.ServingV1beta1().Services("knative-functions").Create(expected)
160+
if err != nil {
161+
logger.Error("Failed to create the function service", zap.Error(err))
162+
return nil, fmt.Errorf("Failed to create the function service: %v", err)
163+
}
164+
return ksvc, nil
165+
}
166+
167+
logger.Error("Unable to get the function service", zap.Error(err))
168+
return nil, err
169+
} else if !equality.Semantic.DeepEqual(expected.Spec, service.Spec) {
170+
service = service.DeepCopy()
171+
service.Spec = expected.Spec
172+
173+
service, err = r.servingClient.ServingV1beta1().Services("knative-functions").Update(service)
174+
if err != nil {
175+
logger.Error("Failed to update the function service", zap.Error(err))
176+
return nil, fmt.Errorf("Failed to update the function service: %v", err)
177+
}
178+
}
179+
180+
return service, nil
181+
}
File renamed without changes.

pkg/reconciler/functions/resources/ksvc.go renamed to pkg/reconciler/crds/resources/ksvc.go

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,9 @@ package resources
1919
import (
2020
corev1 "k8s.io/api/core/v1"
2121
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22-
2322
servingv1beta1 "knative.dev/serving/pkg/apis/serving/v1beta1"
24-
)
2523

26-
const (
27-
ConfigMapAnnotation = "functions.knative.dev/cm-resourceVersion"
24+
duckv1alpha1 "github.com/lionelvillard/knative-functions-controller/pkg/apis/duck/v1alpha1"
2825
)
2926

3027
// MakeKnativeService create a knative service
@@ -37,15 +34,12 @@ func MakeKnativeService(functionName string, version, image string) *servingv1be
3734
ObjectMeta: metav1.ObjectMeta{
3835
Name: functionName,
3936
Namespace: "knative-functions",
40-
Labels: map[string]string{
41-
FunctionRoleLabel: FunctionRole,
42-
},
4337
},
4438
Spec: servingv1beta1.ServiceSpec{
4539
ConfigurationSpec: servingv1beta1.ConfigurationSpec{
4640
Template: servingv1beta1.RevisionTemplateSpec{
4741
ObjectMeta: metav1.ObjectMeta{
48-
Annotations: map[string]string{ConfigMapAnnotation: version},
42+
Annotations: map[string]string{duckv1alpha1.ConfigMapAnnotation: version},
4943
},
5044
Spec: servingv1beta1.RevisionSpec{
5145
PodSpec: corev1.PodSpec{

0 commit comments

Comments
 (0)