Skip to content
This repository was archived by the owner on Mar 31, 2021. It is now read-only.

Added support for cron APIs #30

Open
wants to merge 5 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 105 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,8 +305,109 @@ A tenant can be registered by calling the following API

fires an event without changing its final status

## cron APIs
coming up...
## Cron support
`BigBen` provides full support for scheduling of cron expressions in a
distributed, fault tolerant manner. `BigBen` uses [cron-utils](https://github.com/jmrozanec/cron-utils)
open source package to handle cron parsing and calculating execution times.
Please see more details on various types of crons supported by the `cron-utils` package.
The coverage is quite exhaustive (QUARTZ, UNIX, Cron4J, and Spring).

### How are crons executed?
`BigBen` uses `hazelcast` to create a lock-free distributed cron execution system.
`hazelcast` partitions its data in `271` partitions and it takes care of distributing
these partitions equally among the cluster nodes. All the cron expressions are hashed to
these partitions, which means crons get distributed across the cluster.

Each node then spawns a thread (pool) called `cron-runner` executes every second and
checks which **local** crons are ready to execute. Note that there's no cross
node communication or locking involved in executing these crons.
Each cron requires a `tenant` which dictates how the cron is to be triggered
(much like any other event in `BigBen`).

## cron execution guarantees
`BigBen` aims to guarantee that
1. As long as at least one node is available, the cron will execute
2. A cron will always be executed on one node only (If the node goes down then the
subsequent executions will happen on another node)
3. Each cron trigger is tried four times (like other events in `BigBen`)
(default is now, 1 second later, 2 seconds later, 4 seconds later)
4. If all tries result in failure (e.g. if tenant's http service is not
responding or kafka cluster is down) and if cron log events support is enabled
(see below) then the event is stored in log table with the (last) associated failure.
All intermediate failures are also logged in the configured `log4j` appenders as well.
5. The minimum execution interval supported is 1 second.

## cron event log support
`BigBen` can optionally record each execution of cron trigger in a table called `cron-events`
(see `bigben-schema.cql` for table details).

The `cron-events` uses fully qualified cron-id (a combination of user
supplied cron-id, cron-type (e.g. QUARTZ, UNIX, etc), tenant) and a `logGranularity`
time unit as partition keys and the cron `executionTime` as the event time.
The table also stores what event is triggered at the trigger time. The `cron-events`
also supports log event retention as well.

E.g. you can set up a cron top execute every 1 minute and keep records
grouped together at DAILY level with retention of 1 week for each event.

This support is optional. By default, the log events are turned off.

_the cron log events are stored with consistency ONE_. You can use a different
consistency by providing `cron.log.events.write.consistency` in `bigben.yaml`



## cron APIs
`POST /cron`

Sets up a cron. (**_requires tenant to be set up first_**)

Sample request payload with event log enabled:
```json
{
"tenant": "tenant1",
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ",
"logGranularity": "MINUTES",
"retention": 2,
"retentionUnits": "MINUTES"
}
```
Sample request payload with event log disabled:
```json
{
"tenant": "tenant1",
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ"
}
```
The response comes back with a `description` that tells how the cron
will be executed.

Sample response payload:
```json
{
"cron": {
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ",
"tenant": "tenant1",
"logGranularity": "MINUTES",
"retention": 2,
"retentionUnits": "MINUTES"
},
"description": "every 5 seconds"
}
```
That's it! This cron will execute every 5 seconds as long as any node in cluster
is alive.

`GET /cron/{tenant}/{id}`

Returns all crons (if multiple types) identified by this tenant and
cronId combination.

`DELETE /cron/{tenant}/{id}/{type}`

Deletes a cron with specific combination.

2 changes: 1 addition & 1 deletion app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.walmartlabs.bigben</groupId>
<artifactId>bigben</artifactId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.9-SNAPSHOT</version>
</parent>
<artifactId>bigben-app</artifactId>
<name>BigBen:app</name>
Expand Down
8 changes: 1 addition & 7 deletions app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,8 @@ fun Application.routes() {
post("/generation/random") { call.respond(EventGenerator.generateEvents(call.receive())) }
route("/cron") {
post { call.fromAPIResponse(CronService.upsert(call.receive())) }
get("/describe") { call.fromAPIResponse(CronService.describe(call.receive())) }
get("/{tenant}/{id}") {
call.fromAPIResponse(
CronService.get(
call.parameters["tenant"]!!, call.parameters["id"]!!,
call.request.queryParameters["describe"]?.toBoolean()
)
)
call.fromAPIResponse(CronService.get(call.parameters["tenant"]!!, call.parameters["id"]!!))
}
delete("/{tenant}/{id}/{type}") {
call.fromAPIResponse(CronService.delete(call.parameters["tenant"]!!, call.parameters["id"]!!, call.parameters["type"]!!))
Expand Down
2 changes: 1 addition & 1 deletion cassandra/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
<parent>
<artifactId>bigben</artifactId>
<groupId>com.walmartlabs.bigben</groupId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.9-SNAPSHOT</version>
</parent>

<artifactId>bigben-cassandra</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,33 @@
*/
package com.walmartlabs.bigben.providers.domain.cassandra

import com.datastax.driver.core.*
import com.datastax.driver.core.Cluster
import com.datastax.driver.core.CodecRegistry
import com.datastax.driver.core.HostDistance.LOCAL
import com.datastax.driver.core.HostDistance.REMOTE
import com.datastax.driver.core.policies.*
import com.datastax.driver.core.PoolingOptions
import com.datastax.driver.core.PreparedStatement
import com.datastax.driver.core.ProtocolOptions
import com.datastax.driver.core.Session
import com.datastax.driver.core.SocketOptions
import com.datastax.driver.core.policies.ConstantReconnectionPolicy
import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy
import com.datastax.driver.core.policies.DefaultRetryPolicy
import com.datastax.driver.core.policies.DowngradingConsistencyRetryPolicy
import com.datastax.driver.core.policies.TokenAwarePolicy
import com.datastax.driver.mapping.Mapper
import com.datastax.driver.mapping.Mapper.Option.consistencyLevel
import com.datastax.driver.mapping.Mapper.Option.saveNullFields
import com.datastax.driver.mapping.Mapper.Option.ttl
import com.datastax.driver.mapping.MappingManager
import com.google.common.util.concurrent.ListenableFuture
import com.walmartlabs.bigben.entities.*
import com.walmartlabs.bigben.entities.Bucket
import com.walmartlabs.bigben.entities.EntityProvider
import com.walmartlabs.bigben.entities.Event
import com.walmartlabs.bigben.entities.EventLoader
import com.walmartlabs.bigben.entities.EventLookup
import com.walmartlabs.bigben.entities.EventStatus
import com.walmartlabs.bigben.entities.KV
import com.walmartlabs.bigben.extns.nowUTC
import com.walmartlabs.bigben.utils.commons.Module
import com.walmartlabs.bigben.utils.commons.ModuleRegistry
Expand All @@ -54,8 +71,8 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
private val session: Session

private val clusterConfig = ClusterConfig::class.java.fromJson(map("cassandra.cluster").json())
private val writeConsistency = consistencyLevel(clusterConfig.writeConsistency)
private val readConsistency = consistencyLevel(clusterConfig.readConsistency)
private val writeConsistency = clusterConfig.writeConsistency
private val readConsistency = clusterConfig.readConsistency

init {
l.info("initialing the Cassandra module")
Expand All @@ -77,7 +94,7 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
Bucket::class.java -> BucketC() as T
EventLookup::class.java -> EventLookupC() as T
KV::class.java -> KVC() as T
else -> throw IllegalArgumentException("unknown entity $type")
else -> type.newInstance()
}
}

Expand All @@ -91,12 +108,14 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
}

override fun fetch(selector: T): ListenableFuture<T?> {
val readC = (selector as? ConsistencyOverride)?.read()?.let { it } ?: readConsistency
return mappingManager.mapper(selector::class.java).let {
val readConsistency = consistencyLevel(readC)
when (selector) {
is EventC -> {
require(
selector.eventTime != null && selector.id != null &&
selector.shard != null && selector.shard!! >= 0
selector.eventTime != null && selector.id != null &&
selector.shard != null && selector.shard!! >= 0
) { "event keys not provided: $selector" }
it.getAsync(selector.bucketId, selector.shard, selector.eventTime, selector.id, readConsistency).transform { it }
}
Expand All @@ -115,7 +134,7 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
else -> throw IllegalArgumentException("unknown selector: $selector")
}
}.apply {
transform { if (l.isDebugEnabled) l.debug("fetched entity: {}", it) }
transform { if (l.isDebugEnabled) l.debug("fetched entity: {}, readConsistency: {}", it, readC) }
}
}

Expand All @@ -126,8 +145,8 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
when (selector) {
is EventC -> {
require(
selector.eventTime != null && selector.id != null && selector.bucketId != null &&
selector.shard != null && selector.shard!! >= 0
selector.eventTime != null && selector.id != null && selector.bucketId != null &&
selector.shard != null && selector.shard!! >= 0
) { "event keys not provided: $selector" }
}
is BucketC -> {
Expand All @@ -141,10 +160,13 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
require(selector.key != null && selector.column != null) { "kv keys not provided: $selector" }
selector.lastModified = nowUTC()
}
else -> throw IllegalArgumentException("unknown selector: $selector")
}
if (l.isDebugEnabled) l.debug("saving entity {}", selector)
m.saveAsync(selector, saveNullFields(false), writeConsistency).transform { _ -> if (l.isDebugEnabled) l.debug("saved entity {}", selector); selector }
val writeConsistency = (selector as? ConsistencyOverride)?.write()?.let { it } ?: writeConsistency
val ttl = (selector as? TTLOverride)?.ttl()?.let { it } ?: 0

if (l.isDebugEnabled) l.debug("saving entity {}, ttl: {}, writeConsistency: {}", selector, ttl, writeConsistency)
m.saveAsync(selector, saveNullFields(false), consistencyLevel(writeConsistency), ttl(ttl))
.transform { if (l.isDebugEnabled) l.debug("saved entity {}", selector); selector }
}
}

