-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path6.1) decentralised_naive_iteration2.py
More file actions
166 lines (134 loc) · 7.7 KB
/
Copy path6.1) decentralised_naive_iteration2.py
File metadata and controls
166 lines (134 loc) · 7.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
import multiprocessing
import time
import json
import networkx as nx
import glob
import os
def find_root_causes(cluster_id, graph_data, metrics_resource_data_apps, cluster_msg_files, barrier):
"""Initial processing"""
# Create a directed graph
G = nx.DiGraph()
# Add nodes with anomaly information
instance_nodes = set()
for node in graph_data["nodes"]:
G.add_node(node["node"], anomalies=node["anomaly_status"], application=node["application"])
if node["application"] == "instance":
instance_nodes.add(node["node"])
# Add edges with anomaly propagation details
for edge in graph_data["edges"]:
G.add_edge(edge["source"], edge["target"], anomaly_status=edge["anomaly_status"], edge_type=edge["edge_type"])
# Identify anomalous nodes excluding instances
anomalous_nodes = {n: d["anomalies"] for n, d in G.nodes(data=True) if
d["anomalies"] and d["application"] != "instance"}
# Identify anomalous instances
anomalous_instances = {n for n, d in G.nodes(data=True) if d["anomalies"] and d["application"] == "instance"}
root_causes = {}
for node, anomalies in anomalous_nodes.items():
incoming_anomalous_nodes = [pred for pred in G.predecessors(node) if pred in anomalous_nodes]
incoming_anomalous_edges = [pred for pred in G.predecessors(node) if G[pred][node].get("anomaly_status")]
instance_boost = 0
# If this node is deployed on an anomalous instance, increase weight
for succ in G.successors(node):
if succ in anomalous_instances and G[node][succ].get("edge_type") == "deployment":
instance_boost += 1
for entry in graph_data["nodes"]:
if entry["node"] == node:
if entry["application"] in metrics_resource_data_apps:
instance_boost += 1
# Try to incorporate no.of incoming anomalous edges for RCL calculation. Currently it uses no.of outgoing anomalous edges.
root_causes[node] = {
"anomalies": anomalies,
"anomalous_incoming_nodes": len(incoming_anomalous_nodes),
"anomalous_incoming_edges": len(incoming_anomalous_edges),
"anomaly_count": len(anomalies),
"instance_boost": instance_boost
}
"""Write the messages to be exchanged across clusters to a shared memory location"""
# First prepare the messages. msg_pass should be a dictionary with keys of clusters except itself.
msg_pass = {} # e.g. for cluster_1, 2:{},3:{}
num_clusters = 3 # Pass num_clusters = 3 later
for i in range(1, num_clusters + 1):
if i == cluster_id:
continue
msg_pass[i] = {}
for node in graph_data["nodes"]:
if not node["cluster"] == cluster_id:
msg_pass[node["cluster"]][node["node"] + "_" + node["application"]] = {'anomalous_incoming_nodes': 0, 'anomalous_incoming_edges': 0}
boundary_incoming_anomalous_nodes = [pred for pred in G.predecessors(node["node"]) if pred in anomalous_nodes.keys()]
# print(boundary_incoming_anomalous_nodes)
msg_pass[node["cluster"]][node["node"] + "_" + node["application"]]['anomalous_incoming_nodes'] += len(boundary_incoming_anomalous_nodes)
boundary_incoming_anomalous_edges = [pred for pred in G.predecessors(node["node"]) if G[pred][node["node"]].get("anomaly_status")]
# print(boundary_incoming_anomalous_edges)
msg_pass[node["cluster"]][node["node"] + "_" + node["application"]]['anomalous_incoming_edges'] += len(boundary_incoming_anomalous_edges)
# print("=======")
# print("msg_pass", cluster_id, msg_pass)
# Extract messages from the msg_pass dictionary and create messages of the format <to cluster id>_<from cluster id>
for to_cluster in msg_pass.keys():
# filter the entries in msg_pass[to_cluster] and write to cluster_msg_files[str(to_cluster)+"_"+str(cluster_id)]
filtered = {
k: v for k, v in msg_pass[to_cluster].items()
if v.get('anomalous_incoming_nodes', 0) > 0 or v.get('anomalous_incoming_edges', 0) > 0
}
if filtered:
cluster_msg_files[str(to_cluster) + "_" + str(cluster_id)] = filtered
# Wait for all clusters to finish initial processing
barrier.wait()
"""Second processing with exchanged info"""
exchanged_data = {k: v for k, v in cluster_msg_files.items() if k.split("_")[0] == str(cluster_id)}
# print("Messages aimed at", cluster_id, exchanged_data)
# Adjust the anomaly scores
for l2key in exchanged_data.keys():
for msg_key in exchanged_data[l2key].keys():
for msg_entry in root_causes:
if msg_entry == msg_key.split("_")[0]:
root_causes[msg_entry]['anomalous_incoming_nodes'] += exchanged_data[l2key][msg_key]['anomalous_incoming_nodes']
root_causes[msg_entry]['anomalous_incoming_edges'] += exchanged_data[l2key][msg_key]['anomalous_incoming_edges']
# Write final decisions to shared dictionary
# Rank by number of anomalies and outgoing edges
sorted_roots = sorted(root_causes.items(), key=lambda x: (-x[1]["anomaly_count"], -x[1]["anomalous_incoming_nodes"], -x[1]["anomalous_incoming_edges"], -x[1]["instance_boost"]))
print("sorted_roots", cluster_id, sorted_roots)
if len(sorted_roots)>0:
cluster_msg_files[f"final_{cluster_id}"] = sorted_roots[0] #, end_time - start_time)
else:
cluster_msg_files[f"final_{cluster_id}"] = ()
def main():
l3_folder = 'final_preprocessed_data/sock_shop_chaos/catalogue-cloud/cloud_pod_catalogue-cloud-mdwvc_catalogue-cloud-mdwvc-57bdfd65c7-qn5ks_net_1'
# Identify the location for intermediate JSON files and localizations
save_location = l3_folder.replace('final_preprocessed_data', 'intermediate_jsons_visualizations')
# Find all matching files
files = glob.glob(os.path.join(save_location, "attribute_graph_cluster*.json"))
# Count the number of files
num_of_clusters = len(files)
processes = []
manager = multiprocessing.Manager()
cluster_msg_files = manager.dict() # Shared dictionary for storing decisions
barrier = multiprocessing.Barrier(num_of_clusters) # Ensures all clusters sync
cluster_data = {}
for cluster_id in range(1, num_of_clusters + 1):
# print(cluster_id)
# Load the anomalous graph data
with open(save_location + "/anomalous_subgraph_cluster_" + str(cluster_id) + ".json", "r") as f:
graph_data = json.load(f)
with open(save_location + "/metrics_resource_cluster_" + str(cluster_id) + ".json", "r") as h:
metrics_resource_data = json.load(h)
metrics_resource_data_apps = metrics_resource_data.keys()
cluster_data[cluster_id] = {'graph_data': graph_data, 'metrics_resource_data_apps': list(metrics_resource_data_apps)}
# Start cluster processes
for i in range(1, num_of_clusters + 1):
p = multiprocessing.Process(target=find_root_causes, args=(i, cluster_data[i]['graph_data'], cluster_data[i]['metrics_resource_data_apps'], cluster_msg_files, barrier))
processes.append(p)
p.start()
# Ensure all processes finish
for p in processes:
p.join()
# Collect results
all_entries = []
for i in range(1, num_of_clusters + 1):
final_decision = cluster_msg_files[f"final_{i}"]
if not final_decision == ():
all_entries.append(final_decision)
# Sort using the given criteria
sorted_all = sorted(all_entries, key=lambda x: (-x[1]["anomaly_count"], -x[1]["anomalous_incoming_nodes"], -x[1]["anomalous_incoming_edges"], -x[1]["instance_boost"]))
print("Potential Root Cause:", sorted_all)
if __name__ == "__main__":
main()