7
7
8
8
mapset "github.com/deckarep/golang-set"
9
9
"github.com/pkg/errors"
10
- appsv1 "k8s.io/api/apps/v1"
11
10
corev1 "k8s.io/api/core/v1"
12
11
"k8s.io/apimachinery/pkg/labels"
13
12
"k8s.io/client-go/informers"
@@ -25,13 +24,13 @@ func NewProvider(kubeClient kubernetes.Interface, kubeController k8s.Controller,
25
24
informerFactory := informers .NewSharedInformerFactory (kubeClient , k8s .DefaultKubeEventResyncInterval )
26
25
27
26
informerCollection := InformerCollection {
28
- Endpoints : informerFactory .Core ().V1 ().Endpoints ().Informer (),
29
- Deployments : informerFactory .Apps ().V1 ().Deployments ().Informer (),
27
+ Endpoints : informerFactory .Core ().V1 ().Endpoints ().Informer (),
28
+ Pods : informerFactory .Core ().V1 ().Pods ().Informer (),
30
29
}
31
30
32
31
cacheCollection := CacheCollection {
33
- Endpoints : informerCollection .Endpoints .GetStore (),
34
- Deployments : informerCollection .Deployments .GetStore (),
32
+ Endpoints : informerCollection .Endpoints .GetStore (),
33
+ Pods : informerCollection .Pods .GetStore (),
35
34
}
36
35
37
36
client := Client {
@@ -49,7 +48,7 @@ func NewProvider(kubeClient kubernetes.Interface, kubeController k8s.Controller,
49
48
return kubeController .IsMonitoredNamespace (ns )
50
49
}
51
50
informerCollection .Endpoints .AddEventHandler (k8s .GetKubernetesEventHandlers ("Endpoints" , "Kubernetes" , client .announcements , shouldObserve ))
52
- informerCollection .Deployments .AddEventHandler (k8s .GetKubernetesEventHandlers ("Deployments " , "Kubernetes" , client .announcements , shouldObserve ))
51
+ informerCollection .Pods .AddEventHandler (k8s .GetKubernetesEventHandlers ("Pods " , "Kubernetes" , client .announcements , shouldObserve ))
53
52
54
53
if err := client .run (stop ); err != nil {
55
54
return nil , errors .Errorf ("Failed to start Kubernetes EndpointProvider client: %+v" , err )
@@ -109,43 +108,36 @@ func (c Client) ListEndpointsForService(svc service.MeshService) []endpoint.Endp
109
108
func (c Client ) GetServicesForServiceAccount (svcAccount service.K8sServiceAccount ) ([]service.MeshService , error ) {
110
109
log .Info ().Msgf ("[%s] Getting Services for service account %s on Kubernetes" , c .providerIdent , svcAccount )
111
110
services := mapset .NewSet ()
112
- deploymentsInterface := c .caches .Deployments .List ()
113
-
114
- for _ , deployments := range deploymentsInterface {
115
- kubernetesDeployments := deployments .(* appsv1.Deployment )
116
- if kubernetesDeployments != nil {
117
- if ! c .kubeController .IsMonitoredNamespace (kubernetesDeployments .Namespace ) {
118
- // Doesn't belong to namespaces we are observing
119
- continue
120
- }
121
- spec := kubernetesDeployments .Spec
122
- namespacedSvcAccount := service.K8sServiceAccount {
123
- Namespace : kubernetesDeployments .Namespace ,
124
- Name : spec .Template .Spec .ServiceAccountName ,
125
- }
126
- if svcAccount == namespacedSvcAccount {
127
- var selectorLabel map [string ]string
128
- if spec .Selector != nil {
129
- selectorLabel = spec .Selector .MatchLabels
130
- } else {
131
- selectorLabel = spec .Template .Labels
132
- }
111
+ podsInterface := c .caches .Pods .List ()
133
112
134
- appNamspace := kubernetesDeployments .Namespace
135
- k8sServices , err := c .getServicesByLabels (selectorLabel , appNamspace )
136
- if err != nil {
137
- log .Error ().Err (err ).Msgf ("Error retrieving service with label %v in namespace %s" , selectorLabel , appNamspace )
113
+ for _ , pods := range podsInterface {
114
+ kubernetesPods := pods .(* corev1.Pod )
115
+ if kubernetesPods == nil || ! c .kubeController .IsMonitoredNamespace (kubernetesPods .Namespace ) {
116
+ // Doesn't belong to namespaces we are observing
117
+ continue
118
+ }
119
+ spec := kubernetesPods .Spec
120
+ namespacedSvcAccount := service.K8sServiceAccount {
121
+ Namespace : kubernetesPods .Namespace ,
122
+ Name : spec .ServiceAccountName ,
123
+ }
124
+ if svcAccount != namespacedSvcAccount {
125
+ continue
126
+ }
127
+ podLabels := kubernetesPods .ObjectMeta .Labels
138
128
139
- return nil , errDidNotFindServiceForServiceAccount
140
- }
141
- for _ , svc := range k8sServices {
142
- meshService := service.MeshService {
143
- Namespace : appNamspace ,
144
- Name : svc .Name ,
145
- }
146
- services .Add (meshService )
147
- }
129
+ appNamspace := kubernetesPods .Namespace
130
+ k8sServices , err := c .getServicesByLabels (podLabels , appNamspace )
131
+ if err != nil {
132
+ log .Error ().Err (err ).Msgf ("Error retrieving service matching labels %v in namespace %s" , podLabels , appNamspace )
133
+ return nil , errDidNotFindServiceForServiceAccount
134
+ }
135
+ for _ , svc := range k8sServices {
136
+ meshService := service.MeshService {
137
+ Namespace : appNamspace ,
138
+ Name : svc .Name ,
148
139
}
140
+ services .Add (meshService )
149
141
}
150
142
}
151
143
@@ -178,8 +170,8 @@ func (c *Client) run(stop <-chan struct{}) error {
178
170
}
179
171
180
172
sharedInformers := map [string ]cache.SharedInformer {
181
- "Endpoints" : c .informers .Endpoints ,
182
- "Deployments " : c .informers .Deployments ,
173
+ "Endpoints" : c .informers .Endpoints ,
174
+ "Pods " : c .informers .Pods ,
183
175
}
184
176
185
177
var names []string
@@ -207,7 +199,7 @@ func (c *Client) run(stop <-chan struct{}) error {
207
199
}
208
200
209
201
// getServicesByLabels gets Kubernetes services whose selectors match the given labels
210
- func (c * Client ) getServicesByLabels (matchLabels map [string ]string , namespace string ) ([]corev1.Service , error ) {
202
+ func (c * Client ) getServicesByLabels (podLabels map [string ]string , namespace string ) ([]corev1.Service , error ) {
211
203
var finalList []corev1.Service
212
204
serviceList := c .kubeController .ListServices ()
213
205
@@ -220,7 +212,7 @@ func (c *Client) getServicesByLabels(matchLabels map[string]string, namespace st
220
212
221
213
svcRawSelector := svc .Spec .Selector
222
214
selector := labels .Set (svcRawSelector ).AsSelector ()
223
- if selector .Matches (labels .Set (matchLabels )) {
215
+ if selector .Matches (labels .Set (podLabels )) {
224
216
finalList = append (finalList , * svc )
225
217
}
226
218
}
0 commit comments