Skip to content

Commit 605a1fe

Browse files
Merge pull request #305 from macrocosm-os/staging
Staging
2 parents 8749d88 + 307ddb2 commit 605a1fe

15 files changed

+646
-180
lines changed

.env.example

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
S3_REGION = "nyc3"
2+
S3_ENDPOINT = "https://nyc3.digitaloceanspaces.com"
3+
S3_KEY = "s3_key"
4+
S3_SECRET = "secret_key"

README.md

+7
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,13 @@ Validators are heavily recommended to run the autoprocess script to ensure that
161161
bash run_autoprocess.sh
162162
```
163163

164+
The following environment variables need to be set in your system or application environment (`.env` file):
165+
- `S3_REGION = "nyc3"`: The AWS region or S3-compatible region where the bucket is located.
166+
- `S3_ENDPOINT = "https://nyc3.digitaloceanspaces.com"`: The endpoint URL for your S3-compatible service.
167+
- `S3_KEY`: Your S3 access key ID.
168+
- `S3_SECRET`: Your S3 secret access key.
169+
see `.env.example` for an example of how to set these variables.
170+
164171
### Miner
165172
There are many parameters that one can configure for a simulation. The base command-line args that are needed to run the miner are below.
166173
```bash

folding/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .protocol import JobSubmissionSynapse
22
from .validators.protein import Protein
33

