Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class BenchmarkHelper {
private final Clock clock = new DefaultClock();
private final LifecycleContainer lifecycleContainer = new LifecycleContainer(clock);
private final MetricsProvider metricsProvider = new MetricsProviderImpl();
private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(clock, lifecycleContainer, metricsProvider);
private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(lifecycleContainer);

public Clock getClock() {
return clock;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ public static void main( String[] args )
.withViewPortThreads(4),
VuuClientConnectionOptions.apply()
.withHeartbeat(),
VuuJoinTableProviderOptions.apply(),
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
new scala.collection.mutable.ListBuffer<Plugin>().toList()
).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.finos.toolbox.time.Clock;
import org.finos.toolbox.time.DefaultClock;
import org.finos.vuu.core.table.TableContainer;
import org.finos.vuu.provider.VuuJoinTableProvider;
import org.finos.vuu.provider.JoinTableProviderImpl;
import org.finos.vuu.util.ScalaCollectionConverter;
import org.junit.Test;
import scala.Option;
Expand All @@ -24,7 +24,7 @@ public void should_register_java_function_as_rpc_in_default_handler() {
Clock clock = new DefaultClock();
LifecycleContainer lifecycleContainer = new LifecycleContainer(clock);
MetricsProvider metricsProvider = new MetricsProviderImpl();
TableContainer tableContainer = new TableContainer(new VuuJoinTableProvider(clock, lifecycleContainer, metricsProvider), metricsProvider, clock);
TableContainer tableContainer = new TableContainer(JoinTableProviderImpl.apply(lifecycleContainer), metricsProvider, clock);
final DefaultRpcHandler defaultRpcHandler = new DefaultRpcHandler(tableContainer);
defaultRpcHandler.registerRpc("helloWorld", rpcService::rpcFunction);

Expand Down
8 changes: 4 additions & 4 deletions vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,16 @@ import org.finos.vuu.api.{JoinTableDef, TableDef, ViewPortDef}
import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, StaticServedResource, TableDefContainer, ViewServerModule}
import org.finos.vuu.core.table.{DataTable, TableContainer}
import org.finos.vuu.feature.inmem.VuuInMemPlugin
import org.finos.vuu.net._
import org.finos.vuu.net.flowcontrol.FlowControllerFactory
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server}
import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer}
import org.finos.vuu.net.rest.RestService
import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler}
import org.finos.vuu.net.rpc.JsonSubTypeRegistry
import org.finos.vuu.net.ws.WebSocketServer
import org.finos.vuu.net.{Authenticator, ClientSessionContainerImpl, LoginTokenValidator, MessageBody, ViewServerHandlerFactoryImpl}
import org.finos.vuu.plugin.PluginRegistry
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, Provider, ProviderContainer}
import org.finos.vuu.viewport._
import org.finos.vuu.viewport.{InMemViewPortTreeCallable, InMemViewPortTreeWorkItem, ViewPort, ViewPortAction, ViewPortActionMixin, ViewPortContainer}

import java.util.concurrent.{Callable, FutureTask}

Expand All @@ -42,7 +42,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,

final val sessionContainer = new ClientSessionContainerImpl()

final val joinProvider: JoinTableProvider = JoinTableProviderImpl()
final val joinProvider: JoinTableProvider = JoinTableProviderImpl(config.joinProvider)

final val tableContainer = new TableContainer(joinProvider)

Expand Down
18 changes: 18 additions & 0 deletions vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ object VuuClientConnectionOptions {
def apply(): VuuClientConnectionOptions = {
VuuClientConnectionOptionsImpl(true)
}
}

object VuuJoinTableProviderOptions {
def apply() : VuuJoinTableProviderOptions = {
VuuJoinProviderOptionsImpl.apply(batchSize = 100, maxQueueSize = 20_000)
}
}

