11use std:: pin:: Pin ;
22
3- use crate :: config:: RabbitMQConfig ;
4- use crate :: easyamqp:: {
5- BindQueueConfig , ChannelExt , ConsumeConfig , ConsumerExt , ExchangeConfig , ExchangeType ,
6- QueueConfig ,
7- } ;
8- use crate :: notifyworker:: { NotificationReceiver , SimpleNotifyWorker } ;
9- use crate :: ofborg;
10- use crate :: worker:: { Action , SimpleWorker } ;
11-
123use async_std:: future:: Future ;
134use async_std:: stream:: StreamExt ;
145use async_std:: task;
@@ -21,6 +12,16 @@ use lapin::types::{AMQPValue, FieldTable};
2112use lapin:: { BasicProperties , Channel , Connection , ConnectionProperties , ExchangeKind } ;
2213use tracing:: { debug, trace} ;
2314
15+ use crate :: config:: RabbitMQConfig ;
16+ use crate :: easyamqp:: {
17+ BindQueueConfig , ChannelExt , ConsumeConfig , ConsumerExt , ExchangeConfig , ExchangeType ,
18+ QueueConfig ,
19+ } ;
20+ use crate :: metrics;
21+ use crate :: notifyworker:: { NotificationReceiver , SimpleNotifyWorker } ;
22+ use crate :: ofborg;
23+ use crate :: worker:: { Action , SimpleWorker } ;
24+
2425pub fn from_config ( cfg : & RabbitMQConfig ) -> Result < Connection , lapin:: Error > {
2526 let mut props = FieldTable :: default ( ) ;
2627 props. insert (
@@ -166,6 +167,8 @@ impl<'a, W: SimpleNotifyWorker + 'a> ConsumerExt<'a, W> for NotifyChannel {
166167 Ok ( Box :: pin ( async move {
167168 while let Some ( Ok ( deliver) ) = consumer. next ( ) . await {
168169 debug ! ( ?deliver. delivery_tag, "consumed delivery" ) ;
170+ metrics:: JOBS_RECEIVED . inc ( ) ;
171+
169172 let mut receiver = ChannelNotificationReceiver {
170173 channel : & mut chan,
171174 deliver : & deliver,
0 commit comments