4-
__version__ = "1.3.4"
4+
__version__ = "1.4.0"
55
version_split = __version__.split(".")
66
__spec_version__ = (
77
(10000 * int(version_split[0]))

folding/store.py

+105-53
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
1-
import json
21
import os
2+
import json
3+
import string
34
import random
45
import sqlite3
5-
import string
6-
from dataclasses import asdict, dataclass
7-
from datetime import datetime
6+
import requests
87
from queue import Queue
9-
from typing import List
8+
from typing import Dict, List
9+
10+
from datetime import datetime, timezone
11+
from dataclasses import asdict, dataclass
1012

1113
import numpy as np
1214
import pandas as pd
13-
import requests
15+
1416
from atom.epistula.epistula import Epistula
17+
from folding.utils.epistula_utils import get_epistula_body
1518

1619
DB_DIR = os.path.join(os.path.dirname(__file__), "db")
1720

@@ -53,7 +56,10 @@ def init_db(self):
5356
water TEXT,
5457
epsilon REAL,
5558
system_kwargs TEXT,
56-
min_updates INTEGER
59+
min_updates INTEGER,
60+
job_id TEXT,
61+
s3_links TEXT,
62+
best_cpt_links TEXT
5763
)
5864
"""
5965
)
@@ -67,9 +73,7 @@ def _row_to_job(self, row) -> "Job":
6773
# Convert stored JSON strings back to Python objects
6874
data["hotkeys"] = json.loads(data["hotkeys"])
6975
data["event"] = json.loads(data["event"]) if data["event"] else None
70-
data["system_kwargs"] = (
71-
json.loads(data["system_kwargs"]) if data["system_kwargs"] else None
72-
)
76+
data["system_kwargs"] = json.loads(data["system_kwargs"]) if data["system_kwargs"] else None
7377

7478
# Convert timestamps
7579
for field in ["created_at", "updated_at", "best_loss_at"]:
@@ -80,9 +84,7 @@ def _row_to_job(self, row) -> "Job":
8084

8185
# Convert intervals
8286
data["update_interval"] = pd.Timedelta(seconds=data["update_interval"])
83-
data["max_time_no_improvement"] = pd.Timedelta(
84-
seconds=data["max_time_no_improvement"]
85-
)
87+
data["max_time_no_improvement"] = pd.Timedelta(seconds=data["max_time_no_improvement"])
8688

8789
# Convert boolean
8890
data["active"] = bool(data["active"])
@@ -93,12 +95,13 @@ def _job_to_dict(self, job: "Job") -> dict:
9395
"""Convert a Job object to a dictionary for database storage."""
9496
data = job.to_dict()
9597

96-
# Convert Python objects to JSON strings
97-
data["hotkeys"] = json.dumps(data["hotkeys"])
98-
data["event"] = json.dumps(data["event"]) if data["event"] else None
99-
data["system_kwargs"] = (
100-
json.dumps(data["system_kwargs"]) if data["system_kwargs"] else None
101-
)
98+
# Convert Python list or dict objects to JSON strings for sqlite
99+
data_to_update = {}
100+
for k, v in data.items():
101+
if isinstance(v, (list,dict)):
102+
data_to_update[k] = json.dumps(v)
103+
104+
data.update(data_to_update)
102105

103106
# Convert timestamps to strings
104107
for field in ["created_at", "updated_at", "best_loss_at"]:
@@ -109,9 +112,7 @@ def _job_to_dict(self, job: "Job") -> dict:
109112

110113
# Convert intervals to seconds
111114
data["update_interval"] = int(data["update_interval"].total_seconds())
112-
data["max_time_no_improvement"] = int(
113-
data["max_time_no_improvement"].total_seconds()
114-
)
115+
data["max_time_no_improvement"] = int(data["max_time_no_improvement"].total_seconds())
115116

116117
# Convert boolean to integer
117118
data["active"] = int(data["active"])
@@ -154,6 +155,7 @@ def insert(
154155
hotkeys: List[str],
155156
epsilon: float,
156157
system_kwargs: dict,
158+
job_id: str,
157159
**kwargs,
158160
):
159161
"""Insert a new job into the database."""
@@ -176,6 +178,7 @@ def insert(
176178
updated_at=pd.Timestamp.now().floor("s"),
177179
epsilon=epsilon,
178180
system_kwargs=system_kwargs,
181+
job_id=job_id,
179182
**kwargs,
180183
)
181184

@@ -202,6 +205,34 @@ def update(self, job: "Job"):
202205

203206
cur.execute(query, list(data.values()) + [pdb])
204207

208+
def update_gjp_job(self, job: "Job", gjp_address: str, keypair, job_id: str):
209+
"""
210+
Updates a GJP job with the given parameters.
211+
Args:
212+
job (Job): The job object containing job details.
213+
gjp_address (str): The address of the GJP server.
214+
keypair (Keypair): The keypair for authentication.
215+
job_id (str): The ID of the job to be updated.
216+
Raises:
217+
ValueError: If the job update fails (response status code is not 200).
218+
Returns:
219+
str: The ID of the updated job.
220+
"""
221+
222+
body = get_epistula_body(job=job)
223+
224+
body_bytes = self.epistula.create_message_body(body)
225+
headers = self.epistula.generate_header(hotkey=keypair, body=body_bytes)
226+
227+
response = requests.post(
228+
f"http://{gjp_address}/jobs/update/{job_id}",
229+
headers=headers,
230+
data=body_bytes,
231+
)
232+
if response.status_code != 200:
233+
raise ValueError(f"Failed to upload job: {response.text}")
234+
return response.json()["job_id"]
235+
205236
def get_all_pdbs(self) -> list:
206237
"""
207238
Retrieve all PDB IDs from the job store.
@@ -229,32 +260,55 @@ def upload_job(
229260
water: str,
230261
hotkeys: list,
231262
system_kwargs: dict,
232-
hotkey,
263+
keypair,
233264
gjp_address: str,
265+
epsilon: float,
266+
s3_links: Dict[str, str],
267+
**kwargs,
234268
):
235-
"""Upload a job to the database."""
236-
237-
body = {
238-
"pdb_id": pdb,
239-
"hotkeys": hotkeys,
240-
"system_config": {
241-
"ff": ff,
242-
"box": box,
243-
"water": water,
244-
"system_kwargs": system_kwargs,
245-
},
246-
"em_s3_link": "s3://path/to/em",
247-
"priority": 1,
248-
"organic": False
249-
}
250-
body_bytes = self.epistula.create_message_body(body)
251-
headers = self.epistula.generate_header(hotkey=hotkey, body=body_bytes)
269+
"""
270+
Upload a job to the global job pool database.
271+
272+
Args:
273+
pdb (str): The PDB ID of the job.
274+
ff (str): The force field configuration.
275+
box (str): The box configuration.
276+
water (str): The water configuration.
277+
hotkeys (list): A list of hotkeys.
278+
system_kwargs (dict): Additional system configuration arguments.
279+
keypair (Keypair): The keypair for generating headers.
280+
gjp_address (str): The address of the api server.
281+
event (dict): Additional event data.
252282
253-
response = requests.post(
254-
f"http://{gjp_address}/jobs", headers=headers, data=body_bytes
283+
Returns:
284+
str: The job ID of the uploaded job.
285+
286+
Raises:
287+
ValueError: If the job upload fails.
288+
"""
289+
job = Job(
290+
pdb=pdb,
291+
ff=ff,
292+
box=box,
293+
water=water,
294+
hotkeys=hotkeys,
295+
created_at=pd.Timestamp.now().floor("s"),
296+
updated_at=pd.Timestamp.now().floor("s"),
297+
epsilon=epsilon,
298+
system_kwargs=system_kwargs,
299+
s3_links=s3_links,
300+
**kwargs,
255301
)
302+
303+
body = get_epistula_body(job=job)
304+
305+
body_bytes = self.epistula.create_message_body(body)
306+
headers = self.epistula.generate_header(hotkey=keypair, body=body_bytes)
307+
308+
response = requests.post(f"http://{gjp_address}/jobs", headers=headers, data=body_bytes)
256309
if response.status_code != 200:
257310
raise ValueError(f"Failed to upload job: {response.text}")
311+
return response.json()["job_id"]
258312

