Skip to content

Commit 050e6e3

Browse files
author
xianying.czy
committed
feat(sandbox-gateway): add memberlist server for gateway
Signed-off-by: xianying.czy <xianying.czy@alibaba-inc.com>
1 parent d181e77 commit 050e6e3

File tree

5 files changed

+225
-18
lines changed

5 files changed

+225
-18
lines changed

cmd/sandbox-gateway/main.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package main
22

33
import (
44
"context"
5+
"os"
56

67
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
78
envoyhttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
9+
"k8s.io/client-go/kubernetes"
10+
"k8s.io/client-go/rest"
811

12+
"github.com/openkruise/agents/pkg/proxy"
913
"github.com/openkruise/agents/pkg/sandbox-gateway/controller"
1014
"github.com/openkruise/agents/pkg/sandbox-gateway/filter"
15+
peerserver "github.com/openkruise/agents/pkg/sandbox-gateway/server"
1116
)
1217

1318
func init() {
@@ -22,6 +27,31 @@ func init() {
2227
api.LogErrorf("sandbox controller manager exited with error: %v", err)
2328
}
2429
}()
30+
31+
// Start the peer server for handling route synchronization from other peers
32+
go func() {
33+
ctx := context.Background()
34+
35+
// Get Kubernetes config
36+
cfg, err := rest.InClusterConfig()
37+
if err != nil {
38+
api.LogErrorf("failed to get in-cluster config: %v", err)
39+
os.Exit(1)
40+
}
41+
42+
// Create Kubernetes client
43+
client, err := kubernetes.NewForConfig(cfg)
44+
if err != nil {
45+
api.LogErrorf("failed to create kubernetes client: %v", err)
46+
os.Exit(1)
47+
}
48+
49+
peerServer := peerserver.NewServer(client, proxy.SystemPort)
50+
if err := peerServer.Start(ctx); err != nil {
51+
api.LogErrorf("failed to start peer server: %v", err)
52+
os.Exit(1)
53+
}
54+
}()
2555
}
2656

2757
func main() {}
Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"net/http"
9+
"os"
10+
"time"
11+
12+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/klog/v2"
15+
16+
"github.com/openkruise/agents/api/v1alpha1"
17+
"github.com/openkruise/agents/pkg/peers"
18+
"github.com/openkruise/agents/pkg/proxy"
19+
"github.com/openkruise/agents/pkg/sandbox-gateway/registry"
20+
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
21+
"github.com/openkruise/agents/pkg/utils"
22+
)
23+
24+
// Environment variable names for peer discovery
25+
const (
26+
EnvNamespace = "PEER_NAMESPACE"
27+
EnvLabelSelector = "PEER_LABEL_SELECTOR"
28+
)
29+
30+
const (
31+
// MemberlistBindPort is the default port for memberlist gossip
32+
MemberlistBindPort = 7946
33+
)
34+
35+
// Server handles peer-to-peer communication for route synchronization
36+
type Server struct {
37+
httpServer *http.Server
38+
peerManager *peers.MemberlistPeers
39+
port int
40+
client kubernetes.Interface
41+
}
42+
43+
// NewServer creates a new peer server
44+
func NewServer(client kubernetes.Interface, port int) *Server {
45+
if port == 0 {
46+
port = proxy.SystemPort
47+
}
48+
return &Server{
49+
port: port,
50+
client: client,
51+
}
52+
}
53+
54+
// Start starts the HTTP server for handling refresh requests from peers
55+
func (s *Server) Start(ctx context.Context) error {
56+
log := klog.FromContext(ctx)
57+
58+
mux := http.NewServeMux()
59+
mux.HandleFunc(proxy.RefreshAPI, s.handleRefresh)
60+
61+
s.httpServer = &http.Server{
62+
Addr: fmt.Sprintf(":%d", s.port),
63+
Handler: mux,
64+
ReadHeaderTimeout: 5 * time.Second,
65+
}
66+
67+
// Get node name from environment variables
68+
nodeName := os.Getenv("HOSTNAME")
69+
if nodeName == "" {
70+
nodeName = os.Getenv("POD_NAME")
71+
}
72+
if nodeName == "" {
73+
return fmt.Errorf("HOSTNAME or POD_NAME environment variable must be set")
74+
}
75+
76+
// Get local IP
77+
localIP := utils.GetFirstNonLoopbackIP()
78+
if localIP == "" {
79+
return fmt.Errorf("failed to determine local IP")
80+
}
81+
82+
// Get namespace and label selector from environment variables
83+
namespace := os.Getenv(EnvNamespace)
84+
labelSelector := os.Getenv(EnvLabelSelector)
85+
86+
// Discover existing peers from Kubernetes API
87+
var existingPeers []string
88+
if s.client != nil && namespace != "" && labelSelector != "" {
89+
log.Info("discovering existing peers for memberlist join", "namespace", namespace, "selector", labelSelector)
90+
peerList, err := s.client.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
91+
LabelSelector: labelSelector,
92+
})
93+
if err != nil {
94+
log.Error(err, "failed to list peer pods, continuing without existing peers")
95+
} else {
96+
for _, peer := range peerList.Items {
97+
ip := peer.Status.PodIP
98+
if ip == "" || ip == localIP {
99+
continue
100+
}
101+
existingPeers = append(existingPeers, fmt.Sprintf("%s:%d", ip, MemberlistBindPort))
102+
}
103+
log.Info("found existing peers for memberlist join", "count", len(existingPeers))
104+
}
105+
} else {
106+
log.Info("skipping peer discovery: client not available or namespace/labelSelector not set")
107+
}
108+
109+
s.peerManager = peers.NewMemberlistPeers(nodeName)
110+
111+
if err := s.peerManager.Start(ctx, localIP, MemberlistBindPort, existingPeers); err != nil {
112+
return err
113+
}
114+
115+
go func() {
116+
klog.InfoS("Starting sandbox-gateway peer server", "address", s.httpServer.Addr)
117+
if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
118+
klog.ErrorS(err, "Peer server failed to start")
119+
}
120+
}()
121+
122+
return nil
123+
}
124+
125+
func (s *Server) Stop(ctx context.Context) error {
126+
var errs []error
127+
if s.httpServer != nil {
128+
if err := s.httpServer.Shutdown(ctx); err != nil {
129+
errs = append(errs, err)
130+
}
131+
}
132+
if s.peerManager != nil {
133+
if err := s.peerManager.Stop(); err != nil {
134+
errs = append(errs, err)
135+
}
136+
}
137+
if len(errs) > 0 {
138+
return errors.Join(errs...)
139+
}
140+
return nil
141+
}
142+
143+
// handleRefresh handles the /refresh endpoint for route synchronization
144+
func (s *Server) handleRefresh(w http.ResponseWriter, r *http.Request) {
145+
if r.Method != http.MethodPost {
146+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
147+
return
148+
}
149+
150+
ctx := r.Context()
151+
log := klog.FromContext(ctx)
152+
153+
var route proxy.Route
154+
if err := json.NewDecoder(r.Body).Decode(&route); err != nil {
155+
log.Error(err, "Failed to decode refresh request")
156+
http.Error(w, fmt.Sprintf("Failed to decode request: %v", err), http.StatusBadRequest)
157+
return
158+
}
159+
160+
log.V(consts.DebugLogLevel).Info("Received route refresh", "route", route)
161+
162+
// Handle based on state
163+
if route.State == v1alpha1.SandboxStateRunning {
164+
// Update the route
165+
if registry.UpdateWithVersion(route.ID, route.IP, route.ResourceVersion) {
166+
log.Info("Route updated via refresh", "id", route.ID, "ip", route.IP)
167+
} else {
168+
log.V(consts.DebugLogLevel).Info("Route update skipped due to older resourceVersion", "id", route.ID)
169+
}
170+
} else {
171+
// Delete the route if the sandbox is dead
172+
registry.Delete(route.ID)
173+
}
174+
175+
w.WriteHeader(http.StatusNoContent)
176+
}

