-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathnode_functions.py
More file actions
299 lines (237 loc) · 12.2 KB
/
node_functions.py
File metadata and controls
299 lines (237 loc) · 12.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
import sys
import os
sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from classes.Nodes import Nodes
def generate_nodes(node_number, nodeType, default_height = 100):
node_list = []
node_index = 0
for i in range(node_number):
if (nodeType == 0):
node_list.append(Nodes((0,0,0), "GU", 0, node_index))
elif (nodeType == 1):
node_list.append(Nodes((0,0,default_height), "UAV", 0, node_index))
elif (nodeType ==2):
node_list.append(Nodes((0,0,default_height), "ABS", 0, node_index))
else:
TypeError()
break
node_index += 1
return node_list
def print_node(all_nodes, node_number = -1, onlyPosition = False):
for node in all_nodes:
if node_number == -1:
if onlyPosition == True:
print(node.position)
else:
print(node)
else:
if node.node_number == node_number:
if onlyPosition == True:
print(node.position)
else:
print(node)
def print_node_number(node):
print(node.node_number)
def get_nodes_position(all_nodes):
positions = []
for node in all_nodes:
positions.append(node.position)
return positions
import json
import random
import math
import numpy as np
def move_ground_users(ground_users, blocks, xLength, yLength, max_movement_distance, soft_margin=-1):
if soft_margin == -1: soft_margin = min(xLength, yLength)/100
for gu in ground_users:
moved = False
while not moved:
angle = random.uniform(0, 2 * math.pi)
distance = random.uniform(0, max_movement_distance)
dx = distance * math.cos(angle)
dy = distance * math.sin(angle)
current_position = gu.position
new_x = current_position[0] + dx
new_y = current_position[1] + dy
if 0 <= new_x <= xLength and 0 <= new_y <= yLength:
collision = False
for block in blocks:
bx, by, _ = block['bottomCorner']
bw, bh = block['size']
if (bx - soft_margin <= new_x <= bx + bw + soft_margin and
by - soft_margin <= new_y <= by + bh + soft_margin):
collision = True
break
if not collision:
gu.set_position((new_x, new_y, current_position[2]))
moved = True
import matplotlib.pyplot as plt
import matplotlib.patches as patches
def simulate_and_visualize_movements(n, ground_users, blocks, xLength, yLength, max_movement_distance):
# Create a color map with a unique color for each time step
colors = plt.cm.viridis(np.linspace(0, 1, n))
# Initialize the plot
fig, ax = plt.subplots()
ax.set_xlim(0, xLength)
ax.set_ylim(0, yLength)
# Draw the blocks on the plot
for block in blocks:
bx, by, _ = block['bottomCorner']
bw, bh = block['size']
block_rect = patches.Rectangle((bx, by), bw, bh, linewidth=1, edgecolor='r', facecolor='none')
ax.add_patch(block_rect)
# For each time step, draw the movement of all users
for time_step in range(n):
# Initialize the starting positions for the first time step
if time_step == 0:
for gu in ground_users:
gu.start_pos = gu.position
# Move the users
move_ground_users(ground_users, blocks, xLength, yLength, max_movement_distance)
# Draw the movement paths
for gu in ground_users:
# Update the start position for subsequent time steps
if time_step != 0:
gu.start_pos = gu.end_pos
# Record the new position
gu.end_pos = gu.position
# Draw an arrow from the start to the new position
ax.arrow(gu.start_pos[0], gu.start_pos[1], gu.end_pos[0] - gu.start_pos[0], gu.end_pos[1] - gu.start_pos[1],
head_width=0.5, head_length=1, fc=colors[time_step], ec=colors[time_step])
# Add a legend entry for this time step
ax.plot([], [], color=colors[time_step], label=f'Time Step: {time_step + 1}')
# Add the legend to the plot
ax.legend(loc='upper left')
# Display the final visualization
plt.show()
from functions.path_is_blocked import path_is_blocked
from functions.calculate_data_rate import calculate_data_rate
def get_gu_to_uav_connections(ground_users, UAV_nodes, UAVInfo, blocks, backhaul_connection = None):
gu_to_uav = {}
gu_to_bs_capacity = {}
if backhaul_connection:
for uav_index, uav in enumerate(UAV_nodes):
if uav_index in backhaul_connection.allPaths:
max_backhaul_dr = -1
for connection in backhaul_connection.allPaths[uav_index]:
path = connection['path']
dr = connection['DR']
if dr > max_backhaul_dr:
max_backhaul_dr = dr
# for relay_node in path:
# if relay_node == uav_index: continue
# if relay_node < len(UAV_nodes) and not relay_node in UAV_nodes[uav_index].connected_nodes:
# UAV_nodes[uav_index].add_connection(relay_node)
if max_backhaul_dr > 0:
UAV_nodes[uav_index].set_DR(max_backhaul_dr)
for gu_index, user in enumerate(ground_users):
max_dr = -1
best_uav = None
for uav_index, uav in enumerate(UAV_nodes):
blocked = path_is_blocked(blocks, uav, user)
dr = calculate_data_rate(UAVInfo, uav.position, user.position, blocked)
if dr > max_dr:
max_dr = dr
best_uav = uav_index
gu_to_uav[gu_index] = [best_uav]
ground_users[gu_index].set_connection(best_uav)
ground_users[gu_index].set_DR(max_dr)
gu_to_bs_capacity[gu_index] = min(max_dr, UAV_nodes[user.connected_nodes[0]].data_rate[0])
return gu_to_uav, gu_to_bs_capacity
def move_gu_and_update_connections(ground_users, blocks, x_length, y_length, max_movement_distance, UAV_nodes, UAVInfo, backhaul_connection):
move_ground_users(ground_users, blocks, x_length, y_length, max_movement_distance)
gu_to_uav_connections, gu_to_bs_capacity = get_gu_to_uav_connections(ground_users, UAV_nodes, UAVInfo, blocks, backhaul_connection)
return gu_to_uav_connections, gu_to_bs_capacity
def add_gu_to_simulation(ground_user, number_of_added_gu):
for i in range(number_of_added_gu):
ground_user.append(Nodes((0,0,0), "GU", 0, len(ground_user)))
return ground_user
def add_or_remove_gu(ground_user):
while True:
# Randomly select an operation: 2 means add 2 GUs, 1 means add 1 GU, 0 means no change,
# -1 means remove 1 GU, and -2 means remove 2 GUs
operation = random.choice([2, 1, 0, -1, -2])
# Print the selected operation and perform the corresponding addition or removal
if operation > 0:
print(f"Selected to add {operation} GU(s)")
for _ in range(operation): # Add the specified number of GUs
ground_user.append(Nodes((0,0,0), "GU", 0, len(ground_user)))
break # Operation is valid, exit the loop
elif operation == 0:
print("Selected to make no changes")
break # Operation is valid, exit the loop
elif operation < 0:
# Check if there are enough GUs to remove
if len(ground_user) >= abs(operation):
print(f"Selected to remove {abs(operation)} GU(s)")
for _ in range(abs(operation)): # Remove the specified number of GUs
ground_user.pop()
break # Operation is valid, exit the loop
else:
print("Not enough GUs to remove; reselecting operation.")
# Continue the loop to select a new operation
return ground_user
def set_baseline_backhaul(baseline_UAV_nodes, baseline_UAV_positions, baseline_UAV_connections, baseline_BS_nodes, baseStation, baseline_BS_connections):
for uav_index, uav_nodes in enumerate(baseline_UAV_nodes):
uav_nodes.set_position(baseline_UAV_positions[uav_index]) if baseline_UAV_positions and baseline_UAV_positions[uav_index] else None
uav_nodes.set_connection(baseline_UAV_connections[uav_index]) if baseline_UAV_connections and baseline_UAV_connections[uav_index] else None
for i in range(len(baseline_BS_nodes)):
baseline_BS_nodes[i].set_position((baseStation[i]['bottomCorner'][0], baseStation[i]['bottomCorner'][1], baseStation[i]['height'][0]))
baseline_BS_nodes[i].set_connection(baseline_BS_connections[i])
# def get_ground_user_positions_for_a_time(gu_position_information, current_time):
# ground_users_count = len(gu_position_information.iloc[current_time])
# ground_users = generate_nodes(ground_users_count, 0)
# for i in range(ground_users_count):
# value = gu_position_information.iloc[current_time, i]
# try:
# position = tuple(map(float, value.strip("()").split(',')))
# ground_users[i].set_position(position)
# except ValueError:
# print(f"Error: Invalid position string {value}")
# return ground_users
# def get_ground_user_positions_for_a_time(gu_position_information, current_time):
# ground_users_count = len(gu_position_information.iloc[current_time])
# ground_users = generate_nodes(ground_users_count, 0)
# print(ground_users_count)
# for i in range(ground_users_count):
# value = gu_position_information.iloc[current_time, i]
# if value is None or (isinstance(value, float) and math.isnan(value)):
# # 如果是 NaN 或 None,设置默认位置
# position = (0.0, 0.0, 0.0)
# print(f"Warning: NaN or None value encountered at index {i}, setting default position {position}")
# else:
# try:
# # 处理正常字符串形式的值
# position = tuple(map(float, value.strip("()").split(',')))
# except (ValueError, AttributeError) as e:
# # 捕获无法解析的情况
# print(f"Error: Invalid position string {value} at index {i}. Setting default position.")
# position = (0.0, 0.0, 0.0)
# ground_users[i].set_position(position)
# return ground_users
import math
def get_ground_user_positions_for_a_time(gu_position_information, current_time):
# 获取当前行中非 NaN 的有效值数
ground_users_count = gu_position_information.iloc[current_time].dropna().shape[0]
ground_users = generate_nodes(ground_users_count, 0)
valid_positions = gu_position_information.iloc[current_time].dropna()
for i, value in enumerate(valid_positions):
if value is None or (isinstance(value, float) and math.isnan(value)):
# 如果是 NaN 或 None,设置默认位置
position = (0.0, 0.0, 0.0)
print(f"Warning: NaN or None value encountered at index {i}, setting default position {position}")
else:
try:
# 处理正常字符串形式的值
position = tuple(map(float, value.strip("()").split(',')))
except (ValueError, AttributeError) as e:
# 捕获无法解析的情况
print(f"Error: Invalid position string {value} at index {i}. Setting default position.")
position = (0.0, 0.0, 0.0)
ground_users[i].set_position(position)
return ground_users
def get_gu_info_and_update_connections(gu_position_information, current_time, blocks, UAV_nodes, UAVInfo, backhaul_connection):
ground_users = get_ground_user_positions_for_a_time(gu_position_information, current_time)
# print_node(ground_users, -1, True)
gu_to_uav_connections, gu_to_bs_capacity = get_gu_to_uav_connections(ground_users, UAV_nodes, UAVInfo, blocks, backhaul_connection)
return ground_users, gu_to_uav_connections, gu_to_bs_capacity