1+ import numpy as np
2+
13from utils import CryptoUtils
24from utils .ResourceTracker import ResourceTracker as rT
35from domain .Enums import HttpCodes
46
57
68class Worker :
9+ # region docstrings
710 """
811 Defines a node on the P2P network. Workers are subject to constraints imposed by Hivemind, constraints they inflict
912 on themselves based on available computing power (CPU, RAM, etc...) and can have [0, N] shared file parts. Workers
1013 have the ability to reconstruct lost file parts when needed.
11- :ivar hivemind: coordinator of the unstructured Hybrid P2P network that enlisted this worker for a Hive
12- :type str
14+ :ivar file_parts: key part_name maps to a dict of part_id keys whose values are SharedFilePart
15+ :type dict< str, dict<str, SharedFilePart>
1316 :ivar name: id of this worker node that uniquely identifies him in the network
1417 :type str
15- :ivar file_parts: part_id is a key to a SharedFilePart
16- :type dict<string, SharedFilePart>
18+ :ivar hivemind: coordinator of the unstructured Hybrid P2P network that enlisted this worker for a Hive
19+ :type str
20+ :ivar routing_table: maps file name with state transition probabilities, from this worker to other workers
21+ :type dict<str, pandas.DataFrame>
1722 """
23+ # endregion
1824
25+ # region class variables, instance variables and constructors
1926 def __init__ (self , hivemind , name ):
20- self .hivemind = hivemind
21- self .name = name
2227 self .file_parts = {}
28+ self .__routing_table = {}
29+ self .name = name
30+ self .hivemind = hivemind
31+ # endregion
2332
33+ # region overriden class methods
2434 def __hash__ (self ):
2535 # allows a worker object to be used as a dictionary key
2636 return hash (str (self .name ))
2737
2838 def __eq__ (self , other ):
39+ if isinstance (other , str ):
40+ return self .name == other
2941 return (self .hivemind , self .name ) == (other .hivemind , other .name )
3042
3143 def __ne__ (self , other ):
3244 return not (self == other )
45+ # endregion
3346
47+ # region file recovery methods
3448 def __init_recovery_protocol (self , part ):
3549 """
36- # TODO
3750 When a corrupt file is received initiate recovery protocol, if this is the node with the most file parts
3851 The recovery protocol consists of reconstructing the damaged file part from other parts on the system, it may be
3952 necessary to obtain other files from other nodes to initiate reconstruction
53+ # Note to self - This is not important right now! This is only important after MCMC with metropolis hastings works
54+ # For now assume that when a node dies, if it had less than N-K parts, his parts are given to someone else
4055 """
56+ # TODO:
57+ # corrupted or missing file recovery algorithm
4158 pass
59+ # endregion
60+
61+ # region instance methods
62+ def set_file_routing (self , file_name , labeled_transition_vector ):
63+ """
64+ :param file_name: a file name that is being shared on the hive
65+ :type str
66+ :param labeled_transition_vector: probability vector indicating transitions to other states for the given file
67+ :type 1-D numpy.Array in column format
68+ """
69+ self .__routing_table [file_name ] = labeled_transition_vector
4270
43- def receive_part (self , part ):
44- if CryptoUtils .sha256 (part .part_data ) == part .sha256 :
45- self .file_parts [part .part_id ] = part
71+ def receive_part (self , part , no_check = False ):
72+ if no_check or CryptoUtils .sha256 (part .part_data ) == part .sha256 :
73+ if part .name in self .file_parts :
74+ self .file_parts [part .name ][part .part_id ] = part
75+ else :
76+ self .file_parts [part .name ] = {}
77+ self .file_parts [part .name ][part .part_id ] = part
4678 else :
4779 print ("part_name: {}, part_number: {} - corrupted" .format (part .part_name , str (part .part_number )))
4880 self .__init_recovery_protocol (part )
4981
5082 def send_part (self ):
51- tmp = {}
52- for part_id , part in self .file_parts .items ():
53- dest_worker = part .get_next_state (self .name )
83+ for part_name , part_id_sfp_dict in self .file_parts .items ():
84+ tmp = {}
85+ for part_id , sfp_obj in part_id_sfp_dict .items ():
86+ dest_worker = self .get_next_state (file_name = part_name )
5487 if dest_worker == self .name :
55- tmp [part_id ] = part
88+ tmp [part_id ] = sfp_obj
5689 else :
57- response_code = self .hivemind .simulate_transmission (dest_worker , part )
90+ response_code = self .hivemind .simulate_transmission (dest_worker , sfp_obj )
5891 if response_code != HttpCodes .OK :
59- tmp [part_id ] = part
60- self .file_parts = tmp
92+ # TODO:
93+ # make use of the HttpCode responses with more than a binary behaviour
94+ tmp [part_id ] = sfp_obj
95+ self .file_parts [part_name ] = tmp
6196
6297 def leave_hive (self , orderly = True ):
98+ """
99+ Resets the field of the Worker instance
100+ :param orderly: When True asks the hivemind (master node) to redistribute files belonging to the Worker instance
101+ :type bool
102+ """
63103 if orderly :
64104 self .hivemind .simulate_redistribution (self .file_parts )
65105 self .hivemind = None
66106 self .name = None
67107 self .file_parts = None
68108
109+ def get_next_state (self , file_name ):
110+ """
111+ :param file_name: the name of the file the part to be routed belongs to
112+ :type: str
113+ :return: the name of the worker to whom the file should be routed too
114+ :type: str
115+ """
116+ file_routing_table = self .__routing_table [file_name ]
117+ row_labels = [* file_routing_table .index .values ]
118+ label_probabilities = [* file_routing_table [self .name ]]
119+ return np .random .choice (a = row_labels , p = label_probabilities )
120+ # endregion
121+
122+ # region static methods
69123 @staticmethod
70124 def get_resource_utilization (* args ):
71125 """
@@ -82,6 +136,4 @@ def get_resource_utilization(*args):
82136 for arg in args :
83137 results [arg ] = rT .get_value (arg )
84138 return results
85-
86-
87-
139+ # endregion
0 commit comments