Skip to content
Open

Dev #38

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ravpy/distributed/benchmarking.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ def benchmark():
if file.endswith(".zip"):
os.remove(file)

g.logger.debug("Benchmarking completed successfully, emitting results...")
g.logger.debug("Emitting Benchmark Results...")
client.emit("benchmark_callback", data=json.dumps(benchmark_results), namespace="/client")
client.sleep(1)
Expand Down
71 changes: 19 additions & 52 deletions ravpy/distributed/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@
import sys
import time

from ..config import FTP_DOWNLOAD_FILES_FOLDER
from ..config import FTP_DOWNLOAD_FILES_FOLDER, FTP_TEMP_FILES_FOLDER
from ..globals import g
from ..strings import functions
from ..utils import get_key, dump_data, load_data
from ..utils import get_key, dump_data, load_data, load_data_raw
from .op_functions import *


def compute_locally_bm(*args, **kwargs):
operator = kwargs.get("operator", None)
op_type = kwargs.get("op_type", None)
param_args = kwargs.get("params", None)
# print("Operator", operator,"Op Type:",op_type)

if op_type == "unary":
value1 = args[0]
params = {}
Expand All @@ -37,29 +37,25 @@ def compute_locally_bm(*args, **kwargs):
# async
def compute_locally(payload, subgraph_id, graph_id):
try:
# print("Computing ",payload["operator"])
# print('\n\nPAYLOAD: ',payload)

values = []

for i in range(len(payload["values"])):
if "value" in payload["values"][i].keys():
if "path" not in payload["values"][i].keys():
values.append(payload["values"][i]["value"])

else:
download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER,
os.path.basename(payload["values"][i]["path"]))
os.path.basename(payload["values"][i]["path"]))
value = load_data(download_path).tolist()
values.append(value)

elif "op_id" in payload["values"][i].keys():
values.append(g.outputs[payload['values'][i]['op_id']])
value_path = os.path.join(FTP_TEMP_FILES_FOLDER, "temp_{}.pkl".format(payload['values'][i]['op_id']))
value = load_data_raw(value_path)
values.append(value) #g.outputs[payload['values'][i]['op_id']])

payload["values"] = values

# print("Payload Values: ", payload)

op_type = payload["op_type"]
operator = get_key(payload["operator"], functions)
params = payload['params']
Expand All @@ -72,7 +68,8 @@ def compute_locally(payload, subgraph_id, graph_id):
elif type(params[i]) == dict:
if 'op_id' in params[i].keys():
op_id = params[i]["op_id"]
param_value = g.outputs[op_id]
param_path = os.path.join(FTP_TEMP_FILES_FOLDER, "temp_{}.pkl".format(op_id))
param_value = load_data_raw(param_path)
elif 'value' in params[i].keys():
download_path = os.path.join(FTP_DOWNLOAD_FILES_FOLDER,
os.path.basename(params[i]["path"]))
Expand All @@ -99,8 +96,7 @@ def compute_locally(payload, subgraph_id, graph_id):
if 'dict' in str(type(result)):
file_path = upload_result(payload, result, subgraph_id=subgraph_id,
graph_id=graph_id) # upload_result(payload, result)
g.outputs[payload["op_id"]] = result


