From c9205c03b7926efba37d4a8b94c5550f99e5c142 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Thu, 6 Mar 2025 15:43:26 +0100 Subject: [PATCH 1/5] Refs #22884: Add logic for reiterating for multiple outputs & update submodules Signed-off-by: Javier Gil Aviles --- .../sustainml_modules/sustainml-wp1 | 2 +- .../sustainml_modules/sustainml-wp3 | 2 +- .../sustainml-wp5/backend_node.py | 19 ++++++++++++++++++- .../orchestrator_node/orchestrator_node.py | 15 +++++++++++++-- 4 files changed, 33 insertions(+), 5 deletions(-) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp1 b/sustainml_modules/sustainml_modules/sustainml-wp1 index d10bad5..cba24f4 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp1 +++ b/sustainml_modules/sustainml_modules/sustainml-wp1 @@ -1 +1 @@ -Subproject commit d10bad5826e62f8309b6c6ef72aba843bb83059c +Subproject commit cba24f48617c92641ce611be486d21ebb2f6126a diff --git a/sustainml_modules/sustainml_modules/sustainml-wp3 b/sustainml_modules/sustainml_modules/sustainml-wp3 index 33d782d..0426740 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp3 +++ b/sustainml_modules/sustainml_modules/sustainml-wp3 @@ -1 +1 @@ -Subproject commit 33d782d4f12de06122c2bb7efb0ce568ad03d37d +Subproject commit 0426740cc67104db4011cf8dafacaef68e6c3bbb diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py index 1b56c97..9366dbb 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py @@ -114,7 +114,24 @@ def results_args(): 'task_id': task_json} return jsonify(json), 200 - return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200 + + results = orchestrator.get_results(node_id, task_id) + + if node_id == utils.node_id.CARBONTRACKER.value and 'carbon_footprint' in results: + if task_id is not None: + print("problem_id:", task_id.problem_id(), "iteration_id:", task_id.iteration_id()) + num_outputs = results.get('extra_data', {})['num_outputs'] + + if num_outputs > 1: + print("Reiterating CarbonTracker node for multiple outputs") # Debugging + user_json = orchestrator.get_orchestrator(task_id) + user_json.get('extra_data', {})['num_outputs'] = num_outputs - 1 + user_json['previous_iteration'] = task_id.iteration_id() + user_json.get('extra_data', {})['previous_problem_id'] = task_id.problem_id() + user_json.get('extra_data', {})['model_restrains'] = results.get('extra_data', {})['model_restrains'] + orchestrator.send_user_input(user_json) + + return jsonify({utils.string_node(node_id): results}), 200 # Flask server shutdown route @server.route('/shutdown', methods=['GET']) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py index 016c2ed..5ba1fdc 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py @@ -238,10 +238,17 @@ def get_carbontracker_task_data(self, task_id): carbon_footprint = node_data.carbon_footprint() energy_consumption = node_data.energy_consumption() carbon_intensity = node_data.carbon_intensity() + extra_data_vector = node_data.extra_data() + extra_data_list = [s for s in extra_data_vector] + extra_data_bytes = bytes(extra_data_list) + extra_data_str = extra_data_bytes.decode('utf-8') + extra_data = json.loads(extra_data_str) json_output = {'task_id': task_json, 'carbon_footprint': carbon_footprint, 'energy_consumption': energy_consumption, - 'carbon_intensity': carbon_intensity} + 'carbon_intensity': carbon_intensity, + 'extra_data': extra_data} + return json_output def get_user_input_data(self, task_id): @@ -371,12 +378,16 @@ def send_user_input(self, json_data): mem_footprint = extra.get('max_memory_footprint', utils.default_mem_footprint) goal = extra.get('goal') model_selected = extra.get('model_selected', None) + num_outputs = extra.get('num_outputs') + model_restrains = extra.get('model_restrains', []) # Add extra data to user user_input extra_data = {'hardware_required': hw_req, 'max_memory_footprint': mem_footprint, 'goal': goal, - 'model_selected': model_selected} + 'model_selected': model_selected, + 'num_outputs': num_outputs, + 'model_restrains': model_restrains} json_obj = utils.json_dict(extra_data) data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8) user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist())) From 26a8270ab19f9641b96bf40b69bdf8d902f15a06 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Mon, 10 Mar 2025 07:47:35 +0100 Subject: [PATCH 2/5] Refs #22914: Change location of reiteration for multiple outputs to fix a bug Signed-off-by: Javier Gil Aviles --- .../sustainml-wp5/backend_node.py | 18 +-------- .../orchestrator_node/orchestrator_node.py | 37 +++++++++++++++++-- 2 files changed, 34 insertions(+), 21 deletions(-) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py index 9366dbb..30b7e68 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py @@ -115,23 +115,7 @@ def results_args(): return jsonify(json), 200 - results = orchestrator.get_results(node_id, task_id) - - if node_id == utils.node_id.CARBONTRACKER.value and 'carbon_footprint' in results: - if task_id is not None: - print("problem_id:", task_id.problem_id(), "iteration_id:", task_id.iteration_id()) - num_outputs = results.get('extra_data', {})['num_outputs'] - - if num_outputs > 1: - print("Reiterating CarbonTracker node for multiple outputs") # Debugging - user_json = orchestrator.get_orchestrator(task_id) - user_json.get('extra_data', {})['num_outputs'] = num_outputs - 1 - user_json['previous_iteration'] = task_id.iteration_id() - user_json.get('extra_data', {})['previous_problem_id'] = task_id.problem_id() - user_json.get('extra_data', {})['model_restrains'] = results.get('extra_data', {})['model_restrains'] - orchestrator.send_user_input(user_json) - - return jsonify({utils.string_node(node_id): results}), 200 + return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200 # Flask server shutdown route @server.route('/shutdown', methods=['GET']) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py index 5ba1fdc..e2df080 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py @@ -25,8 +25,9 @@ class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle): - def __init__(self): - + def __init__(self, orchestrator): + super().__init__() + self.orchestrator = orchestrator self.condition = threading.Condition() self.last_task_id = None self.node_status_ = {} @@ -49,13 +50,14 @@ def on_node_status_change( # Callback def on_new_node_output( self, - id : int, + id: int, data): task_id = sustainml_swig.get_task_id(id, data) if task_id is None: print(utils.string_node(id), "node output received.") else: print(utils.string_node(id), "node output received from task", utils.string_task(task_id)) + self.register_result(task_id, id) def register_task(self, task_id): @@ -79,6 +81,33 @@ def register_result(self, task_id, node_id): self.result_status[utils.string_task(task_id)][node_id] = True self.condition.notify_all() + # If the node is CarbonTracker and its output extra_data is non-empty, print a message + if node_id == utils.node_id.CARBONTRACKER.value: + print("Data results of CARBON") # Debugging + carbon_data = sustainml_swig.get_carbontracker(self.orchestrator.node_, task_id) + try: + extra_data_vector = carbon_data.extra_data() # Vector de uint8_t + extra_data_list = [s for s in extra_data_vector] + extra_data_bytes = bytes(extra_data_list) + extra_data_str = extra_data_bytes.decode('utf-8') + extra_data = json.loads(extra_data_str) + except AttributeError: + extra_data = None + + if extra_data is not None and len(extra_data) > 0: + print("Resend is going to be made") # Debugging + num_outputs = extra_data['num_outputs'] + print("Number of outputs:", num_outputs) # Debugging + + if num_outputs > 1: + print("Reiterating CarbonTracker node for multiple outputs") # Debugging + user_json = self.orchestrator.get_orchestrator(task_id) + user_json.get('extra_data', {})['num_outputs'] = num_outputs - 1 + user_json['previous_iteration'] = task_id.iteration_id() + user_json.get('extra_data', {})['previous_problem_id'] = task_id.problem_id() + user_json.get('extra_data', {})['model_restrains'] = extra_data['model_restrains'] + self.orchestrator.send_user_input(user_json) + def results_available(self, task_id, node_id): with self.condition: return self.result_status[utils.string_task(task_id)].get(node_id, False) @@ -87,7 +116,7 @@ class Orchestrator: def __init__(self): - self.handler_ = OrchestratorNodeHandle() + self.handler_ = OrchestratorNodeHandle(self) self.node_ = cpp_OrchestratorNode(self.handler_) # Proxy method to run the node From 6059cf086294d920746eda099c76b481b1380997 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Tue, 8 Apr 2025 13:17:03 +0200 Subject: [PATCH 3/5] Refs #22884: Minor corrections after rebase Signed-off-by: Javier Gil Aviles --- .../sustainml-wp5/orchestrator_node/orchestrator_node.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py index e2df080..9c83db9 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py @@ -84,7 +84,7 @@ def register_result(self, task_id, node_id): # If the node is CarbonTracker and its output extra_data is non-empty, print a message if node_id == utils.node_id.CARBONTRACKER.value: print("Data results of CARBON") # Debugging - carbon_data = sustainml_swig.get_carbontracker(self.orchestrator.node_, task_id) + carbon_data = sustainml_swig.get_carbontracker_task_data(self.orchestrator.node_, task_id) try: extra_data_vector = carbon_data.extra_data() # Vector de uint8_t extra_data_list = [s for s in extra_data_vector] @@ -101,7 +101,7 @@ def register_result(self, task_id, node_id): if num_outputs > 1: print("Reiterating CarbonTracker node for multiple outputs") # Debugging - user_json = self.orchestrator.get_orchestrator(task_id) + user_json = self.orchestrator.get_user_input_data(task_id) user_json.get('extra_data', {})['num_outputs'] = num_outputs - 1 user_json['previous_iteration'] = task_id.iteration_id() user_json.get('extra_data', {})['previous_problem_id'] = task_id.problem_id() From b8c46a3c63e6888840d3d48bd70d863610e87d41 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Tue, 22 Apr 2025 08:27:35 +0200 Subject: [PATCH 4/5] Update reference of wp2 for hf_token environment variable Signed-off-by: Javier Gil Aviles --- sustainml_modules/requirements.txt | 26 +++++++++---------- .../sustainml_modules/sustainml-wp2 | 2 +- .../sustainml-wp5/backend_node.py | 17 ++++++++++++ .../orchestrator_node/orchestrator_node.py | 4 ++- 4 files changed, 34 insertions(+), 15 deletions(-) diff --git a/sustainml_modules/requirements.txt b/sustainml_modules/requirements.txt index c6f88d9..0433060 100644 --- a/sustainml_modules/requirements.txt +++ b/sustainml_modules/requirements.txt @@ -5,16 +5,16 @@ jsonschema kaggle networkx>=3.0 numpy>=1.24.1 -ollama -onnx -onnxruntime -opencv-python>=4.9.0.80 -optimum -Pillow>=9.3.0 -PyQt5>=5.15.10 -rdflib>=7.0.0 -timm -torch>=2.1.2+cu121 -transformers>=4.37.0 -typer -ultralytics +ollama==0.3.3 +onnx==1.17.0 +onnxruntime==1.19.2 +opencv-python==4.10.0.84 +optimum==1.17.1 +Pillow==10.4.0 +PyQt5==5.15.11 +rdflib==7.0.0 +timm==1.0.14 +torch==2.4.1 +transformers==4.48.3 +typer==0.15.1 +ultralytics==8.3.4 diff --git a/sustainml_modules/sustainml_modules/sustainml-wp2 b/sustainml_modules/sustainml_modules/sustainml-wp2 index 1656801..dba456a 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp2 +++ b/sustainml_modules/sustainml_modules/sustainml-wp2 @@ -1 +1 @@ -Subproject commit 1656801b82502cfab71122e94a8554bd131c35b0 +Subproject commit dba456a0f4dfcad260cd864a54ab5cd51c7df98a diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py index 30b7e68..ebe18cb 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py @@ -14,6 +14,7 @@ """SustainML Backend Node Implementation.""" from flask import Flask, request, jsonify +import os import threading import time import signal @@ -36,7 +37,19 @@ def hello_world(): # Send user input data to orchestrator @server.route('/user_input', methods=['POST']) def user_input(): + global hf_token + data = request.json + if 'extra_data' in data: + try: + if isinstance(data['extra_data'], str): + data['extra_data'] = json.loads(data['extra_data']) + except Exception as e: + print("Error decoding extra_data, initializing as empty dict:", e) + data['extra_data'] = {} + else: + data['extra_data'] = {} + data['extra_data']['hf_token'] = hf_token task_id = orchestrator.send_user_input(data) if task_id is None: return jsonify({'error': 'Invalid input data'}), 400 @@ -166,6 +179,10 @@ def signal_handler(sig, frame): # Main program execution if __name__ == '__main__': signal.signal(signal.SIGINT, signal_handler) + hf_token = os.getenv("HF_TOKEN") + if hf_token is None: + print("Error: The HF_TOKEN environment variable is missing. Please set it before starting the node.") + # sys.exit(0) print("Back-end Node running, use Ctrl+C to terminate. Server listening at http://" + server_ip_address + ":" + str(server_port) + "/") flask_server_thread = ServerThread() flask_server_thread.start() # Start the Flask server with the orchestrator diff --git a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py index 9c83db9..31e2cfa 100644 --- a/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py +++ b/sustainml_modules/sustainml_modules/sustainml-wp5/orchestrator_node/orchestrator_node.py @@ -409,6 +409,7 @@ def send_user_input(self, json_data): model_selected = extra.get('model_selected', None) num_outputs = extra.get('num_outputs') model_restrains = extra.get('model_restrains', []) + hf_token = extra.get('hf_token') # Add extra data to user user_input extra_data = {'hardware_required': hw_req, @@ -416,7 +417,8 @@ def send_user_input(self, json_data): 'goal': goal, 'model_selected': model_selected, 'num_outputs': num_outputs, - 'model_restrains': model_restrains} + 'model_restrains': model_restrains, + 'hf_token': hf_token} json_obj = utils.json_dict(extra_data) data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8) user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist())) From c38e346a2cd869e62a69f500b8d0e1885f3b3249 Mon Sep 17 00:00:00 2001 From: Javier Gil Aviles Date: Mon, 28 Apr 2025 08:05:48 +0200 Subject: [PATCH 5/5] Update submodules tags Signed-off-by: Javier Gil Aviles --- sustainml_modules/sustainml_modules/sustainml-wp1 | 2 +- sustainml_modules/sustainml_modules/sustainml-wp2 | 2 +- sustainml_modules/sustainml_modules/sustainml-wp3 | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/sustainml_modules/sustainml_modules/sustainml-wp1 b/sustainml_modules/sustainml_modules/sustainml-wp1 index cba24f4..7ae1b16 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp1 +++ b/sustainml_modules/sustainml_modules/sustainml-wp1 @@ -1 +1 @@ -Subproject commit cba24f48617c92641ce611be486d21ebb2f6126a +Subproject commit 7ae1b16874b04de036fbe505ff2079fd0ee6c3f0 diff --git a/sustainml_modules/sustainml_modules/sustainml-wp2 b/sustainml_modules/sustainml_modules/sustainml-wp2 index dba456a..3e45417 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp2 +++ b/sustainml_modules/sustainml_modules/sustainml-wp2 @@ -1 +1 @@ -Subproject commit dba456a0f4dfcad260cd864a54ab5cd51c7df98a +Subproject commit 3e454171cb1a1de16aeca20e840152f762874af3 diff --git a/sustainml_modules/sustainml_modules/sustainml-wp3 b/sustainml_modules/sustainml_modules/sustainml-wp3 index 0426740..9c19ef4 160000 --- a/sustainml_modules/sustainml_modules/sustainml-wp3 +++ b/sustainml_modules/sustainml_modules/sustainml-wp3 @@ -1 +1 @@ -Subproject commit 0426740cc67104db4011cf8dafacaef68e6c3bbb +Subproject commit 9c19ef42c1147a8003c92885037cc942b63b549b