trait VuuSSLCipherSuiteOptions {
Expand Down Expand Up @@ -77,6 +82,13 @@ trait VuuClientConnectionOptions {
def withHeartbeatDisabled(): VuuClientConnectionOptions
}

trait VuuJoinTableProviderOptions {
def batchSize: Int
def maxQueueSize: Int
def withBatchSize(maxQueueDepth: Int): VuuJoinTableProviderOptions
def withMaxQueueDepth(maxQueueDepth: Int): VuuJoinTableProviderOptions
}

case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{
override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator)
override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator)
Expand Down Expand Up @@ -114,11 +126,17 @@ case class VuuSSLCipherSuiteOptionsImpl(ciphers: List[String], protocols: List[S
override def withProtocols(protocols: List[String]): VuuSSLCipherSuiteOptions = this.copy(protocols = protocols)
}

case class VuuJoinProviderOptionsImpl(batchSize: Int, maxQueueSize: Int) extends VuuJoinTableProviderOptions {
override def withBatchSize(batchSize: Int): VuuJoinTableProviderOptions = this.copy(batchSize = batchSize)
override def withMaxQueueDepth(maxQueueSize: Int): VuuJoinTableProviderOptions = this.copy(maxQueueSize = maxQueueSize)
}

case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(),
wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(),
security: VuuSecurityOptions = VuuSecurityOptions(),
threading: VuuThreadingOptions = VuuThreadingOptions(),
clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(),
joinProvider: VuuJoinTableProviderOptions = VuuJoinTableProviderOptions(),
modules: List[ViewServerModule] = List(),
plugins: List[Plugin] = List()) {
def withModule(module: ViewServerModule): VuuServerConfig = {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
package org.finos.vuu.provider

import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.core.VuuJoinTableProviderOptions

object JoinTableProviderImpl {

def apply()(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider): JoinTableProvider = {
new VuuJoinTableProvider()
def apply()(implicit lifecycleContainer: LifecycleContainer): JoinTableProvider = {
apply(VuuJoinTableProviderOptions())
}

def apply(vuuJoinTableProviderOptions: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer): JoinTableProvider = {
new VuuJoinTableProvider(vuuJoinTableProviderOptions)
}

}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package org.finos.vuu.provider

import com.typesafe.scalalogging.StrictLogging
import org.finos.toolbox.jmx.MetricsProvider
import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.Clock
import org.finos.vuu.api.{JoinTableDef, TableDef}
import org.finos.vuu.core.VuuJoinTableProviderOptions
import org.finos.vuu.core.table.{DataTable, JoinTable, JoinTableUpdate, RowWithData}
import org.finos.vuu.provider.join.{JoinDefToJoinTable, JoinManagerEventDataSink, JoinRelations, RightToLeftKeys}

Expand Down Expand Up @@ -45,11 +44,11 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
* },
* }
*/
class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider) extends JoinTableProvider with StrictLogging {
class VuuJoinTableProvider(options: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer) extends JoinTableProvider with StrictLogging {

lifecycle(this)

private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](20000)
private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](options.maxQueueSize)
private val joinRelations = new JoinRelations()
private val joinSink = new JoinManagerEventDataSink()
private val rightToLeftKeys = new RightToLeftKeys()
Expand Down Expand Up @@ -121,7 +120,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon
logger.debug("[JoinTableProvider] Submitting joint table event:" + jtu)

//get the processing off the join thread
outboundQueue.offer(jtu)
outboundQueue.put(jtu)
}

def eventToRightKey(joinTableDef: JoinTableDef, tableName: String, ev: util.HashMap[String, Any], rightColumn: String): String = {
Expand Down Expand Up @@ -252,7 +251,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon
}

