Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#125] Ensuring objects are deleted properly #300

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/distributed_algorithm_node/MessageBroker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ SynchronousSharedRAM::~SynchronousSharedRAM() {
NODE_QUEUE_MUTEX.unlock();
Utils::error("Unable to remove node from network: " + this->node_id);
} else {
auto node_queue = NODE_QUEUE[this->node_id];
while (!node_queue->empty()) {
delete (CommandLinePackage*) node_queue->dequeue();
}
NODE_QUEUE.erase(this->node_id);
NODE_QUEUE_MUTEX.unlock();
}
Expand Down Expand Up @@ -133,6 +137,7 @@ void SynchronousSharedRAM::inbox_thread_method() {
CommandLinePackage *message_data = (CommandLinePackage *) request;
if (message_data->is_broadcast) {
if (message_data->visited.find(this->node_id) != message_data->visited.end()) {
delete message_data;
continue;
}
this->peers_mutex.lock();
Expand Down Expand Up @@ -188,6 +193,7 @@ void SynchronousGRPC::inbox_thread_method() {
visited.insert(message_data->visited_recipients(i));
}
if (visited.find(this->node_id) != visited.end()) {
delete message_data;
continue;
}
message_data->add_visited_recipients(this->node_id);
Expand Down Expand Up @@ -442,7 +448,7 @@ grpc::Status SynchronousGRPC::execute_message(
dasproto::Empty* reply) {

if (! this->is_shutting_down()) {
// TODO: fix memory leak
// TODO: fix memory leak :: seems to fixed in SynchronousGRPC::inbox_thread_method and SynchronousGRPC::inbox_thread_method
this->incoming_messages.enqueue((void *) new dasproto::MessageData(*request));
return grpc::Status::OK;
} else {
Expand Down
11 changes: 8 additions & 3 deletions src/query_engine/DASNode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,8 @@ PatternMatchingQuery::PatternMatchingQuery(string command, vector<string>& token
#endif
}

LazyWorkerDeleter<RemoteSink<HandlesAnswer>> PatternMatchingQuery::remote_sinks_deleter;

void PatternMatchingQuery::act(shared_ptr<MessageFactory> node) {
#ifdef DEBUG
cout << "PatternMatchingQuery::act() BEGIN" << endl;
Expand All @@ -607,9 +609,12 @@ void PatternMatchingQuery::act(shared_ptr<MessageFactory> node) {
query_answer_processors.push_back(make_unique<CountAnswerProcessor>(local_id, remote_id));
}

// TODO: eliminate this memory leak
RemoteSink<HandlesAnswer>* remote_sink =
new RemoteSink<HandlesAnswer>(this->root_query_element, move(query_answer_processors));
// TODO: eliminate this memory leak :: eliminated with LazyWorkerDeleter plus delete_precedent_on_destructor=true
PatternMatchingQuery::remote_sinks_deleter.add(new RemoteSink<HandlesAnswer>(
this->root_query_element, // precedent
move(query_answer_processors), // processors
true // delete_precedent_on_destructor
));
} else {
Utils::error("Invalid command " + this->command + " in PatternMatchingQuery message");
}
Expand Down
4 changes: 4 additions & 0 deletions src/query_engine/DASNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
#include <stack>

#include "HandlesAnswer.h"
#include "LazyWorkerDeleter.h"
#include "RemoteIterator.h"
#include "RemoteSink.h"
#include "Sink.h"
#include "StarNode.h"

Expand Down Expand Up @@ -54,6 +56,8 @@ class PatternMatchingQuery : public Message {
PatternMatchingQuery(string command, vector<string>& tokens);
void act(shared_ptr<MessageFactory> node);

static LazyWorkerDeleter<RemoteSink<HandlesAnswer>> remote_sinks_deleter;

private:
QueryElement* build_link_template(vector<string>& tokens,
unsigned int cursor,
Expand Down
71 changes: 71 additions & 0 deletions src/query_engine/LazyWorkerDeleter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
#pragma once

#include <memory>
#include <mutex>
#include <thread>
#include <type_traits>
#include <vector>

#include "commons/Utils.h"

using namespace std;

namespace query_engine {

class Worker {
public:
virtual bool is_work_done() = 0;
};

template <class T, typename enable_if<is_base_of<Worker, T>::value, bool>::type = true>
class LazyWorkerDeleter {
public:
LazyWorkerDeleter()
: shutting_down_flag(false),
objects_deleter_thread(make_unique<thread>(&LazyWorkerDeleter::objects_deleter_method, this)) {
}

~LazyWorkerDeleter() {
if (this->shutting_down_flag) return;
this->shutting_down_flag = true;
this->objects_deleter_thread->join();
this->objects_deleter_thread.reset();
lock_guard<mutex> lock(this->objects_mutex);
T* obj;
while (!this->objects.empty()) {
obj = this->objects.front();
this->objects.erase(this->objects.begin());
delete obj;
}
}

void add(T* obj) {
lock_guard<mutex> lock(this->objects_mutex);
if (!this->shutting_down_flag) this->objects.push_back(obj);
}

private:
vector<T*> objects;
mutex objects_mutex;
bool shutting_down_flag;
unique_ptr<thread> objects_deleter_thread;

void objects_deleter_method() {
T* obj;
while (!this->shutting_down_flag) {
{
lock_guard<mutex> lock(this->objects_mutex);
while (!this->objects.empty()) {
obj = this->objects.front();
if (obj->is_work_done()) {
this->objects.erase(this->objects.begin());
delete obj;
}
}
}
commons::Utils::sleep(5000);
}
}
};

} // namespace query_engine
31 changes: 10 additions & 21 deletions src/query_engine/QueryNode.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ QueryNode<AnswerType>::QueryNode(const string& node_id,
this->query_answer_processor = NULL;
this->query_answers_finished_flag = false;
this->shutdown_flag = false;
this->work_done_flag = false;
if (messaging_backend == MessageBrokerType::RAM) {
this->requires_serialization = false;
} else {
Expand All @@ -31,7 +32,12 @@ QueryNode<AnswerType>::QueryNode(const string& node_id,
}

template <class AnswerType>
QueryNode<AnswerType>::~QueryNode() {}
QueryNode<AnswerType>::~QueryNode() {
this->graceful_shutdown();
while (!this->query_answer_queue.empty()) {
delete (QueryAnswer*) this->query_answer_queue.dequeue();
}
}

template <class AnswerType>
void QueryNode<AnswerType>::graceful_shutdown() {
Expand All @@ -44,6 +50,7 @@ void QueryNode<AnswerType>::graceful_shutdown() {
this->shutdown_flag_mutex.unlock();
if (this->query_answer_processor != NULL) {
this->query_answer_processor->join();
delete this->query_answer_processor;
this->query_answer_processor = NULL;
}
}
Expand Down Expand Up @@ -116,16 +123,6 @@ QueryNodeServer<AnswerType>::QueryNodeServer(const string& node_id, MessageBroke
new thread(&QueryNodeServer<AnswerType>::query_answer_processor_method, this);
}

template <class AnswerType>
QueryNodeServer<AnswerType>::~QueryNodeServer() {
this->graceful_shutdown();
if (this->query_answer_processor != NULL) {
this->query_answer_processor->join();
delete this->query_answer_processor;
this->query_answer_processor = NULL;
}
}

template <class AnswerType>
void QueryNodeServer<AnswerType>::node_joined_network(const string& node_id) {
this->add_peer(node_id);
Expand Down Expand Up @@ -163,6 +160,7 @@ void QueryNodeClient<AnswerType>::query_answer_processor_method() {
this->query_answer_queue.empty()) {
this->send(QUERY_ANSWERS_FINISHED_COMMAND, args, this->server_id);
answers_finished_flag = true;
this->work_done_flag = true;
}
} else {
if (this->requires_serialization) {
Expand All @@ -172,6 +170,7 @@ void QueryNodeClient<AnswerType>::query_answer_processor_method() {
}
args.clear();
}
delete query_answer;
Utils::sleep();
}
}
Expand All @@ -188,16 +187,6 @@ QueryNodeClient<AnswerType>::QueryNodeClient(const string& node_id,
this->join_network();
}

template <class AnswerType>
QueryNodeClient<AnswerType>::~QueryNodeClient() {
this->graceful_shutdown();
if (this->query_answer_processor != NULL) {
this->query_answer_processor->join();
delete this->query_answer_processor;
this->query_answer_processor = NULL;
}
}

template <class AnswerType>
void QueryNodeClient<AnswerType>::node_joined_network(const string& node_id) {
// do nothing
Expand Down
8 changes: 5 additions & 3 deletions src/query_engine/QueryNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
#include <thread>

#include "DistributedAlgorithmNode.h"
#include "LazyWorkerDeleter.h"
#include "QueryAnswer.h"
#include "SharedQueue.h"

Expand All @@ -22,7 +23,7 @@ constexpr char* QUERY_ANSWERS_FINISHED_COMMAND = "query_answers_finished";
*
*/
template <class AnswerType>
class QueryNode : public DistributedAlgorithmNode {
class QueryNode : public DistributedAlgorithmNode, public Worker {
public:
QueryNode(const string& node_id,
bool is_server,
Expand All @@ -38,10 +39,13 @@ class QueryNode : public DistributedAlgorithmNode {
bool is_query_answers_empty();
virtual void query_answer_processor_method() = 0;

virtual bool is_work_done() override { return this->work_done_flag; } // as Worker

protected:
SharedQueue query_answer_queue;
thread* query_answer_processor;
bool requires_serialization;
bool work_done_flag;

private:
bool is_server;
Expand All @@ -55,7 +59,6 @@ template <class AnswerType>
class QueryNodeServer : public QueryNode<AnswerType> {
public:
QueryNodeServer(const string& node_id, MessageBrokerType messaging_backend = MessageBrokerType::RAM);
virtual ~QueryNodeServer();

void node_joined_network(const string& node_id);
string cast_leadership_vote();
Expand All @@ -68,7 +71,6 @@ class QueryNodeClient : public QueryNode<AnswerType> {
QueryNodeClient(const string& node_id,
const string& server_id,
MessageBrokerType messaging_backend = MessageBrokerType::RAM);
virtual ~QueryNodeClient();

void node_joined_network(const string& node_id);
string cast_leadership_vote();
Expand Down
6 changes: 5 additions & 1 deletion src/query_engine/answer_processor/AttentionBrokerUpdater.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ class AttentionBrokerUpdater : public QueryAnswerProcessor {
}
}

virtual bool is_work_done() override {
return this->is_flow_finished() && this->answers_queue.empty();
}

protected:
void queue_processor() {
// GRPC stuff
Expand Down Expand Up @@ -86,7 +90,7 @@ class AttentionBrokerUpdater : public QueryAnswerProcessor {

// handle_list.set_context(this->query_context);
do {
if (this->is_flow_finished() && this->answers_queue.empty()) {
if (this->is_work_done()) {
break;
}
bool idle_flag = true;
Expand Down
2 changes: 2 additions & 0 deletions src/query_engine/answer_processor/CountAnswerProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ class CountAnswerProcessor : public QueryAnswerProcessor {
}
virtual void graceful_shutdown() override { this->output_buffer->graceful_shutdown(); }

virtual bool is_work_done() override { return this->output_buffer->is_work_done(); }

protected:
int count;
unique_ptr<QueryNodeClient<CountAnswer>> output_buffer;
Expand Down
2 changes: 2 additions & 0 deletions src/query_engine/answer_processor/HandlesAnswerProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ class HandlesAnswerProcessor : public QueryAnswerProcessor {
virtual void query_answers_finished() override { this->output_buffer->query_answers_finished(); }
virtual void graceful_shutdown() override { this->output_buffer->graceful_shutdown(); }

virtual bool is_work_done() override { return this->output_buffer->is_work_done(); }

protected:
unique_ptr<QueryNodeClient<HandlesAnswer>> output_buffer;
};
Expand Down
3 changes: 2 additions & 1 deletion src/query_engine/answer_processor/QueryAnswerProcessor.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@

#include "QueryAnswer.h"
#include "QueryNode.h"
#include "LazyWorkerDeleter.h"

using namespace std;

namespace query_engine {

class QueryAnswerProcessor {
class QueryAnswerProcessor : public Worker {
public:
virtual ~QueryAnswerProcessor() = default;
virtual void process_answer(QueryAnswer* query_answer) = 0;
Expand Down
15 changes: 15 additions & 0 deletions src/query_engine/query_element/And.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,22 @@ class And : public Operator<N> {
* Destructor.
*/
~And() {
#ifdef DEBUG
cout << "And::~And() BEGIN" << endl;
#endif
graceful_shutdown();
for (size_t i = 0; i < N; i++) {
for (auto* answer : this->query_answer[i]) {
if (answer) {
delete answer;
answer = nullptr;
}
}
this->query_answer[i].clear();
}
#ifdef DEBUG
cout << "And::~And() END" << endl;
#endif
}

// --------------------------------------------------------------------------------------------
Expand Down
11 changes: 11 additions & 0 deletions src/query_engine/query_element/LinkTemplate.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,17 @@ class LinkTemplate : public Source {
delete[] this->local_answers;
delete[] this->next_inner_answer;
}
while (!this->local_buffer.empty()) {
delete (HandlesAnswer*) this->local_buffer.dequeue();
}
for (auto* answer : this->inner_answers) {
if (answer) delete answer;
}
this->inner_answers.clear();
for (auto* clause : this->inner_template) {
if (clause) delete clause;
}
this->inner_template.clear();
local_answers_mutex.unlock();
#ifdef DEBUG
cout << "LinkTemplate::LinkTemplate() DESTRUCTOR END" << endl;
Expand Down
16 changes: 15 additions & 1 deletion src/query_engine/query_element/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,21 @@ class Operator : public QueryElement {
/**
* Destructor.
*/
~Operator() { this->graceful_shutdown(); }
~Operator() {
#ifdef DEBUG
cout << "Operator::Operator() DESTRUCTOR BEGIN" << endl;
#endif
this->graceful_shutdown();
for (size_t i = 0; i < N; i++) {
if (this->precedent[i]) {
delete this->precedent[i];
this->precedent[i] = nullptr;
}
}
#ifdef DEBUG
cout << "Operator::Operator() DESTRUCTOR END" << endl;
#endif
}

// --------------------------------------------------------------------------------------------
// QueryElement API
Expand Down
Loading
Loading