Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion pkg/sqlcache/Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,12 @@ intended to be used as a way of enforcing RBAC.
fields := [][]string{{"metadata", "name"}, {"metadata", "namespace"}}
opts := &informer.ListOptions{}
// gvk should be of type k8s.io/apimachinery/pkg/runtime/schema.GroupVersionKind
c, err := cacheFactory.CacheFor(fields, client, gvk)
c, err := cacheFactory.CacheFor(context.Context,
getFieldsForGVKFunc, // See pkg/stores/sqlproxy/proxy_store:fieldsForGVK for an example
tableClient,
gvk,
schema,
controllerschema.IsListWatchable(schema))
if err != nil {
panic(err)
}
Expand Down
13 changes: 9 additions & 4 deletions pkg/sqlcache/informer/factory/informer_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ type guardedInformer struct {
wg wait.Group
}

// This function is used by factory.CacheFor() to pull in table info when the function needs to create new DB tables.
type GetFieldsFuncType func() (fields [][]string, typeGuidance map[string]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, isNamespaced bool, transform cache.TransformFunc)

type newInformer func(ctx context.Context, client dynamic.ResourceInterface, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, gvk schema.GroupVersionKind, db db.Client, shouldEncrypt bool, typeGuidance map[string]string, namespace bool, watchable bool, gcInterval time.Duration, gcKeepCount int) (*informer.Informer, error)

type Cache struct {
Expand Down Expand Up @@ -144,7 +147,8 @@ func NewCacheFactoryWithContext(ctx context.Context, opts CacheFactoryOptions) (
// a context for a single cache to be able to stop that cache (eg: on schema refresh) without impacting the other caches.
//
// Don't forget to call DoneWithCache with the given informer once done with it.
func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) {

func (f *CacheFactory) CacheFor(ctx context.Context, getFieldsFunc GetFieldsFuncType, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, watchable bool) (*Cache, error) {
// Second, check if the informer and its accompanying informer-specific mutex exist already in the informers cache
// If not, start by creating such informer-specific mutex. That is used later to ensure no two goroutines create
// informers for the same GVK at the same type
Expand All @@ -168,15 +172,15 @@ func (f *CacheFactory) CacheFor(ctx context.Context, fields [][]string, external
// Prevent Stop() to be called for that GVK
gi.stopMutex.RLock()

gvkCache, err := f.cacheForLocked(ctx, gi, fields, externalUpdateInfo, selfUpdateInfo, transform, client, gvk, typeGuidance, namespaced, watchable)
gvkCache, err := f.cacheForLocked(ctx, gi, getFieldsFunc, client, gvk, watchable)
if err != nil {
gi.stopMutex.RUnlock()
return nil, err
}
return gvkCache, nil
}

func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, fields [][]string, externalUpdateInfo *sqltypes.ExternalGVKUpdates, selfUpdateInfo *sqltypes.ExternalGVKUpdates, transform cache.TransformFunc, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, typeGuidance map[string]string, namespaced bool, watchable bool) (*Cache, error) {
func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer, getFieldsFunc GetFieldsFuncType, client dynamic.ResourceInterface, gvk schema.GroupVersionKind, watchable bool) (*Cache, error) {
// At this point an informer-specific mutex (gi.mutex) is guaranteed to exist. Lock it
gi.informerMutex.Lock()

Expand All @@ -191,9 +195,10 @@ func (f *CacheFactory) cacheForLocked(ctx context.Context, gi *guardedInformer,

_, encryptResourceAlways := defaultEncryptedResourceTypes[gvk]
shouldEncrypt := f.encryptAll || encryptResourceAlways
fields, typeGuidance, externalUpdateInfo, selfUpdateInfo, isNamespaced, transformFunc := getFieldsFunc()
// In non-test code this invokes pkg/sqlcache/informer/informer.go: NewInformer()
// search for "func NewInformer(ctx"
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transform, gvk, f.dbClient, shouldEncrypt, typeGuidance, namespaced, watchable, f.gcInterval, f.gcKeepCount)
i, err := f.newInformer(gi.ctx, client, fields, externalUpdateInfo, selfUpdateInfo, transformFunc, gvk, f.dbClient, shouldEncrypt, typeGuidance, isNamespaced, watchable, f.gcInterval, f.gcKeepCount)
if err != nil {
gi.informerMutex.Unlock()
return nil, err
Expand Down
Loading