Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[#175] Client program that spawn other clients #242

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -196,3 +196,5 @@ tags

src/bin
src/bazel**

.nogit/
10 changes: 10 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ github-runner:
build-image:
cd src && bash -x scripts/docker_image_build.sh

build-spawner-image:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comment on src/scripts/docker_image_spawner_build.sh file.

Suggested change
build-spawner-image:
build-spawner-image: build

cd src && bash +x scripts/docker_image_spawner_build.sh

build: build-image
cd src && bash -x scripts/build.sh

Expand All @@ -46,3 +49,10 @@ run-link-creation-agent:
run-link-creation-client:
@bash -x src/scripts/run.sh link_creation_agent_client $(OPTIONS)

run-sentinel-server:
@bash -x src/scripts/run.sh sentinel_server 55000

run-worker-client:
@bash -x src/scripts/run.sh worker_client $(OPTIONS)


20 changes: 20 additions & 0 deletions src/cpp/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,23 @@ cc_binary(
"//main:link_creation_agent_client_main_lib",
],
)

cc_binary(
name = "sentinel_server",
srcs = [],
defines = ["BAZEL_BUILD"],
linkstatic = 1,
deps = [
"//main:sentinel_server_main_lib",
],
)

cc_binary(
name = "worker_client",
srcs = [],
defines = ["BAZEL_BUILD"],
linkstatic = 1,
deps = [
"//main:worker_client_main_lib",
],
)
12 changes: 12 additions & 0 deletions src/cpp/client_spawner/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package(default_visibility = ["//visibility:public"])

cc_library(
name = "client_spawner_lib",
srcs = glob(["*.cc"]),
hdrs = glob(["*.h"]),
includes = ["."],
deps = [
"//distributed_algorithm_node:distributed_algorithm_node_lib",
"//query_engine:query_engine_lib",
],
)
43 changes: 43 additions & 0 deletions src/cpp/client_spawner/SentinelServerNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
#include "SentinelServerNode.h"

using namespace client_spawner;

SentinelServerNode::SentinelServerNode(const string &node_id) : StarNode(node_id) {}

SentinelServerNode::~SentinelServerNode() {}

// TODO: At this point Node does nothing with the messages.
void SentinelServerNode::storage_message(string &worker_id, string &message) {
cout << worker_id <<" message received" << endl;
}

shared_ptr<Message> SentinelServerNode::message_factory(string &command, vector<string> &args) {
shared_ptr<Message> message = StarNode::message_factory(command, args);
if (message) {
return message;
}
if (command == WORKER_NOTIFICATION) {
return shared_ptr<Message>(new WorkerMessage(args[0], args[1]));
}
return shared_ptr<Message>{};
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a change request, it's more like a tip.
The following code does the same, but it's a bit simpler and complies to the Modern C++ style recommendations.

    auto message = StarNode::message_factory(command, args);  // <== here
    if (message) {
        return message;
    }
    if (command == WORKER_NOTIFICATION) {
        return make_shared<WorkerMessage>(args[0], args[1]);  // <== here
    }
    return nullptr;  // <== here

}


// =================================

WorkerMessage::WorkerMessage(string &worker_id, string &msg) {
this->worker_id = worker_id;
this->msg = msg;
}
Copy link
Collaborator

@angeloprobst angeloprobst Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WorkerMessage::WorkerMessage(string &worker_id, string &msg) {
this->worker_id = worker_id;
this->msg = msg;
}
WorkerMessage::WorkerMessage(const string &worker_id, const string &msg)
: worker_id(worker_id), msg(msg) {}


void WorkerMessage::act(shared_ptr<MessageFactory> node) {
#ifdef DEBUG
cout << "WorkerMessage::act() BEGIN" << endl;
cout << "worker_id: " << this->worker_id << endl;
#endif
auto sentinel_server_node = dynamic_pointer_cast<SentinelServerNode>(node);
sentinel_server_node->storage_message(this->worker_id, this->msg);
#ifdef DEBUG
cout << "WorkerMessage::act() END" << endl;
#endif
}
41 changes: 41 additions & 0 deletions src/cpp/client_spawner/SentinelServerNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* @file sentinel_server_node.h
* @brief Responsible for managing workers and handling their messages
*/
#pragma once

#include "StarNode.h"

#define DEBUG
Copy link
Collaborator

@angeloprobst angeloprobst Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is hard-coded but DEBUG is supposed to be set only while running the build process by the building tool, in our case, Bazel. By default, Bazel does not set -DNDEBUG (see here), which means that DEBUG is always enabled unless we instruct Bazel to not doing it.

