Skip to content

Commit 57395d6

Browse files
authored
Merge pull request #1431 from naleeha/table_metadata
Web socket api test for get table meta
2 parents 93d8594 + f6ded4c commit 57395d6

File tree

15 files changed

+279
-128
lines changed

15 files changed

+279
-128
lines changed

example/main-java/src/main/java/org/finos/vuu/VuuExampleMain.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,10 @@ public static void main( String[] args )
7373
VuuThreadingOptions.apply()
7474
.withTreeThreads(4)
7575
.withViewPortThreads(4),
76-
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
77-
new scala.collection.mutable.ListBuffer<Plugin>().toList()
76+
VuuClientConnectionOptions.apply()
77+
.withHeartbeat(),
78+
new scala.collection.mutable.ListBuffer<ViewServerModule>().toList(),
79+
new scala.collection.mutable.ListBuffer<Plugin>().toList()
7880
).withModule(PriceModule.apply(clock, lifecycle, tableDefContainer))
7981
.withModule(SimulationModule.apply(clock, lifecycle, tableDefContainer))
8082
.withModule(MetricsModule.apply(clock, lifecycle, metrics, tableDefContainer))

example/main/src/main/scala/org/finos/vuu/SimulMain.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,9 @@ object SimulMain extends App with StrictLogging {
6161
.withLoginValidator(new AlwaysHappyLoginValidator),
6262
VuuThreadingOptions()
6363
.withViewPortThreads(4)
64-
.withTreeThreads(4)
64+
.withTreeThreads(4),
65+
VuuClientConnectionOptions()
66+
.withHeartbeat()
6567
).withModule(PriceModule())
6668
.withModule(SimulationModule())
6769
.withModule(MetricsModule())

vuu/src/main/scala/org/finos/vuu/client/ClientHelperFns.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,7 @@ object ClientHelperFns {
100100

101101
def auth(user: String, password: String)(implicit vsClient: ViewServerClient): String = {
102102
vsClient.send(JsonViewServerMessage("", "", "", "", AuthenticateRequest(user, password)))
103-
vsClient.awaitMsg.body.asInstanceOf[AuthenticateSuccess].token
103+
awaitMsgBody[AuthenticateSuccess].get.token
104104
}
105105

106106
def rpcCallAsync(sessionId: String, token: String, user: String, service: String, method: String, params: Array[Any], module: String)(implicit vsClient: ViewServerClient): Unit = {

vuu/src/main/scala/org/finos/vuu/core/CoreServerApiHandler.scala

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -220,13 +220,18 @@ class CoreServerApiHandler(val viewPortContainer: ViewPortContainer,
220220
}
221221

222222
override def process(msg: GetTableMetaRequest)(ctx: RequestContext): Option[ViewServerMessage] = {
223-
if (msg.table == null)
224-
errorMsg(s"Table ${msg.table} not found in container")(ctx)
223+
if (msg.table.table == null || msg.table.module == null)
224+
errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}. Table name and module should not be null")(ctx)
225225
else {
226-
val table = tableContainer.getTable(msg.table.table)
227-
val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name)
228-
val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType))
229-
vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx)
226+
val table = tableContainer.getTable(msg.table.table) //todo need to check module? what if modules with same table name
227+
228+
if(table == null)
229+
errorMsg(s"No such table found with name ${msg.table.table} in module ${msg.table.module}")(ctx)
230+
else{
231+
val columnNames = table.getTableDef.columns.sortBy(_.index).map(_.name)
232+
val dataTypes = columnNames.map(table.getTableDef.columnForName(_)).map(col => DataType.asString(col.dataType))
233+
vsMsg(GetTableMetaResponse(msg.table, columnNames, dataTypes, table.getTableDef.keyField))(ctx)
234+
}
230235
}
231236
}
232237

vuu/src/main/scala/org/finos/vuu/core/VuuServer.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import org.finos.vuu.core.module.{ModuleContainer, RealizedViewServerModule, Sta
1010
import org.finos.vuu.core.table.{DataTable, TableContainer}
1111
import org.finos.vuu.feature.inmem.{VuuInMemPlugin, VuuInMemPluginType}
1212
import org.finos.vuu.net._
13+
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory, NoHeartbeatFlowController}
1314
import org.finos.vuu.net.http.{Http2Server, VuuHttp2Server}
1415
import org.finos.vuu.net.json.{CoreJsonSerializationMixin, JsonVsSerializer, Serializer}
1516
import org.finos.vuu.net.rest.RestService
@@ -37,6 +38,8 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,
3738
final val authenticator: Authenticator = config.security.authenticator
3839
final val tokenValidator: LoginTokenValidator = config.security.loginTokenValidator
3940

