Skip to content

Commit 374dfa1

Browse files
authored
Merge pull request #1927 from eeff/v2.6
Pick to v2.6 fix(ekuiper): memory leak and data race
2 parents eff4bc7 + 45b3aed commit 374dfa1

File tree

4 files changed

+65
-63
lines changed

4 files changed

+65
-63
lines changed

plugins/ekuiper/json_rw.c

+11-15
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,8 @@ int json_decode_write_req(char *buf, size_t len, json_write_req_t **result)
185185

186186
json_obj = neu_json_decode_newb(buf, len);
187187
if (NULL == json_obj) {
188-
goto decode_fail;
188+
free(req);
189+
return -1;
189190
}
190191

191192
neu_json_elem_t req_elems[] = {
@@ -208,27 +209,20 @@ int json_decode_write_req(char *buf, size_t len, json_write_req_t **result)
208209
};
209210
ret = neu_json_decode_by_json(json_obj, NEU_JSON_ELEM_SIZE(req_elems),
210211
req_elems);
211-
if (ret != 0) {
212-
goto decode_fail;
213-
}
214-
215212
req->node_name = req_elems[0].v.val_str;
216213
req->group_name = req_elems[1].v.val_str;
217214
req->tag_name = req_elems[2].v.val_str;
218215
req->t = req_elems[3].t;
219216
req->value = req_elems[3].v;
220217

221-
*result = req;
222-
goto decode_exit;
223-
224-
decode_fail:
225-
free(req);
226-
ret = -1;
227-
228-
decode_exit:
229-
if (json_obj != NULL) {
230-
neu_json_decode_free(json_obj);
218+
if (0 == ret) {
219+
*result = req;
220+
} else {
221+
json_decode_write_req_free(req);
222+
ret = -1;
231223
}
224+
225+
neu_json_decode_free(json_obj);
232226
return ret;
233227
}
234228

@@ -243,6 +237,8 @@ void json_decode_write_req_free(json_write_req_t *req)
243237
free(req->tag_name);
244238
if (req->t == NEU_JSON_STR) {
245239
free(req->value.val_str);
240+
} else if (req->t == NEU_JSON_BYTES) {
241+
free(req->value.val_bytes.bytes);
246242
}
247243

248244
free(req);

plugins/ekuiper/plugin_ekuiper.c

+50-48
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,9 @@ static int ekuiper_plugin_uninit(neu_plugin_t *plugin)
114114
return rv;
115115
}
116116

117-
static int ekuiper_plugin_start(neu_plugin_t *plugin)
117+
static inline int start(neu_plugin_t *plugin, const char *url)
118118
{
119-
int rv = 0;
120-
char *url = plugin->url ? plugin->url : EKUIPER_PLUGIN_URL; // default url
119+
int rv = 0;
121120

122121
rv = nng_pair0_open(&plugin->sock);
123122
if (rv != 0) {
@@ -143,16 +142,43 @@ static int ekuiper_plugin_start(neu_plugin_t *plugin)
143142
return rv;
144143
}
145144

145+
nng_mtx_lock(plugin->mtx);
146+
while (plugin->receiving) {
147+
nng_mtx_unlock(plugin->mtx);
148+
nng_msleep(10);
149+
nng_mtx_lock(plugin->mtx);
150+
}
151+
plugin->receiving = true;
146152
nng_recv_aio(plugin->sock, plugin->recv_aio);
153+
nng_mtx_unlock(plugin->mtx);
154+
155+
return NEU_ERR_SUCCESS;
156+
}
157+
158+
static int ekuiper_plugin_start(neu_plugin_t *plugin)
159+
{
160+
int rv = 0;
161+
char *url = plugin->url ? plugin->url : EKUIPER_PLUGIN_URL; // default url
162+
163+
rv = start(plugin, url);
164+
if (rv != 0) {
165+
return rv;
166+
}
167+
147168
plugin->started = true;
148169
plog_notice(plugin, "start successfully");
149170

150171
return NEU_ERR_SUCCESS;
151172
}
152173