The way we are currently running the build process does not allow us to pass options to Bazel, but, once we consider our services stable and ready for production, the build process will be adjusted in order to allow non-debug builds.

In short, you can, and should, remove this #define DEBUG.


using namespace distributed_algorithm_node;


namespace client_spawner {

constexpr const char* WORKER_NOTIFICATION = "worker_notification";

class SentinelServerNode: public StarNode {

public:
SentinelServerNode(const string &node_id);

~SentinelServerNode();

void storage_message(string &worker_id, string &message);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe?

Suggested change
void storage_message(string &worker_id, string &message);
void storage_message(const string &worker_id, const string &message);

Please set them const if the strings are not supposed to be changed inside the function.


virtual shared_ptr<Message> message_factory(string &command, vector<string> &args);
};

class WorkerMessage: public Message {

public:
string worker_id;
string msg;
WorkerMessage(string &worker_id, string &msg);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
WorkerMessage(string &worker_id, string &msg);
WorkerMessage(const string &worker_id, const string &msg);

void act(shared_ptr<MessageFactory> node);

};

} // namespace client_spawner

65 changes: 65 additions & 0 deletions src/cpp/client_spawner/WorkerClientNode.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#include "WorkerClientNode.h"
#include "SentinelServerNode.h"
#include "DASNode.h"
#include "RemoteIterator.h"
#include "QueryAnswer.h"

using namespace client_spawner;
using namespace query_engine;
using namespace query_element;

#define MAX_QUERY_ANSWERS ((unsigned int) 1000)

WorkerClientNode::WorkerClientNode(
const string &node_id,
const string &server_id,
const string &das_node_server_id
) : StarNode(node_id, server_id) {
this->node_id = node_id;
this->server_id = server_id;
this->das_node_server_id = das_node_server_id;
}
Copy link
Collaborator

@angeloprobst angeloprobst Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StarNode is already storing server_id (attr has the same name), and DistributedAlgorithmNode (parent of StarNode) is also storing node_id as my_node_id (private) but accessible via node_id() function.

Suggested change
WorkerClientNode::WorkerClientNode(
const string &node_id,
const string &server_id,
const string &das_node_server_id
) : StarNode(node_id, server_id) {
this->node_id = node_id;
this->server_id = server_id;
this->das_node_server_id = das_node_server_id;
}
WorkerClientNode::WorkerClientNode(
const string &node_id, const string &server_id, const string &das_node_server_id
) : das_node_server_id(das_node_server_id), StarNode(node_id, server_id) {}


WorkerClientNode::~WorkerClientNode() {}

