Skip to content

Commit eebd3f5

Browse files
committed
Added tests
1 parent dc4b53b commit eebd3f5

File tree

2 files changed

+310
-423
lines changed

2 files changed

+310
-423
lines changed

projects/gloo/pkg/plugins/tunneling/plugin.go

+54-206
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
package tunneling
22

33
import (
4-
"fmt"
5-
64
envoy_config_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
75
envoy_config_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
86
envoy_config_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
@@ -12,7 +10,6 @@ import (
1210
"github.com/envoyproxy/go-control-plane/pkg/wellknown"
1311
"github.com/golang/protobuf/ptypes/duration"
1412
"github.com/golang/protobuf/ptypes/wrappers"
15-
"github.com/solo-io/gloo/projects/gloo/constants"
1613
v1 "github.com/solo-io/gloo/projects/gloo/pkg/api/v1"
1714
"github.com/solo-io/gloo/projects/gloo/pkg/plugins"
1815
"github.com/solo-io/gloo/projects/gloo/pkg/translator"
@@ -27,8 +24,10 @@ var (
2724
)
2825

2926
const (
30-
ExtensionName = "tunneling"
31-
TunnelingAutogeneratedClusterPrefix = constants.SoloGeneratedClusterPrefix + "self_cluster_"
27+
ExtensionName = "tunneling"
28+
OriginalClusterSuffix = "_original"
29+
forwardingListenerPrefix = "solo_io_generated_self_listener_"
30+
forwardingListenerStatsPrefix = "soloioTcpStats"
3231
)
3332

3433
type plugin struct{}
@@ -44,7 +43,15 @@ func (p *plugin) Name() string {
4443
func (p *plugin) Init(_ plugins.InitParams) {
4544
}
4645

47-
// GeneratedResources generates the tunneling resources
46+
// GeneratedResources scans Upstreams for a tunneling configuration and sets up
47+
// clusters and listeners to forward traffic to an HTTP CONNECT supporting proxy.
48+
//
49+
// HTTP CONNECT tunneling is provided by an Envoy Listener filter. To send traffic to the Listener,
50+
// we must generate a new forwarding Cluster. The generated Cluster sends traffic over
51+
// a pipe to the Listener which forwards the traffic to the original Upstream.
52+
//
53+
// The SSL configuration for the original cluster is copied to the generated cluster and the
54+
// supplied proxy SSL configuration is set on the original cluster.
4855
func (p *plugin) GeneratedResources(params plugins.Params,
4956
inClusters []*envoy_config_cluster_v3.Cluster,
5057
inEndpoints []*envoy_config_endpoint_v3.ClusterLoadAssignment,
@@ -62,38 +69,15 @@ func (p *plugin) GeneratedResources(params plugins.Params,
6269
var newClusters []*envoy_config_cluster_v3.Cluster
6370
var newListeners []*envoy_config_listener_v3.Listener
6471

65-
upstreams := params.Snapshot.Upstreams
66-
67-
// keep track of clusters we've seen in case of multiple routes to same cluster
72+
// track the clusters we have transformed so we don't do it twice
6873
processedClusters := sets.Set[string]{}
6974

70-
// find all the route config that points to upstreams with tunneling
71-
// for _, rtConfig := range inRouteConfigurations {
72-
// for _, vh := range rtConfig.GetVirtualHosts() {
73-
// for _, rt := range vh.GetRoutes() {
74-
// rtAction := rt.GetRoute()
75-
// // we do not handle the weighted cluster or cluster header cases
76-
// if cluster := rtAction.GetCluster(); cluster != "" {
77-
// fmt.Printf("candidate cluster for tunneling: %v\n", cluster)
78-
// var err error
79-
// newClusters, newListeners, err = setupTunnel(cluster, upstreams, params,
80-
// inClusters, rtAction, newClusters, newListeners, processedClusters)
81-
// if err != nil {
82-
// // return what we have so far, so that any modified input resources can still route
83-
// fmt.Printf("error setting up tunneling for cluster %v: %v\n", cluster, err)
84-
// return newClusters, nil, nil, newListeners, nil
85-
// }
86-
// }
87-
// }
88-
// }
89-
// }
90-
9175
// find all upstreams with tunneling enabled
92-
for _, us := range upstreams {
76+
for _, us := range params.Snapshot.Upstreams {
9377
clusterName := translator.UpstreamToClusterName(us.GetMetadata().Ref())
9478

95-
// check if this cluster has already been updated, if so move on
9679
if processedClusters.Has(clusterName) {
80+
logger.Warnf("cluster %v already processed", clusterName)
9781
continue
9882
}
9983

@@ -106,16 +90,15 @@ func (p *plugin) GeneratedResources(params plugins.Params,
10690
// find the cluster to update
10791
cluster := findClusters(inClusters, clusterName)
10892
if cluster == nil {
109-
fmt.Printf("cluster %v not found\n", clusterName)
93+
logger.Warnf("cluster %v not found when setting up HTTP tunnel", clusterName)
11094
continue
11195
}
11296

11397
// change the original cluster name to avoid conflicts with the new cluster
114-
newOriginalClusterName := clusterName + "_original"
115-
cluster.Name = newOriginalClusterName
98+
newOriginalClusterName := clusterName + OriginalClusterSuffix
11699

117100
// use an in-memory pipe to ourselves (only works on linux)
118-
selfPipe := "@/" + clusterName
101+
forwardingPipe := "@/" + clusterName
119102

120103
var originalTransportSocket *envoy_config_core_v3.TransportSocket
121104
if cluster.GetTransportSocket() != nil {
@@ -149,195 +132,58 @@ func (p *plugin) GeneratedResources(params plugins.Params,
149132
}
150133
}
151134

152-
// generate new cluster and replace the old cluster
153-
newCluster := generateSelfCluster(clusterName, selfPipe, originalTransportSocket)
154-
newClusters = append(newClusters, newCluster)
135+
// generate new cluster with original cluster's name and transport socket that points to
136+
// the new listener's pipe
137+
newCluster := generateForwardingCluster(clusterName, forwardingPipe, originalTransportSocket)
155138

156139
tunnelingHeaders := envoyHeadersFromHttpConnectHeaders(us)
157140

158-
// create the listener with the tunneling configuration
141+
// create the listener with the tunneling configuration and point it the original clusters
159142
listener, err := generateForwardingTcpListener(clusterName, newOriginalClusterName,
160-
selfPipe, httpProxyHostname, tunnelingHeaders)
143+
forwardingPipe, httpProxyHostname, tunnelingHeaders)
161144
if err != nil {
162145
return newClusters, nil, nil, newListeners, err
163146
}
164147

148+
// update the original route's cluster name
149+
cluster.Name = newOriginalClusterName
150+
151+
newClusters = append(newClusters, newCluster)
165152
newListeners = append(newListeners, listener)
166153

167-
// mark this cluster as processed
168154
processedClusters.Insert(clusterName)
169-
170-
// replace the cluster with a new cluster that routes to the generated listener
171-
172-
// var err error
173-
// newClusters, newListeners, err = setupTunnel(cluster, upstreams, params,
174-
// inClusters, nil, newClusters, newListeners, processedClusters)
175-
// if err != nil {
176-
// // return what we have so far, so that any modified input resources can still route
177-
// fmt.Printf("error setting up tunneling for cluster %v: %v\n", cluster, err)
178-
// return newClusters, nil, nil, newListeners, nil
179-
// }
180155
}
181156

182157
return newClusters, nil, nil, newListeners, nil
183158
}
184159

185-
// setupTunnel prepare the tunneling configuration for the given cluster
186-
func setupTunnel(
187-
clusterName string,
188-
upstreams v1.UpstreamList,
189-
params plugins.Params,
190-
inClusters []*envoy_config_cluster_v3.Cluster,
191-
rtAction *envoy_config_route_v3.RouteAction,
192-
generatedClusters []*envoy_config_cluster_v3.Cluster,
193-
generatedListeners []*envoy_config_listener_v3.Listener,
194-
processedClusters sets.Set[string],
195-
) (
196-
[]*envoy_config_cluster_v3.Cluster,
197-
[]*envoy_config_listener_v3.Listener,
198-
error,
199-
) {
200-
ref, err := translator.ClusterToUpstreamRef(clusterName)
201-
if err != nil {
202-
// return what we have so far, so that any modified input resources can still route
203-
// successfully to their generated targets
204-
fmt.Printf("error getting upstream ref for cluster %v: %v\n", clusterName, err)
205-
return generatedClusters, generatedListeners, nil
206-
}
207-
208-
us, err := upstreams.Find(ref.GetNamespace(), ref.GetName())
209-
if err != nil {
210-
// return what we have so far, so that any modified input resources can still route
211-
// successfully to their generated targets
212-
fmt.Printf("error finding upstream %v: %v\n", ref, err)
213-
return generatedClusters, generatedListeners, nil
214-
}
215-
216-
// the existence of this value is our indicator that this is a tunneling upstream
217-
tunnelingHostname := us.GetHttpProxyHostname().GetValue()
218-
if tunnelingHostname == "" {
219-
fmt.Print("tunneling hostname is empty\n")
220-
return generatedClusters, generatedListeners, nil
221-
}
222-
223-
var tunnelingHeaders []*envoy_config_core_v3.HeaderValueOption
224-
for _, header := range us.GetHttpConnectHeaders() {
225-
tunnelingHeaders = append(tunnelingHeaders, &envoy_config_core_v3.HeaderValueOption{
226-
Header: &envoy_config_core_v3.HeaderValue{
227-
Key: header.GetKey(),
228-
Value: header.GetValue(),
229-
},
230-
Append: &wrappers.BoolValue{Value: false},
231-
})
232-
}
233-
234-
selfClusterName := TunnelingAutogeneratedClusterPrefix + clusterName
235-
selfPipe := "@/" + clusterName // use an in-memory pipe to ourselves (only works on linux)
236-
237-
// update the old cluster to route to ourselves first
238-
if rtAction != nil {
239-
rtAction.ClusterSpecifier = &envoy_config_route_v3.RouteAction_Cluster{Cluster: selfClusterName}
240-
}
241-
242-
// we only want to generate a new encapsulating cluster and pipe to ourselves if we have not done so already
243-
if processedClusters.Has(clusterName) {
244-
fmt.Printf("cluster %v already processed\n", clusterName)
245-
return generatedClusters, generatedListeners, nil
246-
}
247-
248-
// find the cluster and updates it's transport socket to use the generated cluster if needed
249-
var originalTransportSocket *envoy_config_core_v3.TransportSocket
250-
for _, inCluster := range inClusters {
251-
if inCluster.GetName() == clusterName {
252-
if inCluster.GetTransportSocket() != nil {
253-
tmp := *inCluster.GetTransportSocket()
254-
originalTransportSocket = &tmp
255-
}
256-
257-
// we copy the transport socket to the generated cluster.
258-
// the generated cluster will use upstream TLS context to leverage TLS origination;
259-
// when we encapsulate in HTTP Connect the tcp data being proxied will
260-
// be encrypted (thus we don't need the original transport socket metadata here)
261-
inCluster.TransportSocket = nil
262-
inCluster.TransportSocketMatches = nil
263-
264-
if us.GetHttpConnectSslConfig() == nil {
265-
break
266-
}
267-
268-
// user told us to configure ssl for the http connect proxy
269-
cfg, err := utils.NewSslConfigTranslator().ResolveUpstreamSslConfig(params.Snapshot.Secrets,
270-
us.GetHttpConnectSslConfig())
271-
if err != nil {
272-
break
273-
}
274-
275-
typedConfig, err := utils.MessageToAny(cfg)
276-
if err != nil {
277-
return generatedClusters, generatedListeners, err
278-
}
279-
280-
inCluster.TransportSocket = &envoy_config_core_v3.TransportSocket{
281-
Name: wellknown.TransportSocketTls,
282-
ConfigType: &envoy_config_core_v3.TransportSocket_TypedConfig{TypedConfig: typedConfig},
283-
}
284-
285-
break
286-
}
287-
}
288-
289-
generatedClusters = append(generatedClusters, generateSelfCluster(selfClusterName,
290-
selfPipe, originalTransportSocket))
291-
292-
forwardingTcpListener, err := generateForwardingTcpListener(clusterName, clusterName, selfPipe,
293-
tunnelingHostname, tunnelingHeaders)
294-
if err != nil {
295-
return generatedClusters, generatedListeners, err
296-
}
297-
298-
generatedListeners = append(generatedListeners, forwardingTcpListener)
299-
300-
processedClusters.Insert(clusterName)
301-
302-
return generatedClusters, generatedListeners, nil
303-
}
304-
305-
// the initial route is updated to route to this generated cluster, which routes envoy back to itself (to the
306-
// generated TCP listener, which forwards to the original destination)
307-
//
308-
// the purpose of doing this is to allow both the HTTP Connection Manager filter and TCP filter to run.
309-
// the HTTP Connection Manager runs to allow route-level matching on HTTP parameters (such as request path),
310-
// but then we forward the bytes as raw TCP to the HTTP Connect proxy (which can only be done on a TCP listener)
311-
func generateSelfCluster(
312-
selfClusterName,
313-
selfPipe string,
160+
// generateForwardingCluster generates a cluster will replace the original cluster and send
161+
// the traffic to the generated listener
162+
func generateForwardingCluster(
163+
clusterName,
164+
pipePath string,
314165
originalTransportSocket *envoy_config_core_v3.TransportSocket,
315166
) *envoy_config_cluster_v3.Cluster {
316167
return &envoy_config_cluster_v3.Cluster{
317168
ClusterDiscoveryType: &envoy_config_cluster_v3.Cluster_Type{
318169
Type: envoy_config_cluster_v3.Cluster_STATIC,
319170
},
320171
ConnectTimeout: &duration.Duration{Seconds: 5},
321-
Name: selfClusterName,
172+
Name: clusterName,
322173
TransportSocket: originalTransportSocket,
323-
LoadAssignment: selfPipeLoadAssignment(selfClusterName, selfPipe),
324-
}
325-
}
326-
327-
// selfPipeLoadAssignment returns a load assignment for the pipe
328-
func selfPipeLoadAssignment(clusterName, pipeName string) *envoy_config_endpoint_v3.ClusterLoadAssignment {
329-
return &envoy_config_endpoint_v3.ClusterLoadAssignment{
330-
ClusterName: clusterName,
331-
Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{
332-
{
333-
LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{
334-
{
335-
HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
336-
Endpoint: &envoy_config_endpoint_v3.Endpoint{
337-
Address: &envoy_config_core_v3.Address{
338-
Address: &envoy_config_core_v3.Address_Pipe{
339-
Pipe: &envoy_config_core_v3.Pipe{
340-
Path: pipeName,
174+
LoadAssignment: &envoy_config_endpoint_v3.ClusterLoadAssignment{
175+
ClusterName: clusterName,
176+
Endpoints: []*envoy_config_endpoint_v3.LocalityLbEndpoints{
177+
{
178+
LbEndpoints: []*envoy_config_endpoint_v3.LbEndpoint{
179+
{
180+
HostIdentifier: &envoy_config_endpoint_v3.LbEndpoint_Endpoint{
181+
Endpoint: &envoy_config_endpoint_v3.Endpoint{
182+
Address: &envoy_config_core_v3.Address{
183+
Address: &envoy_config_core_v3.Address_Pipe{
184+
Pipe: &envoy_config_core_v3.Pipe{
185+
Path: pipePath,
186+
},
341187
},
342188
},
343189
},
@@ -350,16 +196,17 @@ func selfPipeLoadAssignment(clusterName, pipeName string) *envoy_config_endpoint
350196
}
351197
}
352198

353-
// the generated cluster routes to this generated listener, which forwards TCP traffic to an HTTP Connect proxy
199+
// generateForwardingTcpListener generates a listener that will forwards traffic to the
200+
// HTTP Connect proxy
354201
func generateForwardingTcpListener(
355202
cluster,
356203
originalCluster,
357-
selfPipePath,
204+
pipePath,
358205
tunnelingHostname string,
359206
tunnelingHeadersToAdd []*envoy_config_core_v3.HeaderValueOption,
360207
) (*envoy_config_listener_v3.Listener, error) {
361208
cfg := &envoytcp.TcpProxy{
362-
StatPrefix: "soloioTcpStats" + cluster,
209+
StatPrefix: forwardingListenerStatsPrefix + cluster,
363210
TunnelingConfig: &envoytcp.TcpProxy_TunnelingConfig{Hostname: tunnelingHostname,
364211
HeadersToAdd: tunnelingHeadersToAdd},
365212
ClusterSpecifier: &envoytcp.TcpProxy_Cluster{Cluster: originalCluster}, // route to original target
@@ -370,11 +217,11 @@ func generateForwardingTcpListener(
370217
}
371218

372219
return &envoy_config_listener_v3.Listener{
373-
Name: "solo_io_generated_self_listener_" + cluster,
220+
Name: forwardingListenerPrefix + cluster,
374221
Address: &envoy_config_core_v3.Address{
375222
Address: &envoy_config_core_v3.Address_Pipe{
376223
Pipe: &envoy_config_core_v3.Pipe{
377-
Path: selfPipePath,
224+
Path: pipePath,
378225
},
379226
},
380227
},
@@ -393,6 +240,7 @@ func generateForwardingTcpListener(
393240
}, nil
394241
}
395242

243+
// envoyHeadersFromHttpConnectHeaders converts the http connect headers to envoy headers
396244
func envoyHeadersFromHttpConnectHeaders(us *v1.Upstream) []*envoy_config_core_v3.HeaderValueOption {
397245
var tunnelingHeaders []*envoy_config_core_v3.HeaderValueOption
398246
for _, header := range us.GetHttpConnectHeaders() {

0 commit comments

Comments
 (0)