Skip to content

Commit 169fc56

Browse files
authored
Merge pull request #417 from macrocosm-os/staging
Staging
2 parents 149fb0c + f6e2d87 commit 169fc56

14 files changed

+693
-219
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -193,3 +193,4 @@ tests/mock_data
193193
db
194194
local-gjp
195195
*.ipynb
196+
miner-data

folding/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
__version__ = "2.3.5"
1+
__version__ = "2.4.0"
22
version_split = __version__.split(".")
33
__spec_version__ = (
44
(10000 * int(version_split[0]))

folding/base/miner.py

+11
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import bittensor as bt
66

77
from folding.base.neuron import BaseNeuron
8+
from folding.protocol import IntermediateSubmissionSynapse
89
from folding.utils.config import add_miner_args
910
from folding.utils.logger import logger
1011

@@ -44,6 +45,8 @@ def __init__(self, config=None):
4445
forward_fn=self.forward,
4546
blacklist_fn=self.blacklist,
4647
priority_fn=self.priority,
48+
).attach(
49+
forward_fn=self.intermediate_submission_forward,
4750
)
4851
logger.info(f"Axon created: {self.axon}")
4952

@@ -53,6 +56,14 @@ def __init__(self, config=None):
5356
self.thread: threading.Thread = None
5457
self.lock = asyncio.Lock()
5558

59+
def intermediate_submission_forward(self, synapse: IntermediateSubmissionSynapse):
60+
"""Respond to the validator with the necessary information about submitting intermediate checkpoints.
61+
62+
Args:
63+
self (IntermediateSubmissionSynapse): must attach "cpt_files"
64+
"""
65+
pass
66+
5667
def run(self):
5768
pass
5869

folding/miners/folding_miner.py

+157-37
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import traceback
1111
import concurrent.futures
1212
import asyncio
13+
import pandas as pd
1314
from collections import defaultdict
1415
from typing import Dict, List, Tuple, Any
1516