153-
static int ekuiper_plugin_stop(neu_plugin_t *plugin)
174+
static inline void stop(neu_plugin_t *plugin)
154175
{
155176
nng_close(plugin->sock);
177+
}
178+
179+
static int ekuiper_plugin_stop(neu_plugin_t *plugin)
180+
{
181+
stop(plugin);
156182
plugin->started = false;
157183
plog_notice(plugin, "stop successfully");
158184
return NEU_ERR_SUCCESS;
@@ -201,41 +227,6 @@ static int parse_config(neu_plugin_t *plugin, const char *setting,
201227
return -1;
202228
}
203229

204-
static inline int check_url_listenable(neu_plugin_t *plugin, const char *url,
205-
const char *host, uint16_t port)
206-
{
207-
if (NULL != plugin->host && // already configured and
208-
port == plugin->port && // port is the same, then if
209-
(0 == strcmp(plugin->host, host) // 1. host is the same
210-
|| 0 == strcmp("0.0.0.0", host) // 2. to bind to any address
211-
|| 0 == strcmp("0.0.0.0", plugin->host) // 3. bound to any address
212-
)) {
213-
// early return, no need check url is listenable
214-
return 0;
215-
}
216-
217-
nng_socket sock = NNG_SOCKET_INITIALIZER;
218-
int rv = nng_pair0_open(&sock);
219-
if (0 != rv) {
220-
plog_error(plugin, "nng_pair0_open: %s", nng_strerror(rv));
221-
return NEU_ERR_EINTERNAL;
222-
}
223-
224-
if (0 != (rv = nng_listen(sock, url, NULL, 0))) {
225-
plog_error(plugin, "nng_listen: %s", nng_strerror(rv));
226-
if (NNG_EADDRINVAL == rv) {
227-
rv = NEU_ERR_IP_ADDRESS_INVALID;
228-
} else if (NNG_EADDRINUSE == rv) {
229-
rv = NEU_ERR_IP_ADDRESS_IN_USE;
230-
} else {
231-
rv = NEU_ERR_EINTERNAL;
232-
}
233-
}
234-
235-
nng_close(sock);
236-
return rv;
237-
}
238-
239230
static int ekuiper_plugin_config(neu_plugin_t *plugin, const char *setting)
240231
{
241232
int rv = 0;
@@ -255,23 +246,31 @@ static int ekuiper_plugin_config(neu_plugin_t *plugin, const char *setting)
255246
goto error;
256247
}
257248

258-
if (0 != (rv = check_url_listenable(plugin, url, host, port))) {
249+
if (plugin->started) {
250+
stop(plugin);
251+
}
252+
253+
// check we could start the plugin with the new setting
254+
if (0 != (rv = start(plugin, url))) {
255+
// recover with old setting
256+
if (plugin->started && 0 != start(plugin, plugin->url)) {
257+
plog_warn(plugin, "restart host:%s port:%" PRIu16 " fail",
258+
plugin->host, plugin->port);
259+
}
259260
goto error;
260261
}
261262

263+
if (!plugin->started) {
264+
stop(plugin);
265+
}
266+
262267
plog_notice(plugin, "config success");
263268

264269
free(plugin->host);
270+
free(plugin->url);
265271
plugin->host = host;
266272
plugin->port = port;
267-
free(plugin->url);
268-
plugin->url = url;
269-
270-
if (plugin->started) {
271-
// restart service
272-
ekuiper_plugin_stop(plugin);
273-
ekuiper_plugin_start(plugin);
274-
}
273+
plugin->url = url;
275274

276275
return rv;
277276

@@ -310,6 +309,9 @@ static int ekuiper_plugin_request(neu_plugin_t * plugin,
310309
}
311310
break;
312311
}
312+
case NEU_REQRESP_NODE_DELETED: {
313+
break;
314+
}
313315
case NEU_REQ_UPDATE_NODE: {
314316
break;
315317
}

plugins/ekuiper/plugin_ekuiper.h

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ struct neu_plugin {
3737
nng_socket sock;
3838
nng_mtx * mtx;
3939
bool started;
40+
bool receiving;
4041
nng_aio * recv_aio;
4142
char * host;
4243
uint16_t port;

plugins/ekuiper/read_write.c

+3
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,9 @@ void recv_data_callback(void *arg)
8686
rv = nng_aio_result(plugin->recv_aio);
8787
if (0 != rv) {
8888
plog_error(plugin, "nng_recv error: %s", nng_strerror(rv));
89+
nng_mtx_lock(plugin->mtx);
90+
plugin->receiving = false;
91+
nng_mtx_unlock(plugin->mtx);
8992
return;
9093
}
9194

0 commit comments

Comments
 (0)