41+
final val flowControllerFactory: FlowControllerFactory = FlowControllerFactory(config.clientConnection.hasHeartbeat)
42+
4043
final val sessionContainer = new ClientSessionContainerImpl()
4144

4245
final val joinProvider: JoinTableProvider = JoinTableProviderImpl()
@@ -55,7 +58,7 @@ class VuuServer(config: VuuServerConfig)(implicit lifecycle: LifecycleContainer,
5558

5659
final val serverApi = new CoreServerApiHandler(viewPortContainer, tableContainer, providerContainer)
5760

58-
final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer)
61+
final val factory = new ViewServerHandlerFactoryImpl(authenticator, tokenValidator, sessionContainer, serverApi, JsonVsSerializer, moduleContainer, flowControllerFactory)
5962

6063
//order of creation here is important
6164
final val server = new WebSocketServer(config.wsOptions, factory)

vuu/src/main/scala/org/finos/vuu/core/VuuServerOptions.scala

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,11 @@ package org.finos.vuu.core
22

33
import org.finos.vuu.core.module.ViewServerModule
44
import org.finos.vuu.net.auth.AlwaysHappyAuthenticator
5+
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, NoHeartbeatFlowController}
56
import org.finos.vuu.net.http.{VuuHttp2ServerOptions, VuuSecurityOptions}
67
import org.finos.vuu.net.{AlwaysHappyLoginValidator, Authenticator, LoginTokenValidator}
78
import org.finos.vuu.plugin.Plugin
89

