Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 74 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/sort/RowDataComparator.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package org.finos.vuu.core.sort

import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.core.sort.SortDirection.Ascending
import org.finos.vuu.core.table.{Column, DataType, RowData}

trait RowDataComparator extends java.util.Comparator[RowData]

object RowDataComparator extends StrictLogging {

def apply(columns: List[Column], sortDirections: List[SortDirection]): RowDataComparator = {
val comparators = columns
.lazyZip(sortDirections)
.map((col, dir) => buildColumnComparator(col, dir == Ascending))
.toArray
RowDataComparatorImpl(comparators)
}

private def buildColumnComparator(column: Column, isAscending: Boolean): (RowData, RowData) => Int = {
column.dataType match {
case DataType.StringDataType =>
(o1: RowData, o2: RowData) => compareString(o1.get(column), o2.get(column), isAscending)
case DataType.LongDataType | DataType.IntegerDataType | DataType.DoubleDataType |
DataType.BooleanDataType | DataType.CharDataType | DataType.EpochTimestampType |
DataType.ScaledDecimal2Type | DataType.ScaledDecimal4Type |
DataType.ScaledDecimal6Type | DataType.ScaledDecimal8Type =>
(o1: RowData, o2: RowData) => compareComparable(o1.get(column), o2.get(column), isAscending)
case _ =>
logger.warn(s"Unable to sort datatype ${column.dataType}")
(_, _) => 0
}
}

private def compareComparable(v1: Any, v2: Any, isAscending: Boolean): Int = {
val c1 = v1.asInstanceOf[Comparable[AnyRef]]
val c2 = v2.asInstanceOf[AnyRef]

if (c1 eq c2) 0
else if (c1 == null) if (isAscending) 1 else -1
else if (c2 == null) if (isAscending) -1 else 1
else {
val res = c1.compareTo(c2)
if (isAscending) res else -res
}
}

private def compareString(v1: Any, v2: Any, isAscending: Boolean): Int = {
val c1 = v1.asInstanceOf[String]
val c2 = v2.asInstanceOf[String]

if (c1 eq c2) 0
else if (c1 == null) if (isAscending) 1 else -1
else if (c2 == null) if (isAscending) -1 else 1
else {
val res = c1.compareToIgnoreCase(c2)
if (isAscending) res else -res
}
}

}

case class RowDataComparatorImpl(comparators: Array[(RowData, RowData) => Int]) extends RowDataComparator {

override def compare(o1: RowData, o2: RowData): Int = {
var i = 0
var result = 0
while (i < comparators.length && result == 0) {
result = comparators(i)(o1, o2)
i += 1
}
result
}

}
43 changes: 43 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/sort/SortBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package org.finos.vuu.core.sort

import org.finos.vuu.core.table.RowWithData

import java.util

object SortBuffer {

private val threadLocalSortBuffer = new ThreadLocal[Array[RowWithData]]()
private val maxRetainedSize = 5_000_000

/**
* Gets or grows a thread-local array.
*/
def borrow(requiredCapacity: Int): Array[RowWithData] = {
val existing = threadLocalSortBuffer.get()

if (existing == null || existing.length < requiredCapacity) {
val newSize = if (existing == null) requiredCapacity
else Math.max(requiredCapacity, (existing.length * 1.5).toInt)

val newArray = new Array[RowWithData](newSize)
threadLocalSortBuffer.set(newArray)
newArray
} else {
existing
}
}

/**
* Nulls out references to prevent memory leaks.
*/
def release(array: Array[RowWithData], size: Int): Unit = {
if (array != null) {
util.Arrays.fill(array.asInstanceOf[Array[AnyRef]], 0, size, null)
if (array.length > maxRetainedSize) {
threadLocalSortBuffer.remove()
}
}
}
}


108 changes: 0 additions & 108 deletions vuu/src/main/scala/org/finos/vuu/core/sort/SortCompares.scala

This file was deleted.