@@ -19,11 +20,16 @@
1920
# import base miner class which takes care of most of the boilerplate
2021
from folding.base.miner import BaseMinerNeuron
2122
from folding.base.simulation import OpenMMSimulation
22-
from folding.protocol import JobSubmissionSynapse
23+
from folding.protocol import (
24+
JobSubmissionSynapse,
25+
IntermediateSubmissionSynapse,
26+
)
2327
from folding.utils.reporters import (
2428
ExitFileReporter,
2529
LastTwoCheckpointsReporter,
26-
ProteinStructureReporter,
30+
SequentialCheckpointReporter,
31+
ProteinStructureReporter
32+
2733
)
2834
from folding.utils.ops import (
2935
check_if_directory_exists,
@@ -59,49 +65,107 @@ def attach_files_to_synapse(
5965
state: str,
6066
seed: int,
6167
) -> JobSubmissionSynapse:
62-
"""load the output files as bytes and add to synapse.md_output
68+
"""Load output files and attach them to synapse.md_output.
69+
70+
This function handles attaching simulation output files to the synapse object
71+
for communication with the validator. It combines log files and attaches state files.
6372
6473
Args:
65-
synapse (JobSubmissionSynapse): Recently received synapse object
66-
data_directory (str): directory where the miner is holding the necessary data for the validator.
67-
state (str): the current state of the simulation
68-
69-
state is either:
70-
1. nvt
71-
2. npt
72-
3. md_0_1
73-
4. finished
74+
synapse (JobSubmissionSynapse): The synapse object to attach files to
75+
data_directory (str): Directory containing simulation data files
76+
state (str): Current simulation state ('nvt', 'npt', 'md_0_1', or 'finished')
77+
seed (int): Random seed used for the simulation
7478
7579
Returns:
76-
JobSubmissionSynapse: synapse with md_output attached
80+
JobSubmissionSynapse: The synapse object with attached files in md_output
81+
82+
Note:
83+
If state is 'finished', it will be treated as 'md_0_1' for file collection purposes.
7784
"""
78-
79-
synapse.md_output = {} # ensure that the initial state is empty
85+
# Initialize empty md_output
86+
synapse.md_output = {}
8087

8188
try:
82-
state_files = os.path.join(data_directory, f"{state}")
83-
84-
# This should be "state.cpt" and "state_old.cpt"
85-
all_state_files = glob.glob(f"{state_files}*") # Grab all the state_files
86-
87-
if len(all_state_files) == 0:
88-
raise FileNotFoundError(
89-
f"No files found for {state}"
90-
) # if this happens, goes to except block
91-
92-
synapse = attach_files(files_to_attach=all_state_files, synapse=synapse)
89+
# Normalize state for file collection
90+
file_collection_state = "md_0_1" if state == "finished" else state
91+
92+
# Get state files (excluding logs)
93+
state_files_pattern = os.path.join(data_directory, f"{file_collection_state}*")
94+
state_files = [f for f in glob.glob(state_files_pattern) if not f.endswith('.log')]
95+
96+
if not state_files:
97+
raise FileNotFoundError(f"No state files found for {state} in {data_directory}")
98+
99+
# Get log files if they exist
100+
log_files = sorted(glob.glob(os.path.join(data_directory, "*.log")))
101+
files_to_attach = state_files.copy()
102+
103+
if log_files:
104+
# Read and combine log files using pandas
105+
dfs = []
106+
107+
for log_file in log_files:
108+
try:
109+
# Try reading the file with pandas
110+
df = pd.read_csv(log_file)
111+
if not df.empty:
112+
dfs.append(df)
113+
except Exception as e:
114+
logger.warning(f"Could not read log file {log_file}: {e}")
115+
continue
116+
117+
if dfs:
118+
# Combine all dataframes and sort by step
119+
combined_df = pd.concat(dfs, ignore_index=True)
120+
combined_df = combined_df.sort_values('#"Step"')
121+
122+
# Write combined log file
123+
combined_log_path = os.path.join(data_directory, f"{state}_combined.log")
124+
try:
125+
combined_df.to_csv(combined_log_path, index=False)
126+
files_to_attach.append(combined_log_path)
127+
except Exception as e:
128+
logger.error(f"Failed to write combined log file: {e}")
129+
130+
# Attach all files
131+
for filename in files_to_attach:
132+
try:
133+
with open(filename, "rb") as f:
134+
base_filename = os.path.basename(filename)
135+
synapse.md_output[base_filename] = base64.b64encode(f.read())
136+
except Exception as e:
137+
logger.error(f"Failed to read file {filename!r}: {e}")
138+
get_tracebacks()
139+
else:
140+
# Just attach state files if no logs exist
141+
for filename in files_to_attach:
142+
try:
143+
with open(filename, "rb") as f:
144+
base_filename = os.path.basename(filename)
145+
synapse.md_output[base_filename] = base64.b64encode(f.read())
146+
except Exception as e:
147+
logger.error(f"Failed to read file {filename!r}: {e}")
148+
get_tracebacks()
149+
150+
try:
151+
# Clean up combined log file
152+
if os.path.exists(combined_log_path):
153+
os.remove(combined_log_path)
154+
except Exception as e:
155+
logger.warning(f"Failed to remove temporary combined log: {e}")
93156

157+
# Set synapse metadata
94158
synapse.miner_seed = seed
95159
synapse.miner_state = state
96160

97161
except Exception as e:
98-
logger.error(f"Failed to attach files for pdb {synapse.pdb_id} with error: {e}")
162+
logger.error(f"Failed to attach files with error: {e}")
99163
get_tracebacks()
100164
synapse.md_output = {}
165+
synapse.miner_seed = None
166+
synapse.miner_state = None
101167

102-
finally:
103-
return synapse # either return the synapse wth the md_output attached or the synapse as is.
104-
168+
return synapse
105169

106170
def check_synapse(
107171
synapse: JobSubmissionSynapse, event: Dict = None
@@ -381,18 +445,17 @@ def check_if_job_was_worked_on(self, job_id: str) -> tuple[bool, str, dict]:
381445

382446
pdb_hash = self.get_simulation_hash(pdb_id=pdb_id, system_config=gjp_config)
383447
event["pdb_hash"] = pdb_hash
384-
385-
if pdb_hash in self.simulations:
386-
return True, "running_simulation", event
387-
388448
# If you don't have in the list of simulations, check your local storage for the data.
389449
output_dir = os.path.join(self.base_data_path, pdb_id, pdb_hash)
390450
gjp_config_filepath = os.path.join(output_dir, f"config_{pdb_id}.pkl")
391451
event["output_dir"] = output_dir
392452
event["gjp_config_filepath"] = gjp_config_filepath
393453

394-
# check if any of the simulations have finished
395454
event = self.check_and_remove_simulations(event=event)
455+
if pdb_hash in self.simulations:
456+
return True, "running_simulation", event
457+
458+
# check if any of the simulations have finished
396459

397460
submitted_job_is_unique = self.is_unique_job(
398461
system_config_filepath=gjp_config_filepath
@@ -403,6 +466,39 @@ def check_if_job_was_worked_on(self, job_id: str) -> tuple[bool, str, dict]:
403466

404467
return False, "job_not_worked_on", event
405468

469+
def intermediate_submission_forward(self, synapse: IntermediateSubmissionSynapse):
470+
"""Respond to the validator with the necessary information about submitting intermediate checkpoints.
471+
472+
Args:
473+
self (IntermediateSubmissionSynapse): must attach "cpt_files"
474+
"""
475+
logger.info(f"Intermediate submission forward for job: {synapse.job_id}")
476+
job_id = synapse.job_id
477+
pdb_id = synapse.pdb_id
478+
checkpoint_numbers = synapse.checkpoint_numbers
479+
has_worked_on_job, condition, event = self.check_if_job_was_worked_on(
480+
job_id=job_id
481+
)
482+
483+
if not has_worked_on_job:
484+
logger.warning(f"Job ID {job_id} not found in the database.")
485+
return synapse
486+
487+
# Check if the checkpoint files exist in the output directory
488+
output_dir = event["output_dir"]
489+
cpt_files = {}
490+
for checkpoint_number in checkpoint_numbers:
491+
cpt_file = os.path.join(
492+
output_dir, f"{checkpoint_number}.cpt"
493+
)
494+
if os.path.exists(cpt_file):
495+
with open(cpt_file, "rb") as f:
496+
cpt_files[checkpoint_number] = base64.b64encode(f.read())
497+
498+
synapse.cpt_files = cpt_files
499+
500+
return synapse
501+
406502
def forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse:
407503
"""Process an incoming job submission request and return appropriate simulation data.
408504
@@ -864,7 +960,7 @@ def run(
864960

865961
simulation.loadCheckpoint(self.cpt_file_mapper[state])
866962
simulation.step(self.simulation_steps[state])
867-
963+
simulation.saveCheckpoint(f"{self.output_dir}/{state}.cpt")
868964
# TODO: Add a Mock pipeline for the new OpenMM simulation here.
869965

870966
logger.success(f"✅ Finished simulation for protein: {self.pdb_id} ✅")
@@ -933,7 +1029,7 @@ def configure_commands(
9331029
) -> Dict[str, List[str]]:
9341030
state_commands = {}
9351031

936-
for state in self.STATES:
1032+
for state, simulation_steps in self.simulation_steps.items():
9371033
simulation, _ = OpenMMSimulation().create_simulation(
9381034
pdb=self.pdb_obj,
9391035
system_config=system_config.get_config(),
@@ -953,6 +1049,29 @@ def configure_commands(
9531049
file_prefix=state,
9541050
)
9551051
)
1052+
1053+
# Calculate the starting checkpoint counter based on previous states
1054+
starting_counter = 0
1055+
if state in self.simulation_steps:
1056+
state_index = list(self.simulation_steps.keys()).index(state)
1057+
previous_states = list(self.simulation_steps.keys())[:state_index]
1058+
1059+
# Sum the number of checkpoints for all previous states
1060+
for prev_state in previous_states:
1061+
# Calculate how many checkpoints were created in the previous state
1062+
prev_checkpoints = int(
1063+
self.simulation_steps[prev_state] / self.CHECKPOINT_INTERVAL
1064+
)
1065+
starting_counter += prev_checkpoints
1066+
1067+
simulation.reporters.append(
1068+
SequentialCheckpointReporter(
1069+
file_prefix=f"{self.output_dir}/",
1070+
reportInterval=self.CHECKPOINT_INTERVAL,
1071+
checkpoint_counter=starting_counter,
1072+
)
1073+
)
1074+
9561075
simulation.reporters.append(
9571076
ProteinStructureReporter(
9581077
file=f"{self.output_dir}/{state}.log",
@@ -961,6 +1080,7 @@ def configure_commands(
9611080
potentialEnergy=True,
9621081
reference_pdb=os.path.join(self.output_dir, f"{self.pdb_id}.pdb"),
9631082
speed=True,
1083+
9641084
)
9651085
)
9661086
state_commands[state] = simulation

0 commit comments

Comments
 (0)