Skip to content

Commit d965d4f

Browse files
committed
Initial commit: migration from private repo
0 parents  commit d965d4f

17 files changed

+1897
-0
lines changed

.gitignore

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
*.log
2+
node_modules/
3+
queue/
4+
working/
5+
error/

.jshintrc

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
{
2+
"node": true,
3+
4+
"camelcase": true,
5+
"curly": true,
6+
"eqeqeq": true,
7+
"expr": true,
8+
"indent": 2,
9+
"latedef": true,
10+
"newcap": false,
11+
"quotmark": "single",
12+
"strict": true,
13+
"undef": true,
14+
"unused": true
15+
}

.tern-project

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
{
2+
"libs": [
3+
"ecma5"
4+
],
5+
"loadEagerly": [
6+
"**.js"
7+
],
8+
"plugins": {
9+
"node": {}
10+
}
11+
}

LICENSE.txt

Lines changed: 373 additions & 0 deletions
Large diffs are not rendered by default.

README.md

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
# File queue
2+
3+
To install, execute `npm install -g openhim-mediator-file-queue`
4+
5+
Once installed you will have to manually edit the config files in the location where global npm modules are installed.
6+
7+
* With NVM this is usually: `~/.nvm/versions/node/v4.1.0/lib/node_modules/openhim-mediator-file-queue/config`
8+
* With a standard node install this is usually: `/usr/local/lib/node_modules/openhim-mediator-file-queue/config`
9+
10+
You may add a `production.json` file to the config folder to override or add any config values.
11+
12+
## How it works
13+
14+
The file queue simply handles incoming requests, writing the files to a directory on the filesystem, and then processes the queue, sending the files to a configured endpoint. If the file is successfully sent then the queued file is deleted from the filesystem, otherwise it is moved to an error directory.
15+
16+
Multiple "endpoints" can be configured. Each endpoint handles incoming requests for a specific URL, queues them, and then sends them to another configured URL. An endpoint has a "worker" which is responsible for reading the files from the queue and processing them. Workers can process multiple files in parallel as configured (by default 2 at a time). Workers can be paused/unpaused or repopulated via a RESTlike endpoint. Pausing a worker will stop it from processing files from the queue, but the endpoint will continue accepting requests and writing the files to the queue. Repopulating a worker will cause it to refresh its queue from the files on the filesystem. This is useful when manually adding files to or removing files from the queue.
17+
18+
## Endpoint config
19+
20+
An array of endpoints should be configured in the config file. An endpoint can have the following properties:
21+
* `name` (required) - The name of the endpoint which is used for setting up the RESTlike routes for the worker.
22+
* `path` (required) - The path to use for handling incoming requests.
23+
* `url` (required) - The URL to send the files to when processing them from the queue.
24+
* `paused` - Whether or not the endpoint's worker should be paused by default. This must be a boolean value.
25+
* `parallel` - The number of files that the worker should process in parallel. Defaults to 2.

config/config.json

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
{
2+
"port": 4003,
3+
"endpoints": [],
4+
"log_level": "info",
5+
"statsd": {
6+
"host": "localhost",
7+
"port": 8125
8+
},
9+
"api": {
10+
"apiURL": "https://localhost:8080",
11+
"username": "[email protected]",
12+
"password": "password"
13+
},
14+
"mediatorConf": {
15+
"urn": "urn:uuid:a15c3d48-0686-4c9b-b375-f68d2f244a33",
16+
"version": "1.1.1",
17+
"name": "file-queue",
18+
"description": "Async file queue mediator for the OpenHIM",
19+
"defaultChannelConfig": [],
20+
"endpoints": [
21+
{
22+
"name": "File queue",
23+
"host": "localhost",
24+
"path": "/test",
25+
"port": "4002",
26+
"primary": true,
27+
"type": "http"
28+
}
29+
],
30+
"configDefs": []
31+
}
32+
}

config/development.json

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
{
2+
"port": 4002,
3+
"log_level": "debug",
4+
"endpoints": [
5+
{
6+
"name": "test",
7+
"path": "/test",
8+
"url": "http://localhost:9999/test",
9+
"updateTx": true
10+
}
11+
]
12+
}

