Skip to content

DFT Challenge #436

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: staging
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions folding/miners/dft_miner.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
import os
import time
import glob
import copy
import json
import base64
import random
import hashlib
import requests
import traceback
import concurrent.futures
import asyncio

import pandas as pd
from collections import defaultdict
from typing import Dict, List, Tuple, Any

from folding.miners.folding_miner import FoldingMiner

import psi4


class DFTMiner(FoldingMiner):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably make it inherit from the base miner no?

def __init__(self, config=None):
super().__init__(config)

def dft_forward(self, synapse: JobSubmissionSynapse) -> JobSubmissionSynapse:
return synapse
10 changes: 10 additions & 0 deletions folding/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,3 +152,13 @@ def deserialize(self) -> int:
self.cpt_files = cpt_files

return self


class DFTJobSubmissionSynapse(bt.Synapse):
"""A synapse for submission of DFT jobs."""

job_id: str
geometry: str

def deserialize(self) -> int:
return self
65 changes: 65 additions & 0 deletions folding/utils/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,3 +464,68 @@ def check_uniqueness(vectors, tol=0.01):
if are_vectors_too_similar(vectors_np[i], vectors_np[j], tol):
return False
return True


def parse_custom_xyz(file_path: str) -> Dict[str, Any]:
"""Parse a custom xyz file.

Args:
file_path (str): The path to the xyz file.

Returns:
Dict[str, Any]: A dictionary containing the parsed data.
"""
with open(file_path, "r", encoding="utf-8") as f:
lines = [line.strip() for line in f if line.strip()]

# First line: number of atoms
num_atoms = int(lines[0])

# Second line: metadata (tab-separated floats and maybe strings)
metadata_line = lines[1].split("\t")
metadata = [
float(x) if x.replace(".", "", 1).replace("-", "", 1).isdigit() else x
for x in metadata_line
]

# Atom block
atom_lines = lines[2 : 2 + num_atoms]
atom_data = []
for line in atom_lines:
parts = line.split()
atom_data.append(
{
"element": parts[0],
"x": float(parts[1]),
"y": float(parts[2]),
"z": float(parts[3]),
"charge": float(parts[4]),
}
)

atoms_df = pd.DataFrame(atom_data)

# Line after atoms: float properties (frequencies?)
properties = list(map(float, lines[2 + num_atoms].split()))

# SMILES line
smiles = lines[3 + num_atoms].split()

# InChI line
inchis = lines[4 + num_atoms].split()

return {
"num_atoms": num_atoms,
"metadata": metadata,
"atoms": atoms_df,
"properties": properties,
"smiles": smiles,
"inchis": inchis,
}


def to_psi4_geometry_string(array, net_charge=0, spin_multiplicity=1):
"""Convert an array of atoms to a psi4 geometry string."""
lines = [f"{row[0]} {row[1]} {row[2]} {row[3]}" for row in array]
body = "\n".join(lines)
return f"{net_charge} {spin_multiplicity}\n{body}"
139 changes: 103 additions & 36 deletions folding/validators/forward.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
import os
import time
import random
import numpy as np
from tqdm import tqdm
from pathlib import Path
from typing import List, Dict
from collections import defaultdict

from async_timeout import timeout
from folding.utils.s3_utils import upload_to_s3
from folding.validators.protein import Protein
from folding.utils.logging import log_event
from folding.validators.reward import get_energies
from folding.protocol import JobSubmissionSynapse
from folding.protocol import JobSubmissionSynapse, DFTJobSubmissionSynapse
import asyncio
from folding.utils.openmm_forcefields import FORCEFIELD_REGISTRY
from folding.validators.hyperparameters import HyperParameters
Expand All @@ -19,12 +20,43 @@
get_response_info,
OpenMMException,
RsyncException,
to_psi4_geometry_string,
parse_custom_xyz,
)
from folding.utils.logger import logger
from folding.utils.uids import get_all_miner_uids


ROOT_DIR = Path(__file__).resolve().parents[2]


async def run_dft_step(self, job_event: Dict):
"""
Send a DFT job to the miners and return the response.
"""
uids = get_all_miner_uids(
self.metagraph,
self.config.neuron.vpermit_tao_limit,
include_serving_in_check=False,
)

