Skip to content

Commit 66f3b80

Browse files
authored
Support for north-south traffic (#835)
* Support for north-south traffic * fix ci * fix ci
1 parent 19cc43f commit 66f3b80

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

65 files changed

+5378
-203
lines changed

dubbod/planet/cmd/planet-agent/app/cmd.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,10 @@ import (
2121
"context"
2222
"errors"
2323
"fmt"
24-
"github.com/apache/dubbo-kubernetes/pkg/log"
2524
"net/netip"
2625

26+
"github.com/apache/dubbo-kubernetes/pkg/log"
27+
2728
"github.com/apache/dubbo-kubernetes/dubbod/planet/cmd/planet-agent/options"
2829
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/util/network"
2930
"github.com/apache/dubbo-kubernetes/pkg/cmd"

dubbod/planet/docker/dockerfile.proxyadapter

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,6 @@
1515

1616
FROM gcr.io/distroless/static:debug
1717
COPY bin/planet-agent /usr/local/bin/planet-agent
18+
COPY bin/pixiugateway /usr/local/bin/pixiugateway
1819
USER 9999:9999
1920
ENTRYPOINT ["/usr/local/bin/planet-agent"]
File renamed without changes.

dubbod/planet/pkg/bootstrap/configcontroller.go renamed to dubbod/planet/pkg/bootstrap/config_controller.go

Lines changed: 133 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@ import (
2121
"context"
2222
"crypto/tls"
2323
"crypto/x509"
24+
"encoding/json"
2425
"encoding/pem"
2526
"fmt"
2627
configaggregate "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/aggregate"
28+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/config/kube/gateway"
29+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
30+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection"
31+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/leaderelection/k8sleaderelection/k8sresourcelock"
32+
"github.com/apache/dubbo-kubernetes/pkg/config/schema/gvr"
33+
"k8s.io/apimachinery/pkg/api/errors"
2734
"net/url"
2835
"strings"
2936

@@ -59,7 +66,9 @@ func (s *Server) makeKubeConfigController(args *PlanetArgs) *crdclient.Client {
5966
}
6067

6168
schemas := collections.Planet
62-
69+
if features.EnableGatewayAPI {
70+
schemas = collections.PlanetGatewayAPI()
71+
}
6372
return crdclient.NewForSchemas(s.kubeClient, opts, schemas)
6473
}
6574

@@ -69,6 +78,72 @@ func (s *Server) initK8SConfigStore(args *PlanetArgs) error {
6978
}
7079
configController := s.makeKubeConfigController(args)
7180
s.ConfigStores = append(s.ConfigStores, configController)
81+
82+
if features.EnableGatewayAPI {
83+
if s.statusManager == nil && features.EnableGatewayAPIStatus {
84+
s.initStatusManager(args)
85+
}
86+
args.RegistryOptions.KubeOptions.KrtDebugger = args.KrtDebugger
87+
gwc := gateway.NewController(s.kubeClient, s.kubeClient.CrdWatcher().WaitForCRD, args.RegistryOptions.KubeOptions, s.XDSServer)
88+
s.environment.GatewayAPIController = gwc
89+
s.ConfigStores = append(s.ConfigStores, s.environment.GatewayAPIController)
90+
91+
// Use a channel to signal activation of per-revision status writer
92+
activatePerRevisionStatusWriterCh := make(chan struct{})
93+
s.checkAndRunNonRevisionLeaderElectionIfRequired(args, activatePerRevisionStatusWriterCh)
94+
95+
s.addTerminatingStartFunc("gateway status", func(stop <-chan struct{}) error {
96+
leaderelection.
97+
NewPerRevisionLeaderElection(args.Namespace, args.PodName, leaderelection.GatewayStatusController, args.Revision, s.kubeClient).
98+
AddRunFunction(func(leaderStop <-chan struct{}) {
99+
log.Infof("waiting for gateway status writer activation")
100+
<-activatePerRevisionStatusWriterCh
101+
log.Infof("Starting gateway status writer for revision: %s", args.Revision)
102+
gwc.SetStatusWrite(true, s.statusManager)
103+
104+
// Trigger a push so we can recompute status
105+
s.XDSServer.ConfigUpdate(&model.PushRequest{
106+
Full: true,
107+
Reason: model.NewReasonStats(model.GlobalUpdate),
108+
Forced: true,
109+
})
110+
<-leaderStop
111+
log.Infof("Stopping gateway status writer")
112+
gwc.SetStatusWrite(false, nil)
113+
}).
114+
Run(stop)
115+
return nil
116+
})
117+
if features.EnableGatewayAPIDeploymentController {
118+
s.addTerminatingStartFunc("gateway deployment controller", func(stop <-chan struct{}) error {
119+
leaderelection.
120+
NewPerRevisionLeaderElection(args.Namespace, args.PodName, leaderelection.GatewayDeploymentController, args.Revision, s.kubeClient).
121+
AddRunFunction(func(leaderStop <-chan struct{}) {
122+
// We can only run this if the Gateway CRD is created
123+
if s.kubeClient.CrdWatcher().WaitForCRD(gvr.KubernetesGateway, leaderStop) {
124+
controller := gateway.NewDeploymentController(s.kubeClient, s.clusterID, s.environment,
125+
s.webhookInfo.getWebhookConfig, s.webhookInfo.addHandler, nil, args.Revision, args.Namespace)
126+
// Start informers again. This fixes the case where informers for namespace do not start,
127+
// as we create them only after acquiring the leader lock
128+
// Note: stop here should be the overall pilot stop, NOT the leader election stop. We are
129+
// basically lazy loading the informer, if we stop it when we lose the lock we will never
130+
// recreate it again.
131+
s.kubeClient.RunAndWait(stop)
132+
// TODO tag watcher
133+
controller.Run(leaderStop)
134+
}
135+
}).
136+
Run(stop)
137+
return nil
138+
})
139+
}
140+
}
141+
var err error
142+
s.RWConfigStore, err = configaggregate.MakeWriteableCache(s.ConfigStores, configController)
143+
if err != nil {
144+
return err
145+
}
146+
72147
s.XDSServer.ConfigUpdate(&model.PushRequest{
73148
Full: true,
74149
Reason: model.NewReasonStats(model.GlobalUpdate),
@@ -181,9 +256,6 @@ func (s *Server) initConfigController(args *PlanetArgs) error {
181256
}
182257
}
183258

184-
// TODO ingress controller
185-
// TODO addTerminatingStartFunc
186-
187259
// Wrap the config controller with a cache.
188260
aggregateConfigController, err := configaggregate.MakeCache(s.ConfigStores)
189261
if err != nil {
@@ -298,3 +370,60 @@ func (s *Server) getRootCertFromSecret(name, namespace string) (*dubboCredential
298370
}
299371
return kube.ExtractRoot(secret.Data)
300372
}
373+
374+
func (s *Server) checkAndRunNonRevisionLeaderElectionIfRequired(args *PlanetArgs, activateCh chan struct{}) {
375+
cm, err := s.kubeClient.Kube().CoreV1().ConfigMaps(args.Namespace).Get(context.Background(), leaderelection.GatewayStatusController, v1.GetOptions{})
376+
377+
if errors.IsNotFound(err) {
378+
// ConfigMap does not exist, so per-revision leader election should be active
379+
close(activateCh)
380+
return
381+
}
382+
leaderAnn, ok := cm.Annotations[k8sresourcelock.LeaderElectionRecordAnnotationKey]
383+
if ok {
384+
var leaderInfo struct {
385+
HolderIdentity string `json:"holderIdentity"`
386+
}
387+
if err := json.Unmarshal([]byte(leaderAnn), &leaderInfo); err == nil {
388+
if leaderInfo.HolderIdentity != "" {
389+
// Non-revision leader election should run, per-revision should be waiting for activation
390+
s.addTerminatingStartFunc("gateway status", func(stop <-chan struct{}) error {
391+
secondStop := make(chan struct{})
392+
// if stop closes, ensure secondStop closes too
393+
go func() {
394+
<-stop
395+
select {
396+
case <-secondStop:
397+
default:
398+
close(secondStop)
399+
}
400+
}()
401+
leaderelection.
402+
NewLeaderElection(args.Namespace, args.PodName, leaderelection.GatewayStatusController, args.Revision, s.kubeClient).
403+
AddRunFunction(func(leaderStop <-chan struct{}) {
404+
// now that we have the leader lock, we can activate the per-revision status writer
405+
// first close the activateCh channel if it is not already closed
406+
log.Infof("Activating gateway status writer")
407+
select {
408+
case <-activateCh:
409+
// Channel already closed, do nothing
410+
default:
411+
close(activateCh)
412+
}
413+
// now end this lease itself
414+
select {
415+
case <-secondStop:
416+
default:
417+
close(secondStop)
418+
}
419+
}).
420+
Run(secondStop)
421+
return nil
422+
})
423+
return
424+
}
425+
}
426+
}
427+
// If annotation missing or holderIdentity is blank, per-revision leader election should be active
428+
close(activateCh)
429+
}

dubbod/planet/pkg/bootstrap/server.go

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@ import (
2929
"sync"
3030
"time"
3131

32+
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/status"
33+
3234
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/features"
3335
dubbogrpc "github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/grpc"
3436
"github.com/apache/dubbo-kubernetes/dubbod/planet/pkg/keycertbundle"
@@ -127,6 +129,9 @@ type Server struct {
127129
webhookInfo *webhookInfo
128130

129131
krtDebugger *krt.DebugHandler
132+
133+
RWConfigStore model.ConfigStoreController
134+
statusManager *status.Manager
130135
}
131136

132137
type readinessFlags struct {
@@ -139,6 +144,28 @@ type webhookInfo struct {
139144
wh *inject.Webhook
140145
}
141146

147+
func (w *webhookInfo) getWebhookConfig() inject.Config {
148+
w.mu.RLock()
149+
defer w.mu.RUnlock()
150+
if w.wh != nil && w.wh.Config != nil {
151+
return *w.wh.Config
152+
}
153+
return inject.Config{}
154+
}
155+
156+
func (w *webhookInfo) addHandler(fn func()) {
157+
w.mu.Lock()
158+
defer w.mu.Unlock()
159+
// Note: Dubbo's Webhook doesn't have RegisterInjectionHandler method
160+
// This is a placeholder for future implementation
161+
// For now, we'll call the handler directly if webhook is available
162+
if w.wh != nil {
163+
// Handler will be called when webhook config changes
164+
// This is a simplified implementation compared to Istio
165+
fn()
166+
}
167+
}
168+
142169
type readinessProbe func() bool
143170

144171
func NewServer(args *PlanetArgs, initFuncs ...func(*Server)) (*Server, error) {
@@ -448,7 +475,11 @@ func (s *Server) initRegistryEventHandlers() {
448475

449476
// Build ConfigKey for the changed config
450477
// Find the schema to get the kind.Kind
478+
// First try Planet schemas, then try PlanetGatewayAPI schemas
451479
schema, found := collections.Planet.FindByGroupVersionKind(cfg.GroupVersionKind)
480+
if !found && features.EnableGatewayAPI {
481+
schema, found = collections.PlanetGatewayAPI().FindByGroupVersionKind(cfg.GroupVersionKind)
482+
}
452483
if !found {
453484
log.Warnf("configHandler: schema not found for %v, skipping", cfg.GroupVersionKind)
454485
return
@@ -468,6 +499,12 @@ func (s *Server) initRegistryEventHandlers() {
468499
configKind = kind.ServiceRoute
469500
case "PeerAuthentication":
470501
configKind = kind.PeerAuthentication
502+
case "GatewayClass":
503+
configKind = kind.GatewayClass
504+
case "Gateway":
505+
configKind = kind.Gateway
506+
case "HTTPRoute":
507+
configKind = kind.HTTPRoute
471508
default:
472509
log.Debugf("configHandler: unknown schema identifier %s for %v, skipping", schemaID, cfg.GroupVersionKind)
473510
return
@@ -482,11 +519,12 @@ func (s *Server) initRegistryEventHandlers() {
482519
// Log the config change
483520
log.Infof("configHandler: %s event for %s/%s/%s", event, configKey.Kind, configKey.Namespace, configKey.Name)
484521

485-
// Some configs (SubsetRule/ServiceRoute/PeerAuthentication) require Full push to ensure
522+
// Some configs (SubsetRule/ServiceRoute/PeerAuthentication/HTTPRoute) require Full push to ensure
486523
// PushContext is re-initialized and configuration is reloaded.
487524
// PeerAuthentication must rebuild AuthenticationPolicies to enable STRICT mTLS on LDS; without
488525
// a full push the cached PushContext would continue serving plaintext listeners.
489-
needsFullPush := configKind == kind.SubsetRule || configKind == kind.ServiceRoute || configKind == kind.PeerAuthentication
526+
// HTTPRoute must rebuild HTTPRoute index to enable Gateway API routing.
527+
needsFullPush := configKind == kind.SubsetRule || configKind == kind.ServiceRoute || configKind == kind.PeerAuthentication || configKind == kind.HTTPRoute
490528

491529
// Trigger ConfigUpdate to push changes to all connected proxies
492530
s.XDSServer.ConfigUpdate(&model.PushRequest{
@@ -496,6 +534,9 @@ func (s *Server) initRegistryEventHandlers() {
496534
})
497535
}
498536
schemas := collections.Planet.All()
537+
if features.EnableGatewayAPI {
538+
schemas = collections.PlanetGatewayAPI().All()
539+
}
499540
log.Debugf("initRegistryEventHandlers: found %d schemas to register", len(schemas))
500541
registeredCount := 0
501542
for _, schema := range schemas {
@@ -951,6 +992,14 @@ func (s *Server) shouldStartNsController() bool {
951992
return true
952993
}
953994

995+
func (s *Server) initStatusManager(_ *PlanetArgs) {
996+
s.addStartFunc("status manager", func(stop <-chan struct{}) error {
997+
s.statusManager = status.NewManager(s.RWConfigStore)
998+
s.statusManager.Start(stop)
999+
return nil
1000+
})
1001+
}
1002+
9541003
func getDNSNames(_ *PlanetArgs, host string) []string {
9551004
// Append custom hostname if there is any
9561005
customHost := features.DubbodServiceCustomHost
@@ -1054,3 +1103,7 @@ func (s *Server) reloadDubbodCert(watchCh <-chan struct{}, stopCh <-chan struct{
10541103
}
10551104
}
10561105
}
1106+
1107+
func (s *Server) addTerminatingStartFunc(name string, fn server.Component) {
1108+
s.server.RunComponentAsyncAndWait(name, fn)
1109+
}
File renamed without changes.

0 commit comments

Comments
 (0)