Skip to content

Commit 6c6a7c9

Browse files
grpc reverse proxy
1 parent 2f71d8c commit 6c6a7c9

File tree

19 files changed

+1565
-102
lines changed

19 files changed

+1565
-102
lines changed

cmd/agent/container/credentials_server.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,13 @@ import (
1313
"github.com/loft-sh/devpod/pkg/agent/tunnel"
1414
"github.com/loft-sh/devpod/pkg/agent/tunnelserver"
1515
"github.com/loft-sh/devpod/pkg/credentials"
16+
locald "github.com/loft-sh/devpod/pkg/daemon/local"
1617
"github.com/loft-sh/devpod/pkg/dockercredentials"
1718
"github.com/loft-sh/devpod/pkg/gitcredentials"
1819
"github.com/loft-sh/devpod/pkg/gitsshsigning"
1920
"github.com/loft-sh/devpod/pkg/netstat"
2021
portpkg "github.com/loft-sh/devpod/pkg/port"
22+
"github.com/loft-sh/devpod/pkg/ts"
2123
"github.com/loft-sh/log"
2224
"github.com/spf13/cobra"
2325
)
@@ -69,10 +71,23 @@ func NewCredentialsServerCmd(flags *flags.GlobalFlags) *cobra.Command {
6971

7072
// Run runs the command logic
7173
func (cmd *CredentialsServerCmd) Run(ctx context.Context, port int) error {
74+
var tunnelClient tunnel.TunnelClient
75+
var err error
76+
7277
// create a grpc client
73-
tunnelClient, err := tunnelserver.NewTunnelClient(os.Stdin, os.Stdout, true, ExitCodeIO)
74-
if err != nil {
75-
return fmt.Errorf("error creating tunnel client: %w", err)
78+
// if we have client address, lets use the http client
79+
if cmd.Client != "" {
80+
address := ts.EnsureURL(cmd.Client, locald.LocalCredentialsServerPort)
81+
tunnelClient, err = tunnelserver.NewHTTPTunnelClient(address)
82+
if err != nil {
83+
return fmt.Errorf("error creating tunnel client: %w", err)
84+
}
85+
} else {
86+
// otherwise we fallback to stdio client
87+
tunnelClient, err = tunnelserver.NewTunnelClient(os.Stdin, os.Stdout, true, ExitCodeIO)
88+
if err != nil {
89+
return fmt.Errorf("error creating tunnel client: %w", err)
90+
}
7691
}
7792

7893
// this message serves as a ping to the client

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ require (
5050
github.com/rhysd/go-github-selfupdate v1.2.3
5151
github.com/sirupsen/logrus v1.9.3
5252
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
53+
github.com/soheilhy/cmux v0.1.5
5354
github.com/spf13/cobra v1.8.1
5455
github.com/spf13/pflag v1.0.5
5556
github.com/takama/daemon v1.0.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -767,6 +767,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
767767
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
768768
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
769769
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
770+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
770771
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
771772
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
772773
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=

pkg/agent/tunnelserver/client.go

Lines changed: 44 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/loft-sh/devpod/pkg/stdio"
1111
"google.golang.org/grpc"
1212
"google.golang.org/grpc/credentials/insecure"
13+
"google.golang.org/grpc/metadata"
1314
"google.golang.org/grpc/resolver"
1415
)
1516

@@ -36,23 +37,58 @@ func NewTunnelClient(reader io.Reader, writer io.WriteCloser, exitOnClose bool,
3637
return c, nil
3738
}
3839

39-
// NewTunnelClient creates a gRPC tunnel client that connects via the Unix domain socket,
40-
// using the shared dialer from the network package.
41-
func NewHTTPTunnelClient(_ io.Reader, _ io.WriteCloser, _ bool, _ int) (tunnel.TunnelClient, error) {
42-
// After moving from deprecated grpc.Dial to grpc.NewClient we need to setup resolver first
43-
// https://github.com/grpc/grpc-go/issues/1786#issuecomment-2119088770
40+
func NewHTTPTunnelClient(localServerAddr string) (tunnel.TunnelClient, error) {
41+
// Set the default resolver scheme.
4442
resolver.SetDefaultScheme("passthrough")
4543

46-
// Set up a connection to the server.
47-
conn, err := grpc.NewClient("",
44+
// Create a unary interceptor to attach the metadata.
45+
unaryInterceptor := func(
46+
ctx context.Context,
47+
method string,
48+
req, reply interface{},
49+
cc *grpc.ClientConn,
50+
invoker grpc.UnaryInvoker,
51+
opts ...grpc.CallOption,
52+
) error {
53+
// Create new metadata with the required headers.
54+
md := metadata.New(map[string]string{
55+
"X-LOFT-TARGET": localServerAddr,
56+
"X-TARGET-PORT": "9999",
57+
})
58+
// Add this metadata to the outgoing context.
59+
ctx = metadata.NewOutgoingContext(ctx, md)
60+
return invoker(ctx, method, req, reply, cc, opts...)
61+
}
62+
63+
// Create a stream interceptor to attach the metadata.
64+
streamInterceptor := func(
65+
ctx context.Context,
66+
desc *grpc.StreamDesc,
67+
cc *grpc.ClientConn,
68+
method string,
69+
streamer grpc.Streamer,
70+
opts ...grpc.CallOption,
71+
) (grpc.ClientStream, error) {
72+
md := metadata.New(map[string]string{
73+
"X-LOFT-TARGET": localServerAddr,
74+
"X-TARGET-PORT": "9999",
75+
})
76+
ctx = metadata.NewOutgoingContext(ctx, md)
77+
return streamer(ctx, desc, cc, method, opts...)
78+
}
79+
80+
// Set up a connection to the server using the shared dialer.
81+
conn, err := grpc.NewClient(localServerAddr,
4882
grpc.WithTransportCredentials(insecure.NewCredentials()),
4983
grpc.WithContextDialer(network.GetContextDialer()),
84+
grpc.WithUnaryInterceptor(unaryInterceptor),
85+
grpc.WithStreamInterceptor(streamInterceptor),
5086
)
5187
if err != nil {
5288
return nil, err
5389
}
5490

91+
// Create and return the TunnelClient using the generated proto.
5592
c := tunnel.NewTunnelClient(conn)
56-
5793
return c, nil
5894
}

pkg/agent/tunnelserver/tunnelserver.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"fmt"
9+
"io"
910
"net"
1011
"os"
1112
"path/filepath"
@@ -23,13 +24,27 @@ import (
2324
"github.com/loft-sh/devpod/pkg/netstat"
2425
"github.com/loft-sh/devpod/pkg/platform"
2526
provider2 "github.com/loft-sh/devpod/pkg/provider"
27+
"github.com/loft-sh/devpod/pkg/stdio"
2628
"github.com/loft-sh/log"
2729
"github.com/moby/patternmatcher/ignorefile"
2830
perrors "github.com/pkg/errors"
2931
"google.golang.org/grpc"
3032
"google.golang.org/grpc/reflection"
3133
)
3234

35+
// GetListener returns correct listener for services server - either stdio or tcp
36+
func GetListener(client string, reader io.Reader, writer io.WriteCloser, exitOnClose bool) (net.Listener, error) {
37+
if client == "" {
38+
return stdio.NewStdioListener(reader, writer, exitOnClose), nil
39+
}
40+
listener, err := net.Listen("tcp", ":4795") // FIXME
41+
if err != nil {
42+
return nil, err
43+
}
44+
defer listener.Close()
45+
return listener, nil
46+
}
47+
3348
func RunServicesServer(ctx context.Context, lis net.Listener, allowGitCredentials, allowDockerCredentials bool, forwarder netstat.Forwarder, workspace *provider2.Workspace, log log.Logger, options ...Option) error {
3449
opts := append(options, []Option{
3550
WithForwarder(forwarder),

pkg/daemon/local/credentials_proxy.go

Lines changed: 113 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -3,25 +3,25 @@ package local
33
import (
44
"context"
55
"fmt"
6+
"io"
67
"net"
78
"net/http"
8-
"net/http/httputil"
9-
"net/url"
109
"time"
1110

1211
"github.com/loft-sh/log"
12+
"google.golang.org/grpc"
13+
"google.golang.org/grpc/metadata"
14+
"google.golang.org/grpc/status"
1315
"tailscale.com/tsnet"
1416
)
1517

1618
const (
1719
// Listen on this port via tsnet.
1820
LocalCredentialsServerPort = 9999 // FIXME - use random prot
1921
// Target server: local gRPC server running on port 5555.
20-
TargetServer = "http://localhost:5555" // FIXME - get port from request
22+
TargetServer = "http://localhost:4795" // FIXME - get port from request
2123
)
2224

23-
// LocalCredentialsServerProxy acts as a reverse proxy that blindly forwards
24-
// all incoming traffic to the local gRPC server on port 5555.
2525
type LocalCredentialsServerProxy struct {
2626
log log.Logger
2727
tsServer *tsnet.Server
@@ -30,16 +30,13 @@ type LocalCredentialsServerProxy struct {
3030
srv *http.Server
3131
}
3232

33-
// NewLocalCredentialsServerProxy initializes a new LocalCredentialsServerProxy.
3433
func NewLocalCredentialsServerProxy(tsServer *tsnet.Server, log log.Logger) (*LocalCredentialsServerProxy, error) {
3534
return &LocalCredentialsServerProxy{
3635
log: log,
3736
tsServer: tsServer,
3837
}, nil
3938
}
4039

41-
// Listen creates the tsnet listener and HTTP server,
42-
// and registers a catch-all handler that acts as the reverse proxy.
4340
func (s *LocalCredentialsServerProxy) Listen(ctx context.Context) error {
4441
s.log.Info("Starting reverse proxy for local gRPC server")
4542

@@ -51,67 +48,132 @@ func (s *LocalCredentialsServerProxy) Listen(ctx context.Context) error {
5148
}
5249
s.ln = ln
5350

54-
mux := http.NewServeMux()
55-
mux.HandleFunc("/", s.handleReverseProxy)
51+
serverOpts := []grpc.ServerOption{
52+
grpc.UnaryInterceptor(s.unaryProxyInterceptor),
53+
grpc.StreamInterceptor(s.streamProxyInterceptor),
54+
}
5655

57-
// Create the HTTP server.
58-
s.srv = &http.Server{
59-
Handler: mux,
56+
grpcServer := grpc.NewServer(serverOpts...)
57+
s.log.Infof("gRPC reverse proxy listening on %s", fmt.Sprintf(":%d", LocalCredentialsServerPort))
58+
if err := grpcServer.Serve(ln); err != nil {
59+
s.log.Fatalf("failed to serve: %v", err)
6060
}
61+
return nil
62+
}
6163

62-
go func() {
63-
<-ctx.Done()
64-
s.log.Info("Context canceled, shutting down reverse proxy")
65-
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
66-
defer cancel()
67-
if err := s.srv.Shutdown(shutdownCtx); err != nil {
68-
s.log.Errorf("Error shutting down reverse proxy: %v", err)
69-
}
70-
}()
64+
func (s *LocalCredentialsServerProxy) unaryProxyInterceptor(
65+
ctx context.Context,
66+
req interface{},
67+
info *grpc.UnaryServerInfo,
68+
handler grpc.UnaryHandler,
69+
) (interface{}, error) {
70+
s.log.Infof("Start unary proxy interceptor - %v \n", info)
71+
md, ok := metadata.FromIncomingContext(ctx)
72+
if !ok {
73+
return nil, status.Errorf(400, "missing metadata")
74+
}
7175

72-
s.log.Infof("Reverse proxy listening on tsnet port %d", LocalCredentialsServerPort)
73-
err = s.srv.Serve(ln)
74-
if err != nil && err != http.ErrServerClosed {
75-
s.log.Errorf("Reverse proxy error: %v", err)
76-
return err
76+
// Retrieve target headers.
77+
targetHosts := md.Get("x-target-host")
78+
targetPorts := md.Get("x-target-port")
79+
if len(targetHosts) == 0 || len(targetPorts) == 0 {
80+
return nil, status.Errorf(400, "missing x-target-host or x-target-port metadata")
7781
}
82+
targetAddr := fmt.Sprintf("%s:%s", targetHosts[0], targetPorts[0])
83+
s.log.Infof("Proxying unary call %q to target %s", info.FullMethod, targetAddr)
7884

79-
return nil
85+
// Establish connection to the target server.
86+
conn, err := grpc.Dial(targetAddr, grpc.WithInsecure())
87+
if err != nil {
88+
return nil, status.Errorf(503, "failed to dial target %s: %v", targetAddr, err)
89+
}
90+
defer conn.Close()
91+
92+
var resp interface{}
93+
// Forward the call.
94+
err = conn.Invoke(ctx, info.FullMethod, req, &resp)
95+
if err != nil {
96+
return nil, status.Errorf(500, "error invoking target: %v", err)
97+
}
98+
return resp, nil
8099
}
81100

82-
// handleReverseProxy forwards every request to the target gRPC server.
83-
func (s *LocalCredentialsServerProxy) handleReverseProxy(w http.ResponseWriter, r *http.Request) {
84-
s.log.Infof("Forwarding request %s %s to target server", r.Method, r.URL.String())
101+
func (s *LocalCredentialsServerProxy) streamProxyInterceptor(
102+
srv interface{},
103+
ss grpc.ServerStream,
104+
info *grpc.StreamServerInfo,
105+
handler grpc.StreamHandler,
106+
) error {
107+
s.log.Infof("Start stream proxy interceptor - %v \n", info)
108+
// Extract incoming metadata.
109+
md, ok := metadata.FromIncomingContext(ss.Context())
110+
if !ok {
111+
return status.Errorf(400, "missing metadata")
112+
}
113+
114+
// Retrieve target metadata.
115+
targetHosts := md.Get("x-target-host")
116+
targetPorts := md.Get("x-target-port")
117+
if len(targetHosts) == 0 || len(targetPorts) == 0 {
118+
return status.Errorf(400, "missing x-target-host or x-target-port metadata")
119+
}
120+
targetAddr := fmt.Sprintf("%s:%s", targetHosts[0], targetPorts[0])
121+
s.log.Infof("Proxying streaming call %q to target %s", info.FullMethod, targetAddr)
85122

86-
// Parse the target URL.
87-
targetURL, err := url.Parse(TargetServer)
123+
// Dial the target server.
124+
conn, err := grpc.Dial(targetAddr, grpc.WithInsecure())
88125
if err != nil {
89-
s.log.Errorf("Error parsing target URL %s: %v", TargetServer, err)
90-
http.Error(w, "Bad Gateway", http.StatusBadGateway)
91-
return
126+
return status.Errorf(503, "failed to dial target %s: %v", targetAddr, err)
92127
}
128+
defer conn.Close()
93129

94-
// Create the reverse proxy.
95-
proxy := httputil.NewSingleHostReverseProxy(targetURL)
130+
// Create a new context for the client stream and attach metadata.
131+
clientCtx := metadata.NewOutgoingContext(ss.Context(), md)
96132

97-
// Customize the director to forward the Host header to the target.
98-
originalDirector := proxy.Director
99-
proxy.Director = func(req *http.Request) {
100-
originalDirector(req)
101-
req.Host = targetURL.Host
133+
clientStream, err := conn.NewStream(clientCtx, &grpc.StreamDesc{
134+
ServerStreams: info.IsServerStream,
135+
ClientStreams: info.IsClientStream,
136+
}, info.FullMethod)
137+
if err != nil {
138+
return status.Errorf(500, "failed to create stream to target: %v", err)
102139
}
103140

104-
// Use an error handler to log any errors that occur.
105-
proxy.ErrorHandler = func(w http.ResponseWriter, r *http.Request, err error) {
106-
s.log.Errorf("Reverse proxy error: %v", err)
107-
http.Error(w, "Bad Gateway", http.StatusBadGateway)
108-
}
141+
errChan := make(chan error, 2)
142+
go func() {
143+
for {
144+
var msg interface{}
145+
if err := ss.RecvMsg(&msg); err != nil {
146+
errChan <- err
147+
return
148+
}
149+
if err := clientStream.SendMsg(msg); err != nil {
150+
errChan <- err
151+
return
152+
}
153+
}
154+
}()
109155

110-
// Forward the request.
111-
proxy.ServeHTTP(w, r)
156+
go func() {
157+
for {
158+
var msg interface{}
159+
if err := clientStream.RecvMsg(&msg); err != nil {
160+
errChan <- err
161+
return
162+
}
163+
if err := ss.SendMsg(msg); err != nil {
164+
errChan <- err
165+
return
166+
}
167+
}
168+
}()
169+
170+
err = <-errChan
171+
if err == io.EOF {
172+
return nil
173+
}
174+
return err
112175
}
113176

114-
// Close gracefully shuts down the reverse proxy.
115177
func (s *LocalCredentialsServerProxy) Close() error {
116178
s.log.Info("Closing reverse proxy")
117179
if s.srv != nil {

0 commit comments

Comments
 (0)