-
Notifications
You must be signed in to change notification settings - Fork 37
Expand file tree
/
Copy pathtranslate_ingresses.go
More file actions
380 lines (337 loc) · 14.2 KB
/
translate_ingresses.go
File metadata and controls
380 lines (337 loc) · 14.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
package managerdriver
import (
"fmt"
"reflect"
"strings"
"github.com/ngrok/ngrok-operator/internal/annotations"
"github.com/ngrok/ngrok-operator/internal/errors"
"github.com/ngrok/ngrok-operator/internal/ir"
"github.com/ngrok/ngrok-operator/internal/store"
"github.com/ngrok/ngrok-operator/internal/trafficpolicy"
netv1 "k8s.io/api/networking/v1"
"k8s.io/utils/ptr"
"sigs.k8s.io/controller-runtime/pkg/client"
)
// #region Ingresses to IR
// ingressesToIR fetches all stored ingresses and translates them into IR for further processing and translation
func (t *translator) ingressesToIR() []*ir.IRVirtualHost {
hostCache := make(map[ir.IRHostname]*ir.IRVirtualHost) // Each unique hostname corresponds to one IRVirtualHost
upstreamCache := make(map[ir.IRServiceKey]*ir.IRUpstream) // Each unique service/port combo corresponds to one IRUpstream
ingresses := t.store.ListNgrokIngressesV1()
for _, ingress := range ingresses {
// We currently require this annotation to be present for an Ingress to be translated into CloudEndpoints/AgentEndpoints, otherwise the default behaviour is to
// translate it into HTTPSEdges (legacy). A future version will remove support for HTTPSEdges and translation into CloudEndpoints/AgentEndpoints will become the new
// default behaviour.
mappingStrategy, err := MappingStrategyAnnotationToIR(ingress)
if err != nil {
t.log.Error(err, fmt.Sprintf("failed to check %q annotation. defaulting to using endpoints", annotations.MappingStrategyAnnotation))
}
useEndpointPooling, err := annotations.ExtractUseEndpointPooling(ingress)
if err != nil {
t.log.Error(err, fmt.Sprintf("failed to check %q annotation", annotations.MappingStrategyAnnotation))
}
if useEndpointPooling != nil && *useEndpointPooling {
t.log.Info(fmt.Sprintf("the following ingress will create endpoint(s) with pooling enabled because of the %q annotation",
annotations.MappingStrategyAnnotation),
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
)
}
annotationTrafficPolicy, tpObjRef, err := trafficPolicyFromAnnotation(t.store, ingress)
if err != nil {
t.log.Error(err, "error getting ngrok traffic policy for ingress",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace))
continue
}
var defaultDestination *ir.IRDestination
if ingress.Spec.DefaultBackend != nil {
defaultDestination, err = t.ingressBackendToIR(ingress, ingress.Spec.DefaultBackend, upstreamCache)
if err != nil {
t.log.Error(err, "unable to resolve ingress default backend",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
)
continue
}
}
bindings, err := annotations.ExtractUseBindings(ingress)
if err != nil {
t.log.Error(err, "failed to check bindings annotation for ingress",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
)
continue
}
t.ingressToIR(
ingress,
defaultDestination,
hostCache,
upstreamCache,
useEndpointPooling,
annotationTrafficPolicy,
tpObjRef,
bindings,
mappingStrategy,
)
}
vHostSlice := []*ir.IRVirtualHost{}
for _, irVHost := range hostCache {
vHostSlice = append(vHostSlice, irVHost)
}
return vHostSlice
}
// #region Single Ingress IR
// ingressToIR translates a single ingress into IR and stores entries in the cache. Caches are used so that we do not generate duplicate IR for hostnames/services
func (t *translator) ingressToIR(
ingress *netv1.Ingress,
defaultDestination *ir.IRDestination,
hostCache map[ir.IRHostname]*ir.IRVirtualHost,
upstreamCache map[ir.IRServiceKey]*ir.IRUpstream,
endpointPoolingEnabled *bool,
annotationTrafficPolicy *trafficpolicy.TrafficPolicy,
annotationTrafficPolicyRef *ir.OwningResource,
bindings []string,
mappingStrategy ir.IRMappingStrategy,
) {
for _, rule := range ingress.Spec.Rules {
ruleHostname := rule.Host
if ruleHostname == "" {
t.log.Error(errors.New("skipping converting ingress rule into cloud and agent endpoints because the rule.host is empty"),
"empty host in ingress spec rule",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
)
continue
}
// Make a new IRVirtualHost for this hostname unless we have one in the cache
owningResource := ir.OwningResource{
Kind: "Ingress",
Name: ingress.Name,
Namespace: ingress.Namespace,
}
irVHost, exists := hostCache[ir.IRHostname(ruleHostname)]
if exists {
// If we already have a virtual host for this hostname, the traffic policy config must be the same as the one we are currently processing
if !reflect.DeepEqual(irVHost.TrafficPolicyObjRef, annotationTrafficPolicyRef) {
t.log.Error(errors.New("different traffic policy annotations provided for the same hostname"),
"when using the same hostname across multiple ingresses, ensure that they do not use different traffic policies provided via annotations",
"current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
"hostname", ruleHostname,
)
continue
}
// They must have the same configuration for whether or not to pool endpoints
if !ptr.Equal(irVHost.EndpointPoolingEnabled, endpointPoolingEnabled) {
t.log.Error(errors.New("different endpoint pooling annotations provided for the same hostname"),
"when using the same hostname across multiple ingresses, ensure that they all enable or all disable endpoint pooling",
"current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
"hostname", ruleHostname,
)
continue
}
// They must share the same namespace
if irVHost.Namespace != ingress.Namespace {
t.log.Error(fmt.Errorf("unable to convert ingress rule into cloud and agent endpoints. the domain (%q) is already being used by another ingress in a different namespace. you will need to either consolidate them, ensure they are in the same namespace, or use a different domain for one of them", ruleHostname),
"ingress to endpoint conversion error",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
"namespace the hostname is already in-use in", irVHost.Namespace,
)
continue
}
// They must have the same default backend
if !reflect.DeepEqual(irVHost.DefaultDestination, defaultDestination) {
t.log.Error(errors.New("different ingress default backends provided for the same hostname"),
"when using the same hostname across multiple ingresses, ensure that they do not use different default backends. the existing default backend for the hostname will not be overwritten",
"current ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
"hostname", ruleHostname,
)
continue
}
// The current and existing configurations match, add the new owning ingress reference and keep going
irVHost.AddOwningResource(owningResource)
} else {
// Make a deep copy of the ingress traffic policy so that we don't taint it for subsequent rules
var ruleTrafficPolicy *trafficpolicy.TrafficPolicy
if annotationTrafficPolicy != nil {
var err error
ruleTrafficPolicy, err = annotationTrafficPolicy.DeepCopy()
if err != nil {
t.log.Error(err, "failed to copy traffic policy from ingress",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
)
continue
}
}
irVHost = &ir.IRVirtualHost{
Namespace: ingress.Namespace,
OwningResources: []ir.OwningResource{owningResource},
Listener: ir.IRListener{
Hostname: ir.IRHostname(ruleHostname),
Port: 443,
Protocol: ir.IRProtocol_HTTPS,
},
TrafficPolicy: ruleTrafficPolicy,
TrafficPolicyObjRef: annotationTrafficPolicyRef,
LabelsToAdd: t.managedResourceLabels,
Routes: []*ir.IRRoute{},
DefaultDestination: defaultDestination,
EndpointPoolingEnabled: endpointPoolingEnabled,
Metadata: t.defaultIngressMetadata,
Bindings: bindings,
MappingStrategy: mappingStrategy,
}
hostCache[ir.IRHostname(ruleHostname)] = irVHost
}
if rule.HTTP == nil {
t.log.Info("skipping generating endpoints for ingress rule with empty http section")
continue
}
irRoutes := t.ingressPathsToIR(ingress, ruleHostname, rule.HTTP.Paths, upstreamCache)
irVHost.Routes = append(irVHost.Routes, irRoutes...)
hostCache[ir.IRHostname(ruleHostname)] = irVHost
}
}
// #region Ingress Paths IR
// ingressPathsToIR constructs IRRoutes for the path matches under a given ingress rule
func (t *translator) ingressPathsToIR(ingress *netv1.Ingress, ruleHostname string, ingressPaths []netv1.HTTPIngressPath, upstreamCache map[ir.IRServiceKey]*ir.IRUpstream) []*ir.IRRoute {
irRoutes := []*ir.IRRoute{}
for _, pathMatch := range ingressPaths {
destination, err := t.ingressBackendToIR(ingress, &pathMatch.Backend, upstreamCache)
if err != nil {
t.log.Error(err, "ingress rule could not be successfully processed. other ingress rules will continue to be evaluated",
"ingress", fmt.Sprintf("%s.%s", ingress.Name, ingress.Namespace),
"hostname", ruleHostname,
"path", pathMatch.Path,
)
continue
}
pathType := netv1PathTypeToIR(t.log, pathMatch.PathType)
irRoutes = append(irRoutes, &ir.IRRoute{
HTTPMatchCriteria: &ir.IRHTTPMatch{
Path: &pathMatch.Path,
PathType: &pathType,
},
Destinations: []*ir.IRDestination{destination},
})
}
return irRoutes
}
// #region Ingress Backend IR
// ingressBackendToIR constructs an IRDestination from an ingress backend. Currently only service and traffic policies are supported
func (t *translator) ingressBackendToIR(ingress *netv1.Ingress, backend *netv1.IngressBackend, upstreamCache map[ir.IRServiceKey]*ir.IRUpstream) (*ir.IRDestination, error) {
// First check if we are supplying a traffic policy as the backend
if resourceRef := backend.Resource; resourceRef != nil {
if strings.ToLower(resourceRef.Kind) != "ngroktrafficpolicy" {
return nil, fmt.Errorf("ingress backend resource reference to unsupported kind: %q. currently only NgrokTrafficPolicy is supported for resource backends", resourceRef.Kind)
}
if resourceRef.APIGroup != nil && *resourceRef.APIGroup != "ngrok.k8s.ngrok.com" {
return nil, fmt.Errorf("ingress backend resource to invalid group: %q. currently only NgrokTrafficPolicy is supported for resource backends with API Group \"ngrok.k8s.ngrok.com\"", *resourceRef.APIGroup)
}
routePolicyCfg, err := t.store.GetNgrokTrafficPolicyV1(resourceRef.Name, ingress.Namespace)
if err != nil {
return nil, fmt.Errorf("unable to resolve traffic policy backend for ingress rule: %w", err)
}
routeTrafficPolicy, err := trafficpolicy.NewTrafficPolicyFromJSON(routePolicyCfg.Spec.Policy)
if err != nil {
return nil, err
}
if len(routeTrafficPolicy.OnTCPConnect) != 0 {
return nil, errors.New("traffic policies supplied as ingress backends may not contain any on_tcp_connect rules as there is no way to only run them for certain routes")
}
return &ir.IRDestination{
TrafficPolicies: []*trafficpolicy.TrafficPolicy{routeTrafficPolicy},
}, nil
}
// If the backend is not a traffic policy, then it must be a service
if backend.Service == nil {
return nil, errors.New("ingress backend is invalid. Not an NgrokTrafficPolicy or service")
}
serviceName := backend.Service.Name
service, err := t.store.GetServiceV1(serviceName, ingress.Namespace)
if err != nil {
return nil, fmt.Errorf("failed to resolve backend service name: %q in namespace %q: %w",
serviceName,
ingress.Namespace,
err,
)
}
servicePort, err := findServicesPort(t.log, service, backend.Service.Port)
if err != nil || servicePort == nil {
return nil, fmt.Errorf("failed to resolve backend service's port. name: %q, namespace: %q: %w",
serviceName,
ingress.Namespace,
err,
)
}
portProto, err := getProtoForServicePort(t.log, service, servicePort.Name, ir.IRProtocol_HTTP)
if err != nil {
// When this function errors we still get a valid default, so no need to return
t.log.Error(err, "error getting protocol for ingress backend service port")
}
irScheme, err := protocolStringToIRScheme(portProto)
if err != nil {
t.log.Error(err, "error getting scheme from port protocol for ingress backend service port",
"service", fmt.Sprintf("%s.%s", service.Name, service.Namespace),
"port name", servicePort.Name,
"port number", servicePort.Port,
)
}
appProtocol := getPortAppProtocol(t.log, service, servicePort)
irService := ir.IRService{
UID: string(service.UID),
Name: serviceName,
Namespace: ingress.Namespace,
Port: servicePort.Port,
Scheme: irScheme,
Protocol: appProtocol,
}
owningResource := ir.OwningResource{
Kind: "Ingress",
Name: ingress.Name,
Namespace: ingress.Namespace,
}
upstream, exists := upstreamCache[irService.Key()]
if !exists {
upstream = &ir.IRUpstream{
Service: irService,
OwningResources: []ir.OwningResource{owningResource},
}
upstreamCache[irService.Key()] = upstream
} else {
upstream.AddOwningResource(owningResource)
}
return &ir.IRDestination{
Upstream: upstream,
}, nil
}
// #region Helpers
func trafficPolicyFromAnnotation(store store.Storer, obj client.Object) (tp *trafficpolicy.TrafficPolicy, objRef *ir.OwningResource, err error) {
tpName, err := annotations.ExtractNgrokTrafficPolicyFromAnnotations(obj)
if err != nil {
if errors.IsMissingAnnotations(err) {
return nil, nil, nil
}
return nil, nil, fmt.Errorf("error getting ngrok traffic policy for %s %q: %w",
obj.GetObjectKind().GroupVersionKind().Kind,
fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace()),
err,
)
}
tpObj, err := store.GetNgrokTrafficPolicyV1(tpName, obj.GetNamespace())
if err != nil {
return nil, nil, fmt.Errorf("unable to load traffic policy for %s %q from annotations: %w",
obj.GetObjectKind().GroupVersionKind().Kind,
fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace()),
err,
)
}
trafficPolicyCfg, err := trafficpolicy.NewTrafficPolicyFromJSON(tpObj.Spec.Policy)
if err != nil {
return nil, nil, fmt.Errorf("failed to parse traffic policy for %s %q: %w",
obj.GetObjectKind().GroupVersionKind().Kind,
fmt.Sprintf("%s.%s", obj.GetName(), obj.GetNamespace()),
err,
)
}
return trafficPolicyCfg, &ir.OwningResource{
Kind: "NgrokTrafficPolicy",
Name: tpObj.Name,
Namespace: tpObj.Namespace,
}, nil
}