26
26
#include "thread-utils.h"
27
27
#include "apphook.h"
28
28
#include "messages.h"
29
+ #include "timeutils/misc.h"
29
30
30
31
#include <stdio.h>
31
32
32
33
#define PUBLISH_TIMEOUT 10000L
34
+ #define YIELD_INTERVAL_MSEC 500
35
+
36
+ static void
37
+ _start_yield_timer (MQTTDestinationWorker * self )
38
+ {
39
+ if (iv_timer_registered (& self -> yield_timer ))
40
+ iv_timer_unregister (& self -> yield_timer );
41
+
42
+ iv_validate_now ();
43
+ self -> yield_timer .expires = iv_now ;
44
+ timespec_add_msec (& self -> yield_timer .expires , YIELD_INTERVAL_MSEC );
45
+
46
+ iv_timer_register (& self -> yield_timer );
47
+ }
48
+
49
+ static void
50
+ _yield_mqtt (void * cookie )
51
+ {
52
+ MQTTDestinationWorker * self = cookie ;
53
+
54
+ MQTTClient_yield ();
55
+
56
+ _start_yield_timer (self );
57
+ }
33
58
34
59
static LogThreadedResult
35
60
_publish_result_evaluation (LogThreadedDestWorker * self , gint result )
@@ -234,6 +259,12 @@ _init(LogThreadedDestWorker *s)
234
259
return FALSE;
235
260
}
236
261
262
+ IV_TIMER_INIT (& self -> yield_timer );
263
+ self -> yield_timer .cookie = self ;
264
+ self -> yield_timer .handler = _yield_mqtt ;
265
+
266
+ _start_yield_timer (self );
267
+
237
268
return log_threaded_dest_worker_init_method (s );
238
269
}
239
270
@@ -242,6 +273,9 @@ _deinit(LogThreadedDestWorker *s)
242
273
{
243
274
MQTTDestinationWorker * self = (MQTTDestinationWorker * )s ;
244
275
276
+ if (iv_timer_registered (& self -> yield_timer ))
277
+ iv_timer_unregister (& self -> yield_timer );
278
+
245
279
MQTTClient_destroy (& self -> client );
246
280
247
281
log_threaded_dest_worker_deinit_method (s );
0 commit comments