10
10
import traceback
11
11
import concurrent .futures
12
12
import asyncio
13
- import pandas as pd
13
+ import aiohttp
14
+ from aiohttp import FormData
14
15
from collections import defaultdict
15
16
from typing import Dict , List , Tuple , Any
16
17
28
29
ExitFileReporter ,
29
30
LastTwoCheckpointsReporter ,
30
31
SequentialCheckpointReporter ,
31
- ProteinStructureReporter
32
-
33
32
)
34
33
from folding .utils .ops import (
35
34
check_if_directory_exists ,
@@ -59,14 +58,39 @@ def attach_files(
59
58
return synapse
60
59
61
60
61
+ async def upload_to_s3 (session : aiohttp .ClientSession , presigned_url : dict , file_path : str ) -> None :
62
+ """Asynchronously upload a file to S3 using presigned URL"""
63
+ try :
64
+ start_time = time .time ()
65
+ data = FormData ()
66
+ for key , value in presigned_url ["fields" ].items ():
67
+ data .add_field (key , value )
68
+
69
+ with open (file_path , "rb" ) as f :
70
+ data .add_field ("file" , f , filename = "trajectory.dcd" )
71
+
72
+ async with session .post (
73
+ presigned_url ["url" ],
74
+ data = data
75
+ ) as response :
76
+ if response .status != 204 :
77
+ logger .error (f"Failed to upload trajectory to s3: { await response .text ()} " )
78
+ except Exception as e :
79
+ logger .error (f"Error uploading to S3: { e } " )
80
+ get_tracebacks ()
81
+ finally :
82
+ end_time = time .time ()
83
+ logger .info (f"Upload finished in { end_time - start_time } seconds" )
84
+
85
+
62
86
def attach_files_to_synapse (
63
87
synapse : JobSubmissionSynapse ,
64
88
data_directory : str ,
65
89
state : str ,
66
90
seed : int ,
67
91
) -> JobSubmissionSynapse :
68
92
"""Load output files and attach them to synapse.md_output.
69
-
93
+
70
94
This function handles attaching simulation output files to the synapse object
71
95
for communication with the validator. It combines log files and attaches state files.
72
96
@@ -78,64 +102,55 @@ def attach_files_to_synapse(
78
102
79
103
Returns:
80
104
JobSubmissionSynapse: The synapse object with attached files in md_output
81
-
105
+
82
106
Note:
83
107
If state is 'finished', it will be treated as 'md_0_1' for file collection purposes.
84
108
"""
85
109
# Initialize empty md_output
86
110
synapse .md_output = {}
87
111
88
112
try :
113
+ # Start async S3 upload
114
+ trajectory_path = os .path .join (data_directory , "trajectory.dcd" )
115
+ if os .path .exists (trajectory_path ):
116
+ asyncio .create_task (upload_to_s3 (
117
+ session = aiohttp .ClientSession (),
118
+ presigned_url = synapse .presigned_url ,
119
+ file_path = trajectory_path
120
+ ))
89
121
# Normalize state for file collection
90
122
file_collection_state = "md_0_1" if state == "finished" else state
91
-
92
- # Get state files (excluding logs)
93
- state_files_pattern = os .path .join (data_directory , f"{ file_collection_state } *" )
94
- state_files = [f for f in glob .glob (state_files_pattern ) if not f .endswith ('.log' )]
95
-
96
- if not state_files :
97
- raise FileNotFoundError (f"No state files found for { state } in { data_directory } " )
98
123
124
+ # Get cpt files (excluding logs)
125
+ cpt_files_pattern = os .path .join (data_directory , f"{ file_collection_state } *.cpt" )
126
+ cpt_files = [
127
+ f for f in glob .glob (cpt_files_pattern )
128
+ ]
129
+
130
+ if not cpt_files :
131
+ raise FileNotFoundError (
132
+ f"No cpt files found for { state } in { data_directory } "
133
+ )
134
+
135
+ files_to_attach = cpt_files .copy ()
99
136
# Get log files if they exist
100
- log_files = sorted (glob .glob (os .path .join (data_directory , "*.log" )))
101
- files_to_attach = state_files .copy ()
102
-
103
- if log_files :
104
- # Read and combine log files using pandas
105
- dfs = []
106
-
107
- for log_file in log_files :
108
- try :
109
- # Try reading the file with pandas
110
- df = pd .read_csv (log_file )
111
- if not df .empty :
112
- dfs .append (df )
113
- except Exception as e :
114
- logger .warning (f"Could not read log file { log_file } : { e } " )
115
- continue
116
-
117
- if dfs :
118
- # Combine all dataframes and sort by step
119
- combined_df = pd .concat (dfs , ignore_index = True )
120
- combined_df = combined_df .sort_values ('#"Step"' )
121
-
122
- # Write combined log file
123
- combined_log_path = os .path .join (data_directory , f"{ state } _combined.log" )
124
- try :
125
- combined_df .to_csv (combined_log_path , index = False )
126
- files_to_attach .append (combined_log_path )
127
- except Exception as e :
128
- logger .error (f"Failed to write combined log file: { e } " )
129
-
130
- # Attach all files
131
- for filename in files_to_attach :
132
- try :
133
- with open (filename , "rb" ) as f :
134
- base_filename = os .path .basename (filename )
135
- synapse .md_output [base_filename ] = base64 .b64encode (f .read ())
136
- except Exception as e :
137
- logger .error (f"Failed to read file { filename !r} : { e } " )
138
- get_tracebacks ()
137
+ log_file = os .path .join (data_directory , "simulation.log" )
138
+
139
+ if os .path .exists (log_file ):
140
+ files_to_attach .append (log_file )
141
+
142
+
143
+ # Attach all files
144
+ for filename in files_to_attach :
145
+ try :
146
+ with open (filename , "rb" ) as f :
147
+ base_filename = os .path .basename (filename )
148
+ synapse .md_output [base_filename ] = base64 .b64encode (
149
+ f .read ()
150
+ )
151
+ except Exception as e :
152
+ logger .error (f"Failed to read file { filename !r} : { e } " )
153
+ get_tracebacks ()
139
154
else :
140
155
# Just attach state files if no logs exist
141
156
for filename in files_to_attach :
@@ -146,13 +161,6 @@ def attach_files_to_synapse(
146
161
except Exception as e :
147
162
logger .error (f"Failed to read file { filename !r} : { e } " )
148
163
get_tracebacks ()
149
-
150
- try :
151
- # Clean up combined log file
152
- if os .path .exists (combined_log_path ):
153
- os .remove (combined_log_path )
154
- except Exception as e :
155
- logger .warning (f"Failed to remove temporary combined log: { e } " )
156
164
157
165
# Set synapse metadata
158
166
synapse .miner_seed = seed
@@ -167,6 +175,7 @@ def attach_files_to_synapse(
167
175
168
176
return synapse
169
177
178
+
170
179
def check_synapse (
171
180
synapse : JobSubmissionSynapse , event : Dict = None
172
181
) -> JobSubmissionSynapse :
@@ -488,9 +497,7 @@ def intermediate_submission_forward(self, synapse: IntermediateSubmissionSynapse
488
497
output_dir = event ["output_dir" ]
489
498
cpt_files = {}
490
499
for checkpoint_number in checkpoint_numbers :
491
- cpt_file = os .path .join (
492
- output_dir , f"{ checkpoint_number } .cpt"
493
- )
500
+ cpt_file = os .path .join (output_dir , f"{ checkpoint_number } .cpt" )
494
501
if os .path .exists (cpt_file ):
495
502
with open (cpt_file , "rb" ) as f :
496
503
cpt_files [checkpoint_number ] = base64 .b64encode (f .read ())
@@ -524,6 +531,7 @@ def forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse:
524
531
new jobs to avoid duplicate work. State files are only attached if valid
525
532
simulation data is found.
526
533
"""
534
+ start_time = time .time ()
527
535
job_id = synapse .job_id
528
536
529
537
has_worked_on_job , condition , event = self .check_if_job_was_worked_on (
@@ -576,18 +584,20 @@ def forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse:
576
584
state = state ,
577
585
seed = seed ,
578
586
)
587
+ event ["condition" ] = "found_existing_data"
588
+ event ["state" ] = state
589
+ logger .info (f"Time taken for forward function: { time .time () - start_time } seconds" )
590
+ return check_synapse (synapse = synapse , event = event )
591
+
579
592
except Exception as e :
580
593
logger .error (
581
594
f"Failed to read state file for protein { event ['pdb_id' ]} with error: { e } "
582
595
)
583
596
state = None
584
-
585
- finally :
586
- event ["condition" ] = "found_existing_data"
587
- event ["state" ] = state
588
-
597
+ logger .info (f"Time taken for forward function: { time .time () - start_time } seconds" )
589
598
return check_synapse (synapse = synapse , event = event )
590
599
600
+
591
601
# The set of RUNNING simulations.
592
602
elif condition == "running_simulation" :
593
603
self .simulations [event ["pdb_hash" ]]["queried_at" ] = time .time ()
@@ -605,6 +615,7 @@ def forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse:
605
615
event ["condition" ] = "running_simulation"
606
616
event ["state" ] = current_executor_state
607
617
event ["queried_at" ] = simulation ["queried_at" ]
618
+ logger .info (f"Time taken for forward function: { time .time () - start_time } seconds" )
608
619
609
620
return check_synapse (synapse = synapse , event = event )
610
621
@@ -911,6 +922,7 @@ def __init__(
911
922
912
923
self .STATES = ["nvt" , "npt" , "md_0_1" ]
913
924
self .CHECKPOINT_INTERVAL = 10000
925
+ self .TRAJECTORY_INTERVAL = 10000
914
926
self .STATE_DATA_REPORTER_INTERVAL = 10
915
927
self .EXIT_REPORTER_INTERVAL = 10
916
928
@@ -1072,17 +1084,50 @@ def configure_commands(
1072
1084
)
1073
1085
)
1074
1086
1075
- simulation .reporters .append (
1076
- ProteinStructureReporter (
1077
- file = f"{ self .output_dir } /{ state } .log" ,
1078
- reportInterval = self .STATE_DATA_REPORTER_INTERVAL ,
1079
- step = True ,
1080
- potentialEnergy = True ,
1081
- reference_pdb = os .path .join (self .output_dir , f"{ self .pdb_id } .pdb" ),
1082
- speed = True ,
1083
-
1087
+ if state == "nvt" :
1088
+ simulation .reporters .append (
1089
+ app .DCDReporter (
1090
+ file = os .path .join (self .output_dir , "trajectory.dcd" ),
1091
+ reportInterval = self .TRAJECTORY_INTERVAL ,
1092
+ )
1093
+ )
1094
+ simulation .reporters .append (
1095
+ app .StateDataReporter (
1096
+ file = os .path .join (self .output_dir , f"simulation.log" ),
1097
+ reportInterval = self .STATE_DATA_REPORTER_INTERVAL ,
1098
+ step = True ,
1099
+ potentialEnergy = True ,
1100
+ time = True ,
1101
+ temperature = True ,
1102
+ volume = True ,
1103
+ density = True ,
1104
+ speed = True ,
1105
+ elapsedTime = True ,
1106
+ )
1107
+ )
1108
+ else :
1109
+ simulation .reporters .append (
1110
+ app .StateDataReporter (
1111
+ file = os .path .join (self .output_dir , f"simulation.log" ),
1112
+ reportInterval = self .STATE_DATA_REPORTER_INTERVAL ,
1113
+ step = True ,
1114
+ potentialEnergy = True ,
1115
+ time = True ,
1116
+ temperature = True ,
1117
+ volume = True ,
1118
+ density = True ,
1119
+ speed = True ,
1120
+ elapsedTime = True ,
1121
+ append = True ,
1122
+ )
1123
+ )
1124
+ simulation .reporters .append (
1125
+ app .DCDReporter (
1126
+ file = os .path .join (self .output_dir , "trajectory.dcd" ),
1127
+ reportInterval = self .TRAJECTORY_INTERVAL ,
1128
+ append = True ,
1129
+ )
1084
1130
)
1085
- )
1086
1131
state_commands [state ] = simulation
1087
1132
1088
1133
return state_commands
0 commit comments