@@ -56,20 +56,22 @@ type BrokerClient struct {
5656 serverAddr string
5757 clientCert * corev1.Secret
5858 rootCert * corev1.Secret
59-
59+ metric string
60+ clientName string
6061 brokerConn * brokerConnection
6162}
6263
6364// BrokerConnection keeps all the broker connection data.
6465type brokerConnection struct {
65- amqpConn * amqp.Connection
66- amqpChan * amqp.Channel
67- exchangeName string
68- routingKey string
69- queueName string
70- inboundMsgs <- chan amqp.Delivery
71- outboundMsg []byte
72- confirms chan amqp.Confirmation
66+ amqpConn * amqp.Connection
67+ amqpChan * amqp.Channel
68+ announceExchangeName string
69+ ruleExchangeName string
70+ queueName string
71+ inboundMsgs <- chan amqp.Delivery
72+ outboundAnnounceMsg []byte
73+ outboundRuleMsg []byte
74+ confirms chan amqp.Confirmation
7375}
7476
7577// SetupBrokerClient sets the Broker Client from NM reconcile.
@@ -90,9 +92,10 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
9092 bc .serverAddr = broker .Spec .Address
9193
9294 bc .brokerConn = & brokerConnection {}
93- bc .brokerConn .exchangeName = "DefaultPeerRequest"
95+ bc .brokerConn .announceExchangeName = "announcements_exchange"
96+ bc .brokerConn .ruleExchangeName = "rules_exchange"
9497
95- bc .brokerConn .outboundMsg , err = json .Marshal (bc .ID )
98+ bc .brokerConn .outboundAnnounceMsg , err = json .Marshal (bc .ID )
9699 if err != nil {
97100 return err
98101 }
@@ -113,13 +116,14 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
113116 }
114117
115118 // Certificates.
116- bc .clientCert = & corev1.Secret {}
117- bc .rootCert = & corev1.Secret {}
118119
119120 klog .Infof ("Root Secret Name: %s\n " , broker .Spec .CaCert )
120121 klog .Infof ("Client Secret Name: %s\n " , broker .Spec .ClCert )
121122 secretNamespace := "fluidos"
122123
124+ bc .clientCert = & corev1.Secret {}
125+ bc .rootCert = & corev1.Secret {}
126+
123127 err = bc .extractSecret (cl , broker .Spec .ClCert , secretNamespace , bc .clientCert )
124128 if err != nil {
125129 return err
@@ -160,12 +164,22 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
160164 }
161165
162166 // Routing key for topic.
163- bc .brokerConn .routingKey , err = extractCNfromCert (& clientCert )
167+ bc .brokerConn .queueName , err = extractCNfromCert (& clientCert )
164168 if err != nil {
165169 klog .Errorf ("Common Name extraction error: %v" , err )
166170 }
167- bc .brokerConn .queueName = bc .brokerConn .routingKey
168171
172+ bc .brokerConn .outboundRuleMsg , err = json .Marshal (broker .Spec .Rule )
173+ if err != nil {
174+ klog .Errorf ("Error reading rules JSON: %s" , err )
175+ }
176+
177+ // Set the metric to be sent to the broker.
178+ bc .metric = broker .Spec .Metric
179+ bc .brokerConn .outboundAnnounceMsg = nil
180+ bc .buildOutboundMessage ()
181+
182+ klog .Infof ("outbound msg: %s\n " , bc .brokerConn .outboundRuleMsg )
169183 // TLS config.
170184 tlsConfig := & tls.Config {
171185 Certificates : []tls.Certificate {cert },
@@ -174,48 +188,61 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
174188 MinVersion : tls .VersionTLS12 ,
175189 }
176190
191+ bc .clientName = bc .brokerConn .queueName
192+
177193 err = bc .brokerConnectionConfig (tlsConfig )
178194
195+ if err != nil {
196+ klog .Errorf ("Broker connection error: %v" , err )
197+ }
198+
179199 return err
180200}
181201
182202// ExecuteBrokerClient executes the Network Manager Broker routines.
183203func (bc * BrokerClient ) ExecuteBrokerClient (cl client.Client ) error {
184- // Start sending messages
204+ // Start sending announcement messages
185205 klog .Info ("executing broker client routines" )
186206 var err error
187207 if bc .pubFlag {
188208 go func () {
189- bc .publishOnBroker ()
209+ bc .publishOnBroker (bc . brokerConn . announceExchangeName , bc . brokerConn . outboundAnnounceMsg )
190210 }()
191211 }
192212
193- // Start receiving messages
213+ // Start receiving announcement messages
194214 if bc .subFlag {
195215 go func () {
196216 if err = bc .readMsgOnBroker (bc .ctx , cl ); err != nil {
197217 klog .ErrorS (err , "error receiving advertisement" )
198218 }
199219 }()
200220 }
221+
222+ // Start sending rule messages
223+ go func () {
224+ bc .publishOnBroker (bc .brokerConn .ruleExchangeName , bc .brokerConn .outboundRuleMsg )
225+ }()
226+
201227 return err
202228}
203229
204- func (bc * BrokerClient ) publishOnBroker () {
230+ func (bc * BrokerClient ) publishOnBroker (exchangeName string , message [] byte ) {
205231 ticker := time .NewTicker (10 * time .Second )
206232 for {
207233 select {
208234 case <- ticker .C :
209235
210236 // Pub on exchange
211237 err := bc .brokerConn .amqpChan .Publish (
212- bc . brokerConn . exchangeName ,
213- bc . brokerConn . routingKey ,
214- true , // Mandatory: if not routable -> error
238+ exchangeName ,
239+ "" , // routingKey
240+ false , // Mandatory: if not routable -> error
215241 false , // Immediate
216242 amqp.Publishing {
217243 ContentType : "application/json" ,
218- Body : bc .brokerConn .outboundMsg ,
244+ UserId : bc .clientName ,
245+ Body : message ,
219246 Expiration : "30000" , // TTL ms
220247 })
221248 if err != nil {
@@ -225,12 +252,12 @@ func (bc *BrokerClient) publishOnBroker() {
225252 select {
226253 case confirm := <- bc .brokerConn .confirms :
227254 if confirm .Ack {
228- klog .Info ("Message successfully published!" )
255+ klog .Info ("Message successfully published on " , exchangeName )
229256 } else {
230- klog .Info ("Message failed to publish!" )
257+ klog .Info ("Message failed to publish on " , exchangeName )
231258 }
232- case <- time .After (5 * time .Second ): // Timeout
233- klog .Info ("No confirmation received, message status unknown." )
259+ case <- time .After (15 * time .Second ): // Timeout
260+ klog .Info ("No confirmation received, message status unknown from %v" , exchangeName )
234261 }
235262
236263 case <- bc .ctx .Done ():
@@ -309,7 +336,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
309336 SASL : []amqp.Authentication {& amqp.ExternalAuth {}}, // auth EXTERNAL
310337 TLSClientConfig : tlsConfig , // config TLS
311338 Vhost : "/" , // vhost
312- Heartbeat : 10 * time .Second , // heartbeat
339+ Heartbeat : 5 * time .Second , // heartbeat
313340 }
314341
315342 // Config connection
@@ -343,17 +370,16 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
343370 return err
344371 }
345372
346- // Write confirm broker
373+ // Write confirmations
347374 if err := bc .brokerConn .amqpChan .Confirm (false ); err != nil {
348- klog .Errorf ("Failed to enable publisher confirms: %v" , err )
375+ klog .Errorf ("Failed to enable publisher confirms for Announcements : %v" , err )
349376 return err
350377 }
351378
352- // Channels for write confirm
353- bc .brokerConn .confirms = bc .brokerConn .amqpChan .NotifyPublish (make (chan amqp.Confirmation , 1 ))
354-
355- klog .InfoS ("Node" , "ID" , bc .ID .NodeID , "Client Address" , bc .ID .IP , "Server Address" , bc .serverAddr , "RoutingKey" , bc .brokerConn .routingKey )
379+ // Channels for write confirmations
380+ bc .brokerConn .confirms = bc .brokerConn .amqpChan .NotifyPublish (make (chan amqp.Confirmation , 5 ))
356381
382+ klog .InfoS ("Node" , "ID" , bc .ID .NodeID , "Client Address" , bc .ID .IP , "Server Address" , bc .serverAddr /*, "RoutingKey" , bc.brokerConn.routingKey*/ )
357383 return nil
358384}
359385
@@ -368,3 +394,25 @@ func (bc *BrokerClient) extractSecret(cl client.Client, secretName, secretNamesp
368394 }
369395 return nil
370396}
397+
398+ func (bc * BrokerClient ) buildOutboundMessage () error {
399+ var err error
400+
401+ var data map [string ]interface {}
402+ err = json .Unmarshal ([]byte (bc .metric ), & data )
403+ if err != nil {
404+ klog .Error ("Error parsing JSON in builOutboundMessage:" , err )
405+ return err
406+ }
407+
408+ data ["ip" ] = bc .ID .IP
409+ data ["domain" ] = bc .ID .Domain
410+ data ["nodeID" ] = bc .ID .NodeID
411+
412+ bc .brokerConn .outboundAnnounceMsg , err = json .Marshal (data )
413+ if err != nil {
414+ klog .Errorf ("Error reading metric JSON: %s" , err )
415+ return err
416+ }
417+ return nil
418+ }
0 commit comments