pkg/sandbox-manager/core.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sandbox_manager
33
import (
44
"context"
55
"fmt"
6-
"net"
76
"os"
87

98
"github.com/google/uuid"
@@ -13,6 +12,7 @@ import (
1312
"github.com/openkruise/agents/pkg/sandbox-manager/config"
1413
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
1514
"github.com/openkruise/agents/pkg/sandbox-manager/infra/sandboxcr"
15+
"github.com/openkruise/agents/pkg/utils"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1717
"k8s.io/klog/v2"
1818
)
@@ -57,21 +57,6 @@ func NewSandboxManager(client *clients.ClientSet, adapter proxy.RequestAdapter,
5757
return m, err
5858
}
5959

60-
func getFirstNonLoopbackIP() string {
61-
addresses, err := net.InterfaceAddrs()
62-
if err != nil {
63-
return ""
64-
}
65-
for _, addr := range addresses {
66-
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
67-
if ipnet.IP.To4() != nil {
68-
return ipnet.IP.String()
69-
}
70-
}
71-
}
72-
return ""
73-
}
74-
7560
func (m *SandboxManager) Run(ctx context.Context, sysNs, peerSelector string) error {
7661
log := klog.FromContext(ctx)
7762

@@ -86,7 +71,7 @@ func (m *SandboxManager) Run(ctx context.Context, sysNs, peerSelector string) er
8671
// Get pod IP for memberlist binding
8772
podIP := os.Getenv("POD_IP")
8873
if podIP == "" {
89-
podIP = getFirstNonLoopbackIP()
74+
podIP = utils.GetFirstNonLoopbackIP()
9075
}
9176
if podIP == "" {
9277
return fmt.Errorf("failed to determine local IP for memberlist")

pkg/sandbox-manager/infra/sandboxcr/cache_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -598,7 +598,7 @@ func TestCache_InformerWithFilter_GetSandboxSet(t *testing.T) {
598598
require.NoError(t, err, "failed to create SandboxSet %s/%s", sbs.Namespace, sbs.Name)
599599
}
600600
}
601-
defer c.Stop()
601+
defer c.Stop(t.Context())
602602

603603
// Wait for informer sync
604604
time.Sleep(300 * time.Millisecond)

pkg/utils/utils.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"fmt"
9+
"net"
910
"os"
1011
"sync"
1112

@@ -228,3 +229,18 @@ func GetSandboxControllerUsername() string {
228229
}
229230
return "system:serviceaccount:sandbox-system:sandbox-controller-manager"
230231
}
232+
233+
func GetFirstNonLoopbackIP() string {
234+
addresses, err := net.InterfaceAddrs()
235+
if err != nil {
236+
return ""
237+
}
238+
for _, addr := range addresses {
239+
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
240+
if ipnet.IP.To4() != nil {
241+
return ipnet.IP.String()
242+
}
243+
}
244+
}
245+
return ""
246+
}

0 commit comments

Comments
 (0)