diff --git a/CMakeLists.txt b/CMakeLists.txt
index 8d6002db..b5b8290f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -41,7 +41,19 @@ endif()
set_property(GLOBAL PROPERTY USE_FOLDERS ON)
-set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra -Werror) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic]
+# Added for pub/sub service
+# Requires to remove GEDS_EXTRA_COMPILER_FLAGS -Werror
+include(FetchContent)
+FetchContent_Declare(MQTT
+ GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.c.git
+ GIT_TAG v1.3.8)
+FetchContent_MakeAvailable(MQTT)
+FetchContent_Declare(MQTT_CXX
+ GIT_REPOSITORY https://github.com/eclipse/paho.mqtt.cpp
+ GIT_TAG v1.2.0)
+FetchContent_MakeAvailable(MQTT_CXX)
+
+set(GEDS_EXTRA_COMPILER_FLAGS -Wall -Wextra) # -Wpedantic # error: ISO C++ does not support ‘__int128’ for ‘type name’ [-Werror=pedantic]
set(GEDS_EXTRA_LINKER_FLAGS)
# if(CMAKE_CXX_COMPILER_ID MATCHES "Clang")
diff --git a/doc/BUILDING.md b/doc/BUILDING.md
index 277b4cbe..ad2cf3b9 100644
--- a/doc/BUILDING.md
+++ b/doc/BUILDING.md
@@ -1,31 +1,23 @@
-# Building
-
-## CMake
-Install CMake > 3.20.
-
-- Build commands:
- ```bash
- cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR
- cmake --build $BUILD_DIR
- ```
-
-- Test commands:
- ```bash
- cmake --build $BUILD_DIR -t test
- ```
-
-- Install command:
- ```bash
- cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds
- ```
-
-## Docker
-
-`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`.
-
-## Dependencies
-
-### MacOS
+# Building GEDS
+
+- [Workflow](#workflow)
+- [Instructions for MacOS](#instructions-for-macos)
+- [Instructions for Windows](#instructions-for-windows)
+- [Instructions for Linux](#instructions-for-linux)
+- [Deploying via Docker](#deploying-via-docker)
+- [Deploying via Ansible](#deploying-via-ansible)
+
+## Workflow
+The general workflow of building GEDS from source is:
+1. Pull GEDS repository: `git pull https://github.com/IBM/GEDS.git`
+2. Install dependencies, e.g. `cmake` version > 3.20 (check via `cmake --version`)
+3. Create `build` and `install` directory in the GEDS folder and set environment variables: `export $BUILD_DIR=~/GEDS/build` & `export $INSTALL_DIR=~/GEDS/bin`
+4. Build Boost
+5. Build AWS SDK
+6. Build GEDS
+7. Install GEDS
+
+## Instructions for MacOS
Install the following dependencies through homebrew:
@@ -54,23 +46,41 @@ Finally build it with:
cmake --build . --target all
```
-### Linux
+## Instructions for Windows
+Coming
-Install the following dependencies:
+## Instructions for Linux
+Install GEDS dependencies:
```
-apt-get install -y \
- clang \
- curl wget \
- build-essential gcc ninja-build \
- openjdk-11-jdk \
- python3.9 python3.9-dev python3-distutils
+sudo apt install -y clang curl wget build-essential gcc ninja-build openjdk-11-jdk python3-dev python3-distutils cmake
```
-and a recent version (>= 3.20) of CMake:
+CMake version >= 3.20:
```
CMAKE_VERSION=3.22.4
wget --quiet -O cmake.tar.gz https://github.com/Kitware/CMake/releases/download/v${CMAKE_VERSION}/cmake-${CMAKE_VERSION}-linux-x86_64.tar.gz \
&& tar xf cmake.tar.gz --strip-components=1 -C /usr/local/ \
&& rm cmake.tar.gz
```
+
+Install AWS SDK dependecies:
+```
+sudo apt install libcurl4-openssl-dev libssl-de uuid-dev zlib1g-dev libpulse-dev
+```
+
+Build AWS SDK: `/bin/bash build-aws-sdk.sh`
+
+Build Boost: `/bin/bash build-boost.sh`
+
+Build GEDS:
+1. Check if environment variables are correctly set via `printenv | grep BUILD_DIR` and `printenv | grep INSTALL_DIR`
+2. `cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR`
+3. `cmake --build $BUILD_DIR -j 4` (-j specifies the number of cores to use)
+4. `cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds`
+
+## Deploying via Docker
+`build-docker.sh` builds a docker container with GRPC and a build of GEDS in `/usr/local/opt/geds`.
+
+## Deploying via Ansible
+We offer an Ansible playbook to automate GEDS building from source on multiple clients.
diff --git a/doc/geds_ansible.yml b/doc/geds_ansible.yml
new file mode 100644
index 00000000..cdb19177
--- /dev/null
+++ b/doc/geds_ansible.yml
@@ -0,0 +1,123 @@
+---
+- hosts: geds
+ name: Update all apt packages
+ become: false
+ vars:
+ ansible_python_interpreter: /usr/bin/python3
+ remote_home: "{{ ansible_env.HOME }}"
+
+ tasks:
+ - name: Update and upgrade
+ tags: update
+ become: true
+ apt:
+ upgrade: yes
+ update_cache: yes
+
+ - name: Reboot
+ tags: reboot
+ become: true
+ reboot:
+
+ - name: Install GEDS dependencies
+ tags: dependencies
+ become: true
+ apt:
+ pkg:
+ - clang
+ - curl
+ - wget
+ - build-essential
+ - gcc
+ - ninja-build
+ - openjdk-11-jdk
+ - python3-dev
+ - python3-distutils
+ - cmake
+ state: latest
+ update_cache: yes
+
+ - name: Create GEDS directory
+ tags: git
+ become: false
+ file:
+ path: "{{ remote_home }}/GEDS"
+ state: directory
+
+ - name: Git clone GEDS
+ tags: git
+ become: false
+ ansible.builtin.git:
+ repo: "https://github.com/IBM/GEDS.git"
+ dest: "{{ remote_home }}/GEDS/"
+
+ - name: AWS dependencies
+ tags: aws
+ become: true
+ apt:
+ pkg:
+ - libcurl4-openssl-dev
+ - libssl-dev
+ - uuid-dev
+ - zlib1g-dev
+ - libpulse-dev
+ state: latest
+ update_cache: yes
+
+ - name: Build AWS
+ tags: aws
+ become: false
+ ansible.builtin.command: /bin/bash build-aws-sdk.sh
+ async: 3600
+ poll: 30
+ args:
+ chdir: "{{ remote_home }}/GEDS"
+
+ - name: Build boost
+ tags: boost
+ become: false
+ ansible.builtin.command: /bin/bash build-boost.sh
+ args:
+ chdir: "{{ remote_home }}/GEDS"
+
+ - name: Create GEDS build directory
+ tags: geds
+ become: false
+ file:
+ path: "{{ remote_home }}/GEDS/build"
+ state: directory
+
+ - name: Build GEDS
+ tags: geds
+ become: false
+ ansible.builtin.command: cmake -DCMAKE_BUILD_TYPE=Debug -S . -B $BUILD_DIR
+ args:
+ chdir: "{{ remote_home }}/GEDS"
+ environment:
+ BUILD_DIR: "{{ remote_home }}/GEDS/build"
+
+ - name: Build GEDS
+ tags: geds
+ become: false
+ ansible.builtin.command: cmake --build $BUILD_DIR -j 4
+ args:
+ chdir: "{{ remote_home }}/GEDS"
+ environment:
+ BUILD_DIR: "{{ remote_home }}/GEDS/build"
+
+ - name: Create GEDS install directory
+ tags: geds
+ become: false
+ file:
+ path: "{{ remote_home }}/GEDS/bin"
+ state: directory
+
+ - name: Install GEDS
+ tags: geds
+ become: false
+ ansible.builtin.command: cmake --install $BUILD_DIR --prefix $INSTALL_DIR --component geds
+ args:
+ chdir: "{{ remote_home }}/GEDS"
+ environment:
+ BUILD_DIR: "{{ remote_home }}/GEDS/build"
+ INSTALL_DIR: "{{ remote_home }}/GEDS/bin"
diff --git a/src/metadataservice/CMakeLists.txt b/src/metadataservice/CMakeLists.txt
index 07d70ae8..0951f2a1 100644
--- a/src/metadataservice/CMakeLists.txt
+++ b/src/metadataservice/CMakeLists.txt
@@ -10,6 +10,7 @@ set(SOURCES
ObjectStoreHandler.h
S3Helper.cpp
S3Helper.h
+ PubSubMQTT.h
)
add_library(libmetadataservice STATIC ${SOURCES})
@@ -19,6 +20,7 @@ target_link_libraries(libmetadataservice
geds_utility
geds_proto
geds_s3
+ paho-mqttpp3
)
target_compile_options(libmetadataservice PUBLIC ${GEDS_EXTRA_COMPILER_FLAGS})
target_compile_definitions(libmetadataservice
diff --git a/src/metadataservice/PubSubMQTT.h b/src/metadataservice/PubSubMQTT.h
new file mode 100644
index 00000000..ad5c5ce1
--- /dev/null
+++ b/src/metadataservice/PubSubMQTT.h
@@ -0,0 +1,130 @@
+/**
+ * Copyright 2022- IBM Inc. All rights reserved
+ * SPDX-License-Identifier: Apache-2.0
+ */
+
+#include "mqtt/async_client.h"
+
+
+// https://github.com/eclipse/paho.mqtt.cpp/issues/141
+mqtt::async_client_ptr createClient(std::string serverAddress,
+ std::string clientID){
+ // Use MQTT v5 to enable no_local:
+ // This flag avoids receiving messages from the same host by telling
+ // the broker not to send messages received with a client ID to a
+ // subscriber with the same client ID
+
+ mqtt::create_options createOpts = mqtt::create_options();
+ createOpts.set_mqtt_verison(5);
+ std::string persistDir= "None";
+ auto client_ptr = std::make_shared(serverAddress,
+ clientID,
+ createOpts,
+ persistDir);
+ std::cout << "Created MQTT client to " + serverAddress + " and ID " + clientID << std::endl;
+ return client_ptr;
+}
+
+mqtt::async_client_ptr connectClient(std::shared_ptr client_ptr,
+ std::string node){
+ auto connOpts = std::make_shared();
+
+ if (node == "server"){
+ connOpts->set_keep_alive_interval(20);
+ connOpts->set_clean_session(false);
+ connOpts->set_automatic_reconnect(true);
+ connOpts->set_mqtt_version(5);
+ }
+ if (node == "client"){
+ connOpts->set_clean_session(false);
+ connOpts->set_mqtt_version(5);
+ }
+ client_ptr->connect(*connOpts)->wait();
+ std::cout << "Connected MQTT client" << std::endl;
+ return client_ptr;
+}
+
+void publishData(mqtt::async_client_ptr client_ptr,
+ std::string topic,
+ int QoS,
+ std::string data){
+ mqtt::topic top(*client_ptr, topic, QoS, true);
+ mqtt::message_ptr message = mqtt::make_message(topic, data);
+ message->set_qos(QoS);
+ message->set_payload("A single message");
+
+ client_ptr->publish(message);
+ std::cout << "Published data to " + topic << std::endl;
+}
+
+mqtt::async_client_ptr subscribe(mqtt::async_client_ptr client_ptr,
+ std::string topic,
+ int QoS){
+ mqtt::subscribe_options subOpts;
+ subOpts.set_no_local(true); // Only works with MQTT v5
+
+ client_ptr->subscribe(topic, QoS, subOpts)->wait();
+ std::cout << "Subscribed to " + topic << std::endl;
+ return client_ptr;
+}
+
+void unsubscribe(mqtt::async_client_ptr client_ptr,
+ std::string topic){
+ client_ptr->unsubscribe(topic)->wait();
+ client_ptr->stop_consuming();
+ std::cout << "Unsubscribed from " + topic << std::endl;
+}
+
+std::tuple consumeMessage(mqtt::async_client_ptr client_ptr){
+ client_ptr->start_consuming();
+ auto msg = client_ptr->consume_message();
+ // msg->get_payload()
+ return std::make_tuple(msg->get_topic(), msg->to_string());
+}
+
+void disconnectClient(mqtt::async_client_ptr client_ptr){
+ client_ptr->disconnect()->wait();
+ std::cout << "Disconnected client" << std::endl;
+}
+
+
+// Example publisher
+// ---------------------------------------------------------------
+// #include "PubSub.h"
+//
+// int main(int argc, char **argv) {
+// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883",
+// "mds_server");
+// std::string node_type = "server";
+// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr,
+// node_type);
+// publishData(connected_client_ptr, "home/file1", 1, "Hello World");
+// disconnectClient(connected_client_ptr);
+// return 0;
+// }
+
+// Example subscriber
+// ---------------------------------------------------------------
+// #include "PubSub.h"
+//
+// int main(void)
+// {
+// mqtt::async_client_ptr client_ptr = createClient("tcp://localhost:1883",
+// "mds_client");
+// std::string node_type = "client";
+// mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr,
+// node_type);
+// mqtt::async_client_ptr subscribed_client_ptr = subscribe(connected_client_ptr,
+// "home/file1", 1);
+// while (true) {
+// auto msg = consumeMessage(subscribed_client_ptr);
+// std::string topic = std::get<0>(msg);
+// std::string payload = std::get<1>(msg);
+//
+// if (payload.empty()){
+// break;
+// }
+// std::cout << topic + " " + payload << std::endl;
+// }
+// return 0;
+// }
diff --git a/src/metadataservice/main.cpp b/src/metadataservice/main.cpp
index b0383537..8ee298b4 100644
--- a/src/metadataservice/main.cpp
+++ b/src/metadataservice/main.cpp
@@ -4,24 +4,50 @@
*/
#include
+#include
#include
#include
#include "GRPCServer.h"
#include "Ports.h"
+#include "PubSubMQTT.h"
-ABSL_FLAG(std::string, address, "0.0.0.0", "Server interface address.");
+ABSL_FLAG(std::string, address, "localhost", "Server interface address.");
ABSL_FLAG(uint16_t, port, defaultMetdataServerPort, "Port.");
int main(int argc, char **argv) {
absl::ParseCommandLine(argc, argv);
auto serverAddress = FLAGS_address.CurrentValue() + ":" + FLAGS_port.CurrentValue();
+
+ std::thread thread_geds([serverAddress]{
GRPCServer service(serverAddress);
auto status = service.startAndWait();
if (!status.ok()) {
std::cerr << status.message() << std::endl;
- exit(1);
}
+ });
+
+ mqtt::async_client_ptr client_ptr = createClient("172.24.33.70:1883",
+ "mds_server");
+ std::string nodeType = "server";
+ mqtt::async_client_ptr connected_client_ptr = connectClient(client_ptr, nodeType);
+ mqtt::async_client_ptr subscribed_client_ptr = subscribe(connected_client_ptr,
+ "bucket/#", 1);
+
+ std::thread thread_mqtt([connected_client_ptr, subscribed_client_ptr]{
+ while (true) {
+ auto msg = consumeMessage(subscribed_client_ptr);
+ std::string topic = std::get<0>(msg);
+ std::string payload = std::get<1>(msg);
+ if (payload.empty()){
+ break;
+ }
+ publishData(connected_client_ptr, topic, 1, "Hello World");
+ }
+ });
+
+ thread_geds.join();
+ thread_mqtt.join();
return 0;
}
diff --git a/src/python/create.py b/src/python/create.py
index 5e031688..7afc7f8f 100644
--- a/src/python/create.py
+++ b/src/python/create.py
@@ -3,12 +3,24 @@
# SPDX-License-Identifier: Apache-2.0
#
+# stdlib
import os
-from time import sleep
+import json
+import time
+# third party
+import paho.mqtt.client as mqtt_client
+
+# relative
from pygeds import status, GEDS, GEDSConfig
-METADATA_SERVER = os.environ.get("GEDS_METADATASERVER", "zac13:4381")
+METADATA_SERVER: str = os.environ.get("GEDS_METADATASERVER", "zac13:4381")
+MQTT: bool = False
+
+
+if MQTT:
+ client = mqtt_client.Client(client_id="client1")
+ client.connect("172.24.33.70", 1883)
instance = GEDS(GEDSConfig(METADATA_SERVER))
try:
@@ -37,6 +49,14 @@
print(f"Read: {message_read.decode()}")
file.seal()
+if MQTT:
+ client = mqtt_client.Client()
+ client.connect("172.24.33.70", 1883)
+ client.loop_start()
+ client.publish("bucket/testfile", json.dumps({"file_name": "testfile",
+ "created": True}).encode())
+ client.disconnect()
+ client.loop_stop()
file2 = instance.create("bucket2", "testfile2")
file2.write(message_read, 0, len(message))
diff --git a/src/python/read.py b/src/python/read.py
index ed0ae01b..25d2fb6e 100644
--- a/src/python/read.py
+++ b/src/python/read.py
@@ -3,11 +3,31 @@
# SPDX-License-Identifier: Apache-2.0
#
+# stdlib
import os
+import time
+# third party
+import paho.mqtt.client as mqtt_client
+
+# relative
from pygeds import status, GEDS, GEDSConfig
+
METADATA_SERVER = os.environ.get("GEDS_METADATASERVER", "zac13:4381")
+MQTT: bool = False
+
+
+def on_message(client, userdata, msg):
+ print(f"Received {msg.payload} on topic {msg.topic}")
+ file = instance.open("bucket", "testfile")
+ time.sleep(.1)
+ buffer = bytearray(file.size)
+ l = file.read(buffer, 0, len(buffer))
+ print(f"Read {l} bytes")
+ print(buffer.decode("utf-8"))
+
+
instance = GEDS(GEDSConfig(METADATA_SERVER))
try:
instance.start()
@@ -21,3 +41,13 @@
l = file.read(buffer, 0, len(buffer))
print(f"Read {l} bytes")
print(buffer.decode("utf-8"))
+
+if MQTT:
+ client = mqtt_client.Client(client_id="client2")
+ client.on_message = on_message
+
+ client.connect("172.24.33.70", 1883)
+ client.subscribe("bucket/testfile")
+
+ client.loop_forever()
+ client.disconnect()