lib/index.js

Lines changed: 164 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,164 @@
1+
#!/usr/bin/env node
2+
'use strict';
3+
4+
var BodyParser = require('body-parser');
5+
var Confit = require('confit');
6+
var Crypto = require('crypto');
7+
var Express = require('express');
8+
var FS = require('graceful-fs');
9+
var MUtils = require('openhim-mediator-utils');
10+
var OnFinished = require('on-finished');
11+
var Path = require('path');
12+
var Stats = require('./stats');
13+
var Type = require('type-is');
14+
var Utils = require('./utils');
15+
var Worker = require('./worker');
16+
var Winston = require('winston');
17+
18+
var app = Express();
19+
20+
// Adds an extension to the filename based on the content-type request header
21+
function addExt(filename, req) {
22+
switch (Type(req, ['json', 'xml'])) {
23+
case 'json':
24+
return filename + '.json';
25+
case 'xml':
26+
return filename + '.xml';
27+
default:
28+
return filename + '.txt';
29+
}
30+
}
31+
32+
// Set up an endpoint based on the config
33+
function setUpEndpoint(endpoint, apiOpts) {
34+
var updateTx;
35+
if (endpoint.updateTx && endpoint.updateTx === true) {
36+
updateTx = true;
37+
} else {
38+
updateTx = false;
39+
}
40+
var worker = new Worker({
41+
name: endpoint.name,
42+
url: endpoint.url,
43+
paused: endpoint.paused,
44+
parallel: endpoint.parallel,
45+
updateTx: updateTx,
46+
apiOpts: apiOpts
47+
});
48+
49+
// Clear the worker's queue and repopulate it
50+
app.post('/workers/' + worker.name + '/repopulate', function(req, res) {
51+
worker.repopulate();
52+
res.status(200).send('Worker repopulated');
53+
});
54+
55+
// Register an endpoint for pausing/resuming the worker
56+
app.put('/workers/' + worker.name, BodyParser.json(), function(req, res) {
57+
if (typeof req.body.paused !== 'boolean') {
58+
return res.status(400).send('Missing or invalid property: paused');
59+
}
60+
if (req.body.paused) {
61+
worker.pause();
62+
res.status(200).send('Worker paused');
63+
} else {
64+
worker.resume();
65+
res.status(200).send('Worker resumed');
66+
}
67+
});
68+
Winston.info('Worker for endpoint %s available at /workers/%s', endpoint.path, worker.name);
69+
70+
// Register an endpoint for handling requests
71+
app.post(endpoint.path, function(req, res, next) {
72+
Winston.info('Handling request for %s', endpoint.path);
73+
74+
var filename;
75+
if (req.headers['x-openhim-transactionid']) {
76+
// set file name to transaction ID
77+
filename = req.headers['x-openhim-transactionid'];
78+
} else {
79+
// Generate an invalid transaction ID
80+
filename = Crypto.randomBytes(12).toString('hex').replace(/./, 'x');
81+
}
82+
filename = addExt(filename, req);
83+
84+
var filePath = Path.join(worker.queuePath, filename);
85+
var stream = req.pipe(FS.createWriteStream(filePath));
86+
stream.on('error', function(err) {
87+
Stats.increment('errors');
88+
Winston.error('Handling request for %s failed', endpoint.path);
89+
return next(err);
90+
});
91+
stream.on('finish', function() {
92+
Winston.info('File saved to ./%s', Path.relative(process.cwd(), filePath));
93+
worker.addToQueue(filename, function(err) {
94+
if (err) {
95+
Stats.increment('errors');
96+
Winston.error(err, {path: filename});
97+
}
98+
});
99+
100+
var mediatorResponse = {
101+
'x-mediator-urn': apiOpts.urn,
102+
status: 'Processing',
103+
response: {
104+
status: 202,
105+
body: 'Request added to queue',
106+
timestamp: new Date().toString()
107+
}
108+
};
109+
res.status(202).type('application/json+openhim').send(mediatorResponse);
110+
});
111+
});
112+
}
113+
114+
app.get('/heartbeat', function(req, res) {
115+
res.send({
116+
uptime: process.uptime()
117+
});
118+
});
119+
120+
app.use(function(req, res, next) {
121+
var start = process.hrtime();
122+
Stats.increment('requests');
123+
OnFinished(res, function(err, res) {
124+
Stats.increment('response_codes.' + res.statusCode);
125+
Stats.timing('response_time', Utils.millisecondsSince(start));
126+
});
127+
next();
128+
});
129+
130+
// Create the config and start up the app
131+
Confit(Path.join(__dirname, '..', 'config')).create(function(err, config) {
132+
if (err) {
133+
throw err;
134+
}
135+
136+
Object.defineProperty(app.locals, 'config', {value: config});
137+
138+
Winston.clear();
139+
Winston.add(Winston.transports.Console, {timestamp: true, level: config.get('log_level')});
140+
141+
Stats.init(config.get('statsd'));
142+
143+
var apiOpts = config.get('api');
144+
var mediatorConf = config.get('mediatorConf');
145+
apiOpts.urn = mediatorConf.urn;
146+
147+
MUtils.registerMediator(apiOpts, mediatorConf, function(err) {
148+
if (err) {
149+
Winston.error('Could not register mediator');
150+
process.exit(1);
151+
}
152+
153+
MUtils.activateHeartbeat(apiOpts);
154+
155+
config.get('endpoints').forEach(function(endpoint) {
156+
setUpEndpoint(endpoint, apiOpts);
157+
});
158+
159+
var port = process.env.PORT || config.get('port');
160+
app.listen(port, function() {
161+
Winston.info('App started on port %s', port);
162+
});
163+
});
164+
});

