Skip to content

Commit 44c82b3

Browse files
committed
Prevent outbound join queue from discarding data. Make queue and batch size configurable.
1 parent 638503b commit 44c82b3

File tree

10 files changed

+49
-28
lines changed

10 files changed

+49
-28
lines changed

benchmark/src/main/java/org/finos/vuu/benchmark/BenchmarkHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public class BenchmarkHelper {
3232
private final Clock clock = new DefaultClock();
3333
private final LifecycleContainer lifecycleContainer = new LifecycleContainer(clock);
3434
private final MetricsProvider metricsProvider = new MetricsProviderImpl();
35-
private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(clock, lifecycleContainer, metricsProvider);
35+
private final JoinTableProvider joinProvider = JoinTableProviderImpl.apply(lifecycleContainer);
3636

3737
public Clock getClock() {
3838
return clock;

example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,7 @@ public static void main( String[] args )
7171
.withViewPortThreads(4),
7272
VuuClientConnectionOptions.apply()
7373
.withHeartbeat(),
74+
VuuJoinTableProviderOptions.apply(),
7475
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
7576
new scala.collection.mutable.ListBuffer<Plugin>().toList()
7677
).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer))

vuu-java/src/test/java/org/finos/vuu/net/rpc/RpcMethodHandlerTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import org.finos.toolbox.time.Clock;
88
import org.finos.toolbox.time.DefaultClock;
99
import org.finos.vuu.core.table.TableContainer;
10-
import org.finos.vuu.provider.VuuJoinTableProvider;
10+
import org.finos.vuu.provider.JoinTableProviderImpl;
1111
import org.finos.vuu.util.ScalaCollectionConverter;
1212
import org.junit.Test;
1313
import scala.Option;
@@ -24,7 +24,7 @@ public void should_register_java_function_as_rpc_in_default_handler() {
2424
Clock clock = new DefaultClock();
2525
LifecycleContainer lifecycleContainer = new LifecycleContainer(clock);
2626
MetricsProvider metricsProvider = new MetricsProviderImpl();
27-
TableContainer tableContainer = new TableContainer(new VuuJoinTableProvider(clock, lifecycleContainer, metricsProvider), metricsProvider, clock);
27+
TableContainer tableContainer = new TableContainer(JoinTableProviderImpl.apply(lifecycleContainer), metricsProvider, clock);
2828
final DefaultRpcHandler defaultRpcHandler = new DefaultRpcHandler(tableContainer);
2929
defaultRpcHandler.registerRpc("helloWorld", rpcService::rpcFunction);
3030

vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,15 @@ import org.finos.toolbox.lifecycle.{LifecycleContainer, LifecycleEnabled}
66
import org.finos.toolbox.thread.{LifeCycleRunOncePerThreadExecutorRunner, LifeCycleRunner, WorkItem}
77
import org.finos.toolbox.time.Clock
88
import org.finos.vuu.api.{JoinTableDef, TableDef, ViewPortDef}
9-
import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, StaticServedResource, TableDefContainer, ViewServerModule}
9+
import org.finos.vuu.core.module._
1010
import org.finos.vuu.core.table.{DataTable, TableContainer}
1111
import org.finos.vuu.feature.inmem.VuuInMemPlugin
1212
import org.finos.vuu.net._
1313
import org.finos.vuu.net.flowcontrol.FlowControllerFactory
1414
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server}
1515
import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer}
1616
import org.finos.vuu.net.rest.RestService
17-
import org.finos.vuu.net.rpc.{JsonSubTypeRegistry, RpcHandler}
17+
import org.finos.vuu.net.rpc.JsonSubTypeRegistry
1818
import org.finos.vuu.net.ws.WebSocketServer
1919
import org.finos.vuu.plugin.PluginRegistry
2020
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, Provider, ProviderContainer}
@@ -42,7 +42,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,
4242

4343
final val sessionContainer = new ClientSessionContainerImpl()
4444

45-
final val joinProvider: JoinTableProvider = JoinTableProviderImpl()
45+
final val joinProvider: JoinTableProvider = JoinTableProviderImpl(config.joinProvider)
4646

4747
final val tableContainer = new TableContainer(joinProvider)
4848

vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,12 @@ object VuuClientConnectionOptions {
2828
def apply(): VuuClientConnectionOptions = {
2929
VuuClientConnectionOptionsImpl(true)
3030
}
31+
}
3132

33+
object VuuJoinTableProviderOptions {
34+
def apply() : VuuJoinTableProviderOptions = {
35+
VuuJoinProviderOptionsImpl.apply(batchSize = 100, maxQueueSize = 20_000)
36+
}
3237
}
3338

