Skip to content

[22884][22914] Integration of multiples outputs #67

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions sustainml_modules/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,16 @@ jsonschema
kaggle
networkx>=3.0
numpy>=1.24.1
ollama
onnx
onnxruntime
opencv-python>=4.9.0.80
optimum
Pillow>=9.3.0
PyQt5>=5.15.10
rdflib>=7.0.0
timm
torch>=2.1.2+cu121
transformers>=4.37.0
typer
ultralytics
ollama==0.3.3
onnx==1.17.0
onnxruntime==1.19.2
opencv-python==4.10.0.84
optimum==1.17.1
Pillow==10.4.0
PyQt5==5.15.11
rdflib==7.0.0
timm==1.0.14
torch==2.4.1
transformers==4.48.3
typer==0.15.1
ultralytics==8.3.4
2 changes: 1 addition & 1 deletion sustainml_modules/sustainml_modules/sustainml-wp3
18 changes: 18 additions & 0 deletions sustainml_modules/sustainml_modules/sustainml-wp5/backend_node.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"""SustainML Backend Node Implementation."""

from flask import Flask, request, jsonify
import os
import threading
import time
import signal
Expand All @@ -36,7 +37,19 @@ def hello_world():
# Send user input data to orchestrator
@server.route('/user_input', methods=['POST'])
def user_input():
global hf_token

data = request.json
if 'extra_data' in data:
try:
if isinstance(data['extra_data'], str):
data['extra_data'] = json.loads(data['extra_data'])
except Exception as e:
print("Error decoding extra_data, initializing as empty dict:", e)
data['extra_data'] = {}
else:
data['extra_data'] = {}
data['extra_data']['hf_token'] = hf_token
task_id = orchestrator.send_user_input(data)
if task_id is None:
return jsonify({'error': 'Invalid input data'}), 400
Expand Down Expand Up @@ -114,6 +127,7 @@ def results_args():
'task_id': task_json}
return jsonify(json), 200


return jsonify({utils.string_node(node_id): orchestrator.get_results(node_id, task_id)}), 200

# Flask server shutdown route
Expand Down Expand Up @@ -165,6 +179,10 @@ def signal_handler(sig, frame):
# Main program execution
if __name__ == '__main__':
signal.signal(signal.SIGINT, signal_handler)
hf_token = os.getenv("HF_TOKEN")
if hf_token is None:
print("Error: The HF_TOKEN environment variable is missing. Please set it before starting the node.")
# sys.exit(0)
print("Back-end Node running, use Ctrl+C to terminate. Server listening at http://" + server_ip_address + ":" + str(server_port) + "/")
flask_server_thread = ServerThread()
flask_server_thread.start() # Start the Flask server with the orchestrator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@

class OrchestratorNodeHandle(cpp_OrchestratorNodeHandle):

def __init__(self):

def __init__(self, orchestrator):
super().__init__()
self.orchestrator = orchestrator
self.condition = threading.Condition()
self.last_task_id = None
self.node_status_ = {}
Expand All @@ -49,13 +50,14 @@ def on_node_status_change(
# Callback
def on_new_node_output(
self,
id : int,
id: int,
data):
task_id = sustainml_swig.get_task_id(id, data)
if task_id is None:
print(utils.string_node(id), "node output received.")
else:
print(utils.string_node(id), "node output received from task", utils.string_task(task_id))

self.register_result(task_id, id)

def register_task(self, task_id):
Expand All @@ -79,6 +81,33 @@ def register_result(self, task_id, node_id):
self.result_status[utils.string_task(task_id)][node_id] = True
self.condition.notify_all()

# If the node is CarbonTracker and its output extra_data is non-empty, print a message
if node_id == utils.node_id.CARBONTRACKER.value:
print("Data results of CARBON") # Debugging
carbon_data = sustainml_swig.get_carbontracker_task_data(self.orchestrator.node_, task_id)
try:
extra_data_vector = carbon_data.extra_data() # Vector de uint8_t
extra_data_list = [s for s in extra_data_vector]
extra_data_bytes = bytes(extra_data_list)
extra_data_str = extra_data_bytes.decode('utf-8')
extra_data = json.loads(extra_data_str)
except AttributeError:
extra_data = None

if extra_data is not None and len(extra_data) > 0:
print("Resend is going to be made") # Debugging
num_outputs = extra_data['num_outputs']
print("Number of outputs:", num_outputs) # Debugging

if num_outputs > 1:
print("Reiterating CarbonTracker node for multiple outputs") # Debugging
user_json = self.orchestrator.get_user_input_data(task_id)
user_json.get('extra_data', {})['num_outputs'] = num_outputs - 1
user_json['previous_iteration'] = task_id.iteration_id()
user_json.get('extra_data', {})['previous_problem_id'] = task_id.problem_id()
user_json.get('extra_data', {})['model_restrains'] = extra_data['model_restrains']
self.orchestrator.send_user_input(user_json)

def results_available(self, task_id, node_id):
with self.condition:
return self.result_status[utils.string_task(task_id)].get(node_id, False)
Expand All @@ -87,7 +116,7 @@ class Orchestrator:

def __init__(self):

self.handler_ = OrchestratorNodeHandle()
self.handler_ = OrchestratorNodeHandle(self)
self.node_ = cpp_OrchestratorNode(self.handler_)

# Proxy method to run the node
Expand Down Expand Up @@ -238,10 +267,17 @@ def get_carbontracker_task_data(self, task_id):
carbon_footprint = node_data.carbon_footprint()
energy_consumption = node_data.energy_consumption()
carbon_intensity = node_data.carbon_intensity()
extra_data_vector = node_data.extra_data()
extra_data_list = [s for s in extra_data_vector]
extra_data_bytes = bytes(extra_data_list)
extra_data_str = extra_data_bytes.decode('utf-8')
extra_data = json.loads(extra_data_str)
json_output = {'task_id': task_json,
'carbon_footprint': carbon_footprint,
'energy_consumption': energy_consumption,
'carbon_intensity': carbon_intensity}
'carbon_intensity': carbon_intensity,
'extra_data': extra_data}

return json_output

def get_user_input_data(self, task_id):
Expand Down Expand Up @@ -371,12 +407,18 @@ def send_user_input(self, json_data):
mem_footprint = extra.get('max_memory_footprint', utils.default_mem_footprint)
goal = extra.get('goal')
model_selected = extra.get('model_selected', None)
num_outputs = extra.get('num_outputs')
model_restrains = extra.get('model_restrains', [])
hf_token = extra.get('hf_token')

# Add extra data to user user_input
extra_data = {'hardware_required': hw_req,
'max_memory_footprint': mem_footprint,
'goal': goal,
'model_selected': model_selected}
'model_selected': model_selected,
'num_outputs': num_outputs,
'model_restrains': model_restrains,
'hf_token': hf_token}
json_obj = utils.json_dict(extra_data)
data_array = np.frombuffer(json_obj.encode(), dtype=np.uint8)
user_input.extra_data(sustainml_swig.uint8_t_vector(data_array.tolist()))
Expand Down