void WorkerClientNode::execute(vector<string> &request) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void WorkerClientNode::execute(vector<string> &request) {
void WorkerClientNode::execute(const vector<string> &request) {

DASNode das_node_client(this->node_id, this->das_node_server_id);

QueryAnswer *query_answer;

unsigned int count = 0;

RemoteIterator *response = das_node_client.pattern_matcher_query(request);

vector<string> message;

message.push_back(this->node_id);
Copy link
Collaborator

@angeloprobst angeloprobst Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
vector<string> message;
message.push_back(this->node_id);
vector<string> message = {this->node_id()};


string query_answer_str;

while (! response->finished()) {
if ((query_answer = response->pop()) == NULL) {
Utils::sleep();
} else {
query_answer_str += query_answer->to_string();
query_answer_str += "\n";
if (++count == MAX_QUERY_ANSWERS) {
break;
}
}
}

if (count == 0) {
string no_match = "No match for query";
cout << no_match << endl;
message.push_back(no_match);
} else {
cout << query_answer_str << endl;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cout << no_match << endl;
message.push_back(no_match);
} else {
cout << query_answer_str << endl;
#ifdef DEBUG
cout << no_match << endl;
#endif
message.push_back(no_match);
} else {
#ifdef DEBUG
cout << query_answer_str << endl;
#endif

message.push_back(query_answer_str);
}

delete response;

send(WORKER_NOTIFICATION, message, this->server_id);

}
27 changes: 27 additions & 0 deletions src/cpp/client_spawner/WorkerClientNode.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/**
* @file worker_client_node.h
* @brief Responsible for making a query
*/
#pragma once

#include "StarNode.h"

using namespace distributed_algorithm_node;

namespace client_spawner {

class WorkerClientNode : public StarNode {

public:
WorkerClientNode(const string& node_id, const string& server_id, const string& das_node_server_id);

~WorkerClientNode();

void execute(vector<string>& request);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
void execute(vector<string>& request);
void execute(const vector<string>& request);


private:
string node_id;
Copy link
Collaborator

@angeloprobst angeloprobst Feb 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Already defined in DistributedAlgorithNode as my_node_id (private) but accessible via node_id() function - it can removed here.

Suggested change
string node_id;

string das_node_server_id;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this any different from server_id defined in StarNode?

};

} // namespace client_spawner
15 changes: 15 additions & 0 deletions src/cpp/main/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ cc_library(
srcs = ["link_creation_agent_client_main.cc"],
deps = [
"//link_creation_agent:link_creation_agent_lib",
],
)

cc_library(
name = "sentinel_server_main_lib",
srcs = ["sentinel_server_main.cc"],
deps = [
"//client_spawner:client_spawner_lib",
],
)

cc_library(
name = "worker_client_main_lib",
srcs = ["worker_client_main.cc"],
deps = [
"//client_spawner:client_spawner_lib",
],
)
38 changes: 38 additions & 0 deletions src/cpp/main/sentinel_server_main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#include <iostream>
#include <string>
#include <signal.h>

#include "Utils.h"
#include "DASNode.h"
#include "SentinelServerNode.h"

using namespace std;
using namespace client_spawner;

void ctrl_c_handler(int) {
std::cout << "Stopping SentinelServerNode..." << std::endl;
std::cout << "Done." << std::endl;
exit(0);
}

int main(int argc, char* argv[]) {

if (argc != 2) {
cerr << "Usage: " << argv[0] << " <PORT>" << endl;
exit(1);
}

string server_id = "localhost:" + string(argv[1]);

signal(SIGINT, &ctrl_c_handler);

SentinelServerNode server(server_id);

cout << "############################# Sentinel Server ON ##################################" << endl;

do {
Utils::sleep(1000);
} while (true);

return 0;
}
50 changes: 50 additions & 0 deletions src/cpp/main/worker_client_main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#include <iostream>
#include <string>
#include <signal.h>

#include "Utils.h"
#include "DASNode.h"
#include "WorkerClientNode.h"

using namespace std;
using namespace client_spawner;

void ctrl_c_handler(int) {
std::cout << "Stopping WorkerClientNode..." << std::endl;
std::cout << "Done." << std::endl;
exit(0);
}

int main(int argc, char* argv[]) {

if (argc < 4) {
cerr << "Usage: " << argv[0] << " <NODE_ID> <SERVER_ID> <DAS_NODE_SERVER_ID> <QUERY_TOKENS+>" << endl;
exit(1);
}

string node_id = string(argv[1]);
string server_id = string(argv[2]);
string das_node_server_id = string(argv[3]);

cout << "node_id: " << node_id << endl;
cout << "server_id: " << server_id << endl;
cout << "das_node_server_id: " << das_node_server_id << endl;

signal(SIGINT, &ctrl_c_handler);

vector<string> query;

for (int i = 4; i < argc; i++) {
query.push_back(argv[i]);
}

signal(SIGINT, &ctrl_c_handler);

WorkerClientNode client(node_id, server_id, das_node_server_id);

client.execute(query);

cout << "Worker client '" << node_id << "' done!\n" << endl;

return 0;
}
4 changes: 4 additions & 0 deletions src/docker/Dockerfile.spawner
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
FROM das-attention-broker-builder:latest

COPY bin/sentinel_server /opt/das-attention-broker/bin/sentinel_server
COPY bin/worker_client /opt/das-attention-broker/bin/worker_client
4 changes: 4 additions & 0 deletions src/scripts/bazel_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ cd $CPP_WORKSPACE_DIR \
&& mv bazel-bin/query_broker $BIN_DIR \
&& $BAZELISK_BUILD_CMD //:query \
&& mv bazel-bin/query $BIN_DIR \
&& $BAZELISK_BUILD_CMD //:sentinel_server \
&& mv bazel-bin/sentinel_server $BIN_DIR \
&& $BAZELISK_BUILD_CMD //:worker_client \
&& mv bazel-bin/worker_client $BIN_DIR \
&& $BAZELISK_BUILD_CMD \
//hyperon_das_atomdb_cpp:hyperon_das_atomdb_cpp_wheel \
--define=ATOMDB_VERSION=0.8.11 \
Expand Down
Loading