3439
trait VuuSSLCipherSuiteOptions {
@@ -77,6 +82,13 @@ trait VuuClientConnectionOptions {
7782
def withHeartbeatDisabled(): VuuClientConnectionOptions
7883
}
7984

85+
trait VuuJoinTableProviderOptions {
86+
def batchSize: Int
87+
def maxQueueSize: Int
88+
def withBatchSize(maxQueueDepth: Int): VuuJoinTableProviderOptions
89+
def withMaxQueueDepth(maxQueueDepth: Int): VuuJoinTableProviderOptions
90+
}
91+
8092
case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{
8193
override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator)
8294
override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator)
@@ -114,11 +126,17 @@ case class VuuSSLCipherSuiteOptionsImpl(ciphers: List[String], protocols: List[S
114126
override def withProtocols(protocols: List[String]): VuuSSLCipherSuiteOptions = this.copy(protocols = protocols)
115127
}
116128

129+
case class VuuJoinProviderOptionsImpl(batchSize: Int, maxQueueSize: Int) extends VuuJoinTableProviderOptions {
130+
override def withBatchSize(batchSize: Int): VuuJoinTableProviderOptions = this.copy(batchSize = batchSize)
131+
override def withMaxQueueDepth(maxQueueSize: Int): VuuJoinTableProviderOptions = this.copy(maxQueueSize = maxQueueSize)
132+
}
133+
117134
case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(),
118135
wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(),
119136
security: VuuSecurityOptions = VuuSecurityOptions(),
120137
threading: VuuThreadingOptions = VuuThreadingOptions(),
121138
clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(),
139+
joinProvider: VuuJoinTableProviderOptions = VuuJoinTableProviderOptions(),
122140
modules: List[ViewServerModule] = List(),
123141
plugins: List[Plugin] = List()) {
124142
def withModule(module: ViewServerModule): VuuServerConfig = {
Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,16 @@
11
package org.finos.vuu.provider
22

3-
import org.finos.toolbox.jmx.MetricsProvider
43
import org.finos.toolbox.lifecycle.LifecycleContainer
5-
import org.finos.toolbox.time.Clock
4+
import org.finos.vuu.core.VuuJoinTableProviderOptions
65

76
object JoinTableProviderImpl {
87

9-
def apply()(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider): JoinTableProvider = {
10-
new VuuJoinTableProvider()
8+
def apply()(implicit lifecycleContainer: LifecycleContainer): JoinTableProvider = {
9+
apply(VuuJoinTableProviderOptions())
10+
}
11+
12+
def apply(vuuJoinTableProviderOptions: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer): JoinTableProvider = {
13+
new VuuJoinTableProvider(vuuJoinTableProviderOptions)
1114
}
1215

1316
}

vuu/src/main/scala/org/finos/vuu/provider/VuuJoinTableProvider.scala

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
package org.finos.vuu.provider
22

33
import com.typesafe.scalalogging.StrictLogging
4-
import org.finos.toolbox.jmx.MetricsProvider
54
import org.finos.toolbox.lifecycle.LifecycleContainer
6-
import org.finos.toolbox.time.Clock
75
import org.finos.vuu.api.{JoinTableDef, TableDef}
6+
import org.finos.vuu.core.VuuJoinTableProviderOptions
87
import org.finos.vuu.core.table.{DataTable, JoinTable, JoinTableUpdate, RowWithData}
98
import org.finos.vuu.provider.join.{JoinDefToJoinTable, JoinManagerEventDataSink, JoinRelations, RightToLeftKeys}
109

@@ -45,11 +44,11 @@ import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
4544
* },
4645
* }
4746
*/
48-
class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleContainer, metrics: MetricsProvider) extends JoinTableProvider with StrictLogging {
47+
class VuuJoinTableProvider(options: VuuJoinTableProviderOptions)(implicit lifecycle: LifecycleContainer) extends JoinTableProvider with StrictLogging {
4948

5049
lifecycle(this)
5150

52-
private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](20000)
51+
private val outboundQueue = new ArrayBlockingQueue[JoinTableUpdate](options.maxQueueSize)
5352
private val joinRelations = new JoinRelations()
5453
private val joinSink = new JoinManagerEventDataSink()
5554
private val rightToLeftKeys = new RightToLeftKeys()
@@ -121,7 +120,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon
121120
logger.debug("[JoinTableProvider] Submitting joint table event:" + jtu)
122121

123122
//get the processing off the join thread
124-
outboundQueue.offer(jtu)
123+
outboundQueue.put(jtu)
125124
}
126125

