Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions pkg/reconciler/source/crd/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,11 +55,14 @@ func NewController(
}
})

logger.Info("Setting up event handlers")
logger.Info("Setting up CRD event handlers")
crdInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: pkgreconciler.LabelFilterFunc(sources.SourceDuckLabelKey, sources.SourceDuckLabelValue, false),
Handler: controller.HandleAll(impl.Enqueue),
})
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: r.deleteFunc,
}})

return impl
}
77 changes: 65 additions & 12 deletions pkg/reconciler/source/crd/crd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (

"go.uber.org/zap"
v1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apimachinery/pkg/runtime/schema"

customresourcedefinition "knative.dev/pkg/client/injection/apiextensions/informers/apiextensions/v1/customresourcedefinition"
crdreconciler "knative.dev/pkg/client/injection/apiextensions/reconciler/apiextensions/v1/customresourcedefinition"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -66,15 +68,6 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, crd *v1.CustomResourceDe
return err
}

if !crd.DeletionTimestamp.IsZero() {
// We are intentionally not setting up a finalizer on the CRD.
// This might leave unnecessary dynamic controllers running.
// This is a best effort to try to clean them up.
// Note that without a finalizer there is no guarantee we will be called.
r.deleteController(ctx, gvr)
return nil
}

r.reconcileController(ctx, crd, gvr, gvk)

return nil
Expand Down Expand Up @@ -106,20 +99,80 @@ func (r *Reconciler) resolveGroupVersions(crd *v1.CustomResourceDefinition) (*sc
return gvr, gvk, nil
}

func (r *Reconciler) deleteController(ctx context.Context, gvr *schema.GroupVersionResource) {
func (r *Reconciler) resolveGroupVersionsBeta(crd *v1beta1.CustomResourceDefinition) (*schema.GroupVersionResource, *schema.GroupVersionKind, error) {
var gvr *schema.GroupVersionResource
var gvk *schema.GroupVersionKind
for _, v := range crd.Spec.Versions {
if !v.Served {
continue
}
gvr = &schema.GroupVersionResource{
Group: crd.Spec.Group,
Version: v.Name,
Resource: crd.Spec.Names.Plural,
}

gvk = &schema.GroupVersionKind{
Group: crd.Spec.Group,
Version: v.Name,
Kind: crd.Spec.Names.Kind,
}

}
if gvr == nil || gvk == nil {
return nil, nil, fmt.Errorf("unable to find GVR or GVK for %s", crd.Name)
}
return gvr, gvk, nil
}

func (r *Reconciler) deleteFunc(obj interface{}) {
var ctx context.Context
customresourcedefinitionInformer := customresourcedefinition.Get(ctx)

lister := customresourcedefinitionInformer.Lister()

logging.FromContext(context.TODO()).Info("In delete function for CRD")
if obj == nil {
return
}

var gvr *schema.GroupVersionResource
var err error
crdv1, ok := obj.(*v1.CustomResourceDefinition)
if !ok {
crdv1beta1, ok := obj.(*v1beta1.CustomResourceDefinition)
if !ok {
return
}
gvr, _, err = r.resolveGroupVersionsBeta(crdv1beta1)
if err != nil {
logging.FromContext(r.ogctx).Errorw("Error in delete function", zap.String("GVR", gvr.String()), zap.Error(err))
return
}
} else {
gvr, _, err = r.resolveGroupVersions(crdv1)
if err != nil {
logging.FromContext(r.ogctx).Errorw("Error in delete function", zap.String("GVR", gvr.String()), zap.Error(err))
return
}
}

logging.FromContext(context.TODO()).Info("Object not nil")
r.lock.RLock()
_, found := r.controllers[*gvr]
r.lock.RUnlock()
if found {
logging.FromContext(context.TODO()).Info("Object found")
r.lock.Lock()
defer r.lock.Unlock()
// Now that we grabbed the write lock, check that nobody deleted it already.
rc, found := r.controllers[*gvr]
if found {
logging.FromContext(ctx).Infow("Stopping Source Duck Controller", zap.String("GVR", gvr.String()))
logging.FromContext(context.TODO()).Info("Stopping Source Duck Controller", zap.String("GVR", gvr.String()))
rc.cancel()
delete(r.controllers, *gvr)
logging.FromContext(context.TODO()).Info("Deleted Source Duck", zap.String("GVR", gvr.String()))
}
r.lock.Unlock()
}
}

Expand Down
9 changes: 6 additions & 3 deletions pkg/reconciler/source/duck/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,16 @@ func NewController(crd string, gvr schema.GroupVersionResource, gvk schema.Group
}
impl := controller.NewImpl(r, logger, ReconcilerName)

logger.Info("Setting up event handlers")
logger.Info("Setting up Source event handlers")
sourceInformer.AddEventHandler(controller.HandleAll(impl.Enqueue))

eventTypeInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{
FilterFunc: controller.FilterControllerGVK(gvk),
Handler: controller.HandleAll(impl.EnqueueControllerOf),
})
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: impl.Enqueue,
UpdateFunc: controller.PassNew(impl.Enqueue),
DeleteFunc: r.deleteFunc,
}})

return impl
}
Expand Down
7 changes: 7 additions & 0 deletions pkg/reconciler/source/duck/duck.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,10 @@ func asMap(eventTypes []v1beta1.EventType, keyFunc func(*v1beta1.EventType) stri
func keyFromEventType(eventType *v1beta1.EventType) string {
return fmt.Sprintf("%s_%s_%s_%s", eventType.Spec.Type, eventType.Spec.Source, eventType.Spec.Schema, eventType.Spec.Broker)
}

func (r *Reconciler) deleteFunc(obj interface{}) {
logging.FromContext(context.TODO()).Info("In delete function for Source")
if obj == nil {
return
}
}