return json.dumps({
'op_type': payload["op_type"],
'file_name': os.path.basename(file_path),
Expand All @@ -112,37 +108,16 @@ def compute_locally(payload, subgraph_id, graph_id):
if not isinstance(result, np.ndarray):
result = np.array(result)

result_byte_size = result.size * result.itemsize

if result_byte_size < (30 * 1000000) // 10000:
try:
result = result.tolist()
except:
result = result
file_path = upload_result(payload, result, subgraph_id=subgraph_id,
graph_id=graph_id) # upload_result(payload, result)

g.outputs[payload["op_id"]] = result

return json.dumps({
'op_type': payload["op_type"],
'result': result,
'operator': payload["operator"],
"op_id": payload["op_id"],
"status": "success"
})

else:

file_path = upload_result(payload, result, subgraph_id=subgraph_id,
graph_id=graph_id) # upload_result(payload, result)
g.outputs[payload["op_id"]] = result.tolist()

return json.dumps({
'op_type': payload["op_type"],
'file_name': os.path.basename(file_path),
'operator': payload["operator"],
"op_id": payload["op_id"],
"status": "success"
})
return json.dumps({
'op_type': payload["op_type"],
'file_name': os.path.basename(file_path),
'operator': payload["operator"],
"op_id": payload["op_id"],
"status": "success"
})

except Exception as error:
print('Error: ', error)
Expand Down Expand Up @@ -394,15 +369,7 @@ def upload_result(payload, result, subgraph_id=None, graph_id=None):
result = result.tolist()
except:
result = result

file_path = dump_data(payload['op_id'], result)

from zipfile import ZipFile
with ZipFile('local_{}_{}.zip'.format(subgraph_id, graph_id), 'a') as zipObj2:
zipObj2.write(file_path, os.path.basename(file_path))

os.remove(file_path)

return file_path


Expand Down
29 changes: 16 additions & 13 deletions ravpy/distributed/evaluate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from zipfile import ZipFile

from .compute import compute_locally, emit_error
from ..config import FTP_DOWNLOAD_FILES_FOLDER
from ..config import FTP_DOWNLOAD_FILES_FOLDER, FTP_TEMP_FILES_FOLDER
from ..globals import g
from ..utils import setTimeout, stopTimer

Expand All @@ -21,7 +21,6 @@ def compute_subgraph(d):
global client, timeoutId

os.system('clear')
# print("Received Subgraph : ",d["subgraph_id"]," of Graph : ",d["graph_id"])
print(AsciiTable([['Provider Dashboard']]).table)
g.dashboard_data.append([d["subgraph_id"], d["graph_id"], "Computing"])
print(AsciiTable(g.dashboard_data).table)
Expand Down Expand Up @@ -89,6 +88,14 @@ def compute_subgraph(d):

if not g.error:

for temp_file in os.listdir(FTP_TEMP_FILES_FOLDER):
if 'temp_' in temp_file and '.pkl' in temp_file:
file_path = os.path.join(FTP_TEMP_FILES_FOLDER, temp_file)
with ZipFile('local_{}_{}.zip'.format(subgraph_id, graph_id), 'a') as zipObj2:
zipObj2.write(file_path, os.path.basename(file_path))

os.remove(file_path)

# check if file exists
zip_file_name = 'local_{}_{}.zip'.format(subgraph_id, graph_id)
if os.path.exists(zip_file_name):
Expand All @@ -98,7 +105,7 @@ def compute_subgraph(d):
emit_result_data = {"subgraph_id": d["subgraph_id"], "graph_id": d["graph_id"], "token": g.ravenverse_token,
"results": results}
client.emit("subgraph_completed", json.dumps(emit_result_data), namespace="/client")
# print('Emitted subgraph_completed')


os.system('clear')
g.dashboard_data[-1][2] = "Computed"
Expand All @@ -107,15 +114,11 @@ def compute_subgraph(d):

g.has_subgraph = False

stopTimer(timeoutId)
timeoutId = setTimeout(waitInterval, opTimeout)

delete_dir = FTP_DOWNLOAD_FILES_FOLDER
for f in os.listdir(delete_dir):
os.remove(os.path.join(delete_dir, f))

g.delete_files_list = []
g.outputs = {}


@g.client.on('ping', namespace="/client")
Expand Down Expand Up @@ -147,11 +150,6 @@ def waitInterval():
os._exit(1)

if g.client.connected:
if not g.has_subgraph:
client.emit("get_op", json.dumps({
"message": "Send me an aop"
}), namespace="/client")

stopTimer(timeoutId)
timeoutId = setTimeout(waitInterval, opTimeout)

Expand All @@ -176,4 +174,9 @@ def exit_handler():
if g.client is not None:
g.logger.debug("Disconnecting...")
if g.client.connected:
g.client.emit("disconnect", namespace="/client")
g.client.emit("disconnect", namespace="/client")

dir = FTP_TEMP_FILES_FOLDER
if os.path.exists(dir):
for f in os.listdir(dir):
os.remove(os.path.join(dir, f))
9 changes: 7 additions & 2 deletions ravpy/ftp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import socket
from ftplib import FTP

from ..config import RAVENVERSE_FTP_URL
from ..config import RAVENVERSE_FTP_URL, FTP_TEMP_FILES_FOLDER
from ..globals import g

try:
Expand Down Expand Up @@ -126,4 +126,9 @@ def exit_handler():
if g.client is not None:
g.logger.debug("Disconnecting...")
if g.client.connected:
g.client.emit("disconnect", namespace="/client")
g.client.emit("disconnect", namespace="/client")

dir = FTP_TEMP_FILES_FOLDER
if os.path.exists(dir):
for f in os.listdir(dir):
os.remove(os.path.join(dir, f))
5 changes: 5 additions & 0 deletions ravpy/initialize.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from .globals import g

from .utils import isLatestVersion
from .config import FTP_TEMP_FILES_FOLDER

def exit_handler():
g.logger.debug('Application is Closing!')
Expand All @@ -12,6 +13,10 @@ def exit_handler():
if g.client.connected:
g.client.emit("disconnect", namespace="/client")

dir = FTP_TEMP_FILES_FOLDER
if os.path.exists(dir):
for f in os.listdir(dir):
os.remove(os.path.join(dir, f))

atexit.register(exit_handler)

Expand Down
13 changes: 8 additions & 5 deletions ravpy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,10 @@ def isLatestVersion(pkgName):
return latest_version == current_version

def download_file(url, file_name):
g.logger.debug("Downloading benchmark data")
headers = {"token": g.ravenverse_token}
with requests.get(url, stream=True, headers=headers) as r:
with open(file_name, 'wb') as f:
shutil.copyfileobj(r.raw, f)
g.logger.debug("Benchmark data downloaded")


def get_key(val, dict):
Expand Down Expand Up @@ -107,7 +105,6 @@ def get_ftp_credentials():

def get_graph(graph_id):
# Get graph
g.logger.debug("get_graph")
headers = {"token": g.ravenverse_token}
r = requests.get(url="{}/graph/get/?id={}".format(RAVENVERSE_URL, graph_id), headers=headers)
if r.status_code == 200:
Expand All @@ -117,7 +114,6 @@ def get_graph(graph_id):

def get_federated_graph(graph_id):
# Get graph
g.logger.debug("get_federated_graph")
headers = {"token": g.ravenverse_token}
r = requests.get(url="{}/graph/get_federated/?id={}".format(RAVENVERSE_URL, graph_id), headers=headers)
if r.status_code == 200:
Expand Down Expand Up @@ -210,7 +206,7 @@ def dump_data(op_id, value):
os.remove(file_path)
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, 'wb') as f:
pkl.dump(value, f)
pkl.dump(value, f, protocol=pkl.HIGHEST_PROTOCOL)
return file_path


Expand All @@ -222,6 +218,13 @@ def load_data(path):
data = pkl.load(f)
return np.array(data)

def load_data_raw(path):
"""
Load data from file
"""
with open(path, 'rb') as f:
data = pkl.load(f)
return data

def initialize_ftp_client():
credentials = get_ftp_credentials()
Expand Down