axons = [self.metagraph.axons[uid] for uid in uids]
synapse = DFTJobSubmissionSynapse(
job_id=job_event["job_id"],
geometry=job_event["geometry"],
)

responses: List[DFTJobSubmissionSynapse] = await self.dendrite.forward(
axons=axons,
synapse=synapse,
timeout=10,
deserialize=True, # decodes the bytestream response inside of md_outputs.
)

response_info = get_response_info(responses=responses)
return response_info


async def run_step(
self,
protein: Protein,
Expand Down Expand Up @@ -120,7 +152,7 @@ def parse_config(config) -> Dict[str, str]:


# TODO: We need to be able to create a bunch of different challenges.
async def create_new_challenge(self, exclude: List) -> Dict:
async def create_new_challenge(self, job_type: str, exclude: List) -> Dict:
"""Create a new challenge by sampling a random pdb_id and running a hyperparameter search
using the try_prepare_md_challenge function.

Expand All @@ -130,40 +162,52 @@ async def create_new_challenge(self, exclude: List) -> Dict:
Returns:
Dict: event dictionary containing the results of the hyperparameter search
"""
while True:
forward_start_time = time.time()
if self.RSYNC_EXCEPTION_COUNT > 10:
self.config.protein.pdb_id = None
self.config.protein.input_source = "rcsb"

if self.config.protein.pdb_id is not None:
pdb_id = self.config.protein.pdb_id
else:
pdb_id, input_source = load_and_sample_random_pdb_ids(
root_dir=ROOT_DIR,
filename="pdb_ids.pkl",
input_source=self.config.protein.input_source,
exclude=exclude,
)
self.config.protein.input_source = input_source

# Perform a hyperparameter search until we find a valid configuration for the pdb
logger.info(f"Attempting to prepare challenge for pdb {pdb_id}")
event = await try_prepare_md_challenge(self, config=self.config, pdb_id=pdb_id)
event["input_source"] = self.config.protein.input_source

if event.get("validator_search_status"):
return event
else:
# forward time if validator step fails
event["hp_search_time"] = time.time() - forward_start_time

# only log the event if the simulation was not successful
log_event(self, event, failed=True)
logger.debug(
f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌"
if job_type == "md":
while True:
forward_start_time = time.time()
if self.RSYNC_EXCEPTION_COUNT > 10:
self.config.protein.pdb_id = None
self.config.protein.input_source = "rcsb"

if self.config.protein.pdb_id is not None:
pdb_id = self.config.protein.pdb_id
else:
pdb_id, input_source = load_and_sample_random_pdb_ids(
root_dir=ROOT_DIR,
filename="pdb_ids.pkl",
input_source=self.config.protein.input_source,
exclude=exclude,
)
self.config.protein.input_source = input_source

# Perform a hyperparameter search until we find a valid configuration for the pdb
logger.info(f"Attempting to prepare challenge for pdb {pdb_id}")
event = await try_prepare_md_challenge(
self, config=self.config, pdb_id=pdb_id
)
exclude.append(pdb_id)
event["input_source"] = self.config.protein.input_source

if event.get("validator_search_status"):
return event
else:
# forward time if validator step fails
event["hp_search_time"] = time.time() - forward_start_time

# only log the event if the simulation was not successful
log_event(self, event, failed=True)
logger.debug(
f"❌❌ All hyperparameter combinations failed for pdb_id {pdb_id}.. Skipping! ❌❌"
)
exclude.append(pdb_id)

elif job_type == "dft":
while True:
try:
event = await try_prepare_dft_challenge(self)
return event
except Exception as e:
logger.error(f"Error preparing DFT challenge: {e}")
continue


def create_random_modifications_to_system_config(config) -> Dict:
Expand Down Expand Up @@ -303,3 +347,26 @@ async def try_prepare_md_challenge(self, config, pdb_id: str) -> Dict:
return event

return event


async def try_prepare_dft_challenge(self) -> Dict:
"""Attempts to prepare a DFT challenge by sampling a random element from the dataset."""
path = os.path.join(ROOT_DIR, "dsgdb9nsd.xyz")

# Ideally, we are going to ping the endpoint ('https://springernature.figshare.com/ndownloader/files/3195389')
elements = [] # This is going to be ~130,000 elements
for file in os.listdir(path):
if file.endswith(".xyz"):
elements.append(file)

# randomly sample a single element from the list
element = random.choice(elements)

# parse the element
parsed_data = parse_custom_xyz(os.path.join(path, element))
geometry = to_psi4_geometry_string(parsed_data["atoms"].to_numpy())

return {
"element": element,
"geometry": geometry,
}
31 changes: 24 additions & 7 deletions neurons/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
upload_to_s3,
DigitalOceanS3Handler,
)
from folding.validators.forward import create_new_challenge, run_step
from folding.validators.forward import create_new_challenge, run_step, run_dft_step
from folding.validators.protein import Protein
from folding.registries.miner_registry import MinerRegistry
from folding.organic.api import start_organic_api
Expand Down Expand Up @@ -196,12 +196,13 @@ async def add_job(self, job_event: dict[str, Any], protein: Protein = None) -> b

return False

async def add_k_synthetic_jobs(self, k: int):
async def add_k_synthetic_jobs(self, k: int, job_type: str):
"""Creates new synthetic jobs and assigns them to available workers. Updates DB with new records.
Each "job" is an individual protein folding challenge that is distributed to the miners.

Args:
k (int): The number of jobs create and distribute to miners.
job_type (str): The type of job to create. options are "md" or "dft"
"""

# Deploy K number of unique pdb jobs, where each job gets distributed to self.config.neuron.sample_size miners
Expand All @@ -210,9 +211,15 @@ async def add_k_synthetic_jobs(self, k: int):

# This will change on each loop since we are submitting a new pdb to the batch of miners
exclude_pdbs = self.store.get_all_pdbs()
job_event: Dict = await create_new_challenge(self, exclude=exclude_pdbs)
job_event: Dict = await create_new_challenge(
self, exclude=exclude_pdbs, job_type=job_type
)

await self.add_job(job_event=job_event)
if job_type == "md":
await self.add_job(job_event=job_event)
elif job_type == "dft":
# TODO: Need to figure this out..... How to add to queue?
return job_event
await asyncio.sleep(0.01)

async def update_job(self, job: Job):
Expand Down Expand Up @@ -370,7 +377,7 @@ async def prepare_event_for_logging(event: Dict):
if protein is not None and job.active is False:
protein.remove_pdb_directory()

async def create_synthetic_jobs(self):
async def create_synthetic_md_jobs(self):
"""
Creates jobs and adds them to the queue.
"""
Expand All @@ -396,7 +403,7 @@ async def create_synthetic_jobs(self):
# We also assign the pdb to a group of workers (miners), based on their workloads

await self.add_k_synthetic_jobs(
k=self.config.neuron.queue_size - queue.qsize()
k=self.config.neuron.queue_size - queue.qsize(), job_type="md"
)

logger.info(
Expand All @@ -412,6 +419,15 @@ async def create_synthetic_jobs(self):

await asyncio.sleep(self.config.neuron.synthetic_job_interval)

async def create_synthetic_dft_jobs(self):
"""
Creates DFT jobs and push them to miners.
"""
job_event: Dict = await self.add_k_synthetic_jobs(k=1, job_type="dft")

job_event = await run_dft_step(self, job_event=job_event)
await asyncio.sleep(self.config.neuron.synthetic_job_interval)

async def update_jobs(self):
"""
Updates the jobs in the queue.
Expand Down Expand Up @@ -587,7 +603,8 @@ async def __aenter__(self):

self.loop.create_task(self.sync_loop())
self.loop.create_task(self.update_jobs())
self.loop.create_task(self.create_synthetic_jobs())
self.loop.create_task(self.create_synthetic_md_jobs())
self.loop.create_task(self.create_synthetic_dft_jobs())
self.loop.create_task(self.reward_loop())
self.loop.create_task(self.monitor_db())
if self.config.neuron.organic_enabled:
Expand Down