override def runOnce(): Unit = {
val updates = new java.util.ArrayList[JoinTableUpdate](100)
val updates = new java.util.ArrayList[JoinTableUpdate](options.batchSize)

outboundQueue.drainTo(updates) match {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.{Clock, TestFriendlyClock}
import org.finos.vuu.api.TableDef
import org.finos.vuu.core.table.{Columns, InMemDataTable}
import org.finos.vuu.provider.VuuJoinTableProvider
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider}
import org.scalatest.GivenWhenThen
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -30,7 +30,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{
implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl

val tableDef = getTableDef
val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) =
(
Expand Down Expand Up @@ -65,7 +65,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{
implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl

val tableDef = getTableDef
val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) =
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
import org.finos.vuu.api._
import org.finos.vuu.core.table.join.JoinAsserts._
import org.finos.vuu.core.table.{Columns, KeyObserver, RowKeyUpdate, TableContainer}
import org.finos.vuu.provider.{JoinTableProvider, VuuJoinTableProvider}
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider}
import org.finos.vuu.viewport.ViewPortSetup
import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
import org.finos.toolbox.lifecycle.LifecycleContainer
Expand Down Expand Up @@ -206,7 +206,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi

implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrdersDef()

Expand Down Expand Up @@ -258,7 +258,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
Scenario("Left Outer Join, Right key update") {
implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrdersDef()

Expand Down Expand Up @@ -307,7 +307,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi

implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrders2Def()
val pricesDef = mkePricesDef()
Expand Down Expand Up @@ -359,7 +359,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi

implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrdersDef()

Expand Down Expand Up @@ -415,7 +415,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
Scenario("Left Outer Join, Delete Right Record") {
implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider : JoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider : JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrdersDef()

Expand Down Expand Up @@ -465,7 +465,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
Scenario("Left Outer Join of Joins") {
implicit val lifecycle: LifecycleContainer = new LifecycleContainer

implicit val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()

val ordersDef = mkeOrdersDef()
val pricesDef = mkePricesDef()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.{Clock, DefaultClock, TestFriendlyClock}
import org.finos.vuu.api._
import org.finos.vuu.client.messages.RequestId
import org.finos.vuu.core.table.DefaultColumnNames.{CreatedTimeColumnName, LastUpdatedTimeColumnName, allDefaultColumns}
import org.finos.vuu.core.VuuJoinProviderOptionsImpl
import org.finos.vuu.core.table.DefaultColumnNames.{CreatedTimeColumnName, LastUpdatedTimeColumnName}
import org.finos.vuu.core.table._
import org.finos.vuu.feature.inmem.VuuInMemPlugin
import org.finos.vuu.net.ClientSessionId
Expand Down Expand Up @@ -126,6 +127,86 @@ class JoinTableTest extends AnyFeatureSpec with Matchers with ViewPortSetup {
updates.filter( vp => vp.vpUpdate == RowUpdateType).foreach(update => update.table.readRow(update.key.key, List("orderId", "trader", "tradeTime", "ric", "bid", "ask"), printToConsoleProcessor ))
}

Scenario("check large number of ticks all the way through from source to join table"){

implicit val lifecycle: LifecycleContainer = new LifecycleContainer

val dateTime: Long = LocalDateTime.of(2015, 7, 24, 11, 0).atZone(ZoneId.of("Europe/London")).toInstant.toEpochMilli

val ordersDef = TableDef(
name = "orders",
keyField = "orderId",
columns = Columns.fromNames("orderId:String", "trader:String", "ric:String", "tradeTime:Long", "quantity:Double"),
joinFields = "ric", "orderId")

val pricesDef = TableDef("prices", "ric", Columns.fromNames("ric:String", "bid:Double", "ask:Double", "last:Double", "open:Double", "close:Double"), "ric")

val joinDef = JoinTableDef(
name = "orderPrices",
baseTable = ordersDef,
joinColumns = Columns.allFrom(ordersDef) ++ Columns.allFromExcept(pricesDef, "ric"),
joins =
JoinTo(
table = pricesDef,
joinSpec = JoinSpec( left = "ric", right = "ric", LeftOuterJoin)
),
links = VisualLinks(),
joinFields = Seq()
)

val batchSize = 10
val maxQueueSize = 100
val joinProvider = JoinTableProviderImpl(VuuJoinProviderOptionsImpl.apply(batchSize = batchSize, maxQueueSize = maxQueueSize))

val tableContainer = new TableContainer(joinProvider)

val orders = tableContainer.createTable(ordersDef)
val prices = tableContainer.createTable(pricesDef)
val orderPrices = tableContainer.createJoinTable(joinDef)

val ordersProvider = new MockProvider(orders)
val pricesProvider = new MockProvider(prices)

val providerContainer = new ProviderContainer(joinProvider)

val viewPortContainer = setupViewPort(tableContainer, providerContainer)

joinProvider.start()

pricesProvider.tick("VOD.L", Map("ric" -> "VOD.L", "bid" -> 220.0, "ask" -> 222.0))

pricesProvider.tick("BT.L", Map("ric" -> "BT.L", "bid" -> 500.0, "ask" -> 501.0))

for (quantity <- 1 to maxQueueSize + 1) {
ordersProvider.tick("NYC-0001", Map("orderId" -> "NYC-0001", "trader" -> "chris", "tradeTime" -> dateTime, "quantity" -> quantity, "ric" -> "VOD.L"))
ordersProvider.tick("NYC-0002", Map("orderId" -> "NYC-0002", "trader" -> "chris", "tradeTime" -> dateTime, "quantity" -> quantity, "ric" -> "BT.L"))
if (quantity % batchSize == 0) {
joinProvider.runOnce()
}
}

joinProvider.runOnce()

val session = ClientSessionId("sess-01", "chris")

val outQueue = new OutboundRowPublishQueue()

val vpcolumns = ViewPortColumnCreator.create(orderPrices, List("orderId", "trader", "tradeTime", "quantity", "ric"))

val viewPort = viewPortContainer.create(RequestId.oneNew(), session, outQueue, orderPrices, DefaultRange, vpcolumns)

viewPortContainer.runOnce()

assertVpEq(filterByVpId(combineQs(viewPort), viewPort)){
Table(
("orderId" ,"trader" ,"tradeTime" ,"quantity" , "ric" ),
("NYC-0001","chris" ,1437732000000L ,maxQueueSize + 1 , "VOD.L" ),
("NYC-0002","chris" ,1437732000000L ,maxQueueSize + 1 , "BT.L" )
)
}

}

Scenario("check that registering and deregistering listeners on join table propagates to source tables"){

implicit val lifecycle: LifecycleContainer = new LifecycleContainer
Expand Down Expand Up @@ -176,7 +257,7 @@ class JoinTableTest extends AnyFeatureSpec with Matchers with ViewPortSetup {
prices.isKeyObservedBy("VOD.L", ko2) should be (true)
}

Scenario("Check deleting keys from join table and see if it propogates correctly"){
Scenario("Check deleting keys from join table and see if it propagates correctly"){

implicit val lifecycle: LifecycleContainer = new LifecycleContainer

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
import org.finos.toolbox.time.{Clock, DefaultClock}
import org.finos.vuu.core.table.TableContainer
import org.finos.vuu.net.{ClientSessionId, RequestContext}
import org.finos.vuu.provider.VuuJoinTableProvider
import org.finos.vuu.provider.JoinTableProviderImpl
import org.scalatest.BeforeAndAfterEach
import org.scalatest.featurespec.AnyFeatureSpec
import org.scalatest.matchers.should.Matchers
Expand All @@ -18,7 +18,7 @@ class DefaultRpcHandlerTest extends AnyFeatureSpec with Matchers with BeforeAndA
implicit val clock: Clock = new DefaultClock
implicit val lifecycleContainer: LifecycleContainer = new LifecycleContainer
implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl
implicit val tableContainer: TableContainer = new TableContainer(new VuuJoinTableProvider)
implicit val tableContainer: TableContainer = new TableContainer(JoinTableProviderImpl())
handler = new DefaultRpcHandler
}

Expand Down
Loading