Skip to content

Commit d8e9444

Browse files
Merge pull request #38 from FranciscoKloganB/finding-where-files-are-lost
Ready for release
2 parents edcd828 + 58930ff commit d8e9444

6 files changed

Lines changed: 84 additions & 66 deletions

File tree

hive/app/domain/Hive.py

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,18 @@ class Hive:
2323
:ivar List[float, float] corruption_chances: used to simulate file corruption on behalf of the workers, to avoid keeping independant distributions for each part and each replica
2424
:ivar str id: unique identifier in str format
2525
:ivar Hivemind hivemind: reference to the master server, which in this case is just a simulator program
26-
:ivar FileData Union[None, FileData]: instance of class FileData which contains information regarding the file persisted by this hive
2726
:ivar Dict[str, Worker] members: Workers that belong to this P2P Hive, key is worker.id, value is the respective Worker instance
27+
:ivar FileData file: instance of class FileData which contains information regarding the file persisted by this hive
28+
:ivar DataFrame desired_distribution: distribution hive members are seeking to achieve for each the files they persist together.
2829
:ivar int critical_size: minimum number of replicas required for data recovery plus the number of peer faults the system must support during replication.
2930
:ivar int sufficient_size: depends on churn-rate and equals critical_size plus the number of peers expected to fail between two successive recovery phases
30-
:ivar int redudant_size: application-specific system parameter, but basically represents that the hive is to big
31-
:ivar DataFrame desired_distribution: distribution hive members are seeking to achieve for each the files they persist together.
32-
:ivar Dict[str, SharedFilePart] recoverable_parts: just an hammer
31+
:ivar int original_size: stores the initial hive size
32+
:ivar int redundant_size: application-specific system parameter, but basically represents that the hive is to big
33+
:ivar int set_recovery_epoch_sum: stores the sum of the values returned by all SharedFilePart.set_recovery_epoch calls - used for simulation output purposes
34+
:ivar int set_recovery_epoch_calls: stores how many times SharedFilePart.set_recovery_epoch calls was called during the current epoch
35+
:ivar bool running: indicates if the hive has terminated - used for simulation purposes
3336
"""
37+
3438
# region Class Variables, Instance Variables and Constructors
3539
def __init__(self, hivemind: hm.Hivemind, file_name: str, members: Dict[str, Worker]) -> None:
3640
"""
@@ -43,15 +47,16 @@ def __init__(self, hivemind: hm.Hivemind, file_name: str, members: Dict[str, Wor
4347
self.corruption_chances: List[float] = [0, 0]
4448
self.id: str = str(uuid.uuid4())
4549
self.hivemind = hivemind
46-
self.file: FileData = FileData(file_name)
4750
self.members: Dict[str, Worker] = members
51+
self.file: FileData = FileData(file_name)
52+
self.desired_distribution: pd.DataFrame = pd.DataFrame()
4853
self.critical_size: int = REPLICATION_LEVEL
4954
self.sufficient_size: int = self.critical_size + math.ceil(len(self.members) * 0.34)
5055
self.original_size: int = len(members)
51-
self.redudant_size: int = self.sufficient_size + len(self.members)
52-
self.desired_distribution = None
53-
self.file.simulation_data.set_membership_maintenace_at_index(status="stable", size_before=len(members), size_after=len(members), i=0)
54-
self.running = True
56+
self.redundant_size: int = self.sufficient_size + len(self.members)
57+
self.running: bool = True
58+
self.set_recovery_epoch_sum: int = 0
59+
self.set_recovery_epoch_calls: int = 0
5560
self.broadcast_transition_matrix(self.new_transition_matrix()) # implicitly inits self.desired_distribution within new_transition_matrix()
5661
# endregion
5762

@@ -203,6 +208,7 @@ def execute_epoch(self, epoch: int) -> None:
203208
self.running = False
204209
except Exception as e:
205210
self.set_fail("Unexpected exception: ".join(traceback.format_exception(etype=type(e), value=e, tb=e.__traceback__)))
211+
self.file.simulation_data.set_delay_at_index(self.set_recovery_epoch_sum, self.set_recovery_epoch_calls, self.current_epoch)
206212

207213
def is_running(self) -> bool:
208214
return self.running
@@ -224,7 +230,7 @@ def evaluate_hive_convergence(self):
224230
else:
225231
self.file.current_distribution.at[worker.id, DEFAULT_COL] = 0
226232

227-
self.file.simulation_data.parts_in_hive[self.current_epoch] = parts_in_hive
233+
self.file.simulation_data.set_parts_at_index(parts_in_hive, self.current_epoch)
228234

229235
if not parts_in_hive:
230236
self.set_fail("hive has no remaining parts")
@@ -239,8 +245,10 @@ def evaluate_hive_convergence(self):
239245
# region Helpers
240246
def setup_epoch(self, epoch: int) -> None:
241247
self.current_epoch = epoch
242-
self.corruption_chances[0] = 0.0 # np.log10(epoch).item() / 100.0
248+
self.corruption_chances[0] = np.log10(epoch).item() / 300.0
243249
self.corruption_chances[1] = 1.0 - self.corruption_chances[0]
250+
self.set_recovery_epoch_sum = 0
251+
self.set_recovery_epoch_calls = 0
244252

245253
def workers_execute_epoch(self, lost_parts_count: int = 0) -> List[Worker]:
246254
"""
@@ -257,11 +265,11 @@ def workers_execute_epoch(self, lost_parts_count: int = 0) -> List[Worker]:
257265
lost_parts_count += len(lost_parts)
258266
offline_workers.append(worker)
259267
for part in lost_parts.values():
260-
part.set_epochs_to_recover(self.current_epoch)
268+
self.set_recovery_epoch(part)
261269
if part.decrease_and_get_references() == 0:
262-
self.set_fail("lost all replicas of at least one file part")
270+
self.set_fail("lost all replicas of file part with id: {}".format(part.id))
263271
if len(offline_workers) >= len(self.members):
264-
self.set_fail("lost all replicas of at least one file part")
272+
self.set_fail("all hive members disconnected simultaneously")
265273
self.file.simulation_data.set_disconnected_and_losses(len(offline_workers), lost_parts_count, self.current_epoch)
266274
return offline_workers
267275

@@ -278,20 +286,22 @@ def membership_maintenance(self, offline_workers: List[Worker]) -> None:
278286
if damaged_hive_size >= self.sufficient_size:
279287
self.remove_cloud_reference()
280288

281-
if damaged_hive_size >= self.redudant_size:
289+
if damaged_hive_size >= self.redundant_size:
282290
status_before_recovery = "redundant" # TODO: future-iterations evict worse members
283-
elif self.original_size <= damaged_hive_size < self.redudant_size:
291+
elif self.original_size <= damaged_hive_size < self.redundant_size:
284292
status_before_recovery = "stable"
285293
elif self.sufficient_size <= damaged_hive_size < self.original_size:
286294
status_before_recovery = "sufficient"
287295
self.members.update(self.__get_new_members())
288296
elif self.critical_size < damaged_hive_size < self.sufficient_size:
289297
status_before_recovery = "unstable"
290298
self.members.update(self.__get_new_members())
291-
else:
299+
elif 0 < damaged_hive_size <= self.critical_size:
292300
status_before_recovery = "critical"
293301
self.members.update(self.__get_new_members())
294302
self.add_cloud_reference()
303+
else:
304+
status_before_recovery = "dead"
295305

296306
healed_hive_size = len(self.members)
297307
if damaged_hive_size != healed_hive_size:
@@ -302,12 +312,15 @@ def membership_maintenance(self, offline_workers: List[Worker]) -> None:
302312
def __get_new_members(self) -> Dict[str, Worker]:
303313
return self.hivemind.find_replacement_worker(self.members, self.original_size - len(self.members))
304314

305-
def set_fail(self, msg: str) -> bool:
315+
def set_fail(self, msg: str) -> None:
306316
self.running = False
307-
return self.file.simulation_data.set_fail(self.current_epoch, msg)
317+
self.file.simulation_data.set_fail(self.current_epoch, msg)
308318

309319
def tear_down(self, epoch: int) -> None:
310320
# self.hivemind.append_epoch_results(self.id, self.file.simulation_data.__repr__()) TODO: future-iterations where Hivemind has multiple hives
311321
self.file.jwrite(self.file.simulation_data, epoch)
312322

323+
def set_recovery_epoch(self, part: SharedFilePart) -> None:
324+
self.set_recovery_epoch_sum += part.set_recovery_epoch(self.current_epoch)
325+
self.set_recovery_epoch_calls += 1
313326
# endregion

hive/app/domain/Worker.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ class Worker:
3030
# region Class Variables, Instance Variables and Constructors
3131
def __init__(self, worker_id: str, worker_uptime: float):
3232
self.id: str = worker_id
33-
self.uptime: float = math.ceil(worker_uptime * MAX_EPOCHS)
33+
self.uptime: float = float('inf') if worker_uptime == 1.0 else math.ceil(worker_uptime * MAX_EPOCHS)
3434
self.hives: Dict[str, h.Hive] = {}
3535
self.files: Dict[str, Dict[int, SharedFilePart]] = {}
3636
self.routing_table: Dict[str, pd.DataFrame] = {}
@@ -167,9 +167,9 @@ def discard_part(self, name: str, number: int, corrupt: bool = False, hive: h.Hi
167167
part: SharedFilePart = self.files.get(name, {}).pop(number, None)
168168
if part and corrupt:
169169
if part.decrease_and_get_references() == 0:
170-
hive.set_fail("lost all replicas of at least one file part")
170+
hive.set_fail("lost all replicas of file part with id: {}, and last loss was due to corruption".format(part.id))
171171
else:
172-
part.set_epochs_to_recover(hive.current_epoch)
172+
hive.set_recovery_epoch(part)
173173

174174
def get_file_parts(self, file_name: str) -> Dict[int, SharedFilePart]:
175175
"""

hive/app/domain/helpers/SharedFilePart.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ def __init__(self, hive_id: str, name: str, number: int, data: bytes):
3737
# endregion
3838

3939
# region Simulation Interface
40-
def set_epochs_to_recover(self, epoch: int) -> int:
40+
def set_recovery_epoch(self, epoch: int) -> int:
4141
"""
4242
Assigns a value to the instance's recovery_epoch attribute that indicates when a Worker who posses a reference to it, can replicate the part.
4343
:param int epoch: current simulation's epoch

hive/app/domain/helpers/file_data.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@ def equal_distributions(self, parts_in_hive: int) -> bool:
3737
"""
3838
Delegates distribution comparison to ConvergenceData.equal_distributions static method
3939
"""
40+
if parts_in_hive == 0:
41+
return False
42+
4043
normalized_cdv = self.current_distribution.divide(parts_in_hive)
4144
if DEBUG:
4245
self.fwrite("Desired Distribution:\n{}\nCurrent Distribution:\n{}\n".format(
@@ -54,14 +57,17 @@ def fwrite(self, string: str) -> None:
5457
self.out_file.write(string + "\n")
5558

5659
def jwrite(self, data: SimulationData, epoch: int):
57-
print(self.simulation_data.msg)
58-
stop: int = epoch+1
59-
data.disconnected_workers = data.disconnected_workers[0:stop]
60-
data.lost_parts = data.lost_parts[0:stop]
61-
data.hive_status_before_maintenance = data.hive_status_before_maintenance[0:stop]
62-
data.hive_size_before_maintenance = data.hive_size_before_maintenance[0:stop]
63-
data.hive_size_after_maintenance = data.hive_size_after_maintenance[0:stop]
64-
data.delay = data.delay[0:stop]
60+
if not data.msg:
61+
data.msg.append("completed simulation successfully")
62+
if DEBUG:
63+
[print("* {};".format(reason)) for reason in data.msg]
64+
data.disconnected_workers = data.disconnected_workers[:epoch]
65+
data.delay = data.delay[:epoch]
66+
data.lost_parts = data.lost_parts[:epoch]
67+
data.hive_status_before_maintenance = data.hive_status_before_maintenance[:epoch]
68+
data.hive_size_before_maintenance = data.hive_size_before_maintenance[:epoch]
69+
data.hive_size_after_maintenance = data.hive_size_after_maintenance[:epoch]
70+
data.delay = data.delay[:epoch]
6571
json_string = json.dumps(data.__dict__, indent=4, sort_keys=True, ensure_ascii=False)
6672
self.fwrite(json_string)
6773

hive/app/domain/helpers/simulation_data.py

Lines changed: 30 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,21 @@ def __init__(self):
3131
# Updated on Hive.execute_epoch
3232
self.terminated: int = MAX_EPOCHS # gathered
3333
self.successfull: bool = True # gathered
34-
self.msg = "completed simulation successfully" # gathered
35-
self.disconnected_workers: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
34+
self.msg = [] # gathered
35+
self.disconnected_workers: List[int] = [0] * MAX_EPOCHS # gathered
3636
self.lost_parts: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
37-
self.hive_status_before_maintenance: List[str] = [""] * MAX_EPOCHS_PLUS # gathered
38-
self.hive_size_before_maintenance: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
39-
self.hive_size_after_maintenance: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
37+
self.hive_status_before_maintenance: List[str] = [""] * MAX_EPOCHS # gathered
38+
self.hive_size_before_maintenance: List[int] = [0] * MAX_EPOCHS # gathered
39+
self.hive_size_after_maintenance: List[int] = [0] * MAX_EPOCHS # gathered
4040
self.delay: List[float] = [0.0] * MAX_EPOCHS_PLUS
4141
###############################
4242
###############################
4343
# Updated on Hive.route_part
44-
self.moved_parts: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
45-
self.corrupted_parts: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
46-
self.lost_messages: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
44+
self.moved_parts: List[int] = [0] * MAX_EPOCHS # gathered
45+
self.corrupted_parts: List[int] = [0] * MAX_EPOCHS # gathered
46+
self.lost_messages: List[int] = [0] * MAX_EPOCHS # gathered
4747
###############################
48-
self.parts_in_hive: List[int] = [0] * MAX_EPOCHS_PLUS # gathered
48+
self.parts_in_hive: List[int] = [0] * MAX_EPOCHS # gathered
4949

5050
# endregion
5151

@@ -119,12 +119,13 @@ def __repr__(self):
119119
# endregion
120120

121121
# region Helpers
122-
def set_delay_at_index(self, n: float, i: int) -> None:
122+
def set_delay_at_index(self, delay: int, calls: int, i: int) -> None:
123123
"""
124-
:param float n: the delay at epoch i
124+
:param int delay: the delay sum
125+
:param int calls: number of times a delay was generated
125126
:param int i: index of epoch i in SimulationData.delay list
126127
"""
127-
self.delay[i] = n
128+
self.delay[i-1] = 0 if calls == 0 else delay / calls
128129

129130
def set_disconnected_and_losses(self, disconnected=0, lost=0, i=0):
130131
"""
@@ -138,50 +139,53 @@ def set_moved_parts_at_index(self, n: int, i: int) -> None:
138139
:param int n: the quantity of parts moved at epoch i
139140
:param int i: index of epoch i in SimulationData.moved_parts list
140141
"""
141-
self.moved_parts[i] += n
142+
self.moved_parts[i-1] += n
143+
144+
def set_parts_at_index(self, n: int, i: int) -> None:
145+
"""
146+
:param int n: the quantity of parts moved at epoch i
147+
:param int i: index of epoch i in SimulationData.parts_in_hive list
148+
"""
149+
self.parts_in_hive[i-1] += n
142150

143151
def set_disconnected_workers_at_index(self, n: int, i: int) -> None:
144152
"""
145153
:param int n: the quantity of disconnected workers at epoch i
146154
:param int i: index of epoch i in SimulationData.disconnected_workers list
147155
"""
148-
self.disconnected_workers[i] += n
156+
self.disconnected_workers[i-1] += n
149157

150158
def set_lost_parts_at_index(self, n: int, i: int) -> None:
151159
"""
152160
:param int n: the quantity of lost parts at epoch i
153161
:param int i: index of epoch i in SimulationData.lost_parts list
154162
"""
155-
self.lost_parts[i] += n
163+
self.lost_parts[i-1] += n
156164

157165
def set_lost_messages_at_index(self, n: int, i: int) -> None:
158166
"""
159167
:param int n: the quantity of loss messages at epoch i
160168
:param int i: index of epoch i in SimulationData.lost_messages list
161169
"""
162-
self.lost_messages[i] += n
170+
self.lost_messages[i-1] += n
163171

164172
def set_corrupt_files_at_index(self, n: int, i: int) -> None:
165173
"""
166174
:param int n: the quantity of corrupted parts at epoch i
167175
:param int i: index of epoch i in SimulationData.corrupted_parts list
168176
"""
169-
self.corrupted_parts[i] += n
177+
self.corrupted_parts[i-1] += n
170178

171-
def set_fail(self, i: int, msg: str = "") -> bool:
179+
def set_fail(self, i: int, msg: str = "") -> None:
172180
"""
173181
Records the epoch at which the Hive terminated, should only be called if it finished early.
174182
Default, Hive.terminated = MAX_EPOCHS and Hive.successfull = True.
175183
:param int i: epoch at which Hive terminated
176184
:param str msg: a message
177-
:returns bool: usually returns False, only returns True when param i, representing epoch is qual to MAX_EPOCHS
178185
"""
179-
if i == MAX_EPOCHS:
180-
return True
181186
self.terminated = i
182187
self.successfull = False
183-
self.msg = msg
184-
return False
188+
self.msg.append(msg)
185189

186190
def set_membership_maintenace_at_index(self, status: str, size_before: int, size_after: int, i: int) -> None:
187191
"""
@@ -190,13 +194,8 @@ def set_membership_maintenace_at_index(self, status: str, size_before: int, size
190194
:param int size_after: size of the hive after maintenace
191195
:param int i: index of epoch i in SimulationData.delay list
192196
"""
193-
if status is None or size_before is None or size_after is None:
194-
self.hive_status_before_maintenance[i] = self.hive_status_before_maintenance[i-1]
195-
self.hive_size_before_maintenance[i] = self.hive_size_before_maintenance[i-1]
196-
self.hive_size_after_maintenance[i] = self.hive_size_after_maintenance[i-1]
197-
else:
198-
self.hive_status_before_maintenance[i] = status
199-
self.hive_size_before_maintenance[i] = size_before
200-
self.hive_size_after_maintenance[i] = size_after
197+
self.hive_status_before_maintenance[i-1] = status
198+
self.hive_size_before_maintenance[i-1] = size_before
199+
self.hive_size_after_maintenance[i-1] = size_after
201200
# endregion
202201

hive/app/globals/globals.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,12 +3,12 @@
33
DEBUG: bool = True
44

55
# region Simulation Settings
6-
READ_SIZE: int = 20971520 # 512KB = 524288 bytes || 32KB = 32768 bytes. Defines the raw size of each SharedFilePart.
7-
MAX_EPOCHS = 50 # One day has 24h, meaning that one epoch per minute wwould be 1440, 720 defines one epoch every two minutes
6+
READ_SIZE: int = 131072 # 32KB = 32768b || 128KB = 131072b || 512KB = 524288b || 20MB = 20971520b. Defines the raw size of each SharedFilePart.
7+
MAX_EPOCHS = 720 # One day has 24h, meaning that one epoch per minute wwould be 1440, 720 defines one epoch every two minutes
88
MAX_EPOCHS_PLUS = MAX_EPOCHS + 1
99
MIN_DETECTION_DELAY: int = 1 # 2 minutes
10-
MAX_DETECTION_DELAY: int = 2 # 2 minutes
11-
REPLICATION_LEVEL: int = 3 # Each file part has 3 copies, for simulation purposes, this copies are soft copies.
10+
MAX_DETECTION_DELAY: int = 3 # 6 minutes
11+
REPLICATION_LEVEL: int = 5 # Each file part has 3 copies, for simulation purposes, this copies are soft copies.
1212
MIN_CONVERGENCE_THRESHOLD: int = 3
1313
LOSS_CHANCE: float = 0.04 # Each sent file as a 4% chance of timing out due to message being lost in travel
1414
DELIVER_CHANCE: float = 1.0 - LOSS_CHANCE # Each sent file as a 4% chance of timing out due to message being lost in travel

0 commit comments

Comments
 (0)