[Framework] implement ingress operator#75
Conversation
629f57b to
ad1e484
Compare
rokatyy
left a comment
There was a problem hiding this comment.
Some questions and suggestion to begin with. Also, please make sure that all errors start with capitalised letters
pkg/kube/ingress.go
Outdated
| func (iw *IngressWatcher) IngressHandlerDeleteFunc(obj interface{}) { | ||
| host, path, targets, err := iw.extractIngressValuesFromIngressResource(obj) | ||
| if err != nil { | ||
| iw.logger.WarnWith("Delete ingress handler failure- failed to extract values from object", "error", err) | ||
| return | ||
| } | ||
|
|
||
| if err = iw.ingressCache.Delete(host, path, targets); err != nil { | ||
| iw.logger.WarnWith("Delete ingress handler failure- failed delete from cache", "error", err, "object", obj) | ||
| return | ||
| } |
There was a problem hiding this comment.
please rename all this methods to not have ingress in them, it is already in the name of IngressWatcher, so it can be simply Add, Update, Delete
There was a problem hiding this comment.
Agreed, it can not have Func, e.g. AddHandler etc
pkg/kube/ingress.go
Outdated
| } | ||
|
|
||
| // extractHostPathTarget extracts the host, path, and targets from the ingress resource | ||
| func (iw *IngressWatcher) extractHostPathTarget(ingress *networkingv1.Ingress) (string, string, []string, error) { |
There was a problem hiding this comment.
How about a structure to return all these objects?
| func (iw *IngressWatcher) extractHostPathTarget(ingress *networkingv1.Ingress) (string, string, []string, error) { | |
| type IngressValues struct { | |
| Host string | |
| Path string | |
| Targets []string | |
| } |
it will improve readability and get rid of many "" and etc
pkg/kube/ingress.go
Outdated
| return "", errors.New("no rules found in ingress") | ||
| } | ||
|
|
||
| // Ingress must contain exactly one rule |
There was a problem hiding this comment.
It is not a rule in k8s, so if it contains more, we should still ignore that and get the first one, but add log about that
pkg/kube/ingress.go
Outdated
| // Ingress must contain exactly one rule | ||
| httpPath := rule.HTTP.Paths[0] |
pkg/kube/ingress.go
Outdated
| } | ||
|
|
||
| if err = iw.ingressCache.Set(newHost, newPath, newTargets); err != nil { | ||
| iw.logger.WarnWith("Update ingress handler failure- failed to add the new value", "error", err, "object", newObj) |
There was a problem hiding this comment.
| iw.logger.WarnWith("Update ingress handler failure- failed to add the new value", "error", err, "object", newObj) | |
| iw.logger.WarnWith("Update ingress handler failure - failed to add the new value", | |
| "error", err, "object", newObj) |
pkg/kube/ingress.go
Outdated
| // if the host or path has changed, we need to delete the old entry | ||
| if oldHost != newHost || oldPath != newPath { | ||
| if err = iw.ingressCache.Delete(oldHost, oldPath, oldTargets); err != nil { | ||
| iw.logger.WarnWith("Update ingress handler failure - failed to delete old ingress", "error", err) |
There was a problem hiding this comment.
| iw.logger.WarnWith("Update ingress handler failure - failed to delete old ingress", "error", err) | |
| iw.logger.WarnWith("Update ingress handler failure - failed to delete old ingress", | |
| "error", err) |
and same in all the places above
pkg/kube/ingress.go
Outdated
| } | ||
|
|
||
| if err = iw.ingressCache.Set(host, path, targets); err != nil { | ||
| iw.logger.WarnWith("Add ingress handler failure- failed to add the new value", "error", err, "object", obj) |
There was a problem hiding this comment.
| iw.logger.WarnWith("Add ingress handler failure- failed to add the new value", "error", err, "object", obj) | |
| iw.logger.WarnWith("Add ingress handler failure - failed to add the new value", | |
| "error", err, "object", obj) |
| func (iw *IngressWatcher) getPathFromIngress(ingress *networkingv1.Ingress) (string, error) { | ||
| if ingress == nil { | ||
| return "", errors.New("ingress is nil") | ||
| } | ||
|
|
||
| if len(ingress.Spec.Rules) == 0 { | ||
| return "", errors.New("no rules found in ingress") | ||
| } | ||
|
|
There was a problem hiding this comment.
duplication of the code in getHostFromIngress, can be moved to smth like
func (iw *IngressWatcher) getFirstRule(ingress *networkingv1.Ingress) (*networkingv1.IngressRule, error) {
if ingress == nil {
return nil, errors.New("ingress is nil")
}
if len(ingress.Spec.Rules) == 0 {
return nil, errors.New("no rules found in ingress")
}
return &ingress.Spec.Rules[0], nil
}
pkg/kube/ingress.go
Outdated
| targets, err := iw.resolveTargetsCallback(ingress) | ||
| if err != nil { | ||
| return "", "", nil, errors.Wrap(err, "Failed to extract targets from ingress labels") | ||
| } | ||
|
|
There was a problem hiding this comment.
what if resolveTargetsCallback does some additional filtering and returns empty targets if ingress doesn't meet the conditions, would it make sense to put it above getHostFromIngress?
There was a problem hiding this comment.
- I agree that since the callback function is external to the scaler, it's appropriate to check for nil before using the returned targets.
- The order of the function calls isn't critical, but I originally structured them to follow the logical route hierarchy:
host -> path -> targets.
If I understood you correctly, you'd like the target resolution to come first since it’s the most complex part, and placing it earlier supports a fail-fast approach. That makes sense, and I’ll update the order accordingly.
| "k8s.io/client-go/tools/cache" | ||
| ) | ||
|
|
||
| type ResolveTargetsFromIngressCallback func(ingress *networkingv1.Ingress) ([]string, error) |
There was a problem hiding this comment.
Please add a docstring for this type, as its function is expected to be implemented in a custom way. We should provide some guidance on what the implementation should include.
pkg/kube/ingress.go
Outdated
| ctx context.Context, | ||
| dlxLogger logger.Logger, | ||
| kubeClient kubernetes.Interface, | ||
| ingressCache *ingresscache.IngressCache, |
There was a problem hiding this comment.
In the struct above the property is defined as the IngressHostCache interface, but here you receive a pointer to the actual implementation IngressCache.
This should also be the interface.
pkg/kube/ingress.go
Outdated
|
|
||
| ingressWatcher := &IngressWatcher{ | ||
| ctx: ctx, | ||
| logger: dlxLogger, |
There was a problem hiding this comment.
| logger: dlxLogger, | |
| logger: dlxLogger.GetChild("watcher"), |
pkg/kube/ingress.go
Outdated
| ) (*IngressWatcher, error) { | ||
| factory := informers.NewSharedInformerFactoryWithOptions( | ||
| kubeClient, | ||
| 30*time.Second, |
There was a problem hiding this comment.
This resync interval should be configurable
pkg/kube/ingress.go
Outdated
| } | ||
|
|
||
| func (iw *IngressWatcher) Start() error { | ||
| iw.logger.Debug("Starting IngressWatcher") |
There was a problem hiding this comment.
| iw.logger.Debug("Starting IngressWatcher") | |
| iw.logger.Info("Starting ingress watcher") |
pkg/kube/ingress.go
Outdated
| iw.factory.Start(iw.ctx.Done()) | ||
|
|
||
| if !cache.WaitForCacheSync(iw.ctx.Done(), iw.informer.HasSynced) { | ||
| return errors.New("failed to sync ingress cache") |
There was a problem hiding this comment.
| return errors.New("failed to sync ingress cache") | |
| return errors.New("Failed to sync ingress cache") |
pkg/kube/ingress_test.go
Outdated
| // TestIngressWatcherTestSuite runs the test suite | ||
| func TestIngressWatcherTestSuite(t *testing.T) { | ||
| suite.Run(t, new(IngressWatcherTestSuite)) | ||
| } |
There was a problem hiding this comment.
Move to the bottom of the file
pkg/kube/ingress_test.go
Outdated
| name string | ||
| testArgs ingressArgs | ||
| expectedResult []string | ||
| initialStateCache *ingressArgs |
There was a problem hiding this comment.
| initialStateCache *ingressArgs | |
| initialCachedData *ingressArgs |
pkg/kube/ingress_test.go
Outdated
| type ingressArgs struct { | ||
| host string | ||
| path string | ||
| targets []string | ||
| } |
There was a problem hiding this comment.
Following @rokatyy 's suggestion, this can be a struct in ingress.go.
But generally - if you see that you defined the same struct multiple times - define it once outside of the tests.
pkg/kube/ingress_test.go
Outdated
| for _, testCase := range []struct { | ||
| name string | ||
| expectedResults []expectedResults | ||
| initialStateCache *ingressArgs |
There was a problem hiding this comment.
| initialStateCache *ingressArgs | |
| initialCacheData *ingressArgs |
pkg/kube/ingress_test.go
Outdated
| type expectedResult struct { | ||
| host string | ||
| path string | ||
| targets []string | ||
| expectError bool | ||
| expectErrorMsg string | ||
| } |
There was a problem hiding this comment.
Same for this - move it to the top of the file.
rokatyy
left a comment
There was a problem hiding this comment.
LGTM 👍 , just a minor thing regarding the comment
pkg/kube/ingress.go
Outdated
| // | ||
| // Implementation guidelines: | ||
| // - Return a non-nil slice when targets are successfully resolved | ||
| // - Return a non-nil error if resolution fails or no targets are found |
There was a problem hiding this comment.
why error when no targets found? maybe user code contain some custom filtering. I think that's ok not to throw error in this case, just ignore if empty
d00d4c6 to
0522781
Compare
TomerShor
left a comment
There was a problem hiding this comment.
Last minor comments from me
Description
Full description in the LLD that linked to this Jira - https://iguazio.atlassian.net/browse/NUC-498
This PR introduces a new IngressWatcher component that uses Kubernetes informers to watch for Ingress resource changes and updates an internal IngressHostCache accordingly. The watcher handles add, update, and delete events, extracts relevant data (host, path, target list), and reflects those changes in the in-memory cache.
Affected Areas
pkg/kube/ingress.go– new file with the IngressWatcher implementationIngressHostCachefrompkg/ingresscachek8s.io/client-gofor informersTesting
NuclioChanges Made
IngressWatcherstruct to manage and respond to Kubernetes Ingress resource changes.Additional notes