Skip to content

Commit 403cab8

Browse files
committed
seems to work
1 parent 1abb8b3 commit 403cab8

File tree

2 files changed

+85
-38
lines changed

2 files changed

+85
-38
lines changed

reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala

+36-36
Original file line numberDiff line numberDiff line change
@@ -118,7 +118,8 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
118118
def hasNext: Boolean = it.hasNext || !showedLast
119119

120120
def next(): T = if (it.hasNext) it.next() else if (!showedLast) {
121-
showedLast = true; last
121+
showedLast = true
122+
last
122123
} else throw new RuntimeException("Next on empty Iterator")
123124
}
124125

@@ -127,26 +128,27 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
127128
val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator
128129
val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket {
129130
def value: Long = Long.MaxValue
131+
130132
def frequency: Long = 0
131133
})
132134
var fromLowerBound = valuesIterator.next()
133135
var fromUpperBound = valuesIterator.next()
134136
var toLowerBound = lowerBoundaryIterator.next()
135137
var toUpperBound = lowerBoundaryIterator.next()
136-
var zeroCount = 0d
137-
var countInBucket = 0d
138+
var zeroCount: JLong = 0L
139+
var countInBucket = 0L
138140

139-
val negativeCounts = ArrayBuffer.newBuilder[JDouble]
140-
val positiveCounts = ArrayBuffer.newBuilder[JDouble]
141+
val negativeCounts = ArrayBuffer.newBuilder[JLong]
142+
val positiveCounts = ArrayBuffer.newBuilder[JLong]
141143

142-
def iterFrom: Double = {
143-
val d = fromLowerBound.frequency.toDouble
144+
def iterFrom: JLong = {
145+
val d = fromLowerBound.frequency
144146
fromLowerBound = fromUpperBound
145147
fromUpperBound = valuesIterator.next()
146148
d
147149
}
148150

149-
def iterTo: Double = {
151+
def iterTo: JLong = {
150152
toLowerBound = toUpperBound
151153
toUpperBound = lowerBoundaryIterator.next()
152154
val res = countInBucket
@@ -172,7 +174,7 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
172174
countInBucket += iterFrom
173175
positiveCounts += iterTo
174176
} else if (fromUpperBound.value > toUpperBound) {
175-
val firstBonus: JDouble = countInBucket
177+
val firstBonus: JLong = countInBucket
176178
var negBuckets = 0
177179
var zeroBuckets = 0
178180
var posBuckets = 0
@@ -183,14 +185,16 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
183185
toLowerBound = toUpperBound
184186
toUpperBound = lowerBoundaryIterator.next()
185187
}
186-
val totalBuckets = negBuckets + zeroBuckets + posBuckets
187-
val avg = JDouble valueOf iterFrom / totalBuckets
188-
negativeCounts ++= (if (negBuckets > 0) JDouble.valueOf(firstBonus + avg) +: Array.fill(negBuckets - 1)(avg) else Nil)
189-
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JDouble.valueOf(firstBonus + avg) else if (zeroBuckets == 1) avg else JDouble.valueOf(0))
188+
val total = iterFrom
189+
// Not sure about this... everything's going into the first bucket, even though we might be spanning multiple target buckets.
190+
// Might be better to do something like push the avg.floor into each bucket, interpolating the remainder.
191+
// OTOH it may not really come up much in practice, since the internal histos are likely to have similar or finer granularity
192+
negativeCounts ++= (if (negBuckets > 0) JLong.valueOf(firstBonus + total) +: Array.fill(negBuckets - 1)(JLong.valueOf(0)) else Nil)
193+
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JLong.valueOf(firstBonus + total) else JLong.valueOf(0))
190194
positiveCounts ++= (
191195
if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0)
192-
JDouble.valueOf(firstBonus + avg) +: Array.fill(posBuckets - 1)(avg)
193-
else Array.fill(posBuckets)(avg))
196+
JLong.valueOf(firstBonus + total) +: Array.fill(posBuckets - 1)(JLong.valueOf(0))
197+
else Array.fill(posBuckets)(JLong.valueOf(0)))
194198
} else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match {
195199
case 1 => zeroCount += iterFrom
196200
case _ => countInBucket += iterFrom
@@ -216,38 +220,34 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
216220
if (!usedLastValue) countInBucket += fromLowerBound.frequency
217221
positiveCounts += countInBucket
218222

219-
val negBucket = new ExponentialHistogramBuckets {
223+
val negBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
220224
val getOffset: Int = -maxBucketCount
221-
private val doubles: ArrayBuffer[JLong] = negativeCounts.result().map(JLong valueOf _.toLong) // TODO: toLong here loses things
222-
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
223-
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
225+
private val longs: ArrayBuffer[JLong] = negativeCounts.result()
226+
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
227+
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
224228
}
225-
val posBucket = new ExponentialHistogramBuckets {
229+
val posBucket: ExponentialHistogramBuckets = new ExponentialHistogramBuckets {
226230
val getOffset: Int = 1
227-
private val doubles: ArrayBuffer[JLong] = positiveCounts.result().map(JLong valueOf _.toLong) // TODO: we should normalise at avg
228-
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
229-
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
231+
private val longs: ArrayBuffer[JLong] = positiveCounts.result()
232+
val getBucketCounts: util.List[JLong] = new JArrayList(longs.asJava)
233+
val getTotalCount: Long = longs.foldLeft(0L)(_ + _)
230234
}
231-
(negBucket, zeroCount.longValue(), posBucket) // TODO: instead of having these toLongs
235+
(negBucket, zeroCount, posBucket)
232236
}
233237

234238
private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
235239
distributions.filter(_.value.buckets.nonEmpty) match {
236240
case Nil => None
237241
case nonEmpty =>
238242
val mapped = nonEmpty.flatMap { s =>
239-
s.value match {
240-
case zigZag: Distribution.ZigZagCounts =>
241-
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
242-
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
243-
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
244-
Some(ExponentialHistogramPointData.create(
245-
scale, zigZag.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
246-
))
247-
case _ =>
248-
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
249-
None
250-
}
243+
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
244+
245+
// Could also calculate an 'offset' here, but defaulting to offset = 1 for simplicity
246+
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
247+
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
248+
Some(ExponentialHistogramPointData.create(
249+
scale, s.value.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
250+
))
251251
}
252252
if (mapped.nonEmpty) Some(ImmutableExponentialHistogramData.create(AggregationTemporality.DELTA, mapped.asJava))
253253
else None

reporters/kamon-opentelemetry/src/test/scala/kamon/otel/OpenTelemetryMetricReporterSpec.scala

+49-2
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,10 @@
1616

1717
package kamon.otel
1818

19+
import com.typesafe.config.{Config, ConfigValue, ConfigValueFactory}
1920
import io.opentelemetry.api.common.AttributeKey
2021
import io.opentelemetry.sdk.metrics.data.MetricData
22+
import io.opentelemetry.sdk.metrics.internal.data.exponentialhistogram.ExponentialHistogramData
2123
import kamon.Kamon
2224
import kamon.Kamon.config
2325
import kamon.metric._
@@ -37,10 +39,10 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec
3739
with Matchers with Reconfigure {
3840
reconfigure =>
3941

40-
private def openTelemetryMetricsReporter(): (OpenTelemetryMetricsReporter, MockMetricsService) = {
42+
private def openTelemetryMetricsReporter(newConfig: Config = config): (OpenTelemetryMetricsReporter, MockMetricsService) = {
4143
val metricsService = new MockMetricsService()
4244
val reporter = new OpenTelemetryMetricsReporter(_ => metricsService)(ExecutionContext.global)
43-
reporter.reconfigure(config)
45+
reporter.reconfigure(newConfig)
4446
(reporter, metricsService)
4547
}
4648

@@ -148,6 +150,51 @@ class OpenTelemetryMetricReporterSpec extends AnyWordSpec
148150
points.head.getBoundaries.asScala shouldEqual Seq[JDouble](1d, 2d, 3d, 4d, 10d)
149151
points.head.getCounts.asScala shouldEqual Seq[JDouble](2d, 2d, 3d, 0d, 1d, 1d)
150152
}
153+
"send exponential histogram metrics" in {
154+
val newConfig = config.withValue("kamon.otel.metrics.histogram-format", ConfigValueFactory.fromAnyRef("base2_exponential_bucket_histogram"))
155+
val (reporter, mockService) = openTelemetryMetricsReporter(newConfig)
156+
val now = Instant.now()
157+
reporter.reportPeriodSnapshot(
158+
PeriodSnapshot.apply(
159+
now.minusMillis(1000),
160+
now,
161+
Nil,
162+
Nil,
163+
MetricSnapshot.ofDistributions(
164+
"test.histogram",
165+
"test",
166+
Metric.Settings.ForDistributionInstrument(MeasurementUnit.none, java.time.Duration.ZERO, DynamicRange.Default),
167+
Instrument.Snapshot(
168+
TagSet.from(Map("tag1" -> "value1")),
169+
buildHistogramDist(Seq(1L -> 2L, 2L -> 2L, 3L -> 3L, 5L -> 1L, 15L -> 1L))
170+
) :: Nil) :: Nil,
171+
Nil,
172+
Nil
173+
)
174+
)
175+
// basic sanity
176+
mockService.exportMetricsServiceRequest should not be empty
177+
mockService.exportMetricsServiceRequest.get should have size 1
178+
val exportedMetrics: Seq[MetricData] = mockService.exportMetricsServiceRequest.get.asScala.toSeq
179+
exportedMetrics should have size 1
180+
val metricData = exportedMetrics.head
181+
182+
183+
// check value
184+
metricData.getName should equal("test.histogram")
185+
metricData.getDescription should equal("test")
186+
val sumData = ExponentialHistogramData.fromMetricData(metricData)
187+
val points = sumData.getPoints.asScala.toSeq
188+
points should have size 1
189+
points.head.getAttributes should have size 1
190+
points.head.getAttributes.get(AttributeKey.stringKey("tag1")) should equal("value1")
191+
points.head.getScale shouldEqual 5
192+
points.head.getNegativeBuckets.getTotalCount shouldEqual 0L
193+
points.head.getZeroCount shouldEqual 2L
194+
points.head.getPositiveBuckets.getTotalCount shouldEqual 7L
195+
points.head.getSum shouldEqual 35L
196+
points.head.getCount shouldEqual 9L
197+
}
151198

152199
"calculate sensible scales for values" in {
153200
def randomDouble = Random.nextInt(10) match {

0 commit comments

Comments
 (0)