Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
157 changes: 157 additions & 0 deletions plugin/off-heap-table-plugin/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.finos.vuu</groupId>
<artifactId>plugin</artifactId>
<version>0.17.0-SNAPSHOT</version>
</parent>

<groupId>org.finos.vuu.plugin</groupId>
<artifactId>off-heap-table-plugin</artifactId>
<name>${project.artifactId}</name>

<dependencies>
<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>vuu</artifactId>
<version>0.17.0-SNAPSHOT</version>
</dependency>

<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala3-library_3</artifactId>
<version>${scala.version}</version>
</dependency>

<dependency>
<groupId>org.finos.vuu</groupId>
<artifactId>vuu</artifactId>
<version>0.17.0-SNAPSHOT</version>
<classifier>tests</classifier>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_3</artifactId>
<version>${scalatest.version}</version>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.scala-lang</groupId>
<artifactId>scala3-library_3</artifactId>
</exclusion>
</exclusions>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>

<plugins>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-source-plugin</artifactId>
<executions>
<execution>
<phase>compile</phase>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-release-plugin</artifactId>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<id>attach-javadocs</id>
<goals>
<goal>jar</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<sourceDir>src/main/scala</sourceDir>
<testSourceDir>src/test/scala</testSourceDir>
<args>
<arg>-deprecation</arg>
</args>
<jvmArgs>
<jvmArg>-Xms64m</jvmArg>
<jvmArg>-Xmx1024m</jvmArg>
</jvmArgs>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
</execution>
</executions>
</plugin>

<!-- disable surefire -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<!--skipTests>true</skipTests-->
</configuration>
</plugin>
<!-- enable scalatest -->
<plugin>
<groupId>org.scalatest</groupId>
<artifactId>scalatest-maven-plugin</artifactId>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<junitxml>.</junitxml>
<filereports>test-reports.txt</filereports>
</configuration>
<executions>
<execution>
<id>test</id>
<goals>
<goal>test</goal>
</goals>
</execution>
</executions>
</plugin>

</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package org.finos.vuu.plugin.offheap;

import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Arrays;

