From decbed432671d6796d621ef1e4c50f468c035df8 Mon Sep 17 00:00:00 2001 From: Valentin Petrov Date: Thu, 14 Nov 2024 18:36:39 +0200 Subject: [PATCH 1/5] Add support for AWS Cloud Map Collector Discovery --- cmd/otel-allocator/collector/aws_cloud_map.go | 215 +++++++++++++++++ cmd/otel-allocator/collector/collector.go | 225 +++++++++++------- cmd/otel-allocator/collector/k8s.go | 141 +++++++++++ .../{collector_test.go => k8s_test.go} | 38 +-- cmd/otel-allocator/config/config.go | 60 +++-- cmd/otel-allocator/config/flags.go | 53 ++++- cmd/otel-allocator/main.go | 22 +- go.mod | 20 ++ go.sum | 28 +++ 9 files changed, 676 insertions(+), 126 deletions(-) create mode 100644 cmd/otel-allocator/collector/aws_cloud_map.go create mode 100644 cmd/otel-allocator/collector/k8s.go rename cmd/otel-allocator/collector/{collector_test.go => k8s_test.go} (81%) diff --git a/cmd/otel-allocator/collector/aws_cloud_map.go b/cmd/otel-allocator/collector/aws_cloud_map.go new file mode 100644 index 0000000000..8872ff6341 --- /dev/null +++ b/cmd/otel-allocator/collector/aws_cloud_map.go @@ -0,0 +1,215 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "context" + "errors" + "fmt" + "log" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/servicediscovery" + "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" + "github.com/aws/aws-sdk-go/aws" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type AwsCloudMapWatcher struct { + svc *servicediscovery.Client + namespaceName *string + serviceName *string + watcher watcher +} + +type AwsCloudMapWatcherOption func(*AwsCloudMapWatcher) + +var ( + errNoNamespace = errors.New("no Cloud Map namespace specified to resolve the backends") + errNoServiceName = errors.New("no Cloud Map service_name specified to resolve the backends") + discoveryDuration = promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "cloudmap_discovery_duration_seconds", + Help: "Time taken to discover instances in Cloud Map", + Buckets: prometheus.ExponentialBuckets(0.1, 2, 10), + }) + + discoveryErrors = promauto.NewCounter(prometheus.CounterOpts{ + Name: "cloudmap_discovery_errors_total", + Help: "Total number of collector discovery errors", + }) + healthyInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_healthy_instances", + Help: "Number of healthy instances in Cloud Map", + }) + unhealthyInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_unhealthy_instances", + Help: "Number of unhealthy instances in Cloud Map", + }) + totalInstances = promauto.NewGauge(prometheus.GaugeOpts{ + Name: "cloudmap_instances_total", + Help: "Total number of instances in Cloud Map", + }) +) + +func NewAwsCloudMapWatcher(opts ...WatcherOption) (*AwsCloudMapWatcher, error) { + cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithDefaultRegion("")) + if err != nil { + log.Fatalf("unable to load SDK config, %v", err) + return nil, err + } + + // Using the Config value, create the DynamoDB client + svc := servicediscovery.NewFromConfig(cfg) + + w := &AwsCloudMapWatcher{ + svc: svc, + + watcher: watcher{ + close: make(chan struct{}), + minUpdateInterval: defaultMinUpdateInterval, + }, + } + + for _, opt := range opts { + if opt.awsCloudMapOption == nil { + continue + } + opt.awsCloudMapOption(w) + } + + if w.namespaceName == nil || len(*w.namespaceName) == 0 { + return nil, errNoNamespace + } + + if w.serviceName == nil || len(*w.serviceName) == 0 { + return nil, errNoServiceName + } + + return w, nil +} + +func (w *AwsCloudMapWatcher) Watch(options ...WatchOption) error { + config := WatchConfig{} + + for _, option := range options { + option(&config) + } + + if w.svc == nil { + return fmt.Errorf("AWS Cloud Map service client not initialized") + } + + startTime := time.Now() + + discoverOutput, err := w.svc.DiscoverInstances(context.TODO(), &servicediscovery.DiscoverInstancesInput{ + NamespaceName: w.namespaceName, + ServiceName: w.serviceName, + MaxResults: aws.Int32(100), // Limit results for better performance + }) + if err != nil { + return fmt.Errorf("failed to discover instances: %w", err) + } + + // Track discovery metrics + discoveryDuration.Observe(time.Since(startTime).Seconds()) + if err != nil { + discoveryErrors.Inc() + return fmt.Errorf("failed to discover instances: %w", err) + } + + discoveredInstances, healthStats := w.processBatch(discoverOutput.Instances) + + // Update metrics + w.updateMetrics(healthStats.healthy, healthStats.unhealthy) + + w.watcher.log.Info("discovered instances", + "total", len(discoverOutput.Instances), + "healthy", healthStats.healthy, + "unhealthy", healthStats.unhealthy, + "namespace", w.namespaceName, + "service", w.serviceName, + ) + + go w.rateLimitedCollectorHandler(discoveredInstances, config.fn) + + return nil +} + +func (w *AwsCloudMapWatcher) processBatch(instances []types.HttpInstanceSummary) ([]types.HttpInstanceSummary, struct{ healthy, unhealthy int }) { + const batchSize = 50 + stats := struct{ healthy, unhealthy int }{} + result := make([]types.HttpInstanceSummary, 0, len(instances)) + + for i := 0; i < len(instances); i += batchSize { + end := i + batchSize + if end > len(instances) { + end = len(instances) + } + + for _, instance := range instances[i:end] { + if instance.HealthStatus != types.HealthStatusUnhealthy { + result = append(result, instance) + stats.healthy++ + } else { + stats.unhealthy++ + } + } + } + + return result, stats +} + +func (w *AwsCloudMapWatcher) updateMetrics(healthy, unhealthy int) { + healthyInstances.Set(float64(healthy)) + unhealthyInstances.Set(float64(unhealthy)) + totalInstances.Set(float64(healthy + unhealthy)) +} + +func (w *AwsCloudMapWatcher) rateLimitedCollectorHandler(store []types.HttpInstanceSummary, fn func(collectors map[string]*allocation.Collector)) { + ticker := time.NewTicker(w.watcher.minUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-w.watcher.close: + return + case <-ticker.C: + w.runOnCollectors(store, fn) + } + } +} + +// runOnCollectors runs the provided function on the set of collectors from the Store. +func (w *AwsCloudMapWatcher) runOnCollectors(store []types.HttpInstanceSummary, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := make(map[string]*allocation.Collector, len(store)) + var node string + for _, obj := range store { + for attr, value := range obj.Attributes { + if attr == "EC2_INSTANCE_ID" { + node = value + } + } + collectorMap[*obj.InstanceId] = allocation.NewCollector(*obj.InstanceId, node) + } + collectorsDiscovered.Set(float64(len(collectorMap))) + fn(collectorMap) +} + +func (w *AwsCloudMapWatcher) Close() { + w.watcher.Close() +} diff --git a/cmd/otel-allocator/collector/collector.go b/cmd/otel-allocator/collector/collector.go index 8814e38797..515ecf2b46 100644 --- a/cmd/otel-allocator/collector/collector.go +++ b/cmd/otel-allocator/collector/collector.go @@ -4,31 +4,29 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 +// http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. - package collector import ( + "errors" + "fmt" "os" + "strings" "time" "github.com/go-logr/logr" + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" - v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - - "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) const ( @@ -43,102 +41,169 @@ var ( }) ) -type Watcher struct { +type CollectorWatcherType int + +const ( + K8sCollectorWatcher CollectorWatcherType = iota + AwsCloudMapCollectorWatcher +) + +var collectorWatcherTypeStrings = []string{"k8s", "aws-cloud-map"} + +func ParseCollectorWatcherType(s string) (CollectorWatcherType, error) { + for i, name := range collectorWatcherTypeStrings { + if strings.ToLower(s) == name { + return CollectorWatcherType(i), nil + } + } + return 0, errors.New("invalid collector watcher type") +} + +// Implement the Stringer interface for CollectorWatcherType +func (c CollectorWatcherType) String() string { + if int(c) < len(collectorWatcherTypeStrings) { + return collectorWatcherTypeStrings[c] + } + return "unknown" +} + +// Implement the Set method for pflag +func (c *CollectorWatcherType) Set(value string) error { + for i, name := range collectorWatcherTypeStrings { + if strings.ToLower(value) == name { + *c = CollectorWatcherType(i) + return nil + } + } + return errors.New("invalid collector watcher type") +} + +// Implement the Type method for pflag +func (c *CollectorWatcherType) Type() string { + return "CollectorWatcherType" +} + +var _ Watcher = &watcher{} + +// CollectorWatcher interface defines the common methods for watchers +type Watcher interface { + Watch(...WatchOption) error + Close() +} + +type watcher struct { log logr.Logger - k8sClient kubernetes.Interface - close chan struct{} minUpdateInterval time.Duration + close chan struct{} } -func NewCollectorWatcher(logger logr.Logger, kubeConfig *rest.Config) (*Watcher, error) { - clientset, err := kubernetes.NewForConfig(kubeConfig) - if err != nil { - return &Watcher{}, err +func (w *watcher) Watch(options ...WatchOption) error { + config := &WatchConfig{} + for _, opt := range options { + opt(config) } - return &Watcher{ - log: logger.WithValues("component", "opentelemetry-targetallocator"), - k8sClient: clientset, - close: make(chan struct{}), - minUpdateInterval: defaultMinUpdateInterval, - }, nil -} + if config.fn == nil { + return fmt.Errorf("fn is required") + } -func (k *Watcher) Watch(labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector)) error { - selector, err := metav1.LabelSelectorAsSelector(labelSelector) - if err != nil { - return err + if config.labelSelector == nil { + config.labelSelector = &metav1.LabelSelector{} } - listOptionsFunc := func(listOptions *metav1.ListOptions) { - listOptions.LabelSelector = selector.String() + if w.minUpdateInterval == 0 { + w.minUpdateInterval = defaultMinUpdateInterval } - informerFactory := informers.NewSharedInformerFactoryWithOptions( - k.k8sClient, - time.Second*30, - informers.WithNamespace(ns), - informers.WithTweakListOptions(listOptionsFunc)) - informer := informerFactory.Core().V1().Pods().Informer() - notify := make(chan struct{}, 1) - go k.rateLimitedCollectorHandler(notify, informer.GetStore(), fn) + if w.close == nil { + w.close = make(chan struct{}) + } + return nil +} - notifyFunc := func(_ interface{}) { - select { - case notify <- struct{}{}: - default: - } +func (w *watcher) Close() { + if w.close != nil { + close(w.close) } - _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ - AddFunc: notifyFunc, - UpdateFunc: func(oldObj, newObj interface{}) { - notifyFunc(newObj) +} + +// WatchConfig struct defines the common parameters for the watch method of Collector Watchers +type WatchConfig struct { + labelSelector *metav1.LabelSelector + fn func(collectors map[string]*allocation.Collector) +} + +type WatchOption func(wc *WatchConfig) + +func WithLabelSelector(labelSelector *metav1.LabelSelector) WatchOption { + return func(wc *WatchConfig) { + wc.labelSelector = labelSelector + } +} + +func WithFn(fn func(collectors map[string]*allocation.Collector)) WatchOption { + return func(wc *WatchConfig) { + wc.fn = fn + } +} + +type WatcherOption struct { + k8sOption K8sWatcherOption + awsCloudMapOption AwsCloudMapWatcherOption +} + +func WithMinUpdateInterval(interval time.Duration) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + w.watcher.minUpdateInterval = interval + }, + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.watcher.minUpdateInterval = interval }, - DeleteFunc: notifyFunc, - }) - if err != nil { - return err } +} - informer.Run(k.close) - return nil +func WithLogger(logger logr.Logger) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + w.watcher.log = logger + }, + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.watcher.log = logger + }, + } } -// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, -// but not more frequently than once per k.eventPeriod. -func (k *Watcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { - ticker := time.NewTicker(k.minUpdateInterval) - defer ticker.Stop() - - for { - select { - case <-k.close: - return - case <-ticker.C: // throttle events to avoid excessive updates - select { - case <-notify: - k.runOnCollectors(store, fn) - default: +func WithKubeConfig(config *rest.Config) WatcherOption { + return WatcherOption{ + k8sOption: func(w *K8sWatcher) { + clientset, err := kubernetes.NewForConfig(config) + if err != nil { + // Handle error, perhaps log it or panic + w.watcher.log.Error(err, "Failed to create Kubernetes client") + return } - } + w.k8sClient = clientset + }, } } -// runOnCollectors runs the provided function on the set of collectors from the Store. -func (k *Watcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { - collectorMap := map[string]*allocation.Collector{} - objects := store.List() - for _, obj := range objects { - pod := obj.(*v1.Pod) - if pod.Spec.NodeName == "" { - continue - } - collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) +func WithCloudMapConfig(namespaceName, serviceName *string) WatcherOption { + return WatcherOption{ + awsCloudMapOption: func(w *AwsCloudMapWatcher) { + w.namespaceName = namespaceName + w.serviceName = serviceName + }, } - collectorsDiscovered.Set(float64(len(collectorMap))) - fn(collectorMap) } -func (k *Watcher) Close() { - close(k.close) +func NewCollectorWatcher(t CollectorWatcherType, options ...WatcherOption) (Watcher, error) { + switch t { + case K8sCollectorWatcher: + return NewK8sWatcher(options...) + case AwsCloudMapCollectorWatcher: + return NewAwsCloudMapWatcher(options...) + default: + return nil, fmt.Errorf("invalid collector watcher type: %v", t) + } } diff --git a/cmd/otel-allocator/collector/k8s.go b/cmd/otel-allocator/collector/k8s.go new file mode 100644 index 0000000000..313e01d8f0 --- /dev/null +++ b/cmd/otel-allocator/collector/k8s.go @@ -0,0 +1,141 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package collector + +import ( + "errors" + "time" + + "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/cache" +) + +type K8sWatcher struct { + k8sClient kubernetes.Interface + watcher watcher +} + +type K8sWatcherOption func(*K8sWatcher) + +var ( + errNoClient = errors.New("no Kubernetes client given") +) + +func NewK8sWatcher(opts ...WatcherOption) (*K8sWatcher, error) { + c := &K8sWatcher{ + watcher: watcher{ + close: make(chan struct{}), + minUpdateInterval: defaultMinUpdateInterval, + }, + } + for _, opt := range opts { + if opt.k8sOption == nil { + continue + } + opt.k8sOption(c) + } + if c.k8sClient == nil { + return &K8sWatcher{}, errNoClient + } + return c, nil +} + +func (w *K8sWatcher) Watch(options ...WatchOption) error { // labelSelector *metav1.LabelSelector, fn func(collectors map[string]*allocation.Collector) + config := WatchConfig{} + for _, option := range options { + option(&config) + } + + selector, err := metav1.LabelSelectorAsSelector(config.labelSelector) + if err != nil { + return err + } + + listOptionsFunc := func(listOptions *metav1.ListOptions) { + listOptions.LabelSelector = selector.String() + } + informerFactory := informers.NewSharedInformerFactoryWithOptions( + w.k8sClient, + time.Second*30, + informers.WithNamespace(ns), + informers.WithTweakListOptions(listOptionsFunc)) + informer := informerFactory.Core().V1().Pods().Informer() + + notify := make(chan struct{}, 1) + go w.rateLimitedCollectorHandler(notify, informer.GetStore(), config.fn) + + notifyFunc := func(_ interface{}) { + select { + case notify <- struct{}{}: + default: + } + } + _, err = informer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: notifyFunc, + UpdateFunc: func(oldObj, newObj interface{}) { + notifyFunc(newObj) + }, + DeleteFunc: notifyFunc, + }) + if err != nil { + return err + } + + informer.Run(w.watcher.close) + return nil +} + +// rateLimitedCollectorHandler runs fn on collectors present in the store whenever it gets a notification on the notify channel, +// but not more frequently than once per k.eventPeriod. +func (w *K8sWatcher) rateLimitedCollectorHandler(notify chan struct{}, store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + ticker := time.NewTicker(w.watcher.minUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-w.watcher.close: + return + case <-ticker.C: // throttle events to avoid excessive updates + select { + case <-notify: + w.runOnCollectors(store, fn) + default: + } + } + } +} + +// runOnCollectors runs the provided function on the set of collectors from the Store. +func (w *K8sWatcher) runOnCollectors(store cache.Store, fn func(collectors map[string]*allocation.Collector)) { + collectorMap := map[string]*allocation.Collector{} + objects := store.List() + for _, obj := range objects { + pod := obj.(*v1.Pod) + if pod.Spec.NodeName == "" { + continue + } + collectorMap[pod.Name] = allocation.NewCollector(pod.Name, pod.Spec.NodeName) + } + collectorsDiscovered.Set(float64(len(collectorMap))) + fn(collectorMap) +} + +func (w *K8sWatcher) Close() { + w.watcher.Close() +} diff --git a/cmd/otel-allocator/collector/collector_test.go b/cmd/otel-allocator/collector/k8s_test.go similarity index 81% rename from cmd/otel-allocator/collector/collector_test.go rename to cmd/otel-allocator/collector/k8s_test.go index ed5ac364fc..be52d5346e 100644 --- a/cmd/otel-allocator/collector/collector_test.go +++ b/cmd/otel-allocator/collector/k8s_test.go @@ -31,7 +31,7 @@ import ( "github.com/open-telemetry/opentelemetry-operator/cmd/otel-allocator/allocation" ) -var logger = logf.Log.WithName("collector-unit-tests") +var logger = logf.Log.WithName("k8s-collector-unit-tests") var labelMap = map[string]string{ "app.kubernetes.io/instance": "default.test", "app.kubernetes.io/managed-by": "opentelemetry-operator", @@ -40,12 +40,14 @@ var labelSelector = metav1.LabelSelector{ MatchLabels: labelMap, } -func getTestPodWatcher() Watcher { - podWatcher := Watcher{ - k8sClient: fake.NewSimpleClientset(), - close: make(chan struct{}), - log: logger, - minUpdateInterval: time.Millisecond, +func getTestPodWatcher() K8sWatcher { + podWatcher := K8sWatcher{ + k8sClient: fake.NewSimpleClientset(), + watcher: watcher{ + close: make(chan struct{}), + log: logger, + minUpdateInterval: time.Millisecond, + }, } return podWatcher } @@ -63,9 +65,9 @@ func pod(name string) *v1.Pod { } } -func Test_runWatch(t *testing.T) { +func Test_k8sWatcher_runWatch(t *testing.T) { type args struct { - kubeFn func(t *testing.T, podWatcher Watcher) + kubeFn func(t *testing.T, podWatcher K8sWatcher) collectorMap map[string]*allocation.Collector } tests := []struct { @@ -76,7 +78,7 @@ func Test_runWatch(t *testing.T) { { name: "pod add", args: args{ - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher K8sWatcher) { for _, k := range []string{"test-pod1", "test-pod2", "test-pod3"} { p := pod(k) _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) @@ -103,7 +105,7 @@ func Test_runWatch(t *testing.T) { { name: "pod delete", args: args{ - kubeFn: func(t *testing.T, podWatcher Watcher) { + kubeFn: func(t *testing.T, podWatcher K8sWatcher) { for _, k := range []string{"test-pod2", "test-pod3"} { err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Delete(context.Background(), k, metav1.DeleteOptions{}) assert.NoError(t, err) @@ -136,7 +138,7 @@ func Test_runWatch(t *testing.T) { t.Run(tt.name, func(t *testing.T) { podWatcher := getTestPodWatcher() defer func() { - close(podWatcher.close) + close(podWatcher.watcher.close) }() var actual map[string]*allocation.Collector mapMutex := sync.Mutex{} @@ -145,12 +147,12 @@ func Test_runWatch(t *testing.T) { _, err := podWatcher.k8sClient.CoreV1().Pods("test-ns").Create(context.Background(), p, metav1.CreateOptions{}) assert.NoError(t, err) } - go func(podWatcher Watcher) { - err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) { + go func(podWatcher K8sWatcher) { + err := podWatcher.Watch(WithLabelSelector(&labelSelector), WithFn(func(colMap map[string]*allocation.Collector) { mapMutex.Lock() defer mapMutex.Unlock() actual = colMap - }) + })) require.NoError(t, err) }(podWatcher) @@ -168,15 +170,15 @@ func Test_runWatch(t *testing.T) { } // this tests runWatch in the case of watcher channel closing. -func Test_closeChannel(t *testing.T) { +func Test_k8sWatcher_closeChannel(t *testing.T) { podWatcher := getTestPodWatcher() var wg sync.WaitGroup wg.Add(1) - go func(podWatcher Watcher) { + go func(podWatcher K8sWatcher) { defer wg.Done() - err := podWatcher.Watch(&labelSelector, func(colMap map[string]*allocation.Collector) {}) + err := podWatcher.Watch(WithLabelSelector(&labelSelector), WithFn(func(colMap map[string]*allocation.Collector) {})) require.NoError(t, err) }(podWatcher) diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 8a8b7c188a..755648b6c6 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -38,24 +38,26 @@ import ( ) const ( - DefaultResyncTime = 5 * time.Minute - DefaultConfigFilePath string = "/conf/targetallocator.yaml" - DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30) - DefaultAllocationStrategy = "consistent-hashing" - DefaultFilterStrategy = "relabel-config" + DefaultResyncTime = 5 * time.Minute + DefaultConfigFilePath string = "/conf/targetallocator.yaml" + DefaultCRScrapeInterval model.Duration = model.Duration(time.Second * 30) + DefaultAllocationStrategy = "consistent-hashing" + DefaultFilterStrategy = "relabel-config" + DefaultCollectorWatcherType = "k8s" ) type Config struct { - ListenAddr string `yaml:"listen_addr,omitempty"` - KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` - ClusterConfig *rest.Config `yaml:"-"` - RootLogger logr.Logger `yaml:"-"` - CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` - PromConfig *promconfig.Config `yaml:"config"` - AllocationStrategy string `yaml:"allocation_strategy,omitempty"` - FilterStrategy string `yaml:"filter_strategy,omitempty"` - PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` - HTTPS HTTPSServerConfig `yaml:"https,omitempty"` + ListenAddr string `yaml:"listen_addr,omitempty"` + KubeConfigFilePath string `yaml:"kube_config_file_path,omitempty"` + ClusterConfig *rest.Config `yaml:"-"` + RootLogger logr.Logger `yaml:"-"` + CollectorSelector *metav1.LabelSelector `yaml:"collector_selector,omitempty"` + PromConfig *promconfig.Config `yaml:"config"` + AllocationStrategy string `yaml:"allocation_strategy,omitempty"` + FilterStrategy string `yaml:"filter_strategy,omitempty"` + PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` + HTTPS HTTPSServerConfig `yaml:"https,omitempty"` + CollectorWatcher CollectorWatcherConfig `yaml:"collector_watcher,omitempty"` } type PrometheusCRConfig struct { @@ -75,6 +77,16 @@ type HTTPSServerConfig struct { TLSKeyFilePath string `yaml:"tls_key_file_path,omitempty"` } +type CollectorWatcherConfig struct { + WatcherType string `yaml:"type,omitempty"` + AwsCloudMap AwsCloudMapConfig `yaml:"aws_cloud_map,omitempty"` +} + +type AwsCloudMapConfig struct { + Namespace string `yaml:"namespace,omitempty"` + ServiceName string `yaml:"service_name,omitempty"` +} + func LoadFromFile(file string, target *Config) error { return unmarshal(target, file) } @@ -145,6 +157,24 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { target.HTTPS.TLSKeyFilePath = tlsKeyFilePath } + if cwType, changed, err := getCollectorWatcherType(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.WatcherType = cwType + } + + if collectorWatcherNamespace, changed, err := getAWSCloudMapNamespace(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.AwsCloudMap.Namespace = collectorWatcherNamespace + } + + if collectorWatcherServiceName, changed, err := getAWSCloudMapServiceName(flagSet); err != nil { + return err + } else if changed { + target.CollectorWatcher.AwsCloudMap.ServiceName = collectorWatcherServiceName + } + return nil } diff --git a/cmd/otel-allocator/config/flags.go b/cmd/otel-allocator/config/flags.go index 0a47c27636..7e0d23e150 100644 --- a/cmd/otel-allocator/config/flags.go +++ b/cmd/otel-allocator/config/flags.go @@ -25,16 +25,19 @@ import ( // Flag names. const ( - targetAllocatorName = "target-allocator" - configFilePathFlagName = "config-file" - listenAddrFlagName = "listen-addr" - prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" - kubeConfigPathFlagName = "kubeconfig-path" - httpsEnabledFlagName = "enable-https-server" - listenAddrHttpsFlagName = "listen-addr-https" - httpsCAFilePathFlagName = "https-ca-file" - httpsTLSCertFilePathFlagName = "https-tls-cert-file" - httpsTLSKeyFilePathFlagName = "https-tls-key-file" + targetAllocatorName = "target-allocator" + configFilePathFlagName = "config-file" + listenAddrFlagName = "listen-addr" + prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" + kubeConfigPathFlagName = "kubeconfig-path" + httpsEnabledFlagName = "enable-https-server" + listenAddrHttpsFlagName = "listen-addr-https" + httpsCAFilePathFlagName = "https-ca-file" + httpsTLSCertFilePathFlagName = "https-tls-cert-file" + httpsTLSKeyFilePathFlagName = "https-tls-key-file" + collectorWatcherTypeFlagName = "collector-watcher-type" + awsCloudMapNamespaceFlagName = "aws-cloud-map-namespace" + awsCloudMapServiceNameFlagName = "aws-cloud-map-service-name" ) // We can't bind this flag to our FlagSet, so we need to handle it separately. @@ -51,6 +54,9 @@ func getFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet { flagSet.String(httpsCAFilePathFlagName, "", "The path to the HTTPS server TLS CA file.") flagSet.String(httpsTLSCertFilePathFlagName, "", "The path to the HTTPS server TLS certificate file.") flagSet.String(httpsTLSKeyFilePathFlagName, "", "The path to the HTTPS server TLS key file.") + flagSet.String(collectorWatcherTypeFlagName, "k8s", "The type of collector watcher to use. (one of 'k8s', 'aws-cloud-map')") + flagSet.String(awsCloudMapNamespaceFlagName, "default", "The namespace of the AWS Cloud Map service.") + flagSet.String(awsCloudMapServiceNameFlagName, "otel-collector", "The name of the AWS Cloud Map service.") zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling)) zapCmdLineOpts.BindFlags(zapFlagSet) flagSet.AddGoFlagSet(zapFlagSet) @@ -122,3 +128,30 @@ func getHttpsTLSKeyFilePath(flagSet *pflag.FlagSet) (value string, changed bool, value, err = flagSet.GetString(httpsTLSKeyFilePathFlagName) return } + +func getCollectorWatcherType(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(collectorWatcherTypeFlagName); !changed { + value, err = DefaultCollectorWatcherType, nil + return + } + value, err = flagSet.GetString(collectorWatcherTypeFlagName) + return +} + +func getAWSCloudMapNamespace(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(awsCloudMapNamespaceFlagName); !changed { + value, err = "", nil + return + } + value, err = flagSet.GetString(awsCloudMapNamespaceFlagName) + return +} + +func getAWSCloudMapServiceName(flagSet *pflag.FlagSet) (value string, changed bool, err error) { + if changed = flagSet.Changed(awsCloudMapServiceNameFlagName); !changed { + value, err = "", nil + return + } + value, err = flagSet.GetString(awsCloudMapServiceNameFlagName) + return +} diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index f9531d6740..6a6e92d5d5 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -53,7 +53,7 @@ func main() { allocatorPrehook prehook.Hook allocator allocation.Allocator discoveryManager *discovery.Manager - collectorWatcher *collector.Watcher + collectorWatcher collector.Watcher promWatcher allocatorWatcher.Watcher targetDiscoverer *target.Discoverer @@ -107,7 +107,20 @@ func main() { discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics) targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv) - collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher(log, cfg.ClusterConfig) + collectorWatcherType, err := collector.ParseCollectorWatcherType(cfg.CollectorWatcher.WatcherType) + if err != nil { + setupLog.Error(err, "Unable to parse collector watcher type") + os.Exit(1) + } + collectorWatcher, collectorWatcherErr := collector.NewCollectorWatcher( + collectorWatcherType, + collector.WithLogger(log.WithName("collector-watcher")), + collector.WithCloudMapConfig( + &cfg.CollectorWatcher.AwsCloudMap.Namespace, + &cfg.CollectorWatcher.AwsCloudMap.ServiceName, + ), + collector.WithKubeConfig(cfg.ClusterConfig), + ) if collectorWatcherErr != nil { setupLog.Error(collectorWatcherErr, "Unable to initialize collector watcher") os.Exit(1) @@ -179,7 +192,10 @@ func main() { }) runGroup.Add( func() error { - err := collectorWatcher.Watch(cfg.CollectorSelector, allocator.SetCollectors) + err := collectorWatcher.Watch( + collector.WithLabelSelector(cfg.CollectorSelector), + collector.WithFn(allocator.SetCollectors), + ) setupLog.Info("Collector watcher exited") return err }, diff --git a/go.mod b/go.mod index 0b9f77d2a5..94a4dda908 100644 --- a/go.mod +++ b/go.mod @@ -228,3 +228,23 @@ require ( sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect sigs.k8s.io/structured-merge-diff/v4 v4.4.1 // indirect ) + +require ( + github.com/aws/aws-sdk-go-v2/config v1.27.36 + github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5 +) + +require ( + github.com/aws/aws-sdk-go-v2 v1.32.4 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.17.34 // indirect + github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 // indirect + github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 // indirect + github.com/aws/smithy-go v1.22.0 // indirect +) diff --git a/go.sum b/go.sum index 10bcc61145..6c58a917a8 100644 --- a/go.sum +++ b/go.sum @@ -85,6 +85,34 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:W github.com/aws/aws-sdk-go v1.38.35/go.mod h1:hcU610XS61/+aQV88ixoOzUoG7v3b31pl2zKMmprdro= github.com/aws/aws-sdk-go v1.55.5 h1:KKUZBfBoyqy5d3swXyiC7Q76ic40rYcbqH7qjh59kzU= github.com/aws/aws-sdk-go v1.55.5/go.mod h1:eRwEWoyTWFMVYVQzKMNHWP5/RV4xIUGMQfXQHfHkpNU= +github.com/aws/aws-sdk-go-v2 v1.32.4 h1:S13INUiTxgrPueTmrm5DZ+MiAo99zYzHEFh1UNkOxNE= +github.com/aws/aws-sdk-go-v2 v1.32.4/go.mod h1:2SK5n0a2karNTv5tbP1SjsX0uhttou00v/HpXKM1ZUo= +github.com/aws/aws-sdk-go-v2/config v1.27.36 h1:4IlvHh6Olc7+61O1ktesh0jOcqmq/4WG6C2Aj5SKXy0= +github.com/aws/aws-sdk-go-v2/config v1.27.36/go.mod h1:IiBpC0HPAGq9Le0Xxb1wpAKzEfAQ3XlYgJLYKEVYcfw= +github.com/aws/aws-sdk-go-v2/credentials v1.17.34 h1:gmkk1l/cDGSowPRzkdxYi8edw+gN4HmVK151D/pqGNc= +github.com/aws/aws-sdk-go-v2/credentials v1.17.34/go.mod h1:4R9OEV3tgFMsok4ZeFpExn7zQaZRa9MRGFYnI/xC/vs= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14 h1:C/d03NAmh8C4BZXhuRNboF/DqhBkBCeDiJDcaqIT5pA= +github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.14/go.mod h1:7I0Ju7p9mCIdlrfS+JCgqcYD0VXz/N4yozsox+0o078= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23 h1:A2w6m6Tmr+BNXjDsr7M90zkWjsu4JXHwrzPg235STs4= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.23/go.mod h1:35EVp9wyeANdujZruvHiQUAo9E3vbhnIO1mTCAxMlY0= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23 h1:pgYW9FCabt2M25MoHYCfMrVY2ghiiBKYWUVXfwZs+sU= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.23/go.mod h1:c48kLgzO19wAu3CPkDWC28JbaJ+hfQlsdl7I2+oqIbk= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1 h1:VaRN3TlFdd6KxX1x3ILT5ynH6HvKgqdiXoTxAF4HQcQ= +github.com/aws/aws-sdk-go-v2/internal/ini v1.8.1/go.mod h1:FbtygfRFze9usAadmnGJNc8KsP346kEe+y2/oyhGAGc= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5 h1:QFASJGfT8wMXtuP3D5CRmMjARHv9ZmzFUMJznHDOY3w= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.5/go.mod h1:QdZ3OmoIjSX+8D1OPAzPxDfjXASbBMDsz9qvtyIhtik= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20 h1:Xbwbmk44URTiHNx6PNo0ujDE6ERlsCKJD3u1zfnzAPg= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.20/go.mod h1:oAfOFzUB14ltPZj1rWwRc3d/6OgD76R8KlvU3EqM9Fg= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5 h1:Adq1dR6H8pI7pECxgc0S44HMjJcvKoUea0fUyHEFUZA= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.33.5/go.mod h1:GZ6a9kkJfcGPRCMGU003Gb/VsB3qAH2xeIUt/6DLYF4= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.0 h1:fHySkG0IGj2nepgGJPmmhZYL9ndnsq1Tvc6MeuVQCaQ= +github.com/aws/aws-sdk-go-v2/service/sso v1.23.0/go.mod h1:XRlMvmad0ZNL+75C5FYdMvbbLkd6qiqz6foR1nA1PXY= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0 h1:cU/OeQPNReyMj1JEBgjE29aclYZYtXcsPMXbTkVGMFk= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.27.0/go.mod h1:FnvDM4sfa+isJ3kDXIzAB9GAwVSzFzSy97uZ3IsHo4E= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.0 h1:GNVxIHBTi2EgwCxpNiozhNasMOK+ROUA2Z3X+cSBX58= +github.com/aws/aws-sdk-go-v2/service/sts v1.31.0/go.mod h1:yMWe0F+XG0DkRZK5ODZhG7BEFYhLXi2dqGsv6tX0cgI= +github.com/aws/smithy-go v1.22.0 h1:uunKnWlcoL3zO7q+gG2Pk53joueEOsnNB28QdMsmiMM= +github.com/aws/smithy-go v1.22.0/go.mod h1:irrKGvNn1InZwb2d7fkIRNucdfwR8R+Ts3wxYa/cJHg= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3 h1:6df1vn4bBlDDo4tARvBm7l6KA9iVMnE3NWizDeWSrps= github.com/bboreham/go-loser v0.0.0-20230920113527-fcc2c21820a3/go.mod h1:CIWtjkly68+yqLPbvwwR/fjNJA/idrtULjZWh2v1ys0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= From 77cbc71b177a422c14927b572dde6aefc22434c5 Mon Sep 17 00:00:00 2001 From: Valentin Petrov Date: Tue, 19 Nov 2024 16:56:08 +0200 Subject: [PATCH 2/5] Add runtime - kubernetes, to avoid mandatory loading of k8s config --- cmd/otel-allocator/config/config.go | 38 +++++++++++++++++++++-------- cmd/otel-allocator/config/flags.go | 36 +++++++++++++++++---------- 2 files changed, 51 insertions(+), 23 deletions(-) diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 755648b6c6..8104dbae98 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -58,6 +58,7 @@ type Config struct { PrometheusCR PrometheusCRConfig `yaml:"prometheus_cr,omitempty"` HTTPS HTTPSServerConfig `yaml:"https,omitempty"` CollectorWatcher CollectorWatcherConfig `yaml:"collector_watcher,omitempty"` + Runtime RuntimeConfig `yaml:"runtime,omitempty"` } type PrometheusCRConfig struct { @@ -87,6 +88,14 @@ type AwsCloudMapConfig struct { ServiceName string `yaml:"service_name,omitempty"` } +type RuntimeConfig struct { + Kubernetes KubernetesRuntimeConfig `yaml:"kubernetes,omitempty"` +} + +type KubernetesRuntimeConfig struct { + Enabled bool `yaml:"enabled,omitempty"` +} + func LoadFromFile(file string, target *Config) error { return unmarshal(target, file) } @@ -102,19 +111,28 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { if err != nil { return err } - clusterConfig, err := clientcmd.BuildConfigFromFlags("", target.KubeConfigFilePath) - if err != nil { - pathError := &fs.PathError{} - if ok := errors.As(err, &pathError); !ok { - return err - } - clusterConfig, err = rest.InClusterConfig() + + if runtimeKubernetesEnabled, changed, flagErr := getRuntimeKubernetesEnabled(flagSet); flagErr != nil { + return flagErr + } else if changed { + target.Runtime.Kubernetes.Enabled = runtimeKubernetesEnabled + } + + if target.Runtime.Kubernetes.Enabled { + clusterConfig, err := clientcmd.BuildConfigFromFlags("", target.KubeConfigFilePath) if err != nil { - return err + pathError := &fs.PathError{} + if ok := errors.As(err, &pathError); !ok { + return err + } + clusterConfig, err = rest.InClusterConfig() + if err != nil { + return err + } + target.KubeConfigFilePath = "" } - target.KubeConfigFilePath = "" + target.ClusterConfig = clusterConfig } - target.ClusterConfig = clusterConfig target.ListenAddr, err = getListenAddr(flagSet) if err != nil { diff --git a/cmd/otel-allocator/config/flags.go b/cmd/otel-allocator/config/flags.go index 7e0d23e150..d780fe3d28 100644 --- a/cmd/otel-allocator/config/flags.go +++ b/cmd/otel-allocator/config/flags.go @@ -25,19 +25,20 @@ import ( // Flag names. const ( - targetAllocatorName = "target-allocator" - configFilePathFlagName = "config-file" - listenAddrFlagName = "listen-addr" - prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" - kubeConfigPathFlagName = "kubeconfig-path" - httpsEnabledFlagName = "enable-https-server" - listenAddrHttpsFlagName = "listen-addr-https" - httpsCAFilePathFlagName = "https-ca-file" - httpsTLSCertFilePathFlagName = "https-tls-cert-file" - httpsTLSKeyFilePathFlagName = "https-tls-key-file" - collectorWatcherTypeFlagName = "collector-watcher-type" - awsCloudMapNamespaceFlagName = "aws-cloud-map-namespace" - awsCloudMapServiceNameFlagName = "aws-cloud-map-service-name" + targetAllocatorName = "target-allocator" + configFilePathFlagName = "config-file" + listenAddrFlagName = "listen-addr" + prometheusCREnabledFlagName = "enable-prometheus-cr-watcher" + kubeConfigPathFlagName = "kubeconfig-path" + httpsEnabledFlagName = "enable-https-server" + listenAddrHttpsFlagName = "listen-addr-https" + httpsCAFilePathFlagName = "https-ca-file" + httpsTLSCertFilePathFlagName = "https-tls-cert-file" + httpsTLSKeyFilePathFlagName = "https-tls-key-file" + collectorWatcherTypeFlagName = "collector-watcher-type" + awsCloudMapNamespaceFlagName = "aws-cloud-map-namespace" + awsCloudMapServiceNameFlagName = "aws-cloud-map-service-name" + runtimeKubernetesEnabledFlagName = "runtime-kubernetes-enabled" ) // We can't bind this flag to our FlagSet, so we need to handle it separately. @@ -155,3 +156,12 @@ func getAWSCloudMapServiceName(flagSet *pflag.FlagSet) (value string, changed bo value, err = flagSet.GetString(awsCloudMapServiceNameFlagName) return } + +func getRuntimeKubernetesEnabled(flagSet *pflag.FlagSet) (value bool, changed bool, err error) { + if changed = flagSet.Changed(runtimeKubernetesEnabledFlagName); !changed { + value, err = true, nil + return + } + value, err = flagSet.GetBool(runtimeKubernetesEnabledFlagName) + return +} From ee79427cbd27f8130baf9f060d5b66d4bae60bb2 Mon Sep 17 00:00:00 2001 From: Valentin Petrov Date: Mon, 25 Nov 2024 16:49:34 +0200 Subject: [PATCH 3/5] Add configurable minUpdateInterval to reduce costs on AWS Cloud Map and fix collector routine blocking on AWSCloudMap Watch function --- cmd/otel-allocator/collector/aws_cloud_map.go | 104 +++++++++++------- cmd/otel-allocator/config/config.go | 9 ++ cmd/otel-allocator/config/flags.go | 13 +++ cmd/otel-allocator/main.go | 1 + cmd/otel-allocator/server/server.go | 11 +- 5 files changed, 96 insertions(+), 42 deletions(-) diff --git a/cmd/otel-allocator/collector/aws_cloud_map.go b/cmd/otel-allocator/collector/aws_cloud_map.go index 8872ff6341..333cba9b33 100644 --- a/cmd/otel-allocator/collector/aws_cloud_map.go +++ b/cmd/otel-allocator/collector/aws_cloud_map.go @@ -69,7 +69,7 @@ var ( func NewAwsCloudMapWatcher(opts ...WatcherOption) (*AwsCloudMapWatcher, error) { cfg, err := config.LoadDefaultConfig(context.TODO(), config.WithDefaultRegion("")) if err != nil { - log.Fatalf("unable to load SDK config, %v", err) + log.Fatalf("Unable to load SDK config, %v", err) return nil, err } @@ -114,39 +114,39 @@ func (w *AwsCloudMapWatcher) Watch(options ...WatchOption) error { return fmt.Errorf("AWS Cloud Map service client not initialized") } - startTime := time.Now() + // Create a separate done channel for blocking + done := make(chan struct{}) - discoverOutput, err := w.svc.DiscoverInstances(context.TODO(), &servicediscovery.DiscoverInstancesInput{ - NamespaceName: w.namespaceName, - ServiceName: w.serviceName, - MaxResults: aws.Int32(100), // Limit results for better performance - }) - if err != nil { - return fmt.Errorf("failed to discover instances: %w", err) + // Initial discovery + w.watcher.log.Info("Performing initial discovery") + if err := w.discoverAndProcess(config.fn); err != nil { + return err } - // Track discovery metrics - discoveryDuration.Observe(time.Since(startTime).Seconds()) - if err != nil { - discoveryErrors.Inc() - return fmt.Errorf("failed to discover instances: %w", err) - } + w.watcher.log.Info("Starting periodic discovery", "interval", w.watcher.minUpdateInterval) + // Start the periodic discovery in a goroutine + go func() { + ticker := time.NewTicker(w.watcher.minUpdateInterval) + defer ticker.Stop() + + for { + select { + case <-w.watcher.close: + w.watcher.log.Info("Stopping periodic discovery") + close(done) // Signal the main thread to unblock + return + case <-ticker.C: + if err := w.discoverAndProcess(config.fn); err != nil { + w.watcher.log.Error(err, "Error discovering instances") + continue + } - discoveredInstances, healthStats := w.processBatch(discoverOutput.Instances) - - // Update metrics - w.updateMetrics(healthStats.healthy, healthStats.unhealthy) - - w.watcher.log.Info("discovered instances", - "total", len(discoverOutput.Instances), - "healthy", healthStats.healthy, - "unhealthy", healthStats.unhealthy, - "namespace", w.namespaceName, - "service", w.serviceName, - ) - - go w.rateLimitedCollectorHandler(discoveredInstances, config.fn) + } + } + }() + // Block the main thread until done signal + <-done return nil } @@ -180,18 +180,44 @@ func (w *AwsCloudMapWatcher) updateMetrics(healthy, unhealthy int) { totalInstances.Set(float64(healthy + unhealthy)) } -func (w *AwsCloudMapWatcher) rateLimitedCollectorHandler(store []types.HttpInstanceSummary, fn func(collectors map[string]*allocation.Collector)) { - ticker := time.NewTicker(w.watcher.minUpdateInterval) - defer ticker.Stop() +func (w *AwsCloudMapWatcher) discoverAndProcess(handlerFn func(map[string]*allocation.Collector)) error { + startTime := time.Now() - for { - select { - case <-w.watcher.close: - return - case <-ticker.C: - w.runOnCollectors(store, fn) - } + discoverOutput, err := w.svc.DiscoverInstances(context.TODO(), &servicediscovery.DiscoverInstancesInput{ + NamespaceName: w.namespaceName, + ServiceName: w.serviceName, + MaxResults: aws.Int32(100), + }) + if err != nil { + discoveryErrors.Inc() + return fmt.Errorf("Failed to discover instances: %w", err) } + + discoveryDuration.Observe(time.Since(startTime).Seconds()) + + discoveredInstances, healthStats := w.processBatch(discoverOutput.Instances) + + w.updateMetrics(healthStats.healthy, healthStats.unhealthy) + + w.watcher.log.Info("Discovered instances", + "total", len(discoverOutput.Instances), + "healthy", healthStats.healthy, + "unhealthy", healthStats.unhealthy, + "namespace", w.namespaceName, + "service", w.serviceName, + ) + instanceIds := make([]string, len(discoveredInstances)) + for i, instance := range discoveredInstances { + instanceIds[i] = *instance.InstanceId + } + + w.watcher.log.Info("Running on collectors", "instanceIds", instanceIds, "namespace", w.namespaceName, "service", w.serviceName) + + if handlerFn != nil { + w.runOnCollectors(discoveredInstances, handlerFn) + } + + return nil } // runOnCollectors runs the provided function on the set of collectors from the Store. diff --git a/cmd/otel-allocator/config/config.go b/cmd/otel-allocator/config/config.go index 8104dbae98..5a35d4d1fc 100644 --- a/cmd/otel-allocator/config/config.go +++ b/cmd/otel-allocator/config/config.go @@ -44,6 +44,7 @@ const ( DefaultAllocationStrategy = "consistent-hashing" DefaultFilterStrategy = "relabel-config" DefaultCollectorWatcherType = "k8s" + DefaultMinUpdateInterval = 5 * time.Second ) type Config struct { @@ -59,6 +60,7 @@ type Config struct { HTTPS HTTPSServerConfig `yaml:"https,omitempty"` CollectorWatcher CollectorWatcherConfig `yaml:"collector_watcher,omitempty"` Runtime RuntimeConfig `yaml:"runtime,omitempty"` + MinUpdateInterval time.Duration `yaml:"min_update_interval,omitempty"` } type PrometheusCRConfig struct { @@ -193,6 +195,12 @@ func LoadFromCLI(target *Config, flagSet *pflag.FlagSet) error { target.CollectorWatcher.AwsCloudMap.ServiceName = collectorWatcherServiceName } + if minUpdateInterval, changed, err := getMinUpdateInterval(flagSet); err != nil { + return err + } else if changed { + target.MinUpdateInterval = minUpdateInterval + } + return nil } @@ -214,6 +222,7 @@ func CreateDefaultConfig() Config { PrometheusCR: PrometheusCRConfig{ ScrapeInterval: DefaultCRScrapeInterval, }, + MinUpdateInterval: DefaultMinUpdateInterval, } } diff --git a/cmd/otel-allocator/config/flags.go b/cmd/otel-allocator/config/flags.go index d780fe3d28..a096f700ea 100644 --- a/cmd/otel-allocator/config/flags.go +++ b/cmd/otel-allocator/config/flags.go @@ -17,6 +17,7 @@ package config import ( "flag" "path/filepath" + "time" "github.com/spf13/pflag" "k8s.io/client-go/util/homedir" @@ -39,6 +40,7 @@ const ( awsCloudMapNamespaceFlagName = "aws-cloud-map-namespace" awsCloudMapServiceNameFlagName = "aws-cloud-map-service-name" runtimeKubernetesEnabledFlagName = "runtime-kubernetes-enabled" + minUpdateIntervalFlagName = "min-update-interval" ) // We can't bind this flag to our FlagSet, so we need to handle it separately. @@ -58,6 +60,8 @@ func getFlagSet(errorHandling pflag.ErrorHandling) *pflag.FlagSet { flagSet.String(collectorWatcherTypeFlagName, "k8s", "The type of collector watcher to use. (one of 'k8s', 'aws-cloud-map')") flagSet.String(awsCloudMapNamespaceFlagName, "default", "The namespace of the AWS Cloud Map service.") flagSet.String(awsCloudMapServiceNameFlagName, "otel-collector", "The name of the AWS Cloud Map service.") + flagSet.Bool(runtimeKubernetesEnabledFlagName, true, "Enable Kubernetes runtime.") + flagSet.Duration(minUpdateIntervalFlagName, DefaultMinUpdateInterval, "The minimum update interval.") zapFlagSet := flag.NewFlagSet("", flag.ErrorHandling(errorHandling)) zapCmdLineOpts.BindFlags(zapFlagSet) flagSet.AddGoFlagSet(zapFlagSet) @@ -165,3 +169,12 @@ func getRuntimeKubernetesEnabled(flagSet *pflag.FlagSet) (value bool, changed bo value, err = flagSet.GetBool(runtimeKubernetesEnabledFlagName) return } + +func getMinUpdateInterval(flagSet *pflag.FlagSet) (value time.Duration, changed bool, err error) { + if changed = flagSet.Changed(minUpdateIntervalFlagName); !changed { + value, err = 5*time.Second, nil + return + } + value, err = flagSet.GetDuration(minUpdateIntervalFlagName) + return +} diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 6a6e92d5d5..92e6207908 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -119,6 +119,7 @@ func main() { &cfg.CollectorWatcher.AwsCloudMap.Namespace, &cfg.CollectorWatcher.AwsCloudMap.ServiceName, ), + collector.WithMinUpdateInterval(cfg.MinUpdateInterval), collector.WithKubeConfig(cfg.ClusterConfig), ) if collectorWatcherErr != nil { diff --git a/cmd/otel-allocator/server/server.go b/cmd/otel-allocator/server/server.go index 33e845103f..5f86e5a5f8 100644 --- a/cmd/otel-allocator/server/server.go +++ b/cmd/otel-allocator/server/server.go @@ -102,6 +102,11 @@ func (s *Server) setRouter(router *gin.Engine) { router.GET("/livez", s.LivenessProbeHandler) router.GET("/readyz", s.ReadinessProbeHandler) registerPprof(router.Group("/debug/pprof/")) + var routesPath []string + for _, route := range router.Routes() { + routesPath = append(routesPath, route.Path) + } + s.logger.Info("Register routes", "routes", routesPath) } func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr string, options ...Option) *Server { @@ -125,17 +130,17 @@ func NewServer(log logr.Logger, allocator allocation.Allocator, listenAddr strin } func (s *Server) Start() error { - s.logger.Info("Starting server...") + s.logger.Info("Starting HTTP server...", "listenAddr", s.server.Addr) return s.server.ListenAndServe() } func (s *Server) Shutdown(ctx context.Context) error { - s.logger.Info("Shutting down server...") + s.logger.Info("Shutting down HTTP server...") return s.server.Shutdown(ctx) } func (s *Server) StartHTTPS() error { - s.logger.Info("Starting HTTPS server...") + s.logger.Info("Starting HTTPS server...", "listenAddr", s.server.Addr) return s.httpsServer.ListenAndServeTLS("", "") } From 257e7723b5b7e3eb6897ba7c951ce509badbe4c7 Mon Sep 17 00:00:00 2001 From: Valentin Petrov Date: Tue, 10 Dec 2024 13:41:36 +0200 Subject: [PATCH 4/5] Add prometheus logger to be able to debug discovery issues --- cmd/otel-allocator/main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/otel-allocator/main.go b/cmd/otel-allocator/main.go index 92e6207908..793c5e4f62 100644 --- a/cmd/otel-allocator/main.go +++ b/cmd/otel-allocator/main.go @@ -97,14 +97,14 @@ func main() { httpOptions = append(httpOptions, server.WithTLSConfig(tlsConfig, cfg.HTTPS.ListenAddr)) } srv := server.NewServer(log, allocator, cfg.ListenAddr, httpOptions...) - + promLogger := gokitlog.With(gokitlog.NewJSONLogger(gokitlog.NewSyncWriter(os.Stdout)), "logger", "prometheus-discovery") discoveryCtx, discoveryCancel := context.WithCancel(ctx) sdMetrics, err := discovery.CreateAndRegisterSDMetrics(prometheus.DefaultRegisterer) if err != nil { setupLog.Error(err, "Unable to register metrics for Prometheus service discovery") os.Exit(1) } - discoveryManager = discovery.NewManager(discoveryCtx, gokitlog.NewNopLogger(), prometheus.DefaultRegisterer, sdMetrics) + discoveryManager = discovery.NewManager(discoveryCtx, promLogger, prometheus.DefaultRegisterer, sdMetrics) targetDiscoverer = target.NewDiscoverer(log, discoveryManager, allocatorPrehook, srv) collectorWatcherType, err := collector.ParseCollectorWatcherType(cfg.CollectorWatcher.WatcherType) From 2e5b719bf7ce2789faa2d5c7fae2203b8ee61d82 Mon Sep 17 00:00:00 2001 From: Valentin Petrov Date: Thu, 13 Feb 2025 11:36:38 +0200 Subject: [PATCH 5/5] Add Spryker READme on how to build --- README-spryker.md | 30 ++++++++++++++++++++++++++++++ 1 file changed, 30 insertions(+) create mode 100644 README-spryker.md diff --git a/README-spryker.md b/README-spryker.md new file mode 100644 index 0000000000..131544ef2b --- /dev/null +++ b/README-spryker.md @@ -0,0 +1,30 @@ +# README-spryker + +This README points to the process of building Target Allocator image for the Highly Available setup of OTEL Collector in ECS. + +Current limitation is High Availability and Persistence cannot be enabled at once. For this to happen, this needs to be implemented as a further feature. + +## Prerequisites + +* aws-cli - v2.x +* make - any modern version or system make will work +* docker-cli - any will work +* golang - go 1.22.0 (based on the go.mod file) + +## Build + +``` +cd + +export TARGETALLOCATOR_VERSION= +export AWS_ACCESS_KEY_ID="" +export AWS_SECRET_ACCESS_KEY="" +export AWS_SESSION_TOKEN="" + +make container-target-allocator TARGETALLOCATOR_IMG=target-allocator:${TARGETALLOCATOR_VERSION} && \ +aws ecr-public get-login-password --region us-east-1 | docker login --username AWS --password-stdin public.ecr.aws/g5b2g3a8 && \ docker tag target-allocator:${TARGETALLOCATOR_VERSION} public.ecr.aws/g5b2g3a8/target-allocator:${TARGETALLOCATOR_VERSION} && \ docker push public.ecr.aws/g5b2g3a8/target-allocator:${TARGETALLOCATOR_VERSION} +``` + +## Rollout + +Update the new version to the Target Allocator service under the [o11y_module](https://github.com/spryker-projects/tf-module-o11y/blob/b52ec0d3f864d0dd70e949290c3b8d43c603ad50/modules/o11y_target_allocator/locals.tf#L150).