Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions .qoder/rules/git.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
---
trigger: always_on
alwaysApply: true
---

When running read-only git commands such as git log / git show, always pipe through head or tail. For example:

git log --oneline -15 2>&1 | head -20
git show --name-only HEAD 2>&1 | head -30
git show 62ea0c25 --stat 2>&1 | tail -100
17 changes: 16 additions & 1 deletion cmd/sandbox-manager/main.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,19 @@
// Package main provides the main entry point for the E2B on Kubernetes server.
/*
Copyright 2025.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package main

import (
Expand Down
25 changes: 18 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,14 @@ require (
github.com/go-logr/logr v1.4.3
github.com/golang/protobuf v1.5.4
github.com/google/uuid v1.6.0
github.com/hashicorp/memberlist v0.5.4
github.com/onsi/ginkgo/v2 v2.27.3
github.com/onsi/gomega v1.38.2
github.com/prometheus/client_golang v1.22.0
github.com/spf13/pflag v1.0.6
github.com/stretchr/testify v1.11.1
go.uber.org/zap v1.27.0
golang.org/x/sync v0.17.0
golang.org/x/sync v0.18.0
golang.org/x/time v0.11.0
google.golang.org/grpc v1.75.1
google.golang.org/protobuf v1.36.9
Expand All @@ -36,6 +37,7 @@ require (
cel.dev/expr v0.24.0 // indirect
github.com/Masterminds/semver/v3 v3.4.0 // indirect
github.com/antlr4-go/antlr/v4 v4.13.0 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/blang/semver/v4 v4.0.0 // indirect
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
Expand All @@ -61,10 +63,18 @@ require (
github.com/google/go-cmp v0.7.0 // indirect
github.com/google/pprof v0.0.0-20250403155104-27863c87afa6 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect
github.com/hashicorp/errwrap v1.1.0 // indirect
github.com/hashicorp/go-immutable-radix v1.0.0 // indirect
github.com/hashicorp/go-metrics v0.5.4 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-sockaddr v1.0.7 // indirect
github.com/hashicorp/golang-lru v0.5.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/miekg/dns v1.1.68 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
Expand All @@ -75,6 +85,7 @@ require (
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.62.0 // indirect
github.com/prometheus/procfs v0.16.0 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/spf13/cobra v1.8.1 // indirect
github.com/stoewer/go-strcase v1.3.0 // indirect
github.com/x448/float16 v0.8.4 // indirect
Expand All @@ -90,13 +101,13 @@ require (
go.uber.org/multierr v1.11.0 // indirect
go.yaml.in/yaml/v3 v3.0.4 // indirect
golang.org/x/exp v0.0.0-20250531010427-b6e5de432a8b // indirect
golang.org/x/mod v0.27.0 // indirect
golang.org/x/net v0.44.0 // indirect
golang.org/x/mod v0.29.0 // indirect
golang.org/x/net v0.47.0 // indirect
golang.org/x/oauth2 v0.30.0 // indirect
golang.org/x/sys v0.36.0 // indirect
golang.org/x/term v0.35.0 // indirect
golang.org/x/text v0.29.0 // indirect
golang.org/x/tools v0.36.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/tools v0.38.0 // indirect
golang.org/x/tools/go/expect v0.1.1-deprecated // indirect
golang.org/x/tools/go/packages/packagestest v0.1.1-deprecated // indirect
gomodules.xyz/jsonpatch/v2 v2.4.0 // indirect
Expand Down
172 changes: 158 additions & 14 deletions go.sum

Large diffs are not rendered by default.

253 changes: 253 additions & 0 deletions pkg/peers/memberlist.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
/*
Copyright 2026.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package peers

import (
"context"
"fmt"
"net"
"sync/atomic"
"time"

"github.com/hashicorp/memberlist"
"github.com/openkruise/agents/pkg/sandbox-manager/logs"
"k8s.io/klog/v2"
)

const (
// DefaultProbeInterval is the interval between gossip probes
DefaultProbeInterval = 500 * time.Millisecond
// DefaultProbeTimeout is the timeout for gossip probes
DefaultProbeTimeout = 200 * time.Millisecond
// DefaultGossipInterval is the interval between gossip messages
DefaultGossipInterval = 500 * time.Millisecond
// DefaultGossipNodes is the number of nodes to gossip to
DefaultGossipNodes = 3
// DefaultSuspicionMult is the multiplier for determining the time to wait before considering a node suspect
DefaultSuspicionMult = 4
// DefaultRetransmitMult is the multiplier for the number of retransmissions
DefaultRetransmitMult = 4
)

// MemberlistPeers implements Peers using memberlist library
type MemberlistPeers struct {
list *memberlist.Memberlist
config *memberlist.Config
localName string
bindAddr string
bindPort int

started atomic.Bool
stopCh chan struct{}
}

// NewMemberlistPeers creates a new MemberlistPeers instance
func NewMemberlistPeers(nodeName string) *MemberlistPeers {
return &MemberlistPeers{
localName: nodeName,
stopCh: make(chan struct{}),
}
}

// Start initializes and starts the memberlist
func (m *MemberlistPeers) Start(ctx context.Context, bindAddr string, bindPort int, existingPeers []string) error {
log := klog.FromContext(ctx)

if m.started.Load() {
return fmt.Errorf("memberlist already started")
}

m.bindAddr = bindAddr
m.bindPort = bindPort

// Create memberlist config
config := memberlist.DefaultLANConfig()
config.Name = m.localName
config.BindAddr = bindAddr
config.BindPort = bindPort
config.AdvertisePort = bindPort

// Tuning for faster failure detection and convergence
config.ProbeInterval = DefaultProbeInterval
config.ProbeTimeout = DefaultProbeTimeout
config.GossipInterval = DefaultGossipInterval
config.GossipNodes = DefaultGossipNodes
config.SuspicionMult = DefaultSuspicionMult
config.RetransmitMult = DefaultRetransmitMult

// Set up event delegate to track membership changes
config.Events = &eventDelegate{
parent: m,
logCtx: logs.NewContext(),
}

// Disable logging from memberlist itself (we use klog)
config.LogOutput = nil
config.Logger = nil

m.config = config

// Create the memberlist
list, err := memberlist.Create(config)
if err != nil {
return fmt.Errorf("failed to create memberlist: %w", err)
}
m.list = list

// Join existing peers if provided
if len(existingPeers) > 0 {
log.Info("attempting to join existing peers", "peers", existingPeers)
joined, err := list.Join(existingPeers)
if err != nil {
log.Error(err, "failed to join some peers", "joined", joined)
// Don't return error - we can still operate as a single node
} else {
log.Info("successfully joined peers", "count", joined)
}
}

m.started.Store(true)
log.Info("memberlist started", "addr", bindAddr, "port", bindPort, "name", m.localName)

return nil
}

// Stop gracefully leaves the cluster and shuts down
func (m *MemberlistPeers) Stop() error {
if !m.started.Load() || m.list == nil {
return nil
}

close(m.stopCh)

// Gracefully leave the cluster
if err := m.list.Leave(5 * time.Second); err != nil {
return fmt.Errorf("failed to leave memberlist: %w", err)
}

if err := m.list.Shutdown(); err != nil {
return fmt.Errorf("failed to shutdown memberlist: %w", err)
}

m.started.Store(false)
return nil
}

// GetPeers returns the current list of alive peers (excluding self)
func (m *MemberlistPeers) GetPeers() []Peer {
if !m.started.Load() || m.list == nil {
return nil
}

peers := make([]Peer, 0)
for _, member := range m.list.Members() {
if member.Name == m.localName {
continue
}
if member.State == memberlist.StateAlive {
peers = append(peers, Peer{
IP: member.Addr.String(),
Name: member.Name,
})
}
}
return peers
}

// GetAllMembers returns all members including self
func (m *MemberlistPeers) GetAllMembers() []Peer {
if !m.started.Load() || m.list == nil {
return nil
}

members := make([]Peer, 0, len(m.list.Members()))
for _, member := range m.list.Members() {
members = append(members, Peer{
IP: member.Addr.String(),
Name: member.Name,
})
}
return members
}

// WaitForPeers blocks until at least minPeers are discovered or context is canceled
func (m *MemberlistPeers) WaitForPeers(ctx context.Context, minPeers int) error {
log := klog.FromContext(ctx)
log.Info("waiting for peers", "minPeers", minPeers)

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return ctx.Err()
case <-m.stopCh:
return fmt.Errorf("memberlist stopped")
case <-ticker.C:
peers := m.GetPeers()
if len(peers) >= minPeers {
log.Info("minimum peers reached", "count", len(peers))
return nil
}
log.V(4).Info("waiting for more peers", "current", len(peers), "min", minPeers)
}
}
}

// LocalAddr returns the local node's address
func (m *MemberlistPeers) LocalAddr() net.IP {
if !m.started.Load() || m.list == nil {
return nil
}
return m.list.LocalNode().Addr
}

// LocalPort returns the local node's port
func (m *MemberlistPeers) LocalPort() int {
if !m.started.Load() || m.list == nil {
return 0
}
return int(m.list.LocalNode().Port)
}

// eventDelegate handles memberlist membership change events
type eventDelegate struct {
parent *MemberlistPeers
logCtx context.Context
}

func (e *eventDelegate) NotifyJoin(node *memberlist.Node) {
if node.Name == e.parent.localName {
return
}
klog.FromContext(e.logCtx).Info("peer joined", "name", node.Name, "ip", node.Addr.String())
}

func (e *eventDelegate) NotifyLeave(node *memberlist.Node) {
if node.Name == e.parent.localName {
return
}
klog.FromContext(e.logCtx).Info("peer left", "name", node.Name, "ip", node.Addr.String())
}

func (e *eventDelegate) NotifyUpdate(*memberlist.Node) {
// Handle metadata updates if needed in the future
}

// Ensure MemberlistPeers implements Peers
var _ Peers = (*MemberlistPeers)(nil)
Loading
Loading