Skip to content

Commit 7ee0368

Browse files
Multiplex socket to support both http and grpc requests
1 parent 6eada99 commit 7ee0368

File tree

14 files changed

+1332
-92
lines changed

14 files changed

+1332
-92
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ require (
5151
github.com/rhysd/go-github-selfupdate v1.2.3
5252
github.com/sirupsen/logrus v1.9.3
5353
github.com/skratchdot/open-golang v0.0.0-20200116055534-eef842397966
54+
github.com/soheilhy/cmux v0.1.5
5455
github.com/spf13/cobra v1.8.1
5556
github.com/spf13/pflag v1.0.5
5657
github.com/takama/daemon v1.0.0

go.sum

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -803,6 +803,7 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
803803
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
804804
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
805805
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
806+
golang.org/x/net v0.0.0-20201202161906-c7110b5ffcbb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
806807
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
807808
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
808809
golang.org/x/net v0.0.0-20210331212208-0fccb6fa2b5c/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=

pkg/daemon/workspace/network/network_proxy.go

Lines changed: 139 additions & 92 deletions
Original file line numberDiff line numberDiff line change
@@ -5,28 +5,30 @@ import (
55
"errors"
66
"fmt"
77
"net"
8+
"net/http"
89
"os"
9-
"strconv"
10+
"sync"
1011
"time"
1112

12-
"github.com/loft-sh/devpod/pkg/ts"
1313
"github.com/loft-sh/log"
1414
"github.com/mwitkow/grpc-proxy/proxy"
15+
"github.com/soheilhy/cmux"
1516
"google.golang.org/grpc"
16-
"google.golang.org/grpc/codes"
17-
"google.golang.org/grpc/credentials/insecure"
18-
"google.golang.org/grpc/metadata"
19-
"google.golang.org/grpc/status"
2017
"tailscale.com/tsnet"
2118
)
2219

23-
// NetworkProxyService proxies gRPC requests based on metadata.
20+
// NetworkProxyService proxies gRPC and HTTP requests over DevPod network.
21+
// It coordinates the listener, cmux, and underlying servers.
2422
type NetworkProxyService struct {
25-
listener net.Listener
26-
grpcServer *grpc.Server
27-
tsServer *tsnet.Server
28-
log log.Logger
29-
socketPath string
23+
mainListener net.Listener
24+
grpcServer *grpc.Server
25+
httpServer *http.Server
26+
tsServer *tsnet.Server
27+
log log.Logger
28+
socketPath string
29+
mux cmux.CMux
30+
grpcDirector *GrpcDirector
31+
httpProxy *HttpProxyHandler
3032
}
3133

3234
// NewNetworkProxyService creates a new instance listening on the given unix socket.
@@ -37,126 +39,171 @@ func NewNetworkProxyService(socketPath string, tsServer *tsnet.Server, log log.L
3739
return nil, fmt.Errorf("failed to listen on socket %s: %w", socketPath, err)
3840
}
3941

40-
if err := os.Chmod(socketPath, 0770); err != nil {
42+
if err := os.Chmod(socketPath, 0777); err != nil {
4143
l.Close()
4244
return nil, fmt.Errorf("failed to set socket permissions on %s: %w", socketPath, err)
4345
}
4446

4547
log.Infof("NetworkProxyService: network proxy listening on socket %s", socketPath)
48+
49+
grpcDirector := NewGrpcDirector(tsServer, log)
50+
httpProxy := NewHttpProxyHandler(tsServer, log)
51+
4652
return &NetworkProxyService{
47-
listener: l,
48-
tsServer: tsServer,
49-
log: log,
50-
socketPath: socketPath,
53+
mainListener: l,
54+
tsServer: tsServer,
55+
log: log,
56+
socketPath: socketPath,
57+
grpcDirector: grpcDirector,
58+
httpProxy: httpProxy,
5159
}, nil
5260
}
5361

5462
// Start runs the gRPC reverse proxy server.
5563
func (s *NetworkProxyService) Start(ctx context.Context) error {
56-
director := func(ctx context.Context, fullMethodName string) (context.Context, *grpc.ClientConn, error) {
57-
md, ok := metadata.FromIncomingContext(ctx)
58-
if !ok {
59-
s.log.Warnf("[NetworkProxyService] [gRPC] Director missing incoming metadata for call %q", fullMethodName)
60-
return nil, nil, status.Errorf(codes.InvalidArgument, "missing metadata")
61-
}
62-
mdCopy := md.Copy()
63-
64-
targetHosts := mdCopy.Get("x-target-host")
65-
targetPorts := mdCopy.Get("x-target-port")
66-
proxyPorts := mdCopy.Get("x-proxy-port")
67-
if len(targetHosts) == 0 || len(targetPorts) == 0 || len(proxyPorts) == 0 {
68-
s.log.Errorf("[NetworkProxyService] [gRPC] Director missing x-target-host, x-proxy-port or x-target-port metadata for call %q", fullMethodName)
69-
return nil, nil, status.Errorf(codes.InvalidArgument, "missing x-target-host, x-proxy-port or x-target-port metadata")
70-
}
64+
// Create connection multiplexer
65+
s.mux = cmux.New(s.mainListener)
7166

72-
proxyPort, err := strconv.Atoi(proxyPorts[0])
73-
if err != nil {
74-
return nil, nil, err
75-
}
76-
targetAddr := ts.EnsureURL(targetHosts[0], proxyPort)
67+
// Matchers
68+
grpcL := s.mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc"))
69+
httpL := s.mux.Match(cmux.Any())
7770

78-
s.log.Infof("[NetworkProxyService] [gRPC] Proxying call %q to target %s", fullMethodName, targetAddr)
71+
// Servers
72+
s.grpcServer = grpc.NewServer(
73+
grpc.UnknownServiceHandler(proxy.TransparentHandler(s.grpcDirector.DirectorFunc)),
74+
)
75+
s.httpServer = &http.Server{
76+
Handler: s.httpProxy,
77+
}
7978

80-
// Create a custom dialer using the tsnet server.
81-
tsDialer := func(ctx context.Context, addr string) (net.Conn, error) {
82-
s.log.Debugf("[NetworkProxyService] [gRPC] Dialing target %s via tsnet", addr)
83-
conn, err := s.tsServer.Dial(ctx, "tcp", addr)
84-
if err != nil {
85-
s.log.Errorf("[NetworkProxyService] [gRPC] Failed to dial target %s via tsnet: %v", addr, err)
86-
return nil, err
87-
}
88-
return conn, nil
89-
}
79+
// Start servers
80+
var runWg sync.WaitGroup
81+
errChan := make(chan error, 3)
9082

91-
// Dial the target gRPC server (the second proxy) using the tsnet dialer.
92-
conn, err := grpc.DialContext(ctx, targetAddr,
93-
grpc.WithContextDialer(tsDialer),
94-
grpc.WithTransportCredentials(insecure.NewCredentials()),
95-
grpc.WithCodec(proxy.Codec()),
96-
)
97-
if err != nil {
98-
s.log.Errorf("[NetworkProxyService] [gRPC] Failed to dial target backend %s: %v", targetAddr, err)
99-
return nil, nil, status.Errorf(codes.Internal, "failed to dial target backend: %v", err)
83+
runWg.Add(1)
84+
go func() {
85+
defer runWg.Done()
86+
s.log.Debugf("NetworkProxyService: starting gRPC server...")
87+
if err := s.grpcServer.Serve(grpcL); err != nil && !errors.Is(err, grpc.ErrServerStopped) && !errors.Is(err, cmux.ErrListenerClosed) {
88+
s.log.Errorf("NetworkProxyService: gRPC server error: %v", err)
89+
errChan <- fmt.Errorf("gRPC server error: %w", err)
90+
} else {
91+
s.log.Debugf("NetworkProxyService: gRPC server stopped.")
10092
}
93+
}()
10194

102-
outCtx := metadata.NewOutgoingContext(ctx, mdCopy)
103-
104-
return outCtx, conn, nil
105-
}
106-
107-
// Create the gRPC server with the transparent proxy handler.
108-
s.grpcServer = grpc.NewServer(
109-
grpc.UnknownServiceHandler(proxy.TransparentHandler(director)),
110-
)
95+
runWg.Add(1)
96+
go func() {
97+
defer runWg.Done()
98+
s.log.Debugf("NetworkProxyService: starting HTTP server...")
99+
if err := s.httpServer.Serve(httpL); err != nil && !errors.Is(err, http.ErrServerClosed) && !errors.Is(err, cmux.ErrListenerClosed) {
100+
s.log.Errorf("NetworkProxyService: HTTP server error: %v", err)
101+
errChan <- fmt.Errorf("HTTP server error: %w", err)
102+
} else {
103+
s.log.Debugf("NetworkProxyService: HTTP server stopped.")
104+
}
105+
}()
111106

112-
s.log.Infof("NetworkProxyService: starting gRPC server on %s", s.socketPath)
107+
runWg.Add(1)
113108
go func() {
114-
if err := s.grpcServer.Serve(s.listener); err != nil && !errors.Is(err, net.ErrClosed) {
115-
s.log.Errorf("NetworkProxyService: gRPC server error: %v", err)
116-
} else if errors.Is(err, net.ErrClosed) {
117-
s.log.Infof("NetworkProxyService: gRPC server stopped gracefully.")
109+
defer runWg.Done()
110+
s.log.Infof("NetworkProxyService: starting server...")
111+
err := s.mux.Serve()
112+
if err != nil && !errors.Is(err, net.ErrClosed) && !errors.Is(err, cmux.ErrListenerClosed) {
113+
s.log.Errorf("NetworkProxyService: server error: %v", err)
114+
errChan <- fmt.Errorf("server error: %w", err)
115+
} else {
116+
s.log.Infof("NetworkProxyService: server stopped.")
118117
}
119118
}()
120119

121-
<-ctx.Done()
122-
s.log.Infof("NetworkProxyService: context cancelled, shutting down proxy service")
120+
s.log.Infof("NetworkProxyService: successfully started listeners on %s", s.socketPath)
121+
122+
var finalErr error
123+
select {
124+
case <-ctx.Done():
125+
s.log.Infof("NetworkProxyService: context cancelled, shutting down proxy service")
126+
finalErr = ctx.Err()
127+
case err := <-errChan:
128+
s.log.Errorf("NetworkProxyService: server error triggered shutdown: %v", err)
129+
finalErr = err
130+
}
131+
123132
s.Stop()
124-
return ctx.Err()
133+
134+
s.log.Debugf("NetworkProxyService: Waiting for servers to exit...")
135+
runWg.Wait()
136+
s.log.Debugf("NetworkProxyService: All servers exited.")
137+
138+
return finalErr
125139
}
126140

127-
// Stop gracefully shuts down the gRPC server and closes the listener.
128141
func (s *NetworkProxyService) Stop() {
129142
s.log.Infof("NetworkProxyService: stopping proxy service...")
130-
stopped := make(chan struct{})
143+
144+
shutdownCtx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
145+
defer cancel()
146+
147+
var shutdownWg sync.WaitGroup
148+
shutdownWg.Add(2)
149+
131150
go func() {
151+
defer shutdownWg.Done()
132152
if s.grpcServer != nil {
133153
s.grpcServer.GracefulStop()
154+
s.log.Debugf("NetworkProxyService: gRPC server stopped.")
134155
}
135-
close(stopped)
136156
}()
137157

138-
// Wait for graceful stop with a timeout
158+
go func() {
159+
defer shutdownWg.Done()
160+
if s.httpServer != nil {
161+
if err := s.httpServer.Shutdown(shutdownCtx); err != nil {
162+
s.log.Warnf("NetworkProxyService: HTTP server shutdown error: %v", err)
163+
} else {
164+
s.log.Debugf("NetworkProxyService: HTTP server stopped.")
165+
}
166+
}
167+
}()
168+
169+
s.log.Infof("NetworkProxyService: waiting for servers to stop...")
170+
171+
waitDone := make(chan struct{})
172+
go func() {
173+
defer close(waitDone)
174+
shutdownWg.Wait()
175+
}()
176+
139177
select {
140-
case <-time.After(10 * time.Second):
141-
s.log.Warnf("NetworkProxyService: Graceful stop timed out, forcing listener close.")
142-
case <-stopped:
143-
s.log.Infof("NetworkProxyService: gRPC server stopped.")
178+
case <-waitDone:
179+
s.log.Debugf("NetworkProxyService: All server shutdowns completed.")
180+
case <-shutdownCtx.Done():
181+
s.log.Warnf("NetworkProxyService: Graceful shutdown timed out after waiting for servers.")
144182
}
145183

146-
if s.listener != nil {
147-
if err := s.listener.Close(); err != nil {
148-
if !errors.Is(err, net.ErrClosed) {
149-
s.log.Errorf("NetworkProxyService: error closing listener: %v", err)
184+
s.log.Debugf("NetworkProxyService: Listener and socket cleanup.")
185+
186+
if s.mainListener != nil {
187+
s.log.Debugf("NetworkProxyService: Closing main listener...")
188+
if err := s.mainListener.Close(); err != nil {
189+
if !errors.Is(err, net.ErrClosed) && !errors.Is(err, cmux.ErrListenerClosed) {
190+
s.log.Errorf("NetworkProxyService: Error closing main listener: %v", err)
191+
} else {
192+
s.log.Debugf("NetworkProxyService: Main listener closed.")
150193
}
151194
} else {
152-
s.log.Infof("NetworkProxyService: Listener closed.")
195+
s.log.Debugf("NetworkProxyService: Main listener closed successfully.")
153196
}
197+
} else {
198+
s.log.Warnf("NetworkProxyService: Main listener was nil during stop.")
154199
}
155200

156-
if s.socketPath != "" {
157-
if err := os.Remove(s.socketPath); err != nil && !errors.Is(err, os.ErrNotExist) {
158-
s.log.Warnf("NetworkProxyService: failed to remove socket file %s: %v", s.socketPath, err)
159-
}
201+
s.log.Debugf("NetworkProxyService: Removing socket file %s", s.socketPath)
202+
if err := os.Remove(s.socketPath); err != nil && !errors.Is(err, os.ErrNotExist) {
203+
s.log.Warnf("NetworkProxyService: Failed to remove socket file %s: %v", s.socketPath, err)
204+
} else if err == nil {
205+
s.log.Debugf("NetworkProxyService: Removed socket file %s", s.socketPath)
160206
}
161-
s.log.Infof("NetworkProxyService: proxy service stopped.")
207+
208+
s.log.Infof("NetworkProxyService: Proxy service stopped.")
162209
}

vendor/github.com/soheilhy/cmux/.gitignore

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/soheilhy/cmux/.travis.yml

Lines changed: 29 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

vendor/github.com/soheilhy/cmux/CONTRIBUTORS

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)