lib/stats.js

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
'use strict';
2+
3+
var OS = require('os');
4+
var StatsD = require('node-statsd');
5+
6+
var stats = null;
7+
8+
module.exports = {
9+
init: function(options) {
10+
stats = new StatsD({
11+
host: options.host,
12+
port: options.port,
13+
prefix: OS.hostname() + '.momconnect_queue.'
14+
});
15+
},
16+
17+
/**
18+
* Gauges a stat by a specified amount
19+
* @param stat {String|Array} The stat(s) to send
20+
* @param value The value to send
21+
* @param sampleRate {Number=} The Number of times to sample (0 to 1). Optional.
22+
* @param tags {Array=} The Array of tags to add to metrics. Optional.
23+
* @param callback {Function=} Callback when message is done being delivered. Optional.
24+
*/
25+
gauge: function() {
26+
stats && stats.gauge.apply(stats, arguments);
27+
},
28+
29+
/**
30+
* Increments a stat by a specified amount
31+
* @param stat {String|Array} The stat(s) to send
32+
* @param value The value to send
33+
* @param sampleRate {Number=} The Number of times to sample (0 to 1). Optional.
34+
* @param tags {Array=} The Array of tags to add to metrics. Optional.
35+
* @param callback {Function=} Callback when message is done being delivered. Optional.
36+
*/
37+
increment: function() {
38+
stats && stats.increment.apply(stats, arguments);
39+
},
40+
41+
/**
42+
* Represents the timing stat
43+
* @param stat {String|Array} The stat(s) to send
44+
* @param time {Number} The time in milliseconds to send
45+
* @param sampleRate {Number=} The Number of times to sample (0 to 1). Optional.
46+
* @param tags {Array=} The Array of tags to add to metrics. Optional.
47+
* @param callback {Function=} Callback when message is done being delivered. Optional.
48+
*/
49+
timing: function() {
50+
stats && stats.timing.apply(stats, arguments);
51+
}
52+
};

lib/utils.js

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
'use strict';
2+
3+
// Return the number of milliseconds since a start time
4+
exports.millisecondsSince = function millisecondsSince(start) {
5+
var diff = process.hrtime(start);
6+
return diff[0] * 1e3 + diff[1] / 1e6;
7+
};

0 commit comments

Comments
 (0)