|
5 | 5 | saved in a local directory.
|
6 | 6 | """
|
7 | 7 | import csv
|
8 |
| -import datetime |
| 8 | +from datetime import datetime |
9 | 9 | import os
|
10 | 10 | from abr_testing.data_collection.error_levels import ERROR_LEVELS_PATH
|
11 | 11 | from typing import List, Dict, Any, Tuple, Set
|
|
14 | 14 | import requests
|
15 | 15 |
|
16 | 16 |
|
| 17 | +def command_time(command: Dict[str, str]) -> Tuple[float, float]: |
| 18 | + """Calculate total create and complete time per command.""" |
| 19 | + try: |
| 20 | + create_time = datetime.strptime( |
| 21 | + command.get("createdAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 22 | + ) |
| 23 | + start_time = datetime.strptime( |
| 24 | + command.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 25 | + ) |
| 26 | + complete_time = datetime.strptime( |
| 27 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 28 | + ) |
| 29 | + create_to_start = (start_time - create_time).total_seconds() |
| 30 | + start_to_complete = (complete_time - start_time).total_seconds() |
| 31 | + except ValueError: |
| 32 | + create_to_start = 0 |
| 33 | + start_to_complete = 0 |
| 34 | + return create_to_start, start_to_complete |
| 35 | + |
| 36 | + |
| 37 | +def hs_commands(file_results: Dict[str, Any]) -> Dict[str, float]: |
| 38 | + """Gets total latch engagements, homes, rotations and total on time (sec) for heater shaker.""" |
| 39 | + # TODO: modify for cases that have more than 1 heater shaker. |
| 40 | + commandData = file_results.get("commands", "") |
| 41 | + hs_latch_count: float = 0.0 |
| 42 | + hs_temp: float = 0.0 |
| 43 | + hs_home_count: float = 0.0 |
| 44 | + hs_speed: float = 0.0 |
| 45 | + hs_rotations: Dict[str, float] = dict() |
| 46 | + hs_temps: Dict[str, float] = dict() |
| 47 | + temp_time = None |
| 48 | + shake_time = None |
| 49 | + for command in commandData: |
| 50 | + commandType = command["commandType"] |
| 51 | + # Heatershaker |
| 52 | + # Latch count |
| 53 | + if ( |
| 54 | + commandType == "heaterShaker/closeLabwareLatch" |
| 55 | + or commandType == "heaterShaker/openLabwareLatch" |
| 56 | + ): |
| 57 | + hs_latch_count += 1 |
| 58 | + # Home count |
| 59 | + elif commandType == "heaterShaker/deactivateShaker": |
| 60 | + hs_home_count += 1 |
| 61 | + deactivate_time = datetime.strptime( |
| 62 | + command.get("startedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 63 | + ) |
| 64 | + if temp_time is not None and deactivate_time > temp_time: |
| 65 | + temp_duration = (deactivate_time - temp_time).total_seconds() |
| 66 | + hs_temps[hs_temp] = hs_temps.get(hs_temp, 0.0) + temp_duration |
| 67 | + if shake_time is not None and deactivate_time > shake_time: |
| 68 | + shake_duration = (deactivate_time - shake_time).total_seconds() |
| 69 | + hs_rotations[hs_speed] = hs_rotations.get(hs_speed, 0.0) + ( |
| 70 | + (hs_speed * shake_duration) / 60 |
| 71 | + ) |
| 72 | + # of Rotations |
| 73 | + elif commandType == "heaterShaker/setAndWaitForShakeSpeed": |
| 74 | + hs_speed = command["params"]["rpm"] |
| 75 | + shake_time = datetime.strptime( |
| 76 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 77 | + ) |
| 78 | + # On Time |
| 79 | + elif commandType == "heaterShaker/setTargetTemperature": |
| 80 | + # if heater shaker temp is not deactivated. |
| 81 | + hs_temp = command["params"]["celsius"] |
| 82 | + temp_time = datetime.strptime( |
| 83 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 84 | + ) |
| 85 | + |
| 86 | + hs_total_rotations = sum(hs_rotations.values()) |
| 87 | + hs_total_temp_time = sum(hs_temps.values()) |
| 88 | + hs_dict = { |
| 89 | + "Heatershaker # of Latch Engagements": hs_latch_count, |
| 90 | + "Heatershaker # of Homes": hs_home_count, |
| 91 | + "Heatershaker # of Rotations": hs_total_rotations, |
| 92 | + "Heatershaker Temp On Time (sec)": hs_total_temp_time, |
| 93 | + } |
| 94 | + return hs_dict |
| 95 | + |
| 96 | + |
| 97 | +def temperature_module_commands(file_results: Dict[str, Any]) -> Dict[str, float]: |
| 98 | + """Get # of temp changes and total temp on time for temperature module from run log.""" |
| 99 | + # TODO: modify for cases that have more than 1 temperature module. |
| 100 | + tm_temp_change = 0 |
| 101 | + tm_temps: Dict[str, float] = dict() |
| 102 | + temp_time = None |
| 103 | + deactivate_time = None |
| 104 | + commandData = file_results.get("commands", "") |
| 105 | + for command in commandData: |
| 106 | + commandType = command["commandType"] |
| 107 | + if commandType == "temperatureModule/setTargetTemperature": |
| 108 | + tm_temp = command["params"]["celsius"] |
| 109 | + tm_temp_change += 1 |
| 110 | + if commandType == "temperatureModule/waitForTemperature": |
| 111 | + temp_time = datetime.strptime( |
| 112 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 113 | + ) |
| 114 | + if commandType == "temperatureModule/deactivate": |
| 115 | + deactivate_time = datetime.strptime( |
| 116 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 117 | + ) |
| 118 | + if temp_time is not None and deactivate_time > temp_time: |
| 119 | + temp_duration = (deactivate_time - temp_time).total_seconds() |
| 120 | + tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration |
| 121 | + if temp_time is not None and deactivate_time is None: |
| 122 | + # If temperature module is not deactivated, protocol completedAt time stamp used. |
| 123 | + protocol_end = datetime.strptime( |
| 124 | + file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 125 | + ) |
| 126 | + temp_duration = (protocol_end - temp_time).total_seconds() |
| 127 | + tm_temps[tm_temp] = tm_temps.get(tm_temp, 0.0) + temp_duration |
| 128 | + tm_total_temp_time = sum(tm_temps.values()) |
| 129 | + tm_dict = { |
| 130 | + "Temp Module # of Temp Changes": tm_temp_change, |
| 131 | + "Temp Module Temp On Time (sec)": tm_total_temp_time, |
| 132 | + } |
| 133 | + return tm_dict |
| 134 | + |
| 135 | + |
| 136 | +def thermocycler_commands(file_results: Dict[str, Any]) -> Dict[str, float]: |
| 137 | + """Counts # of lid engagements, temp changes, and temp sustaining mins.""" |
| 138 | + # TODO: modify for cases that have more than 1 thermocycler. |
| 139 | + commandData = file_results.get("commands", "") |
| 140 | + lid_engagements: float = 0.0 |
| 141 | + block_temp_changes: float = 0.0 |
| 142 | + lid_temp_changes: float = 0.0 |
| 143 | + lid_temps: Dict[str, float] = dict() |
| 144 | + block_temps: Dict[str, float] = dict() |
| 145 | + lid_on_time = None |
| 146 | + lid_off_time = None |
| 147 | + block_on_time = None |
| 148 | + block_off_time = None |
| 149 | + for command in commandData: |
| 150 | + commandType = command["commandType"] |
| 151 | + if ( |
| 152 | + commandType == "thermocycler/openLid" |
| 153 | + or commandType == "thermocycler/closeLid" |
| 154 | + ): |
| 155 | + lid_engagements += 1 |
| 156 | + if commandType == "thermocycler/setTargetBlockTemperature": |
| 157 | + block_temp = command["params"]["celsius"] |
| 158 | + block_temp_changes += 1 |
| 159 | + block_on_time = datetime.strptime( |
| 160 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 161 | + ) |
| 162 | + if commandType == "thermocycler/setTargetLidTemperature": |
| 163 | + lid_temp_changes += 1 |
| 164 | + lid_temp = command["params"]["celsius"] |
| 165 | + lid_on_time = datetime.strptime( |
| 166 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 167 | + ) |
| 168 | + if commandType == "thermocycler/deactivateLid": |
| 169 | + lid_off_time = datetime.strptime( |
| 170 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 171 | + ) |
| 172 | + if lid_on_time is not None and lid_off_time > lid_on_time: |
| 173 | + lid_duration = (lid_off_time - lid_on_time).total_seconds() |
| 174 | + lid_temps[lid_temp] = lid_temps.get(lid_temp, 0.0) + lid_duration |
| 175 | + if commandType == "thermocycler/deactivateBlock": |
| 176 | + block_off_time = datetime.strptime( |
| 177 | + command.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 178 | + ) |
| 179 | + if block_on_time is not None and block_off_time > block_on_time: |
| 180 | + block_duration = (block_off_time - block_on_time).total_seconds() |
| 181 | + block_temps[block_temp] = ( |
| 182 | + block_temps.get(block_temp, 0.0) + block_duration |
| 183 | + ) |
| 184 | + if commandType == "thermocycler/runProfile": |
| 185 | + profile = command["params"]["profile"] |
| 186 | + total_changes = len(profile) |
| 187 | + block_temp_changes += total_changes |
| 188 | + for cycle in profile: |
| 189 | + block_temp = cycle["celsius"] |
| 190 | + block_time = cycle["holdSeconds"] |
| 191 | + block_temps[block_temp] = block_temps.get(block_temp, 0.0) + block_time |
| 192 | + if block_on_time is not None and block_off_time is None: |
| 193 | + # If thermocycler block not deactivated protocol completedAt time stamp used |
| 194 | + protocol_end = datetime.strptime( |
| 195 | + file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 196 | + ) |
| 197 | + temp_duration = (protocol_end - block_on_time).total_seconds() |
| 198 | + block_temps[block_temp] = block_temps.get(block_temp, 0.0) + temp_duration |
| 199 | + if lid_on_time is not None and lid_off_time is None: |
| 200 | + # If thermocycler lid not deactivated protocol completedAt time stamp used |
| 201 | + protocol_end = datetime.strptime( |
| 202 | + file_results.get("completedAt", ""), "%Y-%m-%dT%H:%M:%S.%f%z" |
| 203 | + ) |
| 204 | + temp_duration = (protocol_end - lid_on_time).total_seconds() |
| 205 | + lid_temps[lid_temp] = block_temps.get(lid_temp, 0.0) + temp_duration |
| 206 | + |
| 207 | + block_total_time = sum(block_temps.values()) |
| 208 | + lid_total_time = sum(lid_temps.values()) |
| 209 | + |
| 210 | + tc_dict = { |
| 211 | + "Thermocycler # of Lid Engagements": lid_engagements, |
| 212 | + "Thermocycler Block # of Temp Changes": block_temp_changes, |
| 213 | + "Thermocycler Block Temp On Time (sec)": block_total_time, |
| 214 | + "Thermocycler Lid # of Temp Changes": lid_temp_changes, |
| 215 | + "Thermocycler Lid Temp On Time (sec)": lid_total_time, |
| 216 | + } |
| 217 | + |
| 218 | + return tc_dict |
| 219 | + |
| 220 | + |
17 | 221 | def create_abr_data_sheet(
|
18 | 222 | storage_directory: str, file_name: str, headers: List[str]
|
19 | 223 | ) -> str:
|
@@ -112,7 +316,7 @@ def read_abr_data_sheet(
|
112 | 316 | runs_in_sheet.add(run_id)
|
113 | 317 | print(f"There are {str(len(runs_in_sheet))} runs documented in the ABR sheet.")
|
114 | 318 | # Read Google Sheet
|
115 |
| - google_sheet.check_token() |
| 319 | + google_sheet.token_check() |
116 | 320 | google_sheet.write_header(headers)
|
117 | 321 | google_sheet.update_row_index()
|
118 | 322 | return runs_in_sheet
|
@@ -189,7 +393,7 @@ def get_calibration_offsets(
|
189 | 393 | health_data = response.json()
|
190 | 394 | robot_name = health_data.get("name", "")
|
191 | 395 | api_version = health_data.get("api_version", "")
|
192 |
| - pull_date_timestamp = datetime.datetime.now() |
| 396 | + pull_date_timestamp = datetime.now() |
193 | 397 | date = pull_date_timestamp.date().isoformat()
|
194 | 398 | file_date = str(pull_date_timestamp).replace(":", "").split(".")[0]
|
195 | 399 | calibration["Robot"] = robot_name
|
@@ -219,5 +423,7 @@ def get_calibration_offsets(
|
219 | 423 | )
|
220 | 424 | deck: Dict[str, Any] = response.json()
|
221 | 425 | calibration["Deck"] = deck.get("deckCalibration", "")
|
222 |
| - saved_file_path = save_run_log_to_json(ip, calibration, storage_directory) |
| 426 | + save_name = ip + "_calibration.json" |
| 427 | + saved_file_path = os.path.join(storage_directory, save_name) |
| 428 | + json.dump(calibration, open(saved_file_path, mode="w")) |
223 | 429 | return saved_file_path, calibration
|
0 commit comments