127126
def eventToRightKey(joinTableDef: JoinTableDef, tableName: String, ev: util.HashMap[String, Any], rightColumn: String): String = {
@@ -252,7 +251,7 @@ class VuuJoinTableProvider(implicit timeProvider: Clock, lifecycle: LifecycleCon
252251
}
253252

254253
override def runOnce(): Unit = {
255-
val updates = new java.util.ArrayList[JoinTableUpdate](100)
254+
val updates = new java.util.ArrayList[JoinTableUpdate](options.batchSize)
256255

257256
outboundQueue.drainTo(updates) match {
258257

vuu/src/test/scala/org/finos/vuu/core/row/RowBuilderTest.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
55
import org.finos.toolbox.time.{Clock, TestFriendlyClock}
66
import org.finos.vuu.api.TableDef
77
import org.finos.vuu.core.table.{Columns, InMemDataTable}
8-
import org.finos.vuu.provider.VuuJoinTableProvider
8+
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider}
99
import org.scalatest.GivenWhenThen
1010
import org.scalatest.featurespec.AnyFeatureSpec
1111
import org.scalatest.matchers.should.Matchers
@@ -30,7 +30,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{
3030
implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl
3131

3232
val tableDef = getTableDef
33-
val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
33+
val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
3434

3535
val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) =
3636
(
@@ -65,7 +65,7 @@ class RowBuilderTest extends AnyFeatureSpec with Matchers with GivenWhenThen{
6565
implicit val metrics: MetricsProviderImpl = new MetricsProviderImpl
6666

6767
val tableDef = getTableDef
68-
val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
68+
val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
6969

7070
val (ricColumn, descColumn, currColumn, exchangeColumn, lotSizeColumn) =
7171
(

vuu/src/test/scala/org/finos/vuu/core/table/join/JoinManagerTest.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import com.typesafe.scalalogging.StrictLogging
44
import org.finos.vuu.api._
55
import org.finos.vuu.core.table.join.JoinAsserts._
66
import org.finos.vuu.core.table.{Columns, KeyObserver, RowKeyUpdate, TableContainer}
7-
import org.finos.vuu.provider.{JoinTableProvider, VuuJoinTableProvider}
7+
import org.finos.vuu.provider.{JoinTableProvider, JoinTableProviderImpl, VuuJoinTableProvider}
88
import org.finos.vuu.viewport.ViewPortSetup
99
import org.finos.toolbox.jmx.{MetricsProvider, MetricsProviderImpl}
1010
import org.finos.toolbox.lifecycle.LifecycleContainer
@@ -206,7 +206,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
206206

207207
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
208208

209-
implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
209+
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
210210

211211
val ordersDef = mkeOrdersDef()
212212

@@ -258,7 +258,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
258258
Scenario("Left Outer Join, Right key update") {
259259
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
260260

261-
implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
261+
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
262262

263263
val ordersDef = mkeOrdersDef()
264264

@@ -307,7 +307,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
307307

308308
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
309309

310-
implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
310+
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
311311

312312
val ordersDef = mkeOrders2Def()
313313
val pricesDef = mkePricesDef()
@@ -359,7 +359,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
359359

360360
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
361361

362-
implicit val joinTableProvider: JoinTableProvider = new VuuJoinTableProvider()
362+
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
363363

364364
val ordersDef = mkeOrdersDef()
365365

@@ -415,7 +415,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
415415
Scenario("Left Outer Join, Delete Right Record") {
416416
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
417417

418-
implicit val joinTableProvider : JoinTableProvider = new VuuJoinTableProvider()
418+
implicit val joinTableProvider : JoinTableProvider = JoinTableProviderImpl()
419419

420420
val ordersDef = mkeOrdersDef()
421421

@@ -465,7 +465,7 @@ class JoinManagerTest extends AnyFeatureSpec with Matchers with StrictLogging wi
465465
Scenario("Left Outer Join of Joins") {
466466
implicit val lifecycle: LifecycleContainer = new LifecycleContainer
467467

468-
implicit val joinTableProvider: VuuJoinTableProvider = new VuuJoinTableProvider()
468+
implicit val joinTableProvider: JoinTableProvider = JoinTableProviderImpl()
469469

470470
val ordersDef = mkeOrdersDef()
471471
val pricesDef = mkePricesDef()

vuu/src/test/scala/org/finos/vuu/net/rpc/DefaultRpcHandlerTest.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import org.finos.toolbox.lifecycle.LifecycleContainer
55
import org.finos.toolbox.time.{Clock, DefaultClock}
66
import org.finos.vuu.core.table.TableContainer
77
import org.finos.vuu.net.{ClientSessionId, RequestContext}
8-
import org.finos.vuu.provider.VuuJoinTableProvider
8+
import org.finos.vuu.provider.JoinTableProviderImpl
99
import org.scalatest.BeforeAndAfterEach
1010
import org.scalatest.featurespec.AnyFeatureSpec
1111
import org.scalatest.matchers.should.Matchers
@@ -18,7 +18,7 @@ class DefaultRpcHandlerTest extends AnyFeatureSpec with Matchers with BeforeAndA
1818
implicit val clock: Clock = new DefaultClock
1919
implicit val lifecycleContainer: LifecycleContainer = new LifecycleContainer
2020
implicit val metricsProvider: MetricsProvider = new MetricsProviderImpl
21-
implicit val tableContainer: TableContainer = new TableContainer(new VuuJoinTableProvider)
21+
implicit val tableContainer: TableContainer = new TableContainer(JoinTableProviderImpl())
2222
handler = new DefaultRpcHandler
2323
}
2424

0 commit comments

Comments
 (0)