39 changes: 21 additions & 18 deletions vuu/src/main/scala/org/finos/vuu/core/sort/Sorts.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package org.finos.vuu.core.sort
import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.collection.array.ImmutableArray
import org.finos.toolbox.time.TimeIt.timeIt
import org.finos.vuu.core.table.{Column, RowData, RowWithData, TablePrimaryKeys}
import org.finos.vuu.core.table.{Column, RowWithData, TablePrimaryKeys}
import org.finos.vuu.feature.inmem.InMemTablePrimaryKeys
import org.finos.vuu.net.SortSpec
import org.finos.vuu.viewport.{RowSource, ViewPortColumns}
Expand All @@ -15,7 +15,11 @@ trait Sort {
}

object Sort {
def apply(spec: SortSpec, columns: List[Column]): Sort = GenericSort2(spec, columns)
def apply(spec: SortSpec, columns: List[Column]): Sort = {
val sortDirections = spec.sortDefs.map(sd => SortDirection.fromExternal(sd.sortType))
val comparator = RowDataComparator.apply(columns, sortDirections)
GenericSort2(comparator)
}
}

object NoSort extends Sort {
Expand All @@ -24,58 +28,57 @@ object NoSort extends Sort {
}
}

private case class GenericSort2(spec: SortSpec, columns: List[Column]) extends Sort with StrictLogging {

private val sortDirections = spec.sortDefs.map(sd => SortDirection.fromExternal(sd.sortType))
private val comparator = new java.util.Comparator[RowData] {
override def compare(o1: RowData, o2: RowData): Int =
SortCompares.compare(o1, o2, columns, sortDirections, 0)
}
private case class GenericSort2(rowDataComparator: RowDataComparator) extends Sort with StrictLogging {

override def doSort(source: RowSource, primaryKeys: TablePrimaryKeys, vpColumns: ViewPortColumns): TablePrimaryKeys = {

//This has been repeatedly benchmarked using JMH. If you touch this, do a before and after run of SortBenchmark

val buffer = SortBuffer.borrow(primaryKeys.length)

logger.trace("Starting map")

val (millisToArray, snapshotAndCount) = timeIt {
createSnapshot(source, primaryKeys, vpColumns)
val (millisToArray, count) = timeIt {
createSnapshot(source, primaryKeys, vpColumns, buffer)
}

logger.trace("Starting sort")

val (millisSort, _ ) = timeIt {
util.Arrays.sort(snapshotAndCount._1, 0, snapshotAndCount._2, comparator)
util.Arrays.sort(buffer, 0, count, rowDataComparator)
}

logger.trace("Starting build imm arr")

val (millisImmArray, immutableArray) = timeIt {
createKeyArray(snapshotAndCount._1, snapshotAndCount._2)
createKeyArray(buffer, count)
}

logger.debug(s"[SORT]: Table Size: ${primaryKeys.length} DataToArray: ${millisToArray}ms, Sort: ${millisSort}ms, ImmutArr: ${millisImmArray}ms")


SortBuffer.release(buffer, primaryKeys.length)

InMemTablePrimaryKeys(immutableArray)
}

private def createSnapshot(source: RowSource, primaryKeys: TablePrimaryKeys, vpColumns: ViewPortColumns): (Array[RowWithData], Int) = {
private def createSnapshot(source: RowSource, primaryKeys: TablePrimaryKeys,
vpColumns: ViewPortColumns,
sortBuffer: Array[RowWithData]): Int = {
val length = primaryKeys.length
val rowDataArray = new Array[RowWithData](length)
var index = 0
var count = 0

while (index < length) {
val key = primaryKeys.get(index)
source.pullRow(key, vpColumns) match {
case r: RowWithData =>
rowDataArray(count) = r
sortBuffer(count) = r
count += 1
case _ =>
}
index += 1
}
(rowDataArray, count)
count
}

private def createKeyArray(snapshot: Array[RowWithData], length: Int): ImmutableArray[String] = {
Expand Down
Loading
Loading