1
+ """Push one or more folders to one or more robots."""
1
2
import subprocess
2
3
import multiprocessing
3
4
import json
4
- import time
5
+
5
6
global folders
6
7
# Opentrons folders that can be pushed to robot
7
8
folders = [
8
- ' abr-testing' ,
9
- ' hardware-testing' ,
10
- ' all' ,
11
- ' other' ,
9
+ " abr-testing" ,
10
+ " hardware-testing" ,
11
+ " all" ,
12
+ " other" ,
12
13
]
13
14
14
15
15
-
16
- def push_subroutine (cmd : str ):
17
- """Pushes specified folder to specified robot"""
16
+ def push_subroutine (cmd : str ) -> None :
17
+ """Pushes specified folder to specified robot."""
18
18
try :
19
19
subprocess .run (cmd )
20
- except Exception as e :
21
- print (' failed to push folder' )
20
+ except Exception :
21
+ print (" failed to push folder" )
22
22
raise
23
-
23
+
24
+
24
25
def main (folder_to_push : str , robot_to_push : str ) -> int :
25
- """Main process"""
26
- cmd = ' make -C {folder} push-ot3 host={ip}'
26
+ """Main process! """
27
+ cmd = " make -C {folder} push-ot3 host={ip}"
27
28
robot_ip_path = ""
28
- push_cmd = ''
29
+ push_cmd = ""
29
30
folder_int = int (folder_to_push )
30
- if folders [folder_int ].lower () == ' all' :
31
- if robot_to_push .lower () == ' all' :
32
- robot_ip_path = input (' Path to robot ips: ' )
33
- with open (robot_ip_path , 'r' ) as ip_file :
31
+ if folders [folder_int ].lower () == " all" :
32
+ if robot_to_push .lower () == " all" :
33
+ robot_ip_path = input (" Path to robot ips: " )
34
+ with open (robot_ip_path , "r" ) as ip_file :
34
35
robot_json = json .load (ip_file )
35
- robot_ips = robot_json .get ('ip_address_list' )
36
+ robot_ips_dict = robot_json .get ("ip_address_list" )
37
+ robot_ips = list (robot_ips_dict .keys ())
36
38
ip_file .close ()
37
39
else :
38
40
robot_ips = [robot_to_push ]
39
41
for folder_name in folders [:- 2 ]:
40
- #Push all folders to all robots
42
+ # Push all folders to all robots
41
43
for robot in robot_ips :
42
- print_proc = multiprocessing .Process (target = print , args = (f"Pushing { folder_name } to { robot } !\n \n " ,))
44
+ print_proc = multiprocessing .Process (
45
+ target = print , args = (f"Pushing { folder_name } to { robot } !\n \n " ,)
46
+ )
43
47
print_proc .start ()
44
48
print_proc .join ()
45
49
push_cmd = cmd .format (folder = folder_name , ip = robot )
46
- process = multiprocessing .Process (target = push_subroutine , args = (push_cmd ,))
50
+ process = multiprocessing .Process (
51
+ target = push_subroutine , args = (push_cmd ,)
52
+ )
47
53
process .start ()
48
54
process .join ()
49
- print_proc = multiprocessing .Process (target = print , args = (f "Done!\n \n " ))
55
+ print_proc = multiprocessing .Process (target = print , args = ("Done!\n \n " ))
50
56
print_proc .start ()
51
57
print_proc .join ()
52
58
else :
53
-
54
- if folder_int == (len (folders )- 1 ):
55
- folder_name = input (' Which folder? ' )
59
+
60
+ if folder_int == (len (folders ) - 1 ):
61
+ folder_name = input (" Which folder? " )
56
62
else :
57
63
folder_name = folders [folder_int ]
58
- if robot_to_push .lower () == ' all' :
64
+ if robot_to_push .lower () == " all" :
59
65
robot_ip_path = input ("Path to robot ips: " )
60
- with open (robot_ip_path , 'r' ) as ip_file :
66
+ with open (robot_ip_path , "r" ) as ip_file :
61
67
robot_json = json .load (ip_file )
62
- robot_ips = robot_json .get (' ip_address_list' )
68
+ robot_ips = robot_json .get (" ip_address_list" )
63
69
ip_file .close ()
64
70
else :
65
71
robot_ips = [robot_to_push ]
66
72
67
73
# Push folder to robots
68
74
for robot in robot_ips :
69
- print_proc = multiprocessing .Process (target = print , args = (f"Pushing { folder_name } to { robot } !\n \n " ,))
75
+ print_proc = multiprocessing .Process (
76
+ target = print , args = (f"Pushing { folder_name } to { robot } !\n \n " ,)
77
+ )
70
78
print_proc .start ()
71
79
print_proc .join ()
72
80
push_cmd = cmd .format (folder = folder_name , ip = robot )
73
81
process = multiprocessing .Process (target = push_subroutine , args = (push_cmd ,))
74
82
process .start ()
75
83
process .join ()
76
- print_proc = multiprocessing .Process (target = print , args = (f "Done!\n \n " ,))
84
+ print_proc = multiprocessing .Process (target = print , args = ("Done!\n \n " ,))
77
85
print_proc .start ()
78
86
print_proc .join ()
79
- return (0 )
87
+ return 0
88
+
80
89
81
-
82
- if __name__ == '__main__' :
90
+ if __name__ == "__main__" :
83
91
for i , folder in enumerate (folders ):
84
92
print (f"{ i } ) { folder } " )
85
- folder_to_push = input (' Please Select a Folder to Push: ' )
93
+ folder_to_push = input (" Please Select a Folder to Push: " )
86
94
robot_to_push = input ("Type in robots ip (type all for all): " )
87
- print (main (folder_to_push , robot_to_push ))
95
+ print (main (folder_to_push , robot_to_push ))
0 commit comments