Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 24 additions & 1 deletion vuu/src/main/scala/org/finos/vuu/core/module/DefaultModule.scala
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package org.finos.vuu.core.module

import scala.language.implicitConversions

class FieldDefString(str: String) {
def double(): String = {
str + ":Double"
Expand All @@ -25,9 +27,30 @@ class FieldDefString(str: String) {
def string(): String = {
str + ":String"
}

def epochTimestamp(): String = {
str + ":EpochTimestamp"
}

def scaledDecimal2(): String = {
str + ":ScaledDecimal2"
}

def scaledDecimal4(): String = {
str + ":ScaledDecimal4"
}

def scaledDecimal6(): String = {
str + ":ScaledDecimal6"
}

def scaledDecimal8(): String = {
str + ":ScaledDecimal8"
}

}

abstract class DefaultModule {
//pimped string impl for field definition
//advanced string impl for field definition
implicit def stringToFieldDef(s: String): FieldDefString = new FieldDefString(s)
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class MetricsJVMProvider(table: DataTable, viewPortContainer: ViewPortContainer)
metrics: MetricsProvider) extends Provider with StrictLogging {

private val runner = new LifeCycleRunner("MetricsJVMProviderThread", () => runOnce(), minCycleTime = 2_000)
private val bytesToMiB = 1.0 / (1024 * 1024)

lifecycleContainer(this).dependsOn(runner)

Expand All @@ -30,16 +31,16 @@ class MetricsJVMProvider(table: DataTable, viewPortContainer: ViewPortContainer)

override val lifecycleId: String = "MetricsJVMProvider"

def toMb(bytes: Long): Double = {
(bytes.toDouble / 1024) / 1024
private def toMb(bytes: Long): Double = {
bytes * bytesToMiB
}

def buildMachineCores(): Map[String, Any] = {
private def buildMachineCores(): Map[String, Any] = {
val processors = Runtime.getRuntime.availableProcessors
Map("cpu-cores" -> processors)
}

def buildHeapData(heap: MemoryUsage): Map[String, Any] = {
private def buildHeapData(heap: MemoryUsage): Map[String, Any] = {
Map(
"mem-type" -> "heap",
"max_MB" -> toMb(heap.getMax),
Expand All @@ -49,7 +50,7 @@ class MetricsJVMProvider(table: DataTable, viewPortContainer: ViewPortContainer)
)
}

def buildNonHeapData(nonheap: MemoryUsage): Map[String, Any] = {
private def buildNonHeapData(nonheap: MemoryUsage): Map[String, Any] = {
Map(
"mem-type" -> "nonheap",
"max_MB" -> toMb(nonheap.getMax),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.finos.vuu.core.module.metrics

import org.finos.vuu.api.{Indices, TableDef}
import org.finos.vuu.core.module.{DefaultModule, ModuleFactory, TableDefContainer, ViewServerModule}
import org.finos.vuu.core.table.Columns
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.{Indices, TableDef}
import org.finos.vuu.core.module.metrics.MetricsSchema.MetricsTree.all_columns
import org.finos.vuu.core.module.{DefaultModule, ModuleFactory, TableDefContainer, ViewServerModule}
import org.finos.vuu.core.table.Columns

object MetricsSchema{

Expand Down Expand Up @@ -92,7 +92,8 @@ object MetricsModule extends DefaultModule {
TableDef(
name = "metricsViewports",
keyField = "id",
columns = Columns.fromNames("id".string(), "table".string(), "structureHash".int(), "updateCount".long(), "keyBuildCount".long(), "mean".double(), "max".double(), "75Perc".double(), "99Perc".double(), "99_9Perc".double()),
columns = Columns.fromNames("id".string(), "table".string(), "structureHash".int(), "updateCount".long(),
"keyBuildCount".long(), "mean".scaledDecimal2(), "max".long(), "75Perc".scaledDecimal4(), "99Perc".scaledDecimal6(), "99_9Perc".scaledDecimal8()),
indices = Indices(),
joinFields = "id"
),
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package org.finos.vuu.core.module.metrics

import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.table.{DataTable, RowWithData, TableContainer}
import org.finos.vuu.provider.Provider
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.LifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table.{DataTable, RowWithData, TableContainer}
import org.finos.vuu.provider.Provider
import org.finos.vuu.viewport.ViewPortTable

class MetricsTableProvider(table: DataTable, tableContainer: TableContainer)(implicit clock: Clock, lifecycleContainer: LifecycleContainer,
Expand Down Expand Up @@ -34,7 +34,7 @@ class MetricsTableProvider(table: DataTable, tableContainer: TableContainer)(imp
table.processUpdate(vpTable.table, RowWithData(vpTable.table, getMetricsData(vpTable)))
)
} catch {
case e: Exception => logger.error("Error occured in metrics", e)
case e: Exception => logger.error("Error occurred in metrics", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.thread.LifeCycleRunner
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.table.datatype.{Scale, ScaledDecimal}
import org.finos.vuu.core.table.{DataTable, RowWithData}
import org.finos.vuu.provider.Provider
import org.finos.vuu.viewport.ViewPortContainer
Expand Down Expand Up @@ -46,8 +47,17 @@ class MetricsViewPortProvider(table: DataTable, viewPortContainer: ViewPortConta
val snapshot = histogram.getSnapshot
val vp = viewPortContainer.getViewPorts.find(f => f.id == key).orNull
if (vp != null) {
val upMap = Map("id" -> key, "table" -> vp.table.name, "mean" -> snapshot.getMean, "max" -> snapshot.getMax, "structureHash" -> vp.getStructuralHashCode(), "updateCount" -> vp.getTableUpdateCount(),
"keyBuildCount" -> vp.keyBuildCount, "75Perc" -> snapshot.get75thPercentile(), "99Perc" -> snapshot.get99thPercentile(), "99_9Perc" -> snapshot.get999thPercentile()
val upMap = Map(
"id" -> key,
"table" -> vp.table.name,
"mean" -> ScaledDecimal(snapshot.getMean, Scale.Two),
"max" -> snapshot.getMax,
"structureHash" -> vp.getStructuralHashCode(),
"updateCount" -> vp.getTableUpdateCount(),
"keyBuildCount" -> vp.keyBuildCount,
"75Perc" -> ScaledDecimal(snapshot.get75thPercentile(), Scale.Four),
"99Perc" -> ScaledDecimal(snapshot.get99thPercentile(), Scale.Six),
"99_9Perc" -> ScaledDecimal(snapshot.get999thPercentile(), Scale.Eight)
)
table.processUpdate(key, RowWithData(key, upMap))
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,33 @@ import java.math.BigDecimal as JBigDecimal
* that are preserved when converting to a long-based representation.
* * @param precision The number of decimal places (e.g., 2 for Scale.Two).
*/
enum Scale(val precision: Int) {
case Two extends Scale(2)
case Four extends Scale(4)
case Six extends Scale(6)
case Eight extends Scale(8)
enum Scale[T <: ScaledDecimal](val precision: Int, val createFunction: Long => T) {

case Two extends Scale[ScaledDecimal2](2, ScaledDecimal2.apply)
case Four extends Scale[ScaledDecimal4](4, ScaledDecimal4.apply)
case Six extends Scale[ScaledDecimal6](6, ScaledDecimal6.apply)
case Eight extends Scale[ScaledDecimal8](8, ScaledDecimal8.apply)

/**
* Creates a [[ScaledDecimal]] instance by shifting the decimal point of the
* input value and truncating to a Long.
* * @param value The Java BigDecimal to convert.
* @return A specialized ScaledDecimal implementation (e.g., [[ScaledDecimal2]]).
*/
def create(value: JBigDecimal): ScaledDecimal = this match
case Two => ScaledDecimal2(value.movePointRight(precision).longValue())
case Four => ScaledDecimal4(value.movePointRight(precision).longValue())
case Six => ScaledDecimal6(value.movePointRight(precision).longValue())
case Eight => ScaledDecimal8(value.movePointRight(precision).longValue())
def create(value: JBigDecimal): T = {
createFunction(value.movePointRight(precision).longValue())
}

}

/**
* Static constants for [[Scale]] to provide idiomatic access for Java callers.
*/
object Scale {
val TWO: Scale = Scale.Two
val FOUR: Scale = Scale.Four
val SIX: Scale = Scale.Six
val EIGHT: Scale = Scale.Eight
val TWO: Scale[ScaledDecimal2] = Scale.Two
val FOUR: Scale[ScaledDecimal4] = Scale.Four
val SIX: Scale[ScaledDecimal6] = Scale.Six
val EIGHT: Scale[ScaledDecimal8] = Scale.Eight
}

/**
Expand All @@ -52,19 +52,30 @@ sealed trait ScaledDecimal {
*/
object ScaledDecimal {

/**
* Creates a ScaledDecimal from a Double.
* @param value The value to scale.
* @param scale The target precision.
*/
def apply[T <: ScaledDecimal](value: Double, scale: Scale[T]): T =
scale.create(JBigDecimal.valueOf(value))

/**
* Creates a ScaledDecimal from a Scala BigDecimal.
* @param value The value to scale.
* @param scale The target precision.
*/
def apply(value: BigDecimal, scale: Scale): ScaledDecimal = scale.create(value.underlying())
def apply[T <: ScaledDecimal](value: BigDecimal, scale: Scale[T]): T =
scale.create(value.underlying())

/**
* Creates a ScaledDecimal from a Java BigDecimal.
* @param value The value to scale.
* @param scale The target precision.
*/
def apply(value: JBigDecimal, scale: Scale): ScaledDecimal = scale.create(value)
def apply[T <: ScaledDecimal](value: JBigDecimal, scale: Scale[T]): T =
scale.create(value)

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class RowUpdateSerializer extends JsonSerializer[RowUpdate] with StrictLogging
case (s: String, _) => gen.writeString(s)
case (i: Int, _) => gen.writeNumber(i)
case (d: Double, _) => gen.writeNumber(d)
case (l: Long, _) => gen.writeNumber(l)
case (l: Long, _) => gen.writeString(l.toString)
case (b: Boolean, _) => gen.writeBoolean(b)
case (c: Char, _) => gen.writeString(c.toString)
case (epoch: EpochTimestamp, _) => gen.writeNumber(epoch.millis)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,22 @@ class ScaledDecimalTest extends AnyFeatureSpec with Matchers with GivenWhenThen
result.scaledValue shouldBe expected
}
}

Scenario("Handling Double inputs") {
val javaTestCases = Table(
("input", "scale", "expectedValue"),
(10.50d, Scale.Two, 1050L),
(1.1234d, Scale.Four, 11234L),
(0.123456d, Scale.Six, 123456L),
(0.01234567d, Scale.Eight, 1234567L),
)

forAll(javaTestCases) { (jVal, scale, expected) =>
val result = ScaledDecimal(jVal, scale)
result.scaledValue shouldBe expected
}
}

}

Feature("Comparison and Ordering") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,9 @@ class RowUpdateSerializerTest extends AnyFeatureSpec with Matchers with StrictLo
"bar",
1,
BigInt(123), //Unsupported by Vuu
EpochTimestamp(456),
ScaledDecimal2(567)
EpochTimestamp(456L),
ScaledDecimal2(567L),
678L,
)
)

Expand All @@ -41,7 +42,7 @@ class RowUpdateSerializerTest extends AnyFeatureSpec with Matchers with StrictLo

serialized shouldEqual "{\"viewPortId\":\"Vp1\",\"vpSize\":1,\"rowIndex\":0,\"rowKey\":\":KEY1\"," +
"\"updateType\":\"U\",\"ts\":100,\"sel\":0,\"vpVersion\":\"Request1\"," +
"\"data\":[\"foo\",\"bar\",1,\"\",456,\"567\"]}"
"\"data\":[\"foo\",\"bar\",1,\"\",456,\"567\",\"678\"]}"

val deserialized = mapper.readValue(serialized, classOf[RowUpdate])

Expand All @@ -54,13 +55,14 @@ class RowUpdateSerializerTest extends AnyFeatureSpec with Matchers with StrictLo
deserialized.ts shouldEqual rowUpdate.ts
deserialized.selected shouldEqual rowUpdate.selected

deserialized.data.length shouldEqual 6
deserialized.data.length shouldEqual 7
deserialized.data(0) shouldEqual "foo"
deserialized.data(1) shouldEqual "bar"
deserialized.data(2) shouldEqual "1"
deserialized.data(3) shouldEqual ""
deserialized.data(4) shouldEqual "456"
deserialized.data(5) shouldEqual "567"
deserialized.data(6) shouldEqual "678"
}

}
Expand Down