Skip to content

Commit eee255a

Browse files
committed
* FIX [parquet] fixed parquet memory leak.
Signed-off-by: xinyi-xs <lihj@emqx.io>
1 parent 13e64dd commit eee255a

File tree

1 file changed

+24
-6
lines changed

1 file changed

+24
-6
lines changed

src/supplemental/nanolib/parquet/parquet.cc

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -724,7 +724,7 @@ const char *
724724
parquet_find(const char *topic, uint64_t key)
725725
{
726726
conf_parquet *conf = file_manager.fetch_conf(topic);
727-
if (conf->enable == false) {
727+
if (conf == NULL || conf->enable == false) {
728728
log_error("Parquet %s is not ready or not launch!", topic);
729729
return NULL;
730730
}
@@ -734,15 +734,20 @@ parquet_find(const char *topic, uint64_t key)
734734
void *elem = NULL;
735735
pthread_mutex_lock(&parquet_queue_mutex);
736736
auto queue = file_manager.fetch_queue(topic);
737+
if (queue == NULL) {
738+
pthread_mutex_unlock(&parquet_queue_mutex);
739+
return NULL;
740+
}
737741
FOREACH_QUEUE(*queue, elem)
738742
{
739743
if (elem) {
740744
if (compare_callback(elem, key)) {
741745
value = nng_strdup((char *) elem);
742-
break;
746+
goto found;
743747
}
744748
}
745749
}
750+
found:
746751
pthread_mutex_unlock(&parquet_queue_mutex);
747752
return value;
748753
}
@@ -769,6 +774,11 @@ parquet_find_span(
769774
pthread_mutex_lock(&parquet_queue_mutex);
770775

771776
auto queue = file_manager.fetch_queue(topic);
777+
if (queue == NULL) {
778+
pthread_mutex_unlock(&parquet_queue_mutex);
779+
*size = 0;
780+
return NULL;
781+
}
772782
if (queue->size != 0) {
773783
array = (const char **) nng_alloc(sizeof(char *) * queue->size);
774784

@@ -1114,7 +1124,7 @@ parquet_find_data_packet(
11141124
vector<parquet_data_packet *> ret_vec;
11151125
string topic = extract_topic(filename);
11161126
conf = file_manager.fetch_conf(topic);
1117-
if (conf->enable == false) {
1127+
if (conf == NULL || conf->enable == false) {
11181128
log_error("Parquet %s is not ready or not launch!", topic.c_str());
11191129
return ret_vec;
11201130
}
@@ -1124,6 +1134,11 @@ parquet_find_data_packet(
11241134
void *elem = NULL;
11251135
auto queue = file_manager.fetch_queue(topic);
11261136
pthread_mutex_lock(&parquet_queue_mutex);
1137+
if (queue == NULL) {
1138+
pthread_mutex_unlock(&parquet_queue_mutex);
1139+
ret_vec.resize(keys.size(), nullptr);
1140+
return ret_vec;
1141+
}
11271142
FOREACH_QUEUE(*queue, elem)
11281143
{
11291144
if (elem && nng_strcasecmp((char *) elem, filename) == 0) {
@@ -1149,7 +1164,7 @@ parquet_find_data_packet(conf_parquet *conf, char *filename, uint64_t key)
11491164
{
11501165
string topic = extract_topic(filename);
11511166
conf = file_manager.fetch_conf(topic);
1152-
if (conf->enable == false) {
1167+
if (conf == NULL || conf->enable == false) {
11531168
log_error("Parquet %s is not ready or not launch!", topic.c_str());
11541169
return NULL;
11551170
}
@@ -1158,6 +1173,10 @@ parquet_find_data_packet(conf_parquet *conf, char *filename, uint64_t key)
11581173
auto queue = file_manager.fetch_queue(topic);
11591174

11601175
pthread_mutex_lock(&parquet_queue_mutex);
1176+
if (queue == NULL) {
1177+
pthread_mutex_unlock(&parquet_queue_mutex);
1178+
return NULL;
1179+
}
11611180
FOREACH_QUEUE(*queue, elem)
11621181
{
11631182
if (elem && nng_strcasecmp((char *) elem, filename) == 0) {
@@ -1225,8 +1244,7 @@ parquet_find_data_packets(
12251244
copy(ret_vec.begin(), ret_vec.end(), packets);
12261245
}
12271246

1228-
// return packets;
1229-
return nullptr;
1247+
return packets;
12301248
}
12311249

12321250
static vector<SchemaColumn>

0 commit comments

Comments
 (0)