Skip to content

Commit 52bb3fb

Browse files
authored
PLAT-1037 leaky bucket (#12)
Some refactor Packages updated and version-pinned More aggressive management of RabbitFallback
1 parent 7b5a82a commit 52bb3fb

25 files changed

Lines changed: 583 additions & 512 deletions

.editorconfig

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ root = true
33

44
[*]
55
indent_style = space
6-
indent_size = 2
6+
indent_size = 4
77
end_of_line = lf
88
charset = utf-8
99
trim_trailing_whitespace = true

.eslintrc

Lines changed: 0 additions & 20 deletions
This file was deleted.

.eslintrc.json

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
{
2+
"root": true,
3+
"env": {
4+
"es6": true,
5+
"node": true
6+
},
7+
"extends": [
8+
"eslint:recommended"
9+
],
10+
"parserOptions": {
11+
"ecmaVersion": 8
12+
},
13+
"rules": {
14+
"quote-props": ["warn", "consistent-as-needed"],
15+
"arrow-parens": 0,
16+
"generator-star-spacing": 0,
17+
"no-debugger": 0,
18+
"array-element-newline": ["error", "consistent"],
19+
"array-bracket-newline": ["error", "consistent"],
20+
"array-bracket-spacing": ["error", "never"],
21+
"no-console": "error",
22+
"indent": ["error", 4, { "SwitchCase": 1 }],
23+
"space-infix-ops": ["warn"],
24+
"eol-last":["warn"],
25+
"no-multi-spaces": ["warn", { "ignoreEOLComments": true }],
26+
"no-multiple-empty-lines": ["error", { "max": 2, "maxEOF": 1 }],
27+
"comma-dangle": ["warn", "never"],
28+
"eqeqeq": ["warn"],
29+
"semi": ["warn", "never"],
30+
"keyword-spacing": ["warn"],
31+
"space-before-blocks": ["warn"],
32+
"comma-spacing": ["warn", { "before": false, "after": true }]
33+
}
34+
}
35+
36+

.gitignore

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,10 @@ typings/
6262
# dotenv environment variables file
6363
.env
6464

65+
# vscode editor settings
66+
.vscode/
67+
6568

6669
# End of https://www.gitignore.io/api/node
6770

6871
.idea
69-
lib

Dockerfile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
FROM node:7.9.0
1+
FROM node:12.18.3
22
MAINTAINER Superbalist <tech+docker@superbalist.com>
33

44
RUN mkdir -p /usr/src/app
55
WORKDIR /usr/src/app
66

77
COPY package.json /usr/src/app/
8-
RUN yarn install
8+
RUN npm install --only=prod
99

1010
COPY src /usr/src/app/src/
1111

Dockerfile-alpine

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
1-
FROM node:7.9.0-alpine
1+
FROM node:12.18.3-alpine
22
MAINTAINER Superbalist <tech+docker@superbalist.com>
33

44
RUN mkdir -p /usr/src/app
55
WORKDIR /usr/src/app
66

77
COPY package.json /usr/src/app/
8-
RUN yarn install
8+
RUN npm install --only=prod
99

1010
COPY src /usr/src/app/src/
1111

changelog.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
11
# Changelog
22

3+
## 1.3.0 2020-09-15
4+
* Badly formed channel messages result in `400` error
5+
* When this service is shutting down, it won't be receiving any more messages
6+
* When a message fails with `503`, the caller should retry
7+
38
## 1.2.0 2018-12-21
49

510
* Fallback to rabbitmq if messages fail 3 times.

package.json

Lines changed: 13 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
{
22
"name": "@superbalist/js-pubsub-rest-proxy",
3-
"version": "1.2.0",
3+
"version": "1.3.0",
44
"description": "An HTTP server which acts as a gateway for publishing messages via a js-pubsub adapter",
55
"private": true,
66
"scripts": {
77
"start": "node ./src/bin/www",
8-
"start-dev": "nodemon -L ./src/bin/www"
8+
"start-dev": "nodemon -L ./src/bin/www -- --trace-sync-io"
99
},
1010
"repository": {
1111
"type": "git",
@@ -24,20 +24,19 @@
2424
"dependencies": {
2525
"@superbalist/js-event-pubsub": "^3.0.2",
2626
"@superbalist/js-pubsub-manager": "^3.0.1",
27-
"ajv": "^5.0.1",
28-
"amqplib": "^0.5.3",
29-
"body-parser": "^1.17.2",
30-
"debug": "~2.6.3",
31-
"express": "^4.15.3",
32-
"morgan": "^1.8.1",
33-
"prom-client": "^10.2.2",
34-
"queue": "^4.2.1",
27+
"ajv": "5.0.1",
28+
"amqplib": "0.6.0",
29+
"body-parser": "1.19.0",
30+
"debug": "4.1.1",
31+
"express": "4.17.1",
32+
"morgan": "1.10.0",
33+
"prom-client": "12.0.0",
34+
"queue": "6.0.1",
3535
"raven": "^2.0.1",
36-
"winston": "^2.3.1"
36+
"winston": "3.3.3"
3737
},
3838
"devDependencies": {
39-
"eslint": "^6.8.0",
40-
"eslint-config-google": "^0.7.1",
41-
"nodemon": "^1.18.7"
39+
"eslint": "7.9.0",
40+
"nodemon": "2.0.4"
4241
}
4342
}

src/app.js

Lines changed: 98 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,133 +1,107 @@
1-
'use strict';
2-
3-
let config = require('./config');
4-
let express = require('express');
5-
let morgan = require('morgan');
6-
let bodyParser = require('body-parser');
7-
let Raven = require('raven');
8-
if (config.SENTRY_DSN) {
9-
Raven.config(config.SENTRY_DSN)
10-
.install();
1+
'use strict'
2+
3+
// get configuration
4+
const {
5+
SENTRY_DSN,
6+
MAX_POST_SIZE,
7+
FALLBACK,
8+
RABBIT
9+
} = require('./config')
10+
11+
let serviceIsAvailable = false
12+
const express = require('express')
13+
const morgan = require('morgan')
14+
const bodyParser = require('body-parser')
15+
const Raven = require('raven')
16+
17+
//TODO Upgrade to sentry pls -- this is deprecated
18+
if (SENTRY_DSN) {
19+
Raven.config(SENTRY_DSN)
20+
.install()
1121
}
1222

13-
if (config.FALLBACK) {
14-
// Use var intentionally so that rabbit doesn't need to be defined
15-
var rabbit = require('./rabbit'); // eslint-disable-line no-var
16-
}
23+
const prom = require('./lib/prometheus')
24+
const logger = require('./lib/logger')
25+
const queue = require('./lib/queue')
26+
const ServiceError = require('./lib/ServiceError')
27+
const rabbitController = require('./lib/rabbit')
1728

18-
let prom = require('./prometheus');
19-
20-
let logger = require('./logger');
21-
let pubsub = require('./pubsub');
22-
let queue = require('./queue');
23-
24-
// bootstrap app
25-
let app = express();
26-
app.use(morgan('dev'));
27-
app.use(bodyParser.json({ limit: config.MAX_POST_SIZE }));
28-
app.use(bodyParser.urlencoded({extended: false}));
29-
30-
// register routes
31-
app.get('/', (req, res, next) => {
32-
res.json({ping: 'pong'});
33-
});
34-
35-
app.get('/healthz', (req, res, next) => {
36-
res.json({ping: 'pong'});
37-
});
38-
39-
app.post('/messages/:channel', (req, res, next) => {
40-
let end = prom.requestSummary.startTimer();
41-
let channel = req.params.channel;
42-
prom.receiveCount.inc({channel});
43-
let messages = req.body.messages || [];
44-
queue.push(publishJob(channel, messages, end));
45-
res.json({success: true});
46-
});
47-
48-
49-
/**
50-
* Return a job
51-
*
52-
* @param {String} channel
53-
* @param {[Object]} messages
54-
* @param {Function} end
55-
* @param {Integer} retries
56-
*
57-
* @return {Function}
58-
*/
59-
function publishJob(channel, messages, end, retries=0) {
60-
return function(cb) {
61-
pubsub.publish(channel, messages).then((result)=>{
62-
let errors = result.errors;
63-
if(errors.length > 0) {
64-
prom.publishCount.inc({state: 'failed', channel});
65-
if(config.FALLBACK && retries >= 2) {
66-
end();
67-
return rabbit.publish(channel, errors);
68-
}
69-
retries++;
70-
queue.push(publishJob(channel, errors.map((error) => error.message), end, retries));
71-
} else {
72-
prom.publishCount.inc({state: 'success', channel});
73-
end();
74-
}
75-
cb();
76-
}).catch((error) => {
77-
cb(error);
78-
});
79-
};
80-
}
29+
if (FALLBACK) rabbitController.start()
30+
31+
const app = express()
32+
33+
app.use(morgan('dev'))
34+
app.use(bodyParser.json({ limit: MAX_POST_SIZE }))
35+
app.use(bodyParser.urlencoded({ extended: false }))
36+
37+
// why is this here? :shrug:
38+
app.get('/', async (req, res) => {
39+
// logger.info('ping')
40+
res.json({ ping: 'pong' }).end()
41+
})
42+
43+
// k8s health check endpoint
44+
app.get('/healthz', (req, res) => {
45+
if (!serviceIsAvailable) throw new ServiceError('This service is not currently accepting requests', 400)
46+
res.json({ ping: 'pong' })
47+
} )
8148

82-
// bind middleware
83-
app.use((req, res, next) => {
84-
let err = new Error('Not Found');
85-
err.status = 404;
86-
next(err);
87-
});
49+
// handle incoming message post requests
50+
app.post('/messages/:channel', async (req, res) => {
51+
const end = prom.requestSummary.startTimer()
52+
const channel = req.params.channel
53+
const messages = req.body.messages
8854

55+
// Early exit if the format is wrong
56+
if (!Array.isArray(messages)) throw new ServiceError('`messages` property is expected to be an array', 400)
57+
58+
// Count this channel add
59+
prom.receiveCount.inc({ channel })
60+
queue.addPublishJob(channel, messages, end)
61+
res.json({ success: true }).end()
62+
})
63+
64+
65+
// Anything requests that have not been dealt with by this point is 404
66+
app.use((req, res, next) => next(new ServiceError('Not Found', 404)))
67+
68+
// Express Error Handler
69+
// eslint-disable-next-line no-unused-vars
8970
app.use((err, req, res, next) => {
90-
// set locals, only providing error in development
91-
res.locals.message = err.message;
92-
res.locals.error = req.app.get('env') === 'development' ? err : {};
93-
94-
// return a json error response
95-
let status = err.status || 500;
96-
res.status(status);
97-
res.json({
98-
status: status,
99-
message: err.message,
100-
});
101-
});
102-
103-
let onExitHandler = () => {
104-
logger.info('Preparing to shutdown application');
105-
106-
logger.info('Stopping queue timer');
107-
clearInterval(queue.timer);
108-
109-
logger.info('Closing express server socket');
110-
// When the HTTP server closes we want to empty the job queue.
111-
app.server.close(emptyQueue);
112-
};
113-
114-
115-
/**
116-
* Run the queue and exit if it is empty
117-
*/
118-
function emptyQueue() {
119-
if(queue.length > 0) {
120-
logger.info('Processing remaining jobs on queue');
121-
queue.start(emptyQueue);
122-
}
71+
// set locals, only providing error in development
72+
res.locals.message = err.message
73+
res.locals.error = req.app.get('env') === 'development' ? err : {}
74+
75+
// return a json error response
76+
let status = err.status || 500
77+
res.status(status)
78+
res.json({
79+
status,
80+
message: err.message
81+
})
82+
})
83+
84+
// Graceful shutdown
85+
let exitHandler = async () => {
86+
serviceIsAvailable = false // Health endpoint will start reporting failure
87+
88+
logger.info('Shutdown Initiated.')
89+
90+
logger.info('Express server has shut down.')
91+
92+
Promise.all([
93+
queue.end(), //Empty queue
94+
rabbitController.stop(RABBIT.SHUTDOWN_WAIT) //Wait for rabbit connection to close
95+
]).then(()=>{
96+
logger.info('PSRP Terminating.')
97+
98+
}).catch(()=>{})
12399
}
124100

125-
process.on('SIGTERM', () => {
126-
onExitHandler();
127-
});
128-
129-
process.on('SIGINT', () => {
130-
onExitHandler();
131-
});
101+
// Make sure we exit cleanly
102+
process.on('SIGTERM', exitHandler) // regular termination signal
103+
process.on('SIGINT', exitHandler) // for ^C
104+
// process.on('SIGUSR2', exitHandler) // for nodemon during dev
132105

133-
module.exports = app;
106+
serviceIsAvailable = true // Health endpoint will start reporting success
107+
module.exports = app

0 commit comments

Comments
 (0)