Skip to content

Commit 6b9a1ad

Browse files
author
xianying.czy
committed
feat(sandbox-gateway): add memberlist server for gateway
Signed-off-by: xianying.czy <xianying.czy@alibaba-inc.com>
1 parent d181e77 commit 6b9a1ad

File tree

4 files changed

+219
-17
lines changed

4 files changed

+219
-17
lines changed

cmd/sandbox-gateway/main.go

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,17 @@ package main
22

33
import (
44
"context"
5+
"os"
56

67
"github.com/envoyproxy/envoy/contrib/golang/common/go/api"
78
envoyhttp "github.com/envoyproxy/envoy/contrib/golang/filters/http/source/go/pkg/http"
9+
"k8s.io/client-go/kubernetes"
10+
"k8s.io/client-go/rest"
811

12+
"github.com/openkruise/agents/pkg/proxy"
913
"github.com/openkruise/agents/pkg/sandbox-gateway/controller"
1014
"github.com/openkruise/agents/pkg/sandbox-gateway/filter"
15+
peerserver "github.com/openkruise/agents/pkg/sandbox-gateway/server"
1116
)
1217

1318
func init() {
@@ -22,6 +27,31 @@ func init() {
2227
api.LogErrorf("sandbox controller manager exited with error: %v", err)
2328
}
2429
}()
30+
31+
// Start the peer server for handling route synchronization from other peers
32+
go func() {
33+
ctx := context.Background()
34+
35+
// Get Kubernetes config
36+
cfg, err := rest.InClusterConfig()
37+
if err != nil {
38+
api.LogErrorf("failed to get in-cluster config: %v", err)
39+
os.Exit(1)
40+
}
41+
42+
// Create Kubernetes client
43+
client, err := kubernetes.NewForConfig(cfg)
44+
if err != nil {
45+
api.LogErrorf("failed to create kubernetes client: %v", err)
46+
os.Exit(1)
47+
}
48+
49+
peerServer := peerserver.NewServer(client, proxy.SystemPort)
50+
if err := peerServer.Start(ctx); err != nil {
51+
api.LogErrorf("failed to start peer server: %v", err)
52+
os.Exit(1)
53+
}
54+
}()
2555
}
2656

2757
func main() {}
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
package server
2+
3+
import (
4+
"context"
5+
"encoding/json"
6+
"errors"
7+
"fmt"
8+
"net/http"
9+
"os"
10+
"time"
11+
12+
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/klog/v2"
15+
16+
"github.com/openkruise/agents/api/v1alpha1"
17+
"github.com/openkruise/agents/pkg/peers"
18+
"github.com/openkruise/agents/pkg/proxy"
19+
"github.com/openkruise/agents/pkg/sandbox-gateway/registry"
20+
"github.com/openkruise/agents/pkg/sandbox-manager/consts"
21+
"github.com/openkruise/agents/pkg/utils"
22+
)
23+
24+
// Environment variable names for peer discovery
25+
const (
26+
EnvNamespace = "PEER_NAMESPACE"
27+
EnvLabelSelector = "PEER_LABEL_SELECTOR"
28+
)
29+
30+
// Server handles peer-to-peer communication for route synchronization
31+
type Server struct {
32+
httpServer *http.Server
33+
peerManager *peers.MemberlistPeers
34+
port int
35+
client kubernetes.Interface
36+
}
37+
38+
// NewServer creates a new peer server
39+
func NewServer(client kubernetes.Interface, port int) *Server {
40+
if port == 0 {
41+
port = proxy.SystemPort
42+
}
43+
return &Server{
44+
port: port,
45+
client: client,
46+
}
47+
}
48+
49+
// Start starts the HTTP server for handling refresh requests from peers
50+
func (s *Server) Start(ctx context.Context) error {
51+
log := klog.FromContext(ctx)
52+
53+
mux := http.NewServeMux()
54+
mux.HandleFunc(proxy.RefreshAPI, s.handleRefresh)
55+
56+
s.httpServer = &http.Server{
57+
Addr: fmt.Sprintf(":%d", s.port),
58+
Handler: mux,
59+
ReadHeaderTimeout: 5 * time.Second,
60+
}
61+
62+
// Get node name from environment variables
63+
nodeName := os.Getenv("HOSTNAME")
64+
if nodeName == "" {
65+
nodeName = os.Getenv("POD_NAME")
66+
}
67+
if nodeName == "" {
68+
return fmt.Errorf("HOSTNAME or POD_NAME environment variable must be set")
69+
}
70+
71+
// Get local IP
72+
localIP := utils.GetFirstNonLoopbackIP()
73+
if localIP == "" {
74+
return fmt.Errorf("failed to determine local IP")
75+
}
76+
77+
// Get namespace and label selector from environment variables
78+
namespace := os.Getenv(EnvNamespace)
79+
labelSelector := os.Getenv(EnvLabelSelector)
80+
81+
// Discover existing peers from Kubernetes API
82+
var existingPeers []string
83+
if s.client != nil && namespace != "" && labelSelector != "" {
84+
log.Info("discovering existing peers for memberlist join", "namespace", namespace, "selector", labelSelector)
85+
peerList, err := s.client.CoreV1().Pods(namespace).List(ctx, v1.ListOptions{
86+
LabelSelector: labelSelector,
87+
})
88+
if err != nil {
89+
log.Error(err, "failed to list peer pods, continuing without existing peers")
90+
} else {
91+
for _, peer := range peerList.Items {
92+
ip := peer.Status.PodIP
93+
if ip == "" || ip == localIP {
94+
continue
95+
}
96+
existingPeers = append(existingPeers, fmt.Sprintf("%s:%d", ip, s.port))
97+
}
98+
log.Info("found existing peers for memberlist join", "count", len(existingPeers))
99+
}
100+
} else {
101+
log.Info("skipping peer discovery: client not available or namespace/labelSelector not set")
102+
}
103+
104+
s.peerManager = peers.NewMemberlistPeers(nodeName)
105+
106+
if err := s.peerManager.Start(ctx, localIP, s.port, existingPeers); err != nil {
107+
return err
108+
}
109+
110+
go func() {
111+
klog.InfoS("Starting sandbox-gateway peer server", "address", s.httpServer.Addr)
112+
if err := s.httpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
113+
klog.ErrorS(err, "Peer server failed to start")
114+
}
115+
}()
116+
117+
return nil
118+
}
119+
120+
func (s *Server) Stop(ctx context.Context) error {
121+
var errs []error
122+
if s.httpServer != nil {
123+
if err := s.httpServer.Shutdown(ctx); err != nil {
124+
errs = append(errs, err)
125+
}
126+
}
127+
if s.peerManager != nil {
128+
if err := s.peerManager.Stop(); err != nil {
129+
errs = append(errs, err)
130+
}
131+
}
132+
if len(errs) > 0 {
133+
return errors.Join(errs...)
134+
}
135+
return nil
136+
}
137+
138+
// handleRefresh handles the /refresh endpoint for route synchronization
139+
func (s *Server) handleRefresh(w http.ResponseWriter, r *http.Request) {
140+
if r.Method != http.MethodPost {
141+
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
142+
return
143+
}
144+
145+
ctx := r.Context()
146+
log := klog.FromContext(ctx)
147+
148+
var route proxy.Route
149+
if err := json.NewDecoder(r.Body).Decode(&route); err != nil {
150+
log.Error(err, "Failed to decode refresh request")
151+
http.Error(w, fmt.Sprintf("Failed to decode request: %v", err), http.StatusBadRequest)
152+
return
153+
}
154+
155+
log.V(consts.DebugLogLevel).Info("Received route refresh", "route", route)
156+
157+
// Handle based on state
158+
if route.State == v1alpha1.SandboxStateDead {
159+
// Delete the route if the sandbox is dead
160+
registry.Delete(route.ID)
161+
} else {
162+
// Update the route
163+
if registry.UpdateWithVersion(route.ID, route.IP, route.ResourceVersion) {
164+
log.Info("Route updated via refresh", "id", route.ID, "ip", route.IP)
165+
} else {
166+
log.V(consts.DebugLogLevel).Info("Route update skipped due to older resourceVersion", "id", route.ID)
167+
}
168+
}
169+
170+
w.WriteHeader(http.StatusNoContent)
171+
}