public class MemoryMappedBuffer implements AutoCloseable {

private static final long SEGMENT_SIZE = 1L << 30; //1GB
private final MappedByteBuffer[] segments;
private final long totalCapacity;

public MemoryMappedBuffer(String filePath, long capacity) throws IOException {
this.totalCapacity = capacity;
int numSegments = (int) ((capacity + SEGMENT_SIZE - 1) / SEGMENT_SIZE);
this.segments = new MappedByteBuffer[numSegments];

try (var raf = new RandomAccessFile(filePath, "rw");
var channel = raf.getChannel()) {

for (int i = 0; i < numSegments; i++) {
long position = i * SEGMENT_SIZE;
long size = Math.min(SEGMENT_SIZE, capacity - position);
segments[i] = channel.map(FileChannel.MapMode.READ_WRITE, position, size);
}
}
}

public void put(long index, byte b) {
int segmentIndex = (int) (index / SEGMENT_SIZE);
int segmentOffset = (int) (index % SEGMENT_SIZE);
segments[segmentIndex].put(segmentOffset, b);
}

public byte get(long index) {
int segmentIndex = (int) (index / SEGMENT_SIZE);
int segmentOffset = (int) (index % SEGMENT_SIZE);
return segments[segmentIndex].get(segmentOffset);
}

public long capacity() {
return totalCapacity;
}

@Override
public void close() {
Arrays.fill(segments, null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package org.finos.vuu.plugin.offheap

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.table.TableContainer
import org.finos.vuu.feature.{FilterFactory, JoinTableFactory, SessionTableFactory, SortFactory, TableFactory, TableFeature, ViewPortCallableFactory, ViewPortFactory, ViewPortKeysCreator, ViewPortTableCreator, ViewPortTreeCallableFactory}
import org.finos.vuu.plugin.offheap.table.OffHeapDataTable
import org.finos.vuu.plugin.{DefaultPlugin, PluginType}
import org.finos.vuu.provider.JoinTableProvider

object OffHeapTablePlugin extends DefaultPlugin {

registerFeature(TableFeature)

override def tableFactory(implicit metrics: MetricsProvider): TableFactory = (tableDef: TableDef, tableContainer: TableContainer, joinTableProvider: JoinTableProvider) => {
given clock: Clock = tableContainer.timeProvider
val table = new OffHeapDataTable(tableDef, joinTableProvider)
tableContainer.addTable(table)
table
}

override def pluginType: PluginType = OffHeapTablePluginType

override def joinTableFactory(implicit metrics: MetricsProvider, timeProvider: Clock): JoinTableFactory = ???

override def sessionTableFactory: SessionTableFactory = ???

override def viewPortKeysCreator: ViewPortKeysCreator = ???

override def viewPortFactory: ViewPortFactory = ???

override def filterFactory: FilterFactory = ???

override def sortFactory: SortFactory = ???

override def viewPortCallableFactory: ViewPortCallableFactory = ???

override def viewPortTreeCallableFactory: ViewPortTreeCallableFactory = ???

override def viewPortTableCreator: ViewPortTableCreator = ???
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package org.finos.vuu.plugin.offheap

import org.finos.vuu.plugin.PluginType

object OffHeapTablePluginType extends PluginType {

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package org.finos.vuu.plugin.offheap.store

class BufferToMapAdaptor {




}
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package org.finos.vuu.plugin.offheap.store

import java.nio.{ByteBuffer, ByteOrder}
import java.nio.charset.StandardCharsets
import scala.collection.immutable.SortedMap

object FastStore {
// Type Tags (1 byte)
private val T_SHORT: Byte = 0
private val T_INT: Byte = 1
private val T_LONG: Byte = 2
private val T_CHAR: Byte = 3
private val T_STRING: Byte = 4

// Header entry is 6 bytes: [Key: 1] + [Offset: 4] + [Tag: 1]
private val ENTRY_SIZE = 6

/**
* Packs data into a read-only ByteBuffer.
* Entries are sorted by key to enable O(log N) lookup.
*/
def pack(data: SortedMap[Byte, Any], target: ByteBuffer): Unit = {
val entryCount = data.size
val headerSize = 2 + (entryCount * ENTRY_SIZE)

// Skip header for now, we will write it last once we know offsets
target.position(headerSize)

val index = data.map { case (key, value) =>
val offset = target.position()
val tag = value match {
case v: Short => target.putShort(v); T_SHORT
case v: Int => target.putInt(v); T_INT
case v: Long => target.putLong(v); T_LONG
case v: Char => target.putChar(v); T_CHAR
case v: String =>
val bytes = v.getBytes(StandardCharsets.UTF_8)
putVarInt(bytes.length, target)
target.put(bytes)
T_STRING
case _ => throw new IllegalArgumentException(s"Unsupported type: ${value.getClass}")
}
(key, offset, tag)
}

// Write the actual Header at the beginning
val finalSize = target.position()
target.position(0)
target.putShort(entryCount.toShort)

index.foreach { case (key, offset, tag) =>
target.putShort(key)
target.putInt(offset)
target.put(tag)
}

target.position(finalSize)
target.flip() // Prepare for reading
}

/**
* Performs a Binary Search on the buffer header to find a key.
* Zero-allocation for primitive returns (except Strings).
*/
def lookup(buf: ByteBuffer, targetKey: Byte): Any = {
val count = buf.getShort(0)
var low = 0
var high = count - 1

while (low <= high) {
val mid = (low + high) / 2
val pos = 2 + (mid * ENTRY_SIZE)
val key = buf.get(pos)

if (key == targetKey) {
val offset = buf.getInt(pos + 1)
val tag = buf.get(pos + 5)
return readValue(buf, offset, tag)
} else if (key < targetKey) {
low = mid + 1
} else {
high = mid - 1
}
}
null
}

private def readValue(buf: ByteBuffer, offset: Int, tag: Byte): Any = {
buf.position(offset)
tag match {
case T_SHORT => buf.getShort()
case T_INT => buf.getInt()
case T_LONG => buf.getLong()
case T_CHAR => buf.getChar()
case T_STRING =>
val len = getVarInt(buf)
val bytes = new Array[Byte](len)
buf.get(bytes)
new String(bytes, StandardCharsets.UTF_8)
}
}

private def putVarInt(value: Int, buf: ByteBuffer): Unit = {
var v = value
while ((v & ~0x7F) != 0) {
buf.put(((v & 0x7F) | 0x80).toByte)
v >>>= 7
}
buf.put(v.toByte)
}

private def getVarInt(buf: ByteBuffer): Int = {
var b = buf.get()
var value = b & 0x7F
var shift = 7
while ((b & 0x80) != 0) {
b = buf.get()
value |= (b & 0x7F) << shift
shift += 7
}
value
}
}
Loading