@@ -56,20 +56,22 @@ type BrokerClient struct {
5656 serverAddr string
5757 clientCert * corev1.Secret
5858 rootCert * corev1.Secret
59-
59+ metric string
60+ clientName string
6061 brokerConn * brokerConnection
6162}
6263
6364// BrokerConnection keeps all the broker connection data.
6465type brokerConnection struct {
65- amqpConn * amqp.Connection
66- amqpChan * amqp.Channel
67- exchangeName string
68- routingKey string
69- queueName string
70- inboundMsgs <- chan amqp.Delivery
71- outboundMsg []byte
72- confirms chan amqp.Confirmation
66+ amqpConn * amqp.Connection
67+ amqpChan * amqp.Channel
68+ announceExchangeName string
69+ ruleExchangeName string
70+ queueName string
71+ inboundMsgs <- chan amqp.Delivery
72+ outboundAnnounceMsg []byte
73+ outboundRuleMsg []byte
74+ confirms chan amqp.Confirmation
7375}
7476
7577// SetupBrokerClient sets the Broker Client from NM reconcile.
@@ -90,9 +92,10 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
9092 bc .serverAddr = broker .Spec .Address
9193
9294 bc .brokerConn = & brokerConnection {}
93- bc .brokerConn .exchangeName = "DefaultPeerRequest"
95+ bc .brokerConn .announceExchangeName = "announcements_exchange"
96+ bc .brokerConn .ruleExchangeName = "rules_exchange"
9497
95- bc .brokerConn .outboundMsg , err = json .Marshal (bc .ID )
98+ bc .brokerConn .outboundAnnounceMsg , err = json .Marshal (bc .ID )
9699 if err != nil {
97100 return err
98101 }
@@ -113,13 +116,14 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
113116 }
114117
115118 // Certificates.
116- bc .clientCert = & corev1.Secret {}
117- bc .rootCert = & corev1.Secret {}
118119
119120 klog .Infof ("Root Secret Name: %s\n " , broker .Spec .CaCert )
120121 klog .Infof ("Client Secret Name: %s\n " , broker .Spec .ClCert )
121122 secretNamespace := "fluidos"
122123
124+ bc .clientCert = & corev1.Secret {}
125+ bc .rootCert = & corev1.Secret {}
126+
123127 err = bc .extractSecret (cl , broker .Spec .ClCert , secretNamespace , bc .clientCert )
124128 if err != nil {
125129 return err
@@ -160,12 +164,22 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
160164 }
161165
162166 // Routing key for topic.
163- bc .brokerConn .routingKey , err = extractCNfromCert (& clientCert )
167+ bc .brokerConn .queueName , err = extractCNfromCert (& clientCert )
164168 if err != nil {
165169 klog .Errorf ("Common Name extraction error: %v" , err )
166170 }
167- bc .brokerConn .queueName = bc .brokerConn .routingKey
168171
172+ bc .brokerConn .outboundRuleMsg , err = json .Marshal (broker .Spec .Rule )
173+ if err != nil {
174+ klog .Error ("Error reading rules JSON: %s" , err )
175+ }
176+
177+ // Set the metric to be sent to the broker.
178+ bc .metric = broker .Spec .Metric
179+ bc .brokerConn .outboundAnnounceMsg = nil
180+ bc .buildOutboundMessage ()
181+
182+ klog .Infof ("outbound msg: %s\n " , bc .brokerConn .outboundRuleMsg )
169183 // TLS config.
170184 tlsConfig := & tls.Config {
171185 Certificates : []tls.Certificate {cert },
@@ -174,48 +188,57 @@ func (bc *BrokerClient) SetupBrokerClient(cl client.Client, broker *networkv1alp
174188 MinVersion : tls .VersionTLS12 ,
175189 }
176190
191+ bc .clientName = bc .brokerConn .queueName
192+
177193 err = bc .brokerConnectionConfig (tlsConfig )
178194
179195 return err
180196}
181197
182198// ExecuteBrokerClient executes the Network Manager Broker routines.
183199func (bc * BrokerClient ) ExecuteBrokerClient (cl client.Client ) error {
184- // Start sending messages
200+ // Start sending announcement messages
185201 klog .Info ("executing broker client routines" )
186202 var err error
187203 if bc .pubFlag {
188204 go func () {
189- bc .publishOnBroker ()
205+ bc .publishOnBroker (bc . brokerConn . announceExchangeName , bc . brokerConn . outboundAnnounceMsg )
190206 }()
191207 }
192208
193- // Start receiving messages
209+ // Start receiving announcement messages
194210 if bc .subFlag {
195211 go func () {
196212 if err = bc .readMsgOnBroker (bc .ctx , cl ); err != nil {
197213 klog .ErrorS (err , "error receiving advertisement" )
198214 }
199215 }()
200216 }
217+
218+ //Start sending rule messages
219+ go func () {
220+ bc .publishOnBroker (bc .brokerConn .ruleExchangeName , bc .brokerConn .outboundRuleMsg ) //bc.brokerConn.amqpChanRule, bc.brokerConn.confirmsRul)
221+ }()
222+
201223 return err
202224}
203225
204- func (bc * BrokerClient ) publishOnBroker () {
226+ func (bc * BrokerClient ) publishOnBroker (exchangeName string , message [] byte ) {
205227 ticker := time .NewTicker (10 * time .Second )
206228 for {
207229 select {
208230 case <- ticker .C :
209231
210232 // Pub on exchange
211233 err := bc .brokerConn .amqpChan .Publish (
212- bc . brokerConn . exchangeName ,
213- bc . brokerConn . routingKey ,
214- true , // Mandatory: if not routable -> error
234+ exchangeName ,
235+ "" , // routingKey
236+ false , // Mandatory: if not routable -> error
215237 false , // Immediate
216238 amqp.Publishing {
217239 ContentType : "application/json" ,
218- Body : bc .brokerConn .outboundMsg ,
240+ UserId : bc .clientName , //privkey digest SHA256
241+ Body : message ,
219242 Expiration : "30000" , // TTL ms
220243 })
221244 if err != nil {
@@ -225,12 +248,12 @@ func (bc *BrokerClient) publishOnBroker() {
225248 select {
226249 case confirm := <- bc .brokerConn .confirms :
227250 if confirm .Ack {
228- klog .Info ("Message successfully published!" )
251+ klog .Info ("Message successfully published on " , exchangeName )
229252 } else {
230- klog .Info ("Message failed to publish!" )
253+ klog .Info ("Message failed to publish on " , exchangeName )
231254 }
232- case <- time .After (5 * time .Second ): // Timeout
233- klog .Info ("No confirmation received, message status unknown." )
255+ case <- time .After (15 * time .Second ): // Timeout
256+ klog .Info ("No confirmation received, message status unknown from %v" , exchangeName )
234257 }
235258
236259 case <- bc .ctx .Done ():
@@ -309,7 +332,7 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
309332 SASL : []amqp.Authentication {& amqp.ExternalAuth {}}, // auth EXTERNAL
310333 TLSClientConfig : tlsConfig , // config TLS
311334 Vhost : "/" , // vhost
312- Heartbeat : 10 * time .Second , // heartbeat
335+ Heartbeat : 5 * time .Second , // heartbeat
313336 }
314337
315338 // Config connection
@@ -343,17 +366,16 @@ func (bc *BrokerClient) brokerConnectionConfig(tlsConfig *tls.Config) error {
343366 return err
344367 }
345368
346- // Write confirm broker
369+ // Write confirmations
347370 if err := bc .brokerConn .amqpChan .Confirm (false ); err != nil {
348- klog .Errorf ("Failed to enable publisher confirms: %v" , err )
371+ klog .Errorf ("Failed to enable publisher confirms for Announcements : %v" , err )
349372 return err
350373 }
351374
352- // Channels for write confirm
353- bc .brokerConn .confirms = bc .brokerConn .amqpChan .NotifyPublish (make (chan amqp.Confirmation , 1 ))
354-
355- klog .InfoS ("Node" , "ID" , bc .ID .NodeID , "Client Address" , bc .ID .IP , "Server Address" , bc .serverAddr , "RoutingKey" , bc .brokerConn .routingKey )
375+ // Channels for write confirmations
376+ bc .brokerConn .confirms = bc .brokerConn .amqpChan .NotifyPublish (make (chan amqp.Confirmation , 5 ))
356377
378+ klog .InfoS ("Node" , "ID" , bc .ID .NodeID , "Client Address" , bc .ID .IP , "Server Address" , bc .serverAddr /*, "RoutingKey" , bc.brokerConn.routingKey*/ )
357379 return nil
358380}
359381
@@ -368,3 +390,25 @@ func (bc *BrokerClient) extractSecret(cl client.Client, secretName, secretNamesp
368390 }
369391 return nil
370392}
393+
394+ func (bc * BrokerClient ) buildOutboundMessage () error {
395+ var err error
396+
397+ var data map [string ]interface {}
398+ err = json .Unmarshal ([]byte (bc .metric ), & data )
399+ if err != nil {
400+ klog .Error ("Error parsing JSON in builOutboundMessage:" , err )
401+ return err
402+ }
403+
404+ data ["ip" ] = bc .ID .IP
405+ data ["domain" ] = bc .ID .Domain
406+ data ["nodeID" ] = bc .ID .NodeID
407+
408+ bc .brokerConn .outboundAnnounceMsg , err = json .Marshal (data )
409+ if err != nil {
410+ klog .Error ("Error reading metric JSON: %s" , err )
411+ return err
412+ }
413+ return nil
414+ }
0 commit comments