pkg/sandbox-manager/core.go

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package sandbox_manager
33
import (
44
"context"
55
"fmt"
6-
"net"
76
"os"
87

98
"github.com/google/uuid"
@@ -13,6 +12,7 @@ import (
1312
"github.com/openkruise/agents/pkg/sandbox-manager/config"
1413
"github.com/openkruise/agents/pkg/sandbox-manager/infra"
1514
"github.com/openkruise/agents/pkg/sandbox-manager/infra/sandboxcr"
15+
"github.com/openkruise/agents/pkg/utils"
1616
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
1717
"k8s.io/klog/v2"
1818
)
@@ -57,21 +57,6 @@ func NewSandboxManager(client *clients.ClientSet, adapter proxy.RequestAdapter,
5757
return m, err
5858
}
5959

60-
func getFirstNonLoopbackIP() string {
61-
addresses, err := net.InterfaceAddrs()
62-
if err != nil {
63-
return ""
64-
}
65-
for _, addr := range addresses {
66-
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
67-
if ipnet.IP.To4() != nil {
68-
return ipnet.IP.String()
69-
}
70-
}
71-
}
72-
return ""
73-
}
74-
7560
func (m *SandboxManager) Run(ctx context.Context, sysNs, peerSelector string) error {
7661
log := klog.FromContext(ctx)
7762

@@ -86,7 +71,7 @@ func (m *SandboxManager) Run(ctx context.Context, sysNs, peerSelector string) er
8671
// Get pod IP for memberlist binding
8772
podIP := os.Getenv("POD_IP")
8873
if podIP == "" {
89-
podIP = getFirstNonLoopbackIP()
74+
podIP = utils.GetFirstNonLoopbackIP()
9075
}
9176
if podIP == "" {
9277
return fmt.Errorf("failed to determine local IP for memberlist")

pkg/utils/utils.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import (
66
"encoding/base64"
77
"encoding/json"
88
"fmt"
9+
"net"
910
"os"
1011
"sync"
1112

@@ -228,3 +229,18 @@ func GetSandboxControllerUsername() string {
228229
}
229230
return "system:serviceaccount:sandbox-system:sandbox-controller-manager"
230231
}
232+
233+
func GetFirstNonLoopbackIP() string {
234+
addresses, err := net.InterfaceAddrs()
235+
if err != nil {
236+
return ""
237+
}
238+
for _, addr := range addresses {
239+
if ipnet, ok := addr.(*net.IPNet); ok && !ipnet.IP.IsLoopback() {
240+
if ipnet.IP.To4() != nil {
241+
return ipnet.IP.String()
242+
}
243+
}
244+
}
245+
return ""
246+
}

0 commit comments

Comments
 (0)