1- // Copyright 2022-2024 FLUIDOS Project
1+ // Copyright 2022-2025 FLUIDOS Project
22//
33// Licensed under the Apache License, Version 2.0 (the "License");
44// you may not use this file except in compliance with the License.
@@ -57,10 +57,11 @@ type BrokerClient struct {
5757 clientCert * corev1.Secret
5858 rootCert * corev1.Secret
5959
60- brokerConn * BrokerConnection
60+ brokerConn * brokerConnection
6161}
6262
63- type BrokerConnection struct {
63+ // BrokerConnection keeps all the broker connection data.
64+ type brokerConnection struct {
6465 amqpConn * amqp.Connection
6566 amqpChan * amqp.Channel
6667 exchangeName string
@@ -71,7 +72,7 @@ type BrokerConnection struct {
7172 confirms chan amqp.Confirmation
7273}
7374
74- // Setup the Broker Client from NM reconcile
75+ // SetupBrokerClient sets the Broker Client from NM reconcile.
7576func (bc * BrokerClient ) SetupBrokerClient (cl client.Client , broker * networkv1alpha1.Broker ) error {
7677 klog .Info ("Setting up Broker Client routines" )
7778
@@ -84,34 +85,34 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
8485 return fmt .Errorf ("failed to get Node Identity" )
8586 }
8687
87- //server address and broker name
88+ // Server address and broker name.
8889 bc .brokerName = broker .Spec .Name
8990 bc .serverAddr = broker .Spec .Address
9091
91- bc .brokerConn = & BrokerConnection {}
92+ bc .brokerConn = & brokerConnection {}
9293 bc .brokerConn .exchangeName = "DefaultPeerRequest"
9394
9495 bc .brokerConn .outboundMsg , err = json .Marshal (bc .ID )
9596 if err != nil {
9697 return err
9798 }
9899
99- //setting pub/sub
100- if strings .EqualFold (broker .Spec .Role , "publisher" ) {
101- klog .Infof ("brokerClient %s set as publisher only" , bc .brokerName )
100+ switch role := broker .Spec .Role ; role {
101+ case "publisher" :
102102 bc .pubFlag = true
103103 bc .subFlag = false
104- } else if strings . EqualFold ( broker . Spec . Role , "subscriber" ) {
105- klog . Infof ( "brokerClient %s set as subscriber only" , bc . brokerName )
104+ klog . Infof ( "brokerClient %s set as publisher only" , bc . brokerName )
105+ case " subscriber" :
106106 bc .pubFlag = false
107107 bc .subFlag = true
108- } else {
109- klog . Infof ( "brokerClient %s set as publisher and subscriber" , bc . brokerName )
108+ klog . Infof ( "brokerClient %s set as subscriber only" , bc . brokerName )
109+ default :
110110 bc .pubFlag = true
111111 bc .subFlag = true
112+ klog .Infof ("brokerClient %s set as publisher and subscriber" , bc .brokerName )
112113 }
113114
114- //certificates
115+ // Certificates.
115116 bc .clientCert = & corev1.Secret {}
116117 bc .rootCert = & corev1.Secret {}
117118
@@ -128,7 +129,7 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
128129 return err
129130 }
130131
131- // Extract certs and key
132+ // Extract certs and key.
132133 clientCert , ok := bc .clientCert .Data ["tls.crt" ]
133134 if ! ok {
134135 klog .Error ("missing certificate: 'tls.crt' not found in clCert Data" )
@@ -144,51 +145,48 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
144145 klog .Error ("missing certificate: 'tls.crt' not found in CACert Data" )
145146 }
146147
147- // load client cert and privKey
148+ // Load client cert and privKey.
148149 cert , err := tls .X509KeyPair (clientCert , clientKey )
149150 if err != nil {
150151 klog .Error ("error X509KeyPair: %v" , err )
151152 return err
152153 }
153154
154- // load CAcert
155+ // Load root cert.
155156 caCertPool := x509 .NewCertPool ()
156157 ok = caCertPool .AppendCertsFromPEM (caCertData )
157158 if ! ok {
158159 klog .Error ("AppendCertsFromPEM error: %v" , ok )
159160 }
160161
161- // Routing key for topic
162+ // Routing key for topic.
162163 bc .brokerConn .routingKey , err = extractCNfromCert (& clientCert )
163164 if err != nil {
164165 klog .Error ("Common Name extraction error: %v" , err )
165166 }
166167 bc .brokerConn .queueName = bc .brokerConn .routingKey
167168
168- // TLS config
169+ // TLS config.
169170 tlsConfig := & tls.Config {
170171 Certificates : []tls.Certificate {cert },
171172 RootCAs : caCertPool ,
172173 ServerName : bc .serverAddr ,
173- //InsecureSkipVerify: false, //skip/allow verif SSL
174+ MinVersion : tls . VersionTLS12 ,
174175 }
175176
176177 err = bc .brokerConnectionConfig (tlsConfig )
177178
178179 return err
179180}
180181
181- // Execute the Network Manager Broker routines.
182+ // ExecuteBrokerClient executes the Network Manager Broker routines.
182183func (bc * BrokerClient ) ExecuteBrokerClient (cl client.Client ) error {
183-
184184 // Start sending messages
185185 klog .Info ("executing broker client routines" )
186186 var err error
187187 if bc .pubFlag {
188188 go func () {
189- if err = bc .publishOnBroker (); err != nil {
190- klog .ErrorS (err , "error sending advertisement" )
191- }
189+ bc .publishOnBroker ()
192190 }()
193191 }
194192
@@ -203,8 +201,7 @@ func (bc *BrokerClient) ExecuteBrokerClient(cl client.Client) error {
203201 return err
204202}
205203
206- func (bc * BrokerClient ) publishOnBroker () error {
207-
204+ func (bc * BrokerClient ) publishOnBroker () {
208205 ticker := time .NewTicker (10 * time .Second )
209206 for {
210207 select {
@@ -219,11 +216,10 @@ func (bc *BrokerClient) publishOnBroker() error {
219216 amqp.Publishing {
220217 ContentType : "application/json" ,
221218 Body : bc .brokerConn .outboundMsg ,
222- Expiration : "30000" , //TTL ms
219+ Expiration : "30000" , // TTL ms
223220 })
224221 if err != nil {
225222 klog .Error ("Error pub message: %v" , err )
226- } else {
227223 }
228224
229225 select {
@@ -240,28 +236,24 @@ func (bc *BrokerClient) publishOnBroker() error {
240236 case <- bc .ctx .Done ():
241237 ticker .Stop ()
242238 klog .Info ("Ticker stopped\n " )
243- return nil
239+ return
244240 }
245241 }
246242}
247243
248244func (bc * BrokerClient ) readMsgOnBroker (ctx context.Context , cl client.Client ) error {
249-
250245 klog .Info ("Listening from Broker" )
251246 for d := range bc .brokerConn .inboundMsgs {
252-
253247 klog .Info ("Received remote advertisement from BROKER\n " )
254248 var remote NetworkManager
255249 err := json .Unmarshal (d .Body , & remote .ID )
256-
257250 if err != nil {
258251 klog .Error ("Error unmarshalling message: " , err )
259252 continue
260253 }
261254 // Check if received advertisement is remote
262255 if bc .ID .IP != remote .ID .IP {
263-
264- //create knownCluster CR
256+ // Create knownCluster CR
265257 kc := & networkv1alpha1.KnownCluster {}
266258
267259 if err := cl .Get (ctx , client.ObjectKey {Name : namings .ForgeKnownClusterName (remote .ID .NodeID ), Namespace : flags .FluidosNamespace }, kc ); err != nil {
@@ -312,7 +304,6 @@ func extractCNfromCert(certPEM *[]byte) (string, error) {
312304}
313305
314306func (bc * BrokerClient ) brokerConnectionConfig (tlsConfig * tls.Config ) error {
315-
316307 var err error
317308 config := amqp.Config {
318309 SASL : []amqp.Authentication {& amqp.ExternalAuth {}}, // auth EXTERNAL
@@ -330,7 +321,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
330321 return err
331322 }
332323
333- // channel creation
324+ // Channel creation
334325 bc .brokerConn .amqpChan , err = bc .brokerConn .amqpConn .Channel ()
335326 if err != nil {
336327 klog .Error ("channel creation error: %v" , err )
@@ -343,7 +334,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
343334 "" , // consumer name (empty -> generated)
344335 true , // AutoAck
345336 false , // Exclusive: queue is accessible only from this consumer
346- true , //false, // NoLocal: does not recieve messages selfpublished
337+ true , // false, // NoLocal: does not receive selfpublished messages
347338 false , // NoWait: server confirmation
348339 nil , // Arguments
349340 )
@@ -366,8 +357,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
366357 return nil
367358}
368359
369- func (bc * BrokerClient ) extractSecret (cl client.Client , secretName string , secretNamespace string , secretDest * corev1.Secret ) error {
370-
360+ func (bc * BrokerClient ) extractSecret (cl client.Client , secretName , secretNamespace string , secretDest * corev1.Secret ) error {
371361 err := cl .Get (context .TODO (), client.ObjectKey {
372362 Name : secretName ,
373363 Namespace : secretNamespace ,
0 commit comments