@@ -18,12 +18,15 @@ package feobserver
1818
1919import (
2020 "context"
21+ "fmt"
2122
23+ "github.com/StarRocks/starrocks-kubernetes-operator/pkg/k8sutils/templates/service"
2224 "github.com/go-logr/logr"
2325 appsv1 "k8s.io/api/apps/v1"
2426 corev1 "k8s.io/api/core/v1"
2527 apierrors "k8s.io/apimachinery/pkg/api/errors"
2628 "k8s.io/apimachinery/pkg/types"
29+ "k8s.io/apimachinery/pkg/util/intstr"
2730 "k8s.io/client-go/tools/record"
2831 "sigs.k8s.io/controller-runtime/pkg/client"
2932
@@ -39,6 +42,8 @@ import (
3942 "github.com/StarRocks/starrocks-kubernetes-operator/pkg/subcontrollers/fe"
4043)
4144
45+ const minFeObserverImageVersion = "4.1.0"
46+
4247type FeObserverController struct {
4348 Client client.Client
4449 Recorder record.EventRecorder
@@ -64,13 +69,12 @@ func (fc *FeObserverController) SyncCluster(ctx context.Context, src *srapi.Star
6469 feSpec := src .Spec .StarRocksFeSpec
6570 observerSpec := feSpec .ToObserverSpec ()
6671 if observerSpec == nil {
67- logger .Info ("fe observer is disabled, clear observer resources" )
68- if err := fc .clearObserverResources (ctx , src ); err != nil {
69- logger .Error (err , "clear fe observer resources failed" )
70- return err
71- }
72+ logger .Info ("src.Spec.StarRocksFeObserverSpec == nil, skip sync fe observer" )
7273 return nil
7374 }
75+ if src .Spec .StarRocksFeSpec == nil {
76+ return fmt .Errorf ("starRocksFeSpec is required before deploying fe observer" )
77+ }
7478
7579 var err error
7680 defer func () {
@@ -84,6 +88,10 @@ func (fc *FeObserverController) SyncCluster(ctx context.Context, src *srapi.Star
8488 return err
8589 }
8690
91+ if ! fe .CheckFEReady (ctx , fc .Client , src .Namespace , src .Name ) {
92+ logger .Info ("FE is not ready, stop sync fe observer" )
93+ return nil
94+ }
8795 // get the fe configMap for resolve ports
8896 logger .V (log .DebugLevel ).Info ("get fe configMap to resolve ports" , "ConfigMapInfo" , feSpec .ConfigMapInfo )
8997 feConfig , err := fe .GetFEConfig (ctx , fc .Client , feSpec , src .Namespace )
@@ -95,30 +103,44 @@ func (fc *FeObserverController) SyncCluster(ctx context.Context, src *srapi.Star
95103 // generate new fe observer statefulset.
96104 logger .V (log .DebugLevel ).Info ("build fe observer statefulset" , "StarRocksCluster" , src )
97105 object := object .NewFromCluster (src )
98-
106+ defaultLabels := load .Labels (src .Name , observerSpec )
107+ svc := rutils .BuildExternalService (object , observerSpec , feConfig , load .Selector (src .Name , observerSpec ), defaultLabels )
108+ searchServiceName := service .SearchServiceName (src .Name , observerSpec )
109+ internalService := service .MakeSearchService (searchServiceName , & svc , []corev1.ServicePort {
110+ {
111+ Name : "query-port" ,
112+ Port : rutils .GetPort (feConfig , rutils .QUERY_PORT ),
113+ TargetPort : intstr .FromInt (int (rutils .GetPort (feConfig , rutils .QUERY_PORT ))),
114+ AppProtocol : func () * string { mysql := "mysql" ; return & mysql }(),
115+ },
116+ }, defaultLabels )
99117 podTemplateSpec , err := fc .buildPodTemplate (src , feConfig )
100118 if err != nil {
101119 logger .Error (err , "build pod template failed" )
102120 return err
103121 }
104122 expectSts := statefulset .MakeStatefulset (object , observerSpec , podTemplateSpec )
105- expectSts .Spec .ServiceName = feSearchServiceName (src .Name )
106123 err = k8sutils .ApplyStatefulSet (ctx , fc .Client , & expectSts , false , rutils .StatefulSetDeepEqual )
107124 if err != nil {
108125 logger .Error (err , "fe observer statefulset failed" , "StarRocksCluster" , src )
109126 return err
110127 }
111128
112- if err = fc .deleteLegacyObserverServices (ctx , src ); err != nil {
113- logger .Error (err , "delete legacy fe observer services failed" )
129+ if err = k8sutils .ApplyService (ctx , fc .Client , internalService , rutils .ServiceDeepEqual ); err != nil {
130+ logger .Error (err , "deploy search service failed" , "internalService" , internalService )
131+ fc .Recorder .Event (src , corev1 .EventTypeWarning , "DeployFeObserverFailed" , err .Error ())
114132 return err
115133 }
116134
135+ if err = k8sutils .ApplyService (ctx , fc .Client , & svc , rutils .ServiceDeepEqual ); err != nil {
136+ logger .Error (err , "deploy external service failed" , "externalService" , svc )
137+ return err
138+ }
117139 return nil
118140}
119141
120142// UpdateClusterStatus update the all resource status about fe observer.
121- func (fc * FeObserverController ) UpdateClusterStatus (_ context.Context , src * srapi.StarRocksCluster ) error {
143+ func (fc * FeObserverController ) UpdateClusterStatus (ctx context.Context , src * srapi.StarRocksCluster ) error {
122144 feSpec := src .Spec .StarRocksFeSpec
123145 observerSpec := feSpec .ToObserverSpec ()
124146 if observerSpec == nil {
@@ -137,7 +159,7 @@ func (fc *FeObserverController) UpdateClusterStatus(_ context.Context, src *srap
137159 }
138160
139161 src .Status .StarRocksFeObserverStatus = fs
140- fs .ServiceName = feExternalServiceName (src .Name )
162+ fs .ServiceName = service . ExternalServiceName (src .Name , feSpec )
141163 statefulSetName := load .Name (src .Name , observerSpec )
142164 fs .ResourceNames = rutils .MergeSlices (fs .ResourceNames , []string {statefulSetName })
143165
@@ -167,7 +189,23 @@ func (fc *FeObserverController) ClearCluster(ctx context.Context, src *srapi.Sta
167189 return nil
168190 }
169191
170- return fc .clearObserverResources (ctx , src )
192+ observerSpec := src .Spec .StarRocksFeSpec .ToObserverSpec ()
193+ statefulSetName := load .Name (src .Name , observerSpec )
194+ if err := k8sutils .DeleteStatefulset (ctx , fc .Client , src .Namespace , statefulSetName ); err != nil && ! apierrors .IsNotFound (err ) {
195+ return err
196+ }
197+ searchServiceName := service .SearchServiceName (src .Name , observerSpec )
198+ if err := k8sutils .DeleteService (ctx , fc .Client , src .Namespace , searchServiceName ); err != nil && ! apierrors .IsNotFound (err ) {
199+ logger .Error (err , "delete search service failed" , "searchServiceName" , searchServiceName )
200+ return err
201+ }
202+ externalServiceName := service .ExternalServiceName (src .Name , observerSpec )
203+ err := k8sutils .DeleteService (ctx , fc .Client , src .Namespace , externalServiceName )
204+ if err != nil && ! apierrors .IsNotFound (err ) {
205+ logger .Error (err , "delete external service failed" , "externalServiceName" , externalServiceName )
206+ return err
207+ }
208+ return nil
171209}
172210
173211func (fc * FeObserverController ) Validating (feSpec * srapi.StarRocksFeSpec ) error {
@@ -179,33 +217,20 @@ func (fc *FeObserverController) Validating(feSpec *srapi.StarRocksFeSpec) error
179217 if err := srapi .ValidUpdateStrategy (feSpec .UpdateStrategy ); err != nil {
180218 return err
181219 }
182- return nil
183- }
184-
185- func (fc * FeObserverController ) clearObserverResources (ctx context.Context , src * srapi.StarRocksCluster ) error {
186- statefulSetName := load .Name (src .Name , (* srapi .StarRocksFeObserverSpec )(nil ))
187- if err := k8sutils .DeleteStatefulset (ctx , fc .Client , src .Namespace , statefulSetName ); err != nil && ! apierrors .IsNotFound (err ) {
188- return err
220+ if feSpec .ToObserverSpec () != nil {
221+ if err := validateObserverImageVersion (feSpec .Image ); err != nil {
222+ return err
223+ }
189224 }
190- return fc . deleteLegacyObserverServices ( ctx , src )
225+ return nil
191226}
192227
193- func (fc * FeObserverController ) deleteLegacyObserverServices (ctx context.Context , src * srapi.StarRocksCluster ) error {
194- searchServiceName := src .Name + "-" + srapi .DEFAULT_FE_OBSERVER + "-search"
195- if err := k8sutils .DeleteService (ctx , fc .Client , src .Namespace , searchServiceName ); err != nil && ! apierrors .IsNotFound (err ) {
196- return err
197- }
198- externalServiceName := src .Name + "-" + srapi .DEFAULT_FE_OBSERVER + "-service"
199- if err := k8sutils .DeleteService (ctx , fc .Client , src .Namespace , externalServiceName ); err != nil && ! apierrors .IsNotFound (err ) {
200- return err
228+ func validateObserverImageVersion (image string ) error {
229+ imageVersion := pod .GetImageVersion (image )
230+ _ , err := pod .IsLowerThanAny (imageVersion , []string {"4.1.0" })
231+ if err != nil {
232+ return fmt .Errorf ("fe observer requires StarRocks FE image version >= %s " +
233+ "Current image version %s does not enable FE observer" , minFeObserverImageVersion , imageVersion )
201234 }
202235 return nil
203236}
204-
205- func feExternalServiceName (clusterName string ) string {
206- return clusterName + "-" + srapi .DEFAULT_FE + "-service"
207- }
208-
209- func feSearchServiceName (clusterName string ) string {
210- return clusterName + "-" + srapi .DEFAULT_FE + "-search"
211- }
0 commit comments