-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathindex.js
134 lines (117 loc) · 4.16 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// © Copyright IBM Corporation 2015,2017.
// Node module: flow-engine
// LICENSE: Apache 2.0, https://www.apache.org/licenses/LICENSE-2.0
/*eslint-env node */
'use strict';
var fs = require('fs');
var path = require('path');
var yaml = require('yamljs');
var flow = require('./lib/flow');
var context = require('./lib/context');
//This is for global level logging and only used in creating
//a flow-engine middleware
var logger = require('./lib/flow')._gLogger;
//The middleware ctor function. The configuration is setup via the options.
//The options object may have:
//- flow: the YAML file path
//- basedir: the working directory, for loading the other files.
//- tasks: the custom policies to be executed, usually they are pieces of
//javascript code.
module.exports = function(options) {
var config;
var error;
//used to interpret or resolve the place holder in the config properties
var paramResolver;
//store the tasks's setup functions
var tasks;
var ctx;
//step 1: loading the assembly yaml
try {
//read assembly file. TODO: this will be replaced by mplane later on
logger.debug('Loading the assembly file "%s"', options.flow);
config = yaml.load(options.flow);
//watch the assembly file
fs.watchFile(options.flow, function(curr, prev) {
//if file changes, reload it
if (curr.mtime > prev.mtime) {
logger.debug('%s %', 'The assembly file is changed.',
'Reload the file for the following requests.');
try {
config = yaml.load(options.flow);
error = undefined;
} catch (e2) {
logger.error('Failed to reload the assembly file: %s', e2);
error = e2;
}
}
});
} catch (e) {
logger.error('Failed to load the assembly file: %s', e);
error = e;
}
//step 2: loading the paramResolver if the 'paramResolver' presents
try {
// the callback function for resolving task's parameter values
// TODO fix the issue of using '.' with path.join('.', 'someModule');
// Verify the test using mocha test case in test-flow.js
// TODO paramResolver & task's base path must be the same???
if (options.paramResolver) {
logger
.debug('Loading the parameter resolver "%s"', options.paramResolver);
paramResolver = require(
path.join((options.baseDir ? options.baseDir : ''),
options.paramResolver))();
}
} catch (e) {
logger.error('Failed to load the parameter resolver: %s', e);
logger.debug('Continue the flow execution without a parameter resolver');
}
//step 3: loading tasks module if there is
tasks = loadTasks(options.tasks, options.baseDir);
//step 4: prepare ctx if not ready
ctx = options.context || context.createContext();
//return the middleware function
return function(req, res, next) {
if (error) {
logger.debug('Go to the error middleware');
//error with loading the assembly file. Go to the error middleware
next(error);
} else {
logger.debug('Invoke the Flow middleware');
//start to run the flow engine
var flowObj = new flow.Flow(config, {
paramResolver: paramResolver,
baseDir: options.baseDir,
tasks: tasks,
});
//create the empty message on the context
ctx.set('message', {}, true);
//execute the flow with the Context object.
flowObj.prepare(ctx, next);
flowObj.run();
}
};
};
function loadTasks(tasks, baseDir) {
var rev = {};
baseDir = baseDir || '';
for (var name in tasks) {
try {
logger.debug('Loading the custom policy "%s"', name);
// TODO fix the issue of using '.' with path.join('.', 'someModule')
// Verify the test using mocha test case in test-flow.js
var taskFunc = require(path.join(baseDir, tasks[name]));
if (!(taskFunc instanceof Function)) {
logger.error('The "%s" policy is not a function. Skip it.', name);
continue;
}
rev[name] = taskFunc({});
} catch (e) {
logger.error('Failed to load the custom policy "%s": %s', name, e);
}
}
return rev;
}
module.exports.Flow = flow.Flow;
module.exports.createContext = context.createContext;
module.exports.tid = flow.tid;