@@ -24,8 +24,8 @@ import java.util.concurrent.atomic.AtomicReference
24
24
import com .typesafe .config .Config
25
25
import kamon .module .Module .Registration
26
26
import kamon .status .Status
27
- import kamon .metric .MetricRegistry
28
- import kamon .trace .Tracer
27
+ import kamon .metric .{ MetricRegistry , PeriodSnapshot }
28
+ import kamon .trace .{ Span , Tracer }
29
29
import kamon .util .Clock
30
30
import org .slf4j .LoggerFactory
31
31
@@ -128,8 +128,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
128
128
*/
129
129
def stopModules (): Future [Unit ] = synchronized {
130
130
implicit val cleanupExecutor = ExecutionContext .Implicits .global
131
- scheduleMetricsTicker(once = true )
132
- scheduleSpansTicker(once = true )
131
+ stopReporterTickers()
133
132
134
133
var stoppedSignals : List [Future [Unit ]] = Nil
135
134
_registeredModules.dropWhile {
@@ -151,7 +150,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
151
150
* all available metric reporting modules. If a ticker was already scheduled then that scheduled job will be
152
151
* cancelled and scheduled again.
153
152
*/
154
- private def scheduleMetricsTicker (once : Boolean = false ): Unit = {
153
+ private def scheduleMetricsTicker (): Unit = {
155
154
val currentMetricsTicker = _metricsTickerSchedule.get()
156
155
if (currentMetricsTicker != null )
157
156
currentMetricsTicker.cancel(false )
@@ -171,24 +170,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
171
170
val currentInstant = Instant .now(clock)
172
171
val periodSnapshot = metricRegistry.snapshot(resetState = true )
173
172
174
- metricReporterModules().foreach { entry =>
175
- Future {
176
- try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error : Throwable =>
177
- _logger.error(s " Reporter [ ${entry.name}] failed to process a metrics tick. " , error)
178
- }
179
- }(entry.executionContext)
180
- }
181
-
173
+ metricReporterModules().foreach(entry => scheduleMetricsTick(entry, periodSnapshot))
182
174
lastInstant = currentInstant
183
175
} catch {
184
176
case NonFatal (t) => _logger.error(" Failed to run a metrics tick" , t)
185
177
}
186
178
}
187
179
188
- if (once)
189
- _metricsTickerExecutor.schedule(ticker, 0L , TimeUnit .MILLISECONDS )
190
- else
191
- _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit .MILLISECONDS )
180
+ _metricsTickerExecutor.scheduleAtFixedRate(ticker, initialDelay, interval, TimeUnit .MILLISECONDS )
192
181
}
193
182
}
194
183
@@ -197,7 +186,7 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
197
186
* all available span reporting modules. If a ticker was already scheduled then that scheduled job will be
198
187
* cancelled and scheduled again.
199
188
*/
200
- private def scheduleSpansTicker (once : Boolean = false ): Unit = {
189
+ private def scheduleSpansTicker (): Unit = {
201
190
val currentSpansTicker = _spansTickerSchedule.get()
202
191
if (currentSpansTicker != null )
203
192
currentSpansTicker.cancel(false )
@@ -208,27 +197,43 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
208
197
val ticker = new Runnable {
209
198
override def run (): Unit = try {
210
199
val spanBatch = tracer.spans()
211
-
212
- spanReporterModules().foreach { entry =>
213
- Future {
214
- try entry.module.reportSpans(spanBatch) catch { case error : Throwable =>
215
- _logger.error(s " Reporter [ ${entry.name}] failed to process a spans tick. " , error)
216
- }
217
- }(entry.executionContext)
218
- }
200
+ spanReporterModules().foreach(entry => scheduleSpansBatch(entry, spanBatch))
219
201
220
202
} catch {
221
203
case NonFatal (t) => _logger.error(" Failed to run a spans tick" , t)
222
204
}
223
205
}
224
206
225
- if (once)
226
- _spansTickerExecutor.schedule(ticker, 0L , TimeUnit .MILLISECONDS )
227
- else
228
- _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit .MILLISECONDS )
207
+ _spansTickerExecutor.scheduleAtFixedRate(ticker, interval, interval, TimeUnit .MILLISECONDS )
229
208
}
230
209
}
231
210
211
+ private def scheduleMetricsTick (entry : Entry [MetricReporter ], periodSnapshot : PeriodSnapshot ): Unit = {
212
+ Future {
213
+ try entry.module.reportPeriodSnapshot(periodSnapshot) catch { case error : Throwable =>
214
+ _logger.error(s " Reporter [ ${entry.name}] failed to process a metrics tick. " , error)
215
+ }
216
+ }(entry.executionContext)
217
+ }
218
+
219
+ private def scheduleSpansBatch (entry : Entry [SpanReporter ], spanBatch : Seq [Span .Finished ]): Unit = {
220
+ Future {
221
+ try entry.module.reportSpans(spanBatch) catch { case error : Throwable =>
222
+ _logger.error(s " Reporter [ ${entry.name}] failed to process a spans tick. " , error)
223
+ }
224
+ }(entry.executionContext)
225
+ }
226
+
227
+ private def stopReporterTickers (): Unit = {
228
+ val currentMetricsTicker = _metricsTickerSchedule.get()
229
+ if (currentMetricsTicker != null )
230
+ currentMetricsTicker.cancel(false )
231
+
232
+ val currentSpansTicker = _spansTickerSchedule.get()
233
+ if (currentSpansTicker != null )
234
+ currentSpansTicker.cancel(false )
235
+ }
236
+
232
237
private def metricReporterModules (): Iterable [Entry [MetricReporter ]] = synchronized {
233
238
_metricReporterModules.values
234
239
}
@@ -368,10 +373,14 @@ class ModuleRegistry(configuration: Configuration, clock: Clock, metricRegistry:
368
373
369
374
// Remove the module from all registries
370
375
_registeredModules = _registeredModules - entry.name
371
- if (entry.module.isInstanceOf [MetricReporter ])
376
+ if (entry.module.isInstanceOf [MetricReporter ]) {
372
377
_metricReporterModules = _metricReporterModules - entry.name
373
- if (entry.module.isInstanceOf [SpanReporter ])
378
+ scheduleMetricsTick(entry.asInstanceOf [Entry [MetricReporter ]], metricRegistry.snapshot(resetState = false ))
379
+ }
380
+ if (entry.module.isInstanceOf [SpanReporter ]) {
374
381
_spanReporterModules = _spanReporterModules - entry.name
382
+ scheduleSpansBatch(entry.asInstanceOf [Entry [SpanReporter ]], tracer.spans())
383
+ }
375
384
376
385
377
386
// Schedule a call to stop on the module
0 commit comments