Expand All @@ -155,8 +177,8 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
when (selector) {
is EventC -> {
require(
selector.eventTime != null && selector.id != null &&
selector.shard != null && selector.shard!! >= 0
selector.eventTime != null && selector.id != null &&
selector.shard != null && selector.shard!! >= 0
) { "event keys not provided: $selector" }
}
is BucketC -> {
Expand All @@ -168,42 +190,42 @@ open class CassandraModule<T : Any> : EntityProvider<T>, ClusterFactory, EventLo
is KVC -> {
require(selector.key != null && selector.column != null) { "kv keys not provided: $selector" }
}
else -> throw IllegalArgumentException("unknown selector: $selector")
}
if (l.isDebugEnabled) l.debug("deleting entity: {}", selector)
m.deleteAsync(selector, writeConsistency).transform { _ -> if (l.isDebugEnabled) l.debug("deleted entity {}", selector); selector }
val deleteConsistency = (selector as? ConsistencyOverride)?.delete()?.let { it } ?: writeConsistency
if (l.isDebugEnabled) l.debug("deleting entity: {}, deleteConsistency: $deleteConsistency", selector)
m.deleteAsync(selector, consistencyLevel(deleteConsistency)).transform { if (l.isDebugEnabled) l.debug("deleted entity {}", selector); selector }
}
}

override fun create(): Cluster {
return Cluster.builder()
.withCodecRegistry(CodecRegistry().register(EnumCodec(EventStatus.values().toSet())).register(ZdtCodec()))
.withClusterName(clusterConfig.clusterName)
.withPort(clusterConfig.port)
.also { clusterConfig.compression?.run { it.withCompression(ProtocolOptions.Compression.valueOf(this)) } }
.withRetryPolicy(if (clusterConfig.downgradingConsistency) DowngradingConsistencyRetryPolicy.INSTANCE else DefaultRetryPolicy.INSTANCE)
.also {
clusterConfig.localDataCenter?.run {
it.withLoadBalancingPolicy(TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(this).withUsedHostsPerRemoteDc(0).build()))
}
}
.withReconnectionPolicy(ConstantReconnectionPolicy(clusterConfig.reconnectPeriod))
.withSocketOptions(SocketOptions().apply {
connectTimeoutMillis = clusterConfig.connectionTimeOut
readTimeoutMillis = clusterConfig.readTimeout
keepAlive = clusterConfig.keepTCPConnectionAlive
})
.withPoolingOptions(PoolingOptions().apply {
clusterConfig.apply {
setConnectionsPerHost(LOCAL, coreConnectionsPerLocalHost, maxConnectionsPerLocalHost)
setConnectionsPerHost(REMOTE, coreConnectionsPerRemoteHost, maxConnectionsPerRemoteHost)
.withCodecRegistry(CodecRegistry().register(EnumCodec(EventStatus.values().toSet())).register(ZdtCodec()))
.withClusterName(clusterConfig.clusterName)
.withPort(clusterConfig.port)
.also { clusterConfig.compression?.run { it.withCompression(ProtocolOptions.Compression.valueOf(this)) } }
.withRetryPolicy(if (clusterConfig.downgradingConsistency) DowngradingConsistencyRetryPolicy.INSTANCE else DefaultRetryPolicy.INSTANCE)
.also {
clusterConfig.localDataCenter?.run {
it.withLoadBalancingPolicy(TokenAwarePolicy(DCAwareRoundRobinPolicy.builder().withLocalDc(this).withUsedHostsPerRemoteDc(0).build()))
}
}
heartbeatIntervalSeconds = 60
})
.also { clusterConfig.username?.run { it.withCredentials(this, clusterConfig.password) } }
.addContactPoints(*clusterConfig.contactPoints.split(",").toTypedArray())
.apply { decorate(this) }
.build()
.withReconnectionPolicy(ConstantReconnectionPolicy(clusterConfig.reconnectPeriod))
.withSocketOptions(SocketOptions().apply {
connectTimeoutMillis = clusterConfig.connectionTimeOut
readTimeoutMillis = clusterConfig.readTimeout
keepAlive = clusterConfig.keepTCPConnectionAlive
})
.withPoolingOptions(PoolingOptions().apply {
clusterConfig.apply {
setConnectionsPerHost(LOCAL, coreConnectionsPerLocalHost, maxConnectionsPerLocalHost)
setConnectionsPerHost(REMOTE, coreConnectionsPerRemoteHost, maxConnectionsPerRemoteHost)
}
heartbeatIntervalSeconds = 60
})
.also { clusterConfig.username?.run { it.withCredentials(this, clusterConfig.password) } }
.addContactPoints(*clusterConfig.contactPoints.split(",").toTypedArray())
.apply { decorate(this) }
.build()
}

protected open fun decorate(builder: Cluster.Builder) {
Expand Down
Loading