259313

260314
# Keep the Job and MockJob classes as they are, they work well with both implementations
@@ -280,6 +334,9 @@ class Job:
280334
epsilon: float = 5 # percentage.
281335
event: dict = None
282336
system_kwargs: dict = None
337+
job_id: str = None
338+
s3_links: Dict[str, str] = None
339+
best_cpt_links: list = None
283340

284341
def to_dict(self):
285342
return asdict(self)
@@ -301,25 +358,21 @@ async def update(self, loss: float, hotkey: str, hotkeys: List[str] = None):
301358
self.updated_at = pd.Timestamp.now().floor("s")
302359
self.updated_count += 1
303360

304-
never_updated_better_loss = (
305-
np.isnan(percent_improvement) and loss < self.best_loss
306-
)
361+
never_updated_better_loss = np.isnan(percent_improvement) and loss < self.best_loss
307362
better_loss = percent_improvement >= self.epsilon
308363

309364
if never_updated_better_loss or better_loss:
310365
self.best_loss = loss
311366
self.best_loss_at = pd.Timestamp.now().floor("s")
312367
self.best_hotkey = hotkey
313368
elif (
314-
pd.Timestamp.now().floor("s") - self.best_loss_at
315-
> self.max_time_no_improvement
369+
pd.Timestamp.now().floor("s") - self.best_loss_at > self.max_time_no_improvement
316370
and self.updated_count >= self.min_updates
317371
):
318372
self.active = False
319373
elif (
320374
isinstance(self.best_loss_at, pd._libs.tslibs.nattype.NaTType)
321-
and pd.Timestamp.now().floor("s") - self.created_at
322-
> self.max_time_no_improvement
375+
and pd.Timestamp.now().floor("s") - self.created_at > self.max_time_no_improvement
323376
):
324377
self.active = False
325378

@@ -339,10 +392,9 @@ def __init__(self, n_hotkeys=5, update_seconds=5, stop_after_seconds=3600 * 24):
339392
self.box = "cube"
340393
self.water = "tip3p"
341394
self.hotkeys = self._make_hotkeys(n_hotkeys)
342-
self.created_at = (
343-
pd.Timestamp.now().floor("s")
344-
- pd.Timedelta(seconds=random.randint(0, 3600 * 24))
345-
).floor("s")
395+
self.created_at = (pd.Timestamp.now().floor("s") - pd.Timedelta(seconds=random.randint(0, 3600 * 24))).floor(
396+
"s"
397+
)
346398
self.updated_at = self.created_at
347399
self.best_loss = 0
348400
self.best_hotkey = random.choice(self.hotkeys)

folding/utils/epistula_utils.py

+37
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import numpy as np
2+
from datetime import datetime, timezone
3+
import pandas as pd
4+
import json
5+
6+
7+
def get_epistula_body(job: "Job") -> dict:
8+
"""Obtain the body of information needed to upload or update a job to the GJP server.
9+
10+
Args:
11+
job (Job): a Job object containing the job details.
12+
13+
Returns:
14+
body (dict): The body of information needed to upload or update a job to the GJP server.
15+
"""
16+
17+
body = job.to_dict()
18+
body["pdb_id"] = body.pop("pdb")
19+
body["system_config"] = {
20+
"ff": body.pop("ff"),
21+
"box": body.pop("box"),
22+
"water": body.pop("water"),
23+
"system_kwargs": body.pop("system_kwargs"),
24+
}
25+
body["s3_links"] = json.dumps(body.pop("s3_links"))
26+
body["priority"] = body.get("priority", 1)
27+
body["is_organic"] = body.get("is_organic", False)
28+
body["update_interval"] = body.pop("update_interval").total_seconds()
29+
body["max_time_no_improvement"] = body.pop("max_time_no_improvement").total_seconds()
30+
body["best_loss_at"] = (
31+
body["best_loss_at"] if pd.notna(body["best_loss_at"]) else datetime.now(timezone.utc).isoformat()
32+
)
33+
body["event"] = json.dumps(body.pop("event"))
34+
body["best_hotkey"] = "" if body["best_hotkey"] is None else body["best_hotkey"]
35+
body["best_loss"] = 0 if body["best_loss"] == np.inf else body["best_loss"]
36+
37+
return body

0 commit comments

Comments
 (0)