9-
10-
1110
object VuuSecurityOptions{
1211
def apply(): VuuSecurityOptions = {
1312
VuuSecurityOptionsImpl(new AlwaysHappyAuthenticator, new AlwaysHappyLoginValidator)
@@ -26,6 +25,12 @@ object VuuThreadingOptions {
2625
}
2726
}
2827

28+
object VuuClientConnectionOptions {
29+
def apply(): VuuClientConnectionOptions = {
30+
VuuClientConnectionOptionsImpl(true)
31+
}
32+
33+
}
2934
trait VuuWebSocketOptions {
3035
def wsPort: Int
3136
def uri: String
@@ -48,6 +53,12 @@ trait VuuThreadingOptions{
4853
def treeThreads: Int
4954
}
5055

56+
trait VuuClientConnectionOptions {
57+
def hasHeartbeat: Boolean
58+
def withHeartbeat(): VuuClientConnectionOptions
59+
def withHeartbeatDisabled(): VuuClientConnectionOptions
60+
}
61+
5162
case class VuuSecurityOptionsImpl(authenticator: Authenticator, loginTokenValidator: LoginTokenValidator) extends VuuSecurityOptions{
5263
override def withAuthenticator(authenticator: Authenticator): VuuSecurityOptions = this.copy(authenticator = authenticator)
5364
override def withLoginValidator(tokenValidator: LoginTokenValidator): VuuSecurityOptions = this.copy(loginTokenValidator = tokenValidator)
@@ -75,8 +86,16 @@ case class VuuThreadingOptionsImpl(viewPortThreads: Int = 1, treeViewPortThreads
7586
override def treeThreads: Int = treeViewPortThreads
7687
}
7788

78-
case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(), wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(), security: VuuSecurityOptions = VuuSecurityOptions(),
89+
case class VuuClientConnectionOptionsImpl(hasHeartbeat: Boolean) extends VuuClientConnectionOptions {
90+
override def withHeartbeat(): VuuClientConnectionOptions = this.copy(true)
91+
override def withHeartbeatDisabled(): VuuClientConnectionOptions = this.copy(false)
92+
}
93+
94+
case class VuuServerConfig(httpOptions: VuuHttp2ServerOptions = VuuHttp2ServerOptions(),
95+
wsOptions: VuuWebSocketOptions = VuuWebSocketOptions(),
96+
security: VuuSecurityOptions = VuuSecurityOptions(),
7997
threading: VuuThreadingOptions = VuuThreadingOptions(),
98+
clientConnection: VuuClientConnectionOptions = VuuClientConnectionOptions(),
8099
modules: List[ViewServerModule] = List(),
81100
plugins: List[Plugin] = List()) {
82101
def withModule(module: ViewServerModule): VuuServerConfig = {

vuu/src/main/scala/org/finos/vuu/net/RequestProcessor.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import io.netty.handler.codec.http.websocketx.TextWebSocketFrame
66
import org.finos.toolbox.time.Clock
77
import org.finos.vuu.client.messages.SessionId
88
import org.finos.vuu.core.module.ModuleContainer
9-
import org.finos.vuu.net.flowcontrol.DefaultFlowController
9+
import org.finos.vuu.net.flowcontrol.{DefaultFlowController, FlowController, FlowControllerFactory}
1010
import org.finos.vuu.net.json.Serializer
1111
import org.finos.vuu.util.{OutboundRowPublishQueue, PublishQueue}
1212
import org.finos.vuu.viewport.ViewPortUpdate
@@ -21,7 +21,8 @@ class RequestProcessor(authenticator: Authenticator,
2121
clientSessionContainer: ClientSessionContainer,
2222
serverApi: ServerApi,
2323
serializer: Serializer[String, MessageBody],
24-
moduleContainer: ModuleContainer
24+
moduleContainer: ModuleContainer,
25+
flowControllerFactory: FlowControllerFactory,
2526
)(implicit timeProvider: Clock) extends StrictLogging {
2627

2728
@volatile private var session: ClientSessionId = null
@@ -63,7 +64,7 @@ class RequestProcessor(authenticator: Authenticator,
6364

6465
protected def createMessageHandler(channel: Channel, sessionId: ClientSessionId): MessageHandler = {
6566
val queue = new OutboundRowPublishQueue()
66-
val flowController = new DefaultFlowController
67+
val flowController = flowControllerFactory.create()
6768
new DefaultMessageHandler(channel, queue, sessionId, serverApi, serializer, flowController, clientSessionContainer, moduleContainer)
6869
}
6970

vuu/src/main/scala/org/finos/vuu/net/ViewServerClient.scala

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,10 @@ class WebSocketViewServerClient(ws: WebSocketClient, serializer: Serializer[Stri
5050
}
5151
else {
5252
Try(serializer.deserialize(msg)) match {
53-
case Success(vsMsg) => vsMsg
53+
case Success(vsMsg) => {
54+
logger.info(s"[Received] $vsMsg")
55+
vsMsg
56+
}
5457
case Failure(e) =>
5558
logger.error(s"could not deserialize ${msg} going to return null", e)
5659
null

vuu/src/main/scala/org/finos/vuu/net/ViewServerHandler.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import org.finos.vuu.core.module.ModuleContainer
77
import org.finos.vuu.net.json.Serializer
88
import org.finos.toolbox.json.JsonUtil
99
import org.finos.toolbox.time.Clock
10+
import org.finos.vuu.net.flowcontrol.{FlowController, FlowControllerFactory}
1011

1112
trait ViewServerHandlerFactory {
1213
def create(): ViewServerHandler
@@ -15,9 +16,11 @@ trait ViewServerHandlerFactory {
1516
class ViewServerHandlerFactoryImpl(authenticator: Authenticator,
1617
tokenValidator: LoginTokenValidator, sessionContainer: ClientSessionContainer,
1718
serverApi: ServerApi, jsonVsSerializer: Serializer[String, MessageBody],
18-
moduleContainer: ModuleContainer)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory {
19+
moduleContainer: ModuleContainer,
20+
flowControllerFactory: FlowControllerFactory,
21+
)(implicit val timeProvider: Clock) extends ViewServerHandlerFactory {
1922
override def create(): ViewServerHandler = {
20-
val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer)
23+
val requestProcessor = new RequestProcessor(authenticator, tokenValidator, sessionContainer, serverApi, jsonVsSerializer, moduleContainer, flowControllerFactory)
2124
new ViewServerHandler(jsonVsSerializer, requestProcessor)
2225
}
2326
}

vuu/src/main/scala/org/finos/vuu/net/flowcontrol/FlowController.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,15 @@ trait FlowController {
1717
def shouldSend(): FlowControlOp
1818
}
1919

20+
case class FlowControllerFactory(hasHeartbeat: Boolean)(implicit timeProvider: Clock){
21+
def create(): FlowController = {
22+
if (hasHeartbeat)
23+
new DefaultFlowController()
24+
else
25+
new NoHeartbeatFlowController
26+
27+
}
28+
}
2029
class DefaultFlowController(implicit timeProvider: Clock) extends FlowController {
2130

2231
@volatile private var lastMsgTime: Long = -1
@@ -57,3 +66,13 @@ class DefaultFlowController(implicit timeProvider: Clock) extends FlowController
5766
}
5867

5968
}
69+
70+
class NoHeartbeatFlowController() extends FlowController {
71+
override def process(msg: ViewServerMessage): Unit = {
72+
//nothing to do here
73+
}
74+
75+
override def shouldSend(): FlowControlOp = {
76+
BatchSize(300)
77+
}
78+
}

0 commit comments

Comments
 (0)