diff --git a/benchmark/src/main/java/org/finos/vuu/benchmark/BenchmarkHelper.java b/benchmark/src/main/java/org/finos/vuu/benchmark/BenchmarkHelper.java index b802eeeac..e06319057 100644 --- a/benchmark/src/main/java/org/finos/vuu/benchmark/BenchmarkHelper.java +++ b/benchmark/src/main/java/org/finos/vuu/benchmark/BenchmarkHelper.java @@ -32,7 +32,7 @@ public class BenchmarkHelper { private final Clock clock = new DefaultClock(); private final LifecycleContainer lifecycleContainer = new LifecycleContainer(clock); private final MetricsProvider metricsProvider = new MetricsProviderImpl(); - private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(clock, lifecycleContainer, metricsProvider); + private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(lifecycleContainer); public Clock getClock() { return clock; diff --git a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java index 6d87b9952..fd87d2b6b 100644 --- a/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java +++ b/example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java @@ -71,6 +71,7 @@ public static void main( String[] args ) .withViewPortThreads(4), VuuClientConnectionOptions.apply() .withHeartbeat(), + VuuJoinTableProviderOptions.apply(), new scala.collection.mutable.ListBuffer().toList(), new scala.collection.mutable.ListBuffer().toList() ).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer)) diff --git a/vuu-java/src/test/java/org/finos/vuu/net/rpc/RpcMethodHandlerTest.java b/vuu-java/src/test/java/org/finos/vuu/net/rpc/RpcMethodHandlerTest.java index 7333d1ab1..6db5e86ad 100644 --- a/vuu-java/src/test/java/org/finos/vuu/net/rpc/RpcMethodHandlerTest.java +++ b/vuu-java/src/test/java/org/finos/vuu/net/rpc/RpcMethodHandlerTest.java @@ -7,7 +7,7 @@ import org.finos.toolbox.time.Clock; import org.finos.toolbox.time.DefaultClock; import org.finos.vuu.core.table.TableContainer; -import org.finos.vuu.provider.VuuJoinTableProvider; +import org.finos.vuu.provider.JoinTableProviderImpl; import org.finos.vuu.util.ScalaCollectionConverter; import org.junit.Test; import scala.Option; @@ -24,7 +24,7 @@ public void should_register_java_function_as_rpc_in_default_handler() { Clock clock = new DefaultClock(); LifecycleContainer lifecycleContainer = new LifecycleContainer(clock); MetricsProvider metricsProvider = new MetricsProviderImpl(); - TableContainer tableContainer = new TableContainer(new VuuJoinTableProvider(clock, lifecycleContainer, metricsProvider), metricsProvider, clock); + TableContainer tableContainer = new TableContainer(JoinTableProviderImpl.apply(lifecycleContainer), metricsProvider, clock); final DefaultRpcHandler defaultRpcHandler = new DefaultRpcHandler(tableContainer); defaultRpcHandler.registerRpc("helloWorld", rpcService::rpcFunction); diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala index 05dcd6069..50aeb227b 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala @@ -9,16 +9,16 @@ import org.finos.vuu.api.{JoinTableDef, TableDef, ViewPortDef} import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, StaticServedResource, TableDefContainer, ViewServerModule} import org.finos.vuu.core.table.{DataTable, TableContainer} import org.finos.vuu.feature.inmem.VuuInMemPlugin -import org.finos.vuu.net._ import org.finos.vuu.net.flowcontrol.FlowControllerFactory import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server} import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer} import org.finos.vuu.net.rest.RestService -import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler} +import org.finos.vuu.net.rpc.JsonSubTypeRegistry import org.finos.vuu.net.ws.WebSocketServer +import org.finos.vuu.net.{Authenticator, ClientSessionContainerImpl, LoginTokenValidator, MessageBody, ViewServerHandlerFactoryImpl} import org.finos.vuu.plugin.PluginRegistry import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, Provider, ProviderContainer} -import org.finos.vuu.viewport._ +import org.finos.vuu.viewport.{InMemViewPortTreeCallable, InMemViewPortTreeWorkItem, ViewPort, ViewPortAction, ViewPortActionMixin, ViewPortContainer} import java.util.concurrent.{Callable, FutureTask} @@ -42,7 +42,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer, final val sessionContainer = new ClientSessionContainerImpl() - final val joinProvider: JoinTableProvider = JoinTableProviderImpl() + final val joinProvider: JoinTableProvider = JoinTableProviderImpl(config.joinProvider) final val tableContainer = new TableContainer(joinProvider) diff --git a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala index 7410f01f8..30b088733 100644 --- a/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala +++ b/vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala @@ -28,7 +28,12 @@ object VuuClientConnectionOptions { def apply(): VuuClientConnectionOptions = { VuuClientConnectionOptionsImpl(true) } +} +object VuuJoinTableProviderOptions { + def apply() : VuuJoinTableProviderOptions = { + VuuJoinProviderOptionsImpl.apply(batchSize = 100, maxQueueSize = 20_000) + } } trait VuuSSLCipherSuiteOptions { @@ -77,6 +82,13 @@ trait VuuClientConnectionOptions { def withHeartbeatDisabled(): VuuClientConnectionOptions } +trait VuuJoinTableProviderOptions { + def batchSize: Int + def maxQueueSize: Int + def withBatchSize(maxQueueDepth: Int): VuuJoinTableProviderOptions + def withMaxQueueDepth(maxQueueDepth: Int): VuuJoinTableProviderOptions +} + case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{ override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator) override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator) @@ -114,11 +126,17 @@ case class VuuSSLCipherSuiteOptionsImpl(ciphers: List[String], protocols: List[S override def withProtocols(protocols: List[String]): VuuSSLCipherSuiteOptions = this.copy(protocols = protocols) } +case class VuuJoinProviderOptionsImpl(batchSize: Int, maxQueueSize: Int) extends VuuJoinTableProviderOptions { + override def withBatchSize(batchSize: Int): VuuJoinTableProviderOptions = this.copy(batchSize = batchSize) + override def withMaxQueueDepth(maxQueueSize: Int): VuuJoinTableProviderOptions = this.copy(maxQueueSize = maxQueueSize) +} + case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(), threading: VuuThreadingOptions = VuuThreadingOptions(), clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(), + joinProvider: VuuJoinTableProviderOptions = VuuJoinTableProviderOptions(), modules: List[ViewServerModule] = List(), plugins: List[Plugin] = List()) { def withModule(module: ViewServerModule): VuuServerConfig = { diff --git a/vuu/src/main/scala/org/finos/vuu/provider/JoinTableProviderImpl.scala b/vuu/src/main/scala/org/finos/vuu/provider/JoinTableProviderImpl.scala index 2cbe2187c..9960af704 100644 --- a/vuu/src/main/scala/org/finos/vuu/provider/JoinTableProviderImpl.scala +++ b/vuu/src/main/scala/org/finos/vuu/provider/JoinTableProviderImpl.scala @@ -1,13 +1,16 @@ package org.finos.vuu.provider -import org.finos.toolbox.jmx.MetricsProvider import org.finos.toolbox.lifecycle.LifecycleContainer -import org.finos.toolbox.time.Clock +import org.finos.vuu.core.VuuJoinTableProviderOptions object JoinTableProviderImpl { - def apply()(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider): JoinTableProvider = { - new VuuJoinTableProvider() + def apply()(implicit lifecycleContainer: LifecycleContainer): JoinTableProvider = { + apply(VuuJoinTableProviderOptions()) + } + + def apply(vuuJoinTableProviderOptions: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer): JoinTableProvider = { + new VuuJoinTableProvider(vuuJoinTableProviderOptions) } } diff --git a/vuu/src/main/scala/org/finos/vuu/provider/VuuJoinTableProvider.scala b/vuu/src/main/scala/org/finos/vuu/provider/VuuJoinTableProvider.scala index 64bf4a02c..df318a329 100644 --- a/vuu/src/main/scala/org/finos/vuu/provider/VuuJoinTableProvider.scala +++ b/vuu/src/main/scala/org/finos/vuu/provider/VuuJoinTableProvider.scala @@ -1,10 +1,9 @@ package org.finos.vuu.provider import com.typesafe.scalalogging.StrictLogging -import org.finos.toolbox.jmx.MetricsProvider import org.finos.toolbox.lifecycle.LifecycleContainer -import org.finos.toolbox.time.Clock import org.finos.vuu.api.{JoinTableDef, TableDef} +import org.finos.vuu.core.VuuJoinTableProviderOptions import org.finos.vuu.core.table.{DataTable, JoinTable, JoinTableUpdate, RowWithData} import org.finos.vuu.provider.join.{JoinDefToJoinTable, JoinManagerEventDataSink, JoinRelations, RightToLeftKeys} @@ -45,11 +44,11 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap} * }, * } */ -class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider) extends JoinTableProvider with StrictLogging { +class VuuJoinTableProvider(options: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer) extends JoinTableProvider with StrictLogging { lifecycle(this) - private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](20000) + private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](options.maxQueueSize) private val joinRelations = new JoinRelations() private val joinSink = new JoinManagerEventDataSink() private val rightToLeftKeys = new RightToLeftKeys() @@ -121,7 +120,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon logger.debug("[JoinTableProvider] Submitting joint table event:" + jtu) //get the processing off the join thread - outboundQueue.offer(jtu) + outboundQueue.put(jtu) } def eventToRightKey(joinTableDef: JoinTableDef, tableName: String, ev: util.HashMap[String, Any], rightColumn: String): String = { @@ -252,7 +251,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon } override def runOnce(): Unit = { - val updates = new java.util.ArrayList[JoinTableUpdate](100) + val updates = new java.util.ArrayList[JoinTableUpdate](options.batchSize) outboundQueue.drainTo(updates) match { diff --git a/vuu/src/test/scala/org/finos/vuu/core/row/RowBuilderTest.scala b/vuu/src/test/scala/org/finos/vuu/core/row/RowBuilderTest.scala index 5a00b80c0..5e11e314b 100644 --- a/vuu/src/test/scala/org/finos/vuu/core/row/RowBuilderTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/core/row/RowBuilderTest.scala @@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.{Clock, TestFriendlyClock} import org.finos.vuu.api.TableDef import org.finos.vuu.core.table.{Columns, InMemDataTable} -import org.finos.vuu.provider.VuuJoinTableProvider +import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider} import org.scalatest.GivenWhenThen import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers @@ -30,7 +30,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{ implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl val tableDef = getTableDef - val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider() + val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) = ( @@ -65,7 +65,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{ implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl val tableDef = getTableDef - val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider() + val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) = ( diff --git a/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinManagerTest.scala b/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinManagerTest.scala index 7cab02ed0..ea354da69 100644 --- a/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinManagerTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinManagerTest.scala @@ -4,7 +4,7 @@ import com.typesafe.scalalogging.StrictLogging import org.finos.vuu.api._ import org.finos.vuu.core.table.join.JoinAsserts._ import org.finos.vuu.core.table.{Columns, KeyObserver, RowKeyUpdate, TableContainer} -import org.finos.vuu.provider.{JoinTableProvider, VuuJoinTableProvider} +import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider} import org.finos.vuu.viewport.ViewPortSetup import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl} import org.finos.toolbox.lifecycle.LifecycleContainer @@ -206,7 +206,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrdersDef() @@ -258,7 +258,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi Scenario("Left Outer Join, Right key update") { implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrdersDef() @@ -307,7 +307,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrders2Def() val pricesDef = mkePricesDef() @@ -359,7 +359,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrdersDef() @@ -415,7 +415,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi Scenario("Left Outer Join, Delete Right Record") { implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider : JoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider : JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrdersDef() @@ -465,7 +465,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi Scenario("Left Outer Join of Joins") { implicit val lifecycle: LifecycleContainer = new LifecycleContainer - implicit val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider() + implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl() val ordersDef = mkeOrdersDef() val pricesDef = mkePricesDef() diff --git a/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinTableTest.scala b/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinTableTest.scala index 569021b96..a39b88fbb 100644 --- a/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinTableTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/core/table/join/JoinTableTest.scala @@ -5,7 +5,8 @@ import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.{Clock, DefaultClock, TestFriendlyClock} import org.finos.vuu.api._ import org.finos.vuu.client.messages.RequestId -import org.finos.vuu.core.table.DefaultColumnNames.{CreatedTimeColumnName, LastUpdatedTimeColumnName, allDefaultColumns} +import org.finos.vuu.core.VuuJoinProviderOptionsImpl +import org.finos.vuu.core.table.DefaultColumnNames.{CreatedTimeColumnName, LastUpdatedTimeColumnName} import org.finos.vuu.core.table._ import org.finos.vuu.feature.inmem.VuuInMemPlugin import org.finos.vuu.net.ClientSessionId @@ -126,6 +127,86 @@ class JoinTableTest extends AnyFeatureSpec with Matchers with ViewPortSetup { updates.filter( vp => vp.vpUpdate == RowUpdateType).foreach(update => update.table.readRow(update.key.key, List("orderId", "trader", "tradeTime", "ric", "bid", "ask"), printToConsoleProcessor )) } + Scenario("check large number of ticks all the way through from source to join table"){ + + implicit val lifecycle: LifecycleContainer = new LifecycleContainer + + val dateTime: Long = LocalDateTime.of(2015, 7, 24, 11, 0).atZone(ZoneId.of("Europe/London")).toInstant.toEpochMilli + + val ordersDef = TableDef( + name = "orders", + keyField = "orderId", + columns = Columns.fromNames("orderId:String", "trader:String", "ric:String", "tradeTime:Long", "quantity:Double"), + joinFields = "ric", "orderId") + + val pricesDef = TableDef("prices", "ric", Columns.fromNames("ric:String", "bid:Double", "ask:Double", "last:Double", "open:Double", "close:Double"), "ric") + + val joinDef = JoinTableDef( + name = "orderPrices", + baseTable = ordersDef, + joinColumns = Columns.allFrom(ordersDef) ++ Columns.allFromExcept(pricesDef, "ric"), + joins = + JoinTo( + table = pricesDef, + joinSpec = JoinSpec( left = "ric", right = "ric", LeftOuterJoin) + ), + links = VisualLinks(), + joinFields = Seq() + ) + + val batchSize = 10 + val maxQueueSize = 100 + val joinProvider = JoinTableProviderImpl(VuuJoinProviderOptionsImpl.apply(batchSize = batchSize, maxQueueSize = maxQueueSize)) + + val tableContainer = new TableContainer(joinProvider) + + val orders = tableContainer.createTable(ordersDef) + val prices = tableContainer.createTable(pricesDef) + val orderPrices = tableContainer.createJoinTable(joinDef) + + val ordersProvider = new MockProvider(orders) + val pricesProvider = new MockProvider(prices) + + val providerContainer = new ProviderContainer(joinProvider) + + val viewPortContainer = setupViewPort(tableContainer, providerContainer) + + joinProvider.start() + + pricesProvider.tick("VOD.L", Map("ric" -> "VOD.L", "bid" -> 220.0, "ask" -> 222.0)) + + pricesProvider.tick("BT.L", Map("ric" -> "BT.L", "bid" -> 500.0, "ask" -> 501.0)) + + for (quantity <- 1 to maxQueueSize + 1) { + ordersProvider.tick("NYC-0001", Map("orderId" -> "NYC-0001", "trader" -> "chris", "tradeTime" -> dateTime, "quantity" -> quantity, "ric" -> "VOD.L")) + ordersProvider.tick("NYC-0002", Map("orderId" -> "NYC-0002", "trader" -> "chris", "tradeTime" -> dateTime, "quantity" -> quantity, "ric" -> "BT.L")) + if (quantity % batchSize == 0) { + joinProvider.runOnce() + } + } + + joinProvider.runOnce() + + val session = ClientSessionId("sess-01", "chris") + + val outQueue = new OutboundRowPublishQueue() + + val vpcolumns = ViewPortColumnCreator.create(orderPrices, List("orderId", "trader", "tradeTime", "quantity", "ric")) + + val viewPort = viewPortContainer.create(RequestId.oneNew(), session, outQueue, orderPrices, DefaultRange, vpcolumns) + + viewPortContainer.runOnce() + + assertVpEq(filterByVpId(combineQs(viewPort), viewPort)){ + Table( + ("orderId" ,"trader" ,"tradeTime" ,"quantity" , "ric" ), + ("NYC-0001","chris" ,1437732000000L ,maxQueueSize + 1 , "VOD.L" ), + ("NYC-0002","chris" ,1437732000000L ,maxQueueSize + 1 , "BT.L" ) + ) + } + + } + Scenario("check that registering and deregistering listeners on join table propagates to source tables"){ implicit val lifecycle: LifecycleContainer = new LifecycleContainer @@ -176,7 +257,7 @@ class JoinTableTest extends AnyFeatureSpec with Matchers with ViewPortSetup { prices.isKeyObservedBy("VOD.L", ko2) should be (true) } - Scenario("Check deleting keys from join table and see if it propogates correctly"){ + Scenario("Check deleting keys from join table and see if it propagates correctly"){ implicit val lifecycle: LifecycleContainer = new LifecycleContainer diff --git a/vuu/src/test/scala/org/finos/vuu/net/rpc/DefaultRpcHandlerTest.scala b/vuu/src/test/scala/org/finos/vuu/net/rpc/DefaultRpcHandlerTest.scala index 260be8c06..1e6e8ed61 100644 --- a/vuu/src/test/scala/org/finos/vuu/net/rpc/DefaultRpcHandlerTest.scala +++ b/vuu/src/test/scala/org/finos/vuu/net/rpc/DefaultRpcHandlerTest.scala @@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer import org.finos.toolbox.time.{Clock, DefaultClock} import org.finos.vuu.core.table.TableContainer import org.finos.vuu.net.{ClientSessionId, RequestContext} -import org.finos.vuu.provider.VuuJoinTableProvider +import org.finos.vuu.provider.JoinTableProviderImpl import org.scalatest.BeforeAndAfterEach import org.scalatest.featurespec.AnyFeatureSpec import org.scalatest.matchers.should.Matchers @@ -18,7 +18,7 @@ class DefaultRpcHandlerTest extends AnyFeatureSpec with Matchers with BeforeAndA implicit val clock: Clock = new DefaultClock implicit val lifecycleContainer: LifecycleContainer = new LifecycleContainer implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl - implicit val tableContainer: TableContainer = new TableContainer(new VuuJoinTableProvider) + implicit val tableContainer: TableContainer = new TableContainer(JoinTableProviderImpl()) handler = new DefaultRpcHandler }