|
1 | 1 | # devicehive-plugin-cassandra-node |
2 | | -DeviceHive Cassandra storage plugin. NodeJS implementation. |
| 2 | +DeviceHive Cassandra storage plugin written in Node.js |
| 3 | +# Overview |
| 4 | +This plugin allows you to store commands and notifications obtained through DeviceHive platform in Cassandra. This application consists of 2 parts: schema creation and actually plugin defined in docker-compose file. |
| 5 | +<br /><br /> |
| 6 | +On services start schema creation will run first [to create table and UDT schemas from JSON](#creating-tables-and-udts) and plugin will check schema creation state using some interval and defined number of checks. **Schema creation service always must be the only one node**. This was done to prevent concurrent schema modification that causes exception in Cassandra. But feel free to scale plugin service as much as you need. |
| 7 | +<br /><br /> |
| 8 | +On the very beginning of run time both: plugin and schema creation will [compare existent table and UDT schemas with what defined in JSON](#schema-comparison). In case of columns/fields set mismatch, column/field types mismatch, primary and clustering keys mismatch or ordering mismatch applications will fail. In case some UDT or table already exists it will notify user. |
| 9 | +<br /><br /> |
| 10 | +When the message comes in it can be either command or command update or notification. Depends on what type it is data will be inserted in appropriate [group of tables](#table-groups). Also data will be filtered to [match described table schema](#data-model). |
| 11 | + |
| 12 | +# How it works |
| 13 | + |
| 14 | + 1. Start DeviceHive |
| 15 | + 2. Start Cassandra |
| 16 | + 3. Create Cassandra keyspace |
| 17 | + 4. Create following .env file. **Replace username, password, plugin topic, localhost, keyspace name, cassandra username and password with your values.** |
| 18 | + |
| 19 | + ENVSEPARATOR=_ <br /> |
| 20 | + plugin_user_login=username<br /> |
| 21 | + plugin_user_password=password <br /> |
| 22 | + plugin_plugin_topic=plugin topic <br /> |
| 23 | + plugin_device_hive_plugin_ws_endpoint=ws://localhost:3001 <br /> |
| 24 | + plugin_device_hive_auth_service_api_url=http://localhost:8090/dh/rest <br /> |
| 25 | + plugin_subscription_group=cassandra_plugin <br /> |
| 26 | + cassandra_connection_keyspace=keyspace name <br /> |
| 27 | + cassandra_connection_username=cassandra username <br /> |
| 28 | + cassandra_connection_password=cassandra password <br /> |
| 29 | + cassandra_connection_contactPoints=localhost |
| 30 | + |
| 31 | + 5. Run `docker-compose up` (optionally add `--scale plugin=N`) |
| 32 | + 6. Issue notification through DeviceHive |
| 33 | + 7. Observe data in *notifications\_by\_deviceid* and *notifications\_by\_timestamp* tables: |
| 34 | + |
| 35 | +<br /> |
| 36 | + |
| 37 | + notification | deviceid |
| 38 | + --------------+-------------------------------------- |
| 39 | + test | e50d6085-2aba-48e9-b1c3-73c673e414be |
| 40 | + |
| 41 | + |
| 42 | +<br /> |
| 43 | + |
| 44 | + notification | timestamp | parameters |
| 45 | + --------------+---------------------------------+------------------------------------------- |
| 46 | + test | 2018-02-27 17:17:27.963000+0000 | { "yourParam": "custom parameter value" } |
| 47 | + |
| 48 | +# Configuration |
| 49 | +## Plugin |
| 50 | +Plugin part of configuration you can find [here](https://github.com/devicehive/devicehive-plugin-core-node#configuration). |
| 51 | +## Cassandra |
| 52 | +Cassandra part of plugin based on [DataStax Node.js driver](https://github.com/datastax/nodejs-driver) so please see [ClientOptions for DataStax driver](http://docs.datastax.com/en/developer/nodejs-driver/3.3/api/type.ClientOptions/) to get description of configs you are able to apply in this plugin. |
| 53 | +There are two ways of configuring Cassandra: |
| 54 | + |
| 55 | + 1. Share `cassandraConfig` directory as volume with docker container |
| 56 | + 2. Redefine properties using `.env` |
| 57 | + |
| 58 | +Configuration files are stored under `cassandraConfig` directory, there you can find two files: one is for actually Cassandra configuration (`config.json`) and another one for policies (`policies.json`). **Note: all configs described in environment variable format** |
| 59 | +<br /> |
| 60 | +Example of `config.json`: |
| 61 | + |
| 62 | + { |
| 63 | + "connection": { |
| 64 | + "contactPoints": "127.0.0.1", |
| 65 | + "keyspace": "mykeyspace", |
| 66 | + "protocolOptions": { |
| 67 | + "port": "9042" |
| 68 | + }, |
| 69 | + "username": "cassandra", |
| 70 | + "password": "cassandra" |
| 71 | + }, |
| 72 | + |
| 73 | + "custom": { |
| 74 | + "schemaChecksCount": 5, |
| 75 | + "schemaChecksInterval": 1000 |
| 76 | + } |
| 77 | + } |
| 78 | +Cassandra config contains CUSTOM property for options not related to Cassandra connection. It can contain: |
| 79 | + |
| 80 | + - `schemaChecksCount` — how many times plugin service will check schema creation state on startup |
| 81 | + - `schemaChecksInterval` — how often (in milliseconds) plugin service will check schema creation state on startup |
| 82 | + |
| 83 | +<br /><br /> |
| 84 | +Example of `policies.json`: |
| 85 | + |
| 86 | + "loadBalancing": { |
| 87 | + "type": "WhiteListPolicy", |
| 88 | + "params": { |
| 89 | + "childPolicy": { |
| 90 | + "type": "DCAwareRoundRobinPolicy", |
| 91 | + "params": { |
| 92 | + "localDc": "127.0.0.1", |
| 93 | + "usedHostsPerRemoteDc": 2 |
| 94 | + } |
| 95 | + } |
| 96 | + } |
| 97 | + }, |
| 98 | + "retry": { |
| 99 | + "type": "IdempotenceAwareRetryPolicy", |
| 100 | + "params": { |
| 101 | + "childPolicy": { |
| 102 | + "type": "RetryPolicy" |
| 103 | + } |
| 104 | + } |
| 105 | + } |
| 106 | +All supported policies described [here](https://docs.datastax.com/en/developer/nodejs-driver/3.3/api/module.policies/). |
| 107 | +<br /> |
| 108 | +How to compose policies: |
| 109 | + |
| 110 | + - **Class name** of specific policy is `type` property |
| 111 | + - **Arguments for policy constructor** described under `params` property |
| 112 | + - **Wrapping policies** (i.e. WhiteListPolicy of loadBalancing, IdempotenceAwareRetryPolicy of retry policy) must have `params` property with `childPolicy` that describes child policy in the same way (using `type` for class name and `params` for constructor arguments) |
| 113 | + |
| 114 | +As was mentioned above you can redefine config options with .env file. In this case nesting of JSON is replaced with ENVSEPARATOR value and cassandra prefix with scope (connection or custom) should be added. Example: |
| 115 | + |
| 116 | + ENVSEPARATOR=_ |
| 117 | + DEBUG=cassandrastoragemodule |
| 118 | + cassandra_connection_contactPoints=127.0.0.1, 192.168.1.1 |
| 119 | + cassandra_connection_protocolOptions_port=9042 |
| 120 | + cassandra_connection_policies_retry_type=RetryPolicy |
| 121 | + cassandra_custom_schema_checks_count=20 |
| 122 | + cassandra_custom_schema_checks_interval=500 |
| 123 | + |
| 124 | +**Note: plugin supports all config options related to Cassandra connection taken from [here](http://docs.datastax.com/en/developer/nodejs-driver/3.3/api/type.ClientOptions/)** |
| 125 | +If you want to have Cassandra module logging enabled put _DEBUG=cassandrastoragemodule_ in your environment variables. |
| 126 | +# Tables and user defined types |
| 127 | +This plugin allows you to configure tables and UDTs in JSON format which are to be created on application startup using schema creation service. |
| 128 | +## Creating tables and UDTs |
| 129 | +Schemas have to be described in **cassandraSchemas/cassandra-tables.json** and **cassandraSchemas/cassandra-user-types.json**. Feel free to modify these files and share volume with docker container (this is already done in docker-compose.yml). |
| 130 | +Example of table schema description: |
| 131 | + |
| 132 | + { |
| 133 | + "tables": { |
| 134 | + "commands": { |
| 135 | + "command": "text", |
| 136 | + "deviceId": "text", |
| 137 | + "timestamp": "timestamp", |
| 138 | + "id": "int", |
| 139 | + "parameters": "text", |
| 140 | + |
| 141 | + "__primaryKey__": [ "command", "deviceId" ], |
| 142 | + "__clusteringKey__": [ "timestamp" ], |
| 143 | + "__order__": { |
| 144 | + "timestamp": "DESC" |
| 145 | + }, |
| 146 | + "__options__": { |
| 147 | + "bloom_filter_fp_chance": 0.05, |
| 148 | + "compression": { |
| 149 | + "chunk_length_in_kb": 128, |
| 150 | + "class": "org.apache.cassandra.io.compress.SnappyCompressor" |
| 151 | + }, |
| 152 | + }, |
| 153 | + "__dropIfExists__": true |
| 154 | + }, |
| 155 | + } |
| 156 | + } |
| 157 | +Keys are column names and values are Cassandra data types. Also JSON schema has several reserved properties: |
| 158 | + |
| 159 | + - `__primaryKey__` — Array of column names which will represent primary key |
| 160 | + - `__clusteringKey__` — Array of column names which will represent clustering key |
| 161 | + - `__order__` — Object of order definition for clustering keys, values allowed: |
| 162 | + <br /> |
| 163 | + - DESC — for descending order |
| 164 | + <br /> |
| 165 | + - ASC — for ascending order (default) |
| 166 | + |
| 167 | + - `__options__` — Object of any table options allowed by Cassandra (see [Cassandra table options](https://docs.datastax.com/en/cql/3.3/cql/cql_reference/cqlCreateTable.html#ariaid-title4)) |
| 168 | + - `__dropIfExists__` — Boolean value (false by default) indicates whether table or UDT should be dropped before creation |
| 169 | + |
| 170 | +JSON schema for UDT is simpler, it contains fields with their types and `__dropIfExists__` only if necessary. |
| 171 | +Example of UDT: |
| 172 | + |
| 173 | + { |
| 174 | + "params": { |
| 175 | + "name": "text", |
| 176 | + "id": "int", |
| 177 | + "address": "inet" |
| 178 | + } |
| 179 | + } |
| 180 | +## Data model and table groups |
| 181 | +### Data model |
| 182 | +Table column definitions should stick to DeviceHive command and notification data models: |
| 183 | + |
| 184 | + - [Command representation](https://docs.devicehive.com/v3.4.3/docs/devicecommand#section-resource-representation) |
| 185 | + - [Notification representation](https://docs.devicehive.com/v3.4.3/docs/devicenotification#section-resource-representation) |
| 186 | + |
| 187 | +This means you should define columns with same names as entity has to successfully save incoming data (but this doesn't mean you are not allowed to define any other columns). |
| 188 | +<br /> |
| 189 | +Each incoming message is filtered by defined schema to extract properties that are described in table/UDT schema. **Note: filtering is applicable for columns with UDT (see parameters below).** So you can have multiple tables with different partition keys, ordering and columns set to store one entity, for example: |
| 190 | +<br /> |
| 191 | +*we will store only command name with device ID in one table and command name with timestamp in another* |
| 192 | + |
| 193 | + { |
| 194 | + "command_by_deviceid": { |
| 195 | + "command": "text", |
| 196 | + "deviceId": "text", |
| 197 | + "__primaryKey__": [ "command" ], |
| 198 | + "__clusteringKey__": [ "deviceId" ] |
| 199 | + }, |
| 200 | + "command_by_timestamp": { |
| 201 | + "command": "text", |
| 202 | + "timestamp": "timestamp", |
| 203 | + "__primaryKey__": [ "command" ], |
| 204 | + "__clusteringKey__": [ "timestamp" ] |
| 205 | + }, |
| 206 | + } |
| 207 | +If you have defined some column that doesn't exist in message `null` will be inserted as value for such column in Cassandra. |
| 208 | +#### Parameters field |
| 209 | +Command and notification data may contain parameters field which is arbitrary JSON value. For parameters column you can use following types: |
| 210 | + |
| 211 | + - text (varchar) or ascii — parameters value will be saved as JSON string |
| 212 | + - map<text, text> or map<ascii, ascii>, map<ascii, text> etc. — parameters value will be inserted as map with appropriate keys and values |
| 213 | + - UDT — parameters value will be filtered by some UDT defined separately |
| 214 | + |
| 215 | +Also `frozen` use is allowed for *UDT* or *map* parameters. |
| 216 | +<br /> |
| 217 | +Example of UDT parameters: |
| 218 | +<br /> |
| 219 | +*UDT schema* |
| 220 | + |
| 221 | + { |
| 222 | + "params": { |
| 223 | + "username": "text", |
| 224 | + "age": "int" |
| 225 | + } |
| 226 | + } |
| 227 | +*Table schema* |
| 228 | + |
| 229 | + { |
| 230 | + "command": "text", |
| 231 | + "deviceId": "text", |
| 232 | + "parameters": "frozen<params>", |
| 233 | + "__primaryKey__": [ "command" ], |
| 234 | + "__clusteringKey__": [ "deviceId" ] |
| 235 | + } |
| 236 | +Then you can issue a command through DeviceHive and only described data model in schemas will be stored: |
| 237 | + |
| 238 | + { |
| 239 | + "command": "my-command", |
| 240 | + "deviceId": "my-device", |
| 241 | + "thisFieldWillNotBeStored": true, |
| 242 | + "parameters": { |
| 243 | + "username": "John Doe", |
| 244 | + "age": "30", |
| 245 | + "thisFieldWillNotBeStoredAlso": true |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | +### Table groups |
| 250 | +According to DeviceHive when the message comes in it can be one of two types: command or notification. Command message also can be command update. You can store any of these in arbitrary amount of tables just by defining table schemas and assigning tables to specific table group. |
| 251 | +There are three table groups: |
| 252 | + - **command** |
| 253 | + - **notification** |
| 254 | + - **command updates** (store command updates separately, may be empty) |
| 255 | + |
| 256 | +**Note: in case of empty `commandUpdatesTables` each command update will actually update existing command in all tables assigned to `commandTables`.** |
| 257 | + |
| 258 | +You can assign any table to any group as well you can assign one table to multiple groups (many to many). Table groups are configured in **cassandraSchemas/cassandra-tables.json** with **commandTables**, **notificationTables**, **commandUpdatesTables** properties. |
| 259 | +For example if you have: |
| 260 | +<br /> |
| 261 | +`"notificationTables": [ "notifications_by_name", "notifications_by_deviceid", "notifications_by_X" ]` |
| 262 | +<br /> |
| 263 | +each incoming notification will be inserted in all of these tables with structure that you have defined. |
| 264 | +<br /> |
| 265 | +Or you may want to have shared table (*messages* in this example): |
| 266 | + |
| 267 | + "commandTables": [ "commands", "messages" ], |
| 268 | + "notificationTables": [ "notifications", "messages" ], |
| 269 | + "commandUpdatesTables": [ "command_updates", "messages" ] |
| 270 | + |
| 271 | +## Schema comparison |
| 272 | +When schema creation and plugin start up it compares tables and UDTs described in JSON schema with existent ones (if there are some). In case of inconsistency in columns/fields set or column/field types, primary keys, clustering keys or ordering applications will fail with appropriate messages. You can drop table or UDT before creation by [specifying `__dropIfExists__` as true](#creating-tables-and-udts) to prevent this kind of errors. **It's definitely not recommended to use this kind of schema recreation if you have some data in your tables.** If some table or UDT already exists application will just notify user about existent structure. |
| 273 | +# Data path example |
| 274 | +Lets take a look at how data is processed in Cassandra plugin and compare input data in plugin and output data in Cassandra. |
| 275 | +<br /> |
| 276 | +Assume our command message looks like this: |
| 277 | + |
| 278 | + { |
| 279 | + "command": "my-command", |
| 280 | + "deviceId": "my-device", |
| 281 | + "timestamp": "2018-02-22T11:53:47.082Z", |
| 282 | + "parameters": { |
| 283 | + "name": "John Doe", |
| 284 | + "age": 30, |
| 285 | + "ip": "127.0.0.1" |
| 286 | + }, |
| 287 | + "status": "some-status" |
| 288 | + } |
| 289 | + |
| 290 | +And our JSON schemas of command tables are: |
| 291 | +<br /> |
| 292 | +*commands_by_timestamp* |
| 293 | + |
| 294 | + { |
| 295 | + "command": "text", |
| 296 | + "timestamp": "timestamp", |
| 297 | + "parameters": "frozen<params>", |
| 298 | + "__primaryKey__": [ "command" ], |
| 299 | + "__clusteringKey__": [ "timestamp" ], |
| 300 | + "__order__": { |
| 301 | + "timestamp": "DESC" |
| 302 | + } |
| 303 | + } |
| 304 | +*commands_by_deviceid* |
| 305 | + |
| 306 | + { |
| 307 | + "command": "text", |
| 308 | + "deviceId": "text", |
| 309 | + "__primaryKey__": [ "command" ], |
| 310 | + "__clusteringKey__": [ "deviceId" ] |
| 311 | + } |
| 312 | +Since *commands_by_timestamp* has `parameters` of UDT we will define `params` custom type: |
| 313 | +<br /> |
| 314 | +*params* |
| 315 | + |
| 316 | + { |
| 317 | + "name": "text", |
| 318 | + "ip": "inet" |
| 319 | + } |
| 320 | +After our message will be issued Cassandra plugin will catch that and insert into two tables with defined schema: |
| 321 | +<br /> |
| 322 | +*commands_by_timestamp* |
| 323 | + |
| 324 | + command | timestamp | parameters |
| 325 | + ---------+---------------------------------+------------------------------------- |
| 326 | + test | 2018-02-22 11:53:47.082000+0000 | {name: 'John Doe', ip: '127.0.0.1'} |
| 327 | + |
| 328 | + |
| 329 | +*commands_by_deviceid* |
| 330 | + |
| 331 | + command | deviceid |
| 332 | + ---------+-------------------------------------- |
| 333 | + test | e50d6085-2aba-48e9-b1c3-73c673e414be |
| 334 | + |
0 commit comments