forked from gosquared/aws-elasticsearch-js
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathindex.js
137 lines (113 loc) · 3.97 KB
/
index.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
const AWS = require('aws-sdk');
const debug = require('debug')('elasticsearch');
const pump = require('pump');
const { Connection } = require('@elastic/elasticsearch');
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const {
ConnectionError,
RequestAbortedError,
TimeoutError
} = require('@elastic/elasticsearch/lib/errors');
function isStream(obj) {
return typeof obj.pipe === 'function';
}
function createConnector(opts = {}) {
let { region, getCreds } = opts;
region = region || process.env.AWS_REGION;
if (!getCreds) getCreds = cb => AWS.config.getCredentials(cb);
class AwsEs extends Connection {
request(params, callback) {
this._openRequests++
let cleanedListeners = false
let aborted;
let request = { abort: () => { aborted = true } };
const requestParams = this.buildRequestObject(params)
// https://github.com/nodejs/node/commit/b961d9fd83
if (INVALID_PATH_REGEX.test(requestParams.path) === true) {
callback(new TypeError(`ERR_UNESCAPED_CHARACTERS: ${requestParams.path}`), null)
/* istanbul ignore next */
return request;
}
debug('Starting a new request', params)
getCreds((err, credentials) => {
if (err) {
this._openRequests--;
return callback(new ConnectionError(err.message), null);
}
if (aborted) {
this._openRequests--;
return;
}
const domain = requestParams.hostname;
const body = requestParams.body;
const endpoint = new AWS.Endpoint(domain);
request = new AWS.HttpRequest(endpoint, region);
request.method = requestParams.method;
request.path = requestParams.path;
if (body && !isStream(body)) {
request.body = body;
request.headers['Content-Length'] = Buffer.byteLength(request.body);
}
const contentType = requestParams.headers['content-type'];
if (contentType) {
request.headers['Content-Type'] = contentType;
}
request.headers['host'] = domain;
const signer = new AWS.Signers.V4(request, 'es');
signer.addAuthorization(credentials, new Date());
const client = new AWS.HttpClient();
const response = () => {};
const error = () => {};
const opts = {
timeout: requestParams.timeout,
agent: requestParams.agent
}
request = client.handleRequest(request, opts, response, error)
const onResponse = response => {
cleanListeners()
this._openRequests--
callback(null, response)
}
const onTimeout = () => {
cleanListeners()
this._openRequests--
request.once('error', () => {}) // we need to catch the request aborted error
request.abort()
callback(new TimeoutError('Request timed out', params), null)
}
const onError = err => {
cleanListeners()
this._openRequests--
callback(new ConnectionError(err.message), null)
}
const onAbort = () => {
cleanListeners()
request.once('error', () => {}) // we need to catch the request aborted error
debug('Request aborted', params)
this._openRequests--
callback(new RequestAbortedError(), null)
}
request.on('response', onResponse)
request.on('timeout', onTimeout)
request.on('error', onError)
request.on('abort', onAbort)
// Disables the Nagle algorithm
request.setNoDelay(true)
function cleanListeners () {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.removeListener('abort', onAbort)
cleanedListeners = true
}
});
return {
abort: () => {
request.abort();
}
}
}
}
return AwsEs;
}
exports.createConnector = createConnector;