Skip to content
This repository was archived by the owner on Mar 31, 2021. It is now read-only.

Slight change done to fix the compatibility #53

Open
wants to merge 13 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
122 changes: 117 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,18 @@ A tenant can be registered by calling the following API
]
}
```

```json
[
{
"id":"1000001",
"eventTime":"2020-06-28T00:00:00Z",
"tenant":"1",
"payload":"send_email",
"mode":"UPSERT",
"deliveryOption":"PAYLOAD_ONLY"
}
]
```
### find an event
`GET /events/find?id=?&tenant=?`

Expand All @@ -305,8 +316,109 @@ A tenant can be registered by calling the following API

fires an event without changing its final status

## cron APIs
coming up...
## Cron support
`BigBen` provides full support for scheduling of cron expressions in a
distributed, fault tolerant manner. `BigBen` uses [cron-utils](https://github.com/jmrozanec/cron-utils)
open source package to handle cron parsing and calculating execution times.
Please see more details on various types of crons supported by the `cron-utils` package.
The coverage is quite exhaustive (QUARTZ, UNIX, Cron4J, and Spring).

### How are crons executed?
`BigBen` uses `hazelcast` to create a lock-free distributed cron execution system.
`hazelcast` partitions its data in `271` partitions and it takes care of distributing
these partitions equally among the cluster nodes. All the cron expressions are hashed to
these partitions, which means crons get distributed across the cluster.

Each node then spawns a thread (pool) called `cron-runner` executes every second and
checks which **local** crons are ready to execute. Note that there's no cross
node communication or locking involved in executing these crons.
Each cron requires a `tenant` which dictates how the cron is to be triggered
(much like any other event in `BigBen`).

## cron execution guarantees
`BigBen` aims to guarantee that
1. As long as at least one node is available, the cron will execute
2. A cron will always be executed on one node only (If the node goes down then the
subsequent executions will happen on another node)
3. Each cron trigger is tried four times (like other events in `BigBen`)
(default is now, 1 second later, 2 seconds later, 4 seconds later)
4. If all tries result in failure (e.g. if tenant's http service is not
responding or kafka cluster is down) and if cron log events support is enabled
(see below) then the event is stored in log table with the (last) associated failure.
All intermediate failures are also logged in the configured `log4j` appenders as well.
5. The minimum execution interval supported is 1 second.

## cron event log support
`BigBen` can optionally record each execution of cron trigger in a table called `cron-events`
(see `bigben-schema.cql` for table details).

The `cron-events` uses fully qualified cron-id (a combination of user
supplied cron-id, cron-type (e.g. QUARTZ, UNIX, etc), tenant) and a `logGranularity`
time unit as partition keys and the cron `executionTime` as the event time.
The table also stores what event is triggered at the trigger time. The `cron-events`
also supports log event retention as well.

E.g. you can set up a cron top execute every 1 minute and keep records
grouped together at DAILY level with retention of 1 week for each event.

This support is optional. By default, the log events are turned off.

_the cron log events are stored with consistency ONE_. You can use a different
consistency by providing `cron.log.events.write.consistency` in `bigben.yaml`



## cron APIs
`POST /cron`

Sets up a cron. (**_requires tenant to be set up first_**)

Sample request payload with event log enabled:
```json
{
"tenant": "tenant1",
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ",
"logGranularity": "MINUTES",
"retention": 2,
"retentionUnits": "MINUTES"
}
```
Sample request payload with event log disabled:
```json
{
"tenant": "tenant1",
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ"
}
```
The response comes back with a `description` that tells how the cron
will be executed.

Sample response payload:
```json
{
"cron": {
"id": "cronId1",
"expression": "*/5 * * * * ? *",
"type": "QUARTZ",
"tenant": "tenant1",
"logGranularity": "MINUTES",
"retention": 2,
"retentionUnits": "MINUTES"
},
"description": "every 5 seconds"
}
```
That's it! This cron will execute every 5 seconds as long as any node in cluster
is alive.

`GET /cron/{tenant}/{id}`

Returns all crons (if multiple types) identified by this tenant and
cronId combination.

`DELETE /cron/{tenant}/{id}/{type}`

Deletes a cron with specific combination.

2 changes: 1 addition & 1 deletion app/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<parent>
<groupId>com.walmartlabs.bigben</groupId>
<artifactId>bigben</artifactId>
<version>1.0.7-SNAPSHOT</version>
<version>1.0.9-SNAPSHOT</version>
</parent>
<artifactId>bigben-app</artifactId>
<name>BigBen:app</name>
Expand Down
8 changes: 1 addition & 7 deletions app/src/main/kotlin/com/walmartlabs/bigben/app/run.kt
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,8 @@ fun Application.routes() {
post("/generation/random") { call.respond(EventGenerator.generateEvents(call.receive())) }
route("/cron") {
post { call.fromAPIResponse(CronService.upsert(call.receive())) }
get("/describe") { call.fromAPIResponse(CronService.describe(call.receive())) }
get("/{tenant}/{id}") {
call.fromAPIResponse(
CronService.get(
call.parameters["tenant"]!!, call.parameters["id"]!!,
call.request.queryParameters["describe"]?.toBoolean()
)
)
call.fromAPIResponse(CronService.get(call.parameters["tenant"]!!, call.parameters["id"]!!))
}
delete("/{tenant}/{id}/{type}") {
call.fromAPIResponse(CronService.delete(call.parameters["tenant"]!!, call.parameters["id"]!!, call.parameters["type"]!!))
Expand Down
76 changes: 76 additions & 0 deletions build/bin/bigben-schema.cql
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
-- DROP KEYSPACE IF EXISTS bigben;

CREATE
KEYSPACE IF NOT EXISTS bigben
WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };

-- DROP TABLE IF EXISTS bigben.buckets;

CREATE TABLE IF NOT EXISTS bigben.buckets
(
id timestamp PRIMARY KEY,
count bigint,
failed_shards text,
modified_at timestamp,
processed_at timestamp,
status text
);

-- DROP TABLE IF EXISTS bigben.lookups;

CREATE TABLE IF NOT EXISTS bigben.lookups
(
tenant text,
xref_id text,
bucket_id timestamp,
event_id text,
event_time timestamp,
l_m timestamp,
payload text,
shard int,
PRIMARY KEY ((tenant, xref_id)
)
);

-- DROP TABLE IF EXISTS bigben.events;

CREATE TABLE IF NOT EXISTS bigben.events
(
bucket_id timestamp,
shard int,
event_time timestamp,
id text,
error text,
payload text,
processed_at timestamp,
status text,
tenant text,
xref_id text,
PRIMARY KEY ((bucket_id, shard),
event_time,
id
)
) WITH CLUSTERING ORDER BY (event_time ASC, id ASC);

-- DROP TABLE IF EXISTS bigben.kv_table;

CREATE TABLE IF NOT EXISTS bigben.kv_table
(
key text,
column text,
l_m timestamp,
value text,
PRIMARY KEY (key, column)
) WITH CLUSTERING ORDER BY (column ASC);

-- DROP TABLE IF EXISTS bigben.cron_events;

CREATE TABLE IF NOT EXISTS bigben.cron_events
(
cron_id text,
bucket_id timestamp,
event_time TIMESTAMP,
event text,
PRIMARY KEY ((cron_id, bucket_id),event_time)
) WITH CLUSTERING ORDER BY (event_time DESC);

Binary file added build/bin/bigben.jar
Binary file not shown.
146 changes: 146 additions & 0 deletions build/bin/bigben.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
# top level modules
modules:
- name: domain
class: com.walmartlabs.bigben.providers.domain.cassandra.CassandraModule
- name: processors
object: com.walmartlabs.bigben.processors.ProcessorRegistry
- name: hz
class: com.walmartlabs.bigben.utils.hz.Hz
- name: scheduler
object: com.walmartlabs.bigben.SchedulerModule
- name: events
object: com.walmartlabs.bigben.EventModule
- name: messaging
object: com.walmartlabs.bigben.kafka.KafkaModule
enabled: ${kafka.module.enabled:-false}
- name: cron
object: com.walmartlabs.bigben.cron.CronRunner
enabled: ${cron.module.enabled:-false}

# hazelcast properties
hz:
template: file://hz.template.xml
group:
name: bigben-dev
password: bigben-dev
network:
autoIncrementPort: true
members: 127.0.0.1
port: 5701
map:
store:
writeDelay: 30

# message related properties
messaging.producer.factory.class: com.walmartlabs.bigben.kafka.KafkaMessageProducerFactory

# cassandra related properties
cassandra:
keyspace: bigben
cluster:
contactPoints: 127.0.0.1
clusterName: bigben-cluster
port: 9042
localDataCenter: null
coreConnectionsPerLocalHost: 1
maxConnectionsPerLocalHost: 1
coreConnectionsPerRemoteHost: 1
maxConnectionsPerRemoteHost: 1
maxRequestsPerLocalConnection: 32768
maxRequestsPerRemoteConnection: 2048
newLocalConnectionThreshold: 3000
newRemoteConnectionThreshold: 400
poolTimeoutMillis: 0
keepTCPConnectionAlive: true
connectionTimeOut: 5000
readTimeout: 12000
reconnectPeriod: 5
username: null
password: null
downgradingConsistency: false
writeConsistency: LOCAL_ONE
readConsistency: LOCAL_ONE

# kafka consumer properties
kafka:
consumers:
- num.consumers: ${num.consumers:-8}
processor.impl.class: com.walmartlabs.bigben.kafka.ProcessorImpl
topics: ${bigben.inbound.topic.name:-null}
max.poll.wait.time: ${max.poll.wait.time:-10000}
message.retry.max.count: ${message.retry.max.count:-10}
config:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
bootstrap.servers: ${bigben.inbound.topic.bootstrap.servers:-null}
#fetch.min.bytes: 1
group.id: ${group.id:-bigben-inbound}
heartbeat.interval.ms: ${heartbeat.interval.ms:-3000}
session.timeout.ms: 30000
auto.offset.reset: ${auto.offset.reset:-latest}
fetch.max.bytes: 324000
max.poll.interval.ms: 30000
max.poll.records: 100
receive.buffer.bytes: 65536
request.timeout.ms: 60000
#send.buffer.bytes: 131072
enable.auto.commit: ${enable.auto.commit:-false}
producer:
config: # this is default kafka producer config, these values will be used if not supplied during the tenant registration
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: org.apache.kafka.common.serialization.StringSerializer
acks: "1"
buffer.memory: 32400
retries: 3

# system properties
task:
executor:
#retry.thread.count: 8
retry.time.units: SECONDS
delay: 1
max.retries: 3
backoff.multiplier: 2

app.server.port: 8080
generic.future.max.get.time: 60

events:
scheduler.enabled: true
schedule.scan.interval.minutes: 1
num.shard.submitters: 8
receiver:
shard.size: 1000
lapse.offset.minutes: 0
delete:
max.retries: 3
initial.delay: 1
backoff.multiplier: 1
submit:
initial.delay: 1
backoff.multiplier: 1
max.retries: 3
processor:
max.retries: 3
initial.delay: 1
backoff.multiplier: 2
eager.loading: true
tasks:
max.events.in.memory: 100000
scheduler.worker.threads: 8

# bucket manager / loader related properties
buckets:
backlog.check.limit: 1440 # 1 Day
background:
load.fetch.size: 100
load.wait.interval.seconds: 15

cron:
runner:
core.pool.size: 8
load:
max.retries: 10
delay: 1
backoff.multiplier: 1
time.units: "SECONDS"
2 changes: 1 addition & 1 deletion build/docker/app_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ function start() {
-e JAVA_OPTS="${JAVA_OPTS} -Dbigben.configs=uri://${APP_ROOT}/overrides.yaml,uri://${APP_ROOT}/bigben.yaml \
-Dapp.server.port=${SERVER_PORT} \
-Dbigben.log.file=${APP_ROOT}/logs/bigben_app_${app_port}.log \
-Dbigben.log.config=${APP_ROOT} \
-Dbigben.log.config=${APP_ROOT}/log4j.xml \
-Dhazelcast.local.publicAddress=${HOST_IP}:${hz_port}" \
--name "${APP_CONTAINER_NAME}_$app_port" sandeepmalik/bigben:1
let i=i+1
Expand Down
Loading