Skip to content

Commit 1abb8b3

Browse files
committed
attempt to implement
1 parent cf5c6a0 commit 1abb8b3

File tree

1 file changed

+122
-33
lines changed

1 file changed

+122
-33
lines changed

reporters/kamon-opentelemetry/src/main/scala/kamon/otel/MetricsConverter.scala

+122-33
Original file line numberDiff line numberDiff line change
@@ -112,51 +112,137 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
112112
d))
113113
}
114114

115-
private def getExpoBucketCounts(maxBucketCount: Int)(s: Snapshot[Distribution]) = {
116-
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
115+
class ItWithLast[T](it: Iterator[T], last: T) extends Iterator[T] {
116+
private var showedLast: Boolean = false
117117

118-
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
118+
def hasNext: Boolean = it.hasNext || !showedLast
119+
120+
def next(): T = if (it.hasNext) it.next() else if (!showedLast) {
121+
showedLast = true; last
122+
} else throw new RuntimeException("Next on empty Iterator")
123+
}
124+
125+
private def getExpoBucketCounts(scale: Int, maxBucketCount: Int)(s: Snapshot[Distribution]) = {
119126
val base = Math.pow(2, Math.pow(2, -scale))
120-
// lower boundary of index 0 is always 0 (inclusive) https://opentelemetry.io/blog/2023/exponential-histograms/#bucket-calculation
121-
val lowerBoundaryIterator: Iterator[Double] = (-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)).iterator
127+
val lowerBoundaryIterator: Iterator[Double] = ((-maxBucketCount to maxBucketCount).map(i => Math.pow(base, i)) :+ Double.MaxValue).iterator
128+
val valuesIterator = new ItWithLast[Distribution.Bucket](s.value.bucketsIterator, new Distribution.Bucket {
129+
def value: Long = Long.MaxValue
130+
def frequency: Long = 0
131+
})
132+
var fromLowerBound = valuesIterator.next()
133+
var fromUpperBound = valuesIterator.next()
134+
var toLowerBound = lowerBoundaryIterator.next()
135+
var toUpperBound = lowerBoundaryIterator.next()
136+
var zeroCount = 0d
137+
var countInBucket = 0d
122138

123-
val counts = ArrayBuffer.newBuilder[JLong]
124-
val foo = new ExponentialHistogramBuckets {
125-
def getOffset: Int = ???
126-
def getBucketCounts: util.List[JLong] = ???
127-
def getTotalCount: Long = ???
139+
val negativeCounts = ArrayBuffer.newBuilder[JDouble]
140+
val positiveCounts = ArrayBuffer.newBuilder[JDouble]
141+
142+
def iterFrom: Double = {
143+
val d = fromLowerBound.frequency.toDouble
144+
fromLowerBound = fromUpperBound
145+
fromUpperBound = valuesIterator.next()
146+
d
128147
}
129-
// val boundaryIterator: Iterator[JDouble] = (bucketConfiguration :+ maxDouble).iterator
130-
var nextBoundary = lowerBoundaryIterator.next()
131-
var inBucketCount = 0L
132-
for (el <- s.value.bucketsIterator) {
133-
while (el.value > nextBoundary) {
134-
nextBoundary = boundaryIterator.next()
135-
counts += inBucketCount
136-
inBucketCount = 0L
148+
149+
def iterTo: Double = {
150+
toLowerBound = toUpperBound
151+
toUpperBound = lowerBoundaryIterator.next()
152+
val res = countInBucket
153+
countInBucket = 0
154+
res
155+
}
156+
// normal case
157+
while (lowerBoundaryIterator.hasNext && valuesIterator.hasNext) {
158+
if (fromUpperBound.value <= toLowerBound) {
159+
countInBucket += iterFrom // Or drop?
160+
} else if (fromLowerBound.value >= toUpperBound) toLowerBound match {
161+
case 1 => zeroCount += iterTo
162+
case b if b < 1 => negativeCounts += iterTo
163+
case b if b > 1 => positiveCounts += iterTo
164+
} else if (fromUpperBound.value == toUpperBound) toLowerBound match {
165+
case 1 =>
166+
zeroCount += iterFrom
167+
iterTo
168+
case b if b < 1 =>
169+
countInBucket += iterFrom
170+
negativeCounts += iterTo
171+
case b if b > 1 =>
172+
countInBucket += iterFrom
173+
positiveCounts += iterTo
174+
} else if (fromUpperBound.value > toUpperBound) {
175+
val firstBonus: JDouble = countInBucket
176+
var negBuckets = 0
177+
var zeroBuckets = 0
178+
var posBuckets = 0
179+
while (fromUpperBound.value > toUpperBound && lowerBoundaryIterator.hasNext) {
180+
if (toLowerBound < 1) negBuckets += 1
181+
else if (toLowerBound == 1) zeroBuckets += 1
182+
else if (toLowerBound >= 1) posBuckets += 1
183+
toLowerBound = toUpperBound
184+
toUpperBound = lowerBoundaryIterator.next()
185+
}
186+
val totalBuckets = negBuckets + zeroBuckets + posBuckets
187+
val avg = JDouble valueOf iterFrom / totalBuckets
188+
negativeCounts ++= (if (negBuckets > 0) JDouble.valueOf(firstBonus + avg) +: Array.fill(negBuckets - 1)(avg) else Nil)
189+
zeroCount += (if (negBuckets == 0 && zeroBuckets == 1) JDouble.valueOf(firstBonus + avg) else if (zeroBuckets == 1) avg else JDouble.valueOf(0))
190+
positiveCounts ++= (
191+
if (negBuckets == 0 && zeroBuckets == 0 && posBuckets > 0)
192+
JDouble.valueOf(firstBonus + avg) +: Array.fill(posBuckets - 1)(avg)
193+
else Array.fill(posBuckets)(avg))
194+
} else /*if (fromUpperBound.value < toUpperBound) */ toLowerBound match {
195+
case 1 => zeroCount += iterFrom
196+
case _ => countInBucket += iterFrom
137197
}
138-
inBucketCount += el.frequency
139198
}
140-
while (boundaryIterator.hasNext) {
141-
counts += inBucketCount
142-
boundaryIterator.next()
143-
inBucketCount = 0L
199+
var usedLastValue = false
200+
// more buckets left to fill but only one unused value, sitting in fromLowerBound.
201+
while (lowerBoundaryIterator.hasNext) {
202+
if (fromLowerBound.value > toLowerBound && fromLowerBound.value < toUpperBound) {
203+
usedLastValue = true
204+
countInBucket += fromLowerBound.frequency
205+
}
206+
toLowerBound match {
207+
case 1 => zeroCount += iterTo
208+
case b if b < 1 => negativeCounts += iterTo
209+
case b if b > 1 => positiveCounts += iterTo
210+
}
144211
}
145-
counts += inBucketCount
146-
counts
212+
// more values left, but only one unfilled bucket, sitting in toLowerBound
213+
while (valuesIterator.hasNext) {
214+
countInBucket += iterFrom
215+
}
216+
if (!usedLastValue) countInBucket += fromLowerBound.frequency
217+
positiveCounts += countInBucket
218+
219+
val negBucket = new ExponentialHistogramBuckets {
220+
val getOffset: Int = -maxBucketCount
221+
private val doubles: ArrayBuffer[JLong] = negativeCounts.result().map(JLong valueOf _.toLong) // TODO: toLong here loses things
222+
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
223+
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
224+
}
225+
val posBucket = new ExponentialHistogramBuckets {
226+
val getOffset: Int = 1
227+
private val doubles: ArrayBuffer[JLong] = positiveCounts.result().map(JLong valueOf _.toLong) // TODO: we should normalise at avg
228+
val getBucketCounts: util.List[JLong] = new JArrayList(doubles.asJava)
229+
val getTotalCount: Long = doubles.foldLeft(0L)(_ + _)
230+
}
231+
(negBucket, zeroCount.longValue(), posBucket) // TODO: instead of having these toLongs
147232
}
148233

149-
private def toExponentialHistogramData(distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
234+
private def toExponentialHistogramData(maxBucketCount: Int, distributions: Seq[Snapshot[Distribution]]): Option[ExponentialHistogramData] =
150235
distributions.filter(_.value.buckets.nonEmpty) match {
151236
case Nil => None
152237
case nonEmpty =>
153238
val mapped = nonEmpty.flatMap { s =>
154239
s.value match {
155240
case zigZag: Distribution.ZigZagCounts =>
156-
logger.error("Unable to construct exponential histogram data - Unimplemented")
157-
None
241+
def maxScale(v: JDouble): Int = MetricsConverter.maxScale(maxBucketCount)(v)
242+
val scale = Math.min(maxScale(s.value.min.toDouble), maxScale(s.value.max.toDouble))
243+
val (neg, zero, pos) = getExpoBucketCounts(scale, maxBucketCount)(s)
158244
Some(ExponentialHistogramPointData.create(
159-
???, zigZag.sum, ???, ???, ???, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
245+
scale, zigZag.sum, zero, pos, neg, fromNs, toNs, SpanConverter.toAttributes(s.tags), new JArrayList[DoubleExemplarData]()
160246
))
161247
case _ =>
162248
logger.error("Unable to construct exponential histogram data - only ZigZagCounts distribution can be converted")
@@ -167,15 +253,17 @@ class WithResourceMetricsConverter(resource: Resource, kamonVersion: String, fro
167253
else None
168254
}
169255

170-
def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] =
171-
toExponentialHistogramData(histogram.instruments).map(d =>
256+
def convertExponentialHistogram(histogram: MetricSnapshot.Distributions): Option[MetricData] = {
257+
val maxBucketCount = expoBucketConfig(histogram.name, histogram.settings.unit)
258+
toExponentialHistogramData(maxBucketCount, histogram.instruments).map(d =>
172259
ImmutableMetricData.createExponentialHistogram(
173260
resource,
174261
instrumentationScopeInfo(histogram),
175262
histogram.name,
176263
histogram.description,
177264
toString(histogram.settings.unit),
178265
d))
266+
}
179267

180268
def convertHistogram(histogramFormat: HistogramFormat)(histogram: MetricSnapshot.Distributions): Option[MetricData] = histogramFormat match {
181269
case Explicit => convertExplicitHistogram(histogram)
@@ -220,8 +308,9 @@ private[otel] object MetricsConverter {
220308
}
221309

222310
private val bases = (maxScale to minScale by -1).map(scale => (scale, Math.pow(2, Math.pow(2, -scale)))).toArray
311+
223312
def maxScale(maxBucketCount: Int)(v: JDouble): Int = {
224-
if (v >= 1) bases.collectFirst{ case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale}.getOrElse(minScale)
225-
else bases.collectFirst{ case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale}.getOrElse(minScale)
313+
if (v >= 1) bases.collectFirst { case (scale, base) if Math.pow(base, maxBucketCount) >= v => scale }.getOrElse(minScale)
314+
else bases.collectFirst { case (scale, base) if Math.pow(base, -maxBucketCount) <= v => scale }.getOrElse(minScale)
226315
}
227316
}

0 commit comments

Comments
 (0)