Skip to content

Commit 388584f

Browse files
authored
Merge pull request #1268 from Sage-Bionetworks/develop-fix-synapsecache-issue
[bug fix] Updated .synapseCache, functions to calculate cache, and cleared manifests before each download
2 parents 757abd1 + 3dc7028 commit 388584f

File tree

4 files changed

+180
-67
lines changed

4 files changed

+180
-67
lines changed

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,9 @@ markers = [
126126
Google credentials (skipped on GitHub CI) \
127127
""",
128128
"""\
129+
not_windows: tests that don't work on on windows machine
130+
""",
131+
"""\
129132
schematic_api: marks tests covering \
130133
API functionality (skipped on regular GitHub CI test suite)
131134
""",
@@ -143,4 +146,4 @@ markers = [
143146
rule_benchmark: marks tests covering \
144147
validation rule benchmarking
145148
"""
146-
]
149+
]

schematic/store/synapse.py

Lines changed: 25 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import logging
88
import secrets
99
from dataclasses import dataclass
10-
import tempfile
10+
import shutil
1111

1212
# allows specifying explicit variable types
1313
from typing import Dict, List, Tuple, Sequence, Union
@@ -43,17 +43,17 @@
4343
from schematic_db.rdb.synapse_database import SynapseDatabase
4444

4545

46-
from schematic.utils.df_utils import update_df, load_df, col_in_dataframe, populate_df_col_with_another_col
46+
from schematic.utils.df_utils import update_df, load_df, col_in_dataframe
4747
from schematic.utils.validate_utils import comma_separated_list_regex, rule_in_rule_list
48-
from schematic.utils.general import entity_type_mapping, get_dir_size, convert_size, convert_gb_to_bytes, create_temp_folder
48+
from schematic.utils.general import entity_type_mapping, get_dir_size, convert_gb_to_bytes, create_temp_folder, check_synapse_cache_size, clear_synapse_cache
4949
from schematic.schemas.explorer import SchemaExplorer
5050
from schematic.schemas.generator import SchemaGenerator
5151
from schematic.store.base import BaseStorage
5252
from schematic.exceptions import MissingConfigValueError, AccessCredentialsError
5353

5454
from schematic.configuration.configuration import CONFIG
5555

56-
from schematic.utils.general import profile
56+
from schematic.utils.general import profile, calculate_datetime
5757

5858
logger = logging.getLogger("Synapse storage")
5959

@@ -75,12 +75,16 @@ def _download_manifest_to_folder(self) -> File:
7575
"""
7676
if "SECRETS_MANAGER_SECRETS" in os.environ:
7777
temporary_manifest_storage = "/var/tmp/temp_manifest_download"
78+
# clear out all the existing manifests
79+
if os.path.exists(temporary_manifest_storage):
80+
shutil.rmtree(temporary_manifest_storage)
81+
# create a new directory to store manifest
7882
if not os.path.exists(temporary_manifest_storage):
79-
os.mkdir("/var/tmp/temp_manifest_download")
83+
os.mkdir(temporary_manifest_storage)
84+
# create temporary folders for storing manifests
8085
download_location = create_temp_folder(temporary_manifest_storage)
8186
else:
8287
download_location=CONFIG.manifest_folder
83-
8488
manifest_data = self.syn.get(
8589
self.manifest_id,
8690
downloadLocation=download_location,
@@ -177,41 +181,34 @@ def __init__(
177181
Typical usage example:
178182
syn_store = SynapseStorage()
179183
"""
180-
184+
# TODO: turn root_synapse_cache to a parameter in init
181185
self.syn = self.login(token, access_token)
182186
self.project_scope = project_scope
183187
self.storageFileview = CONFIG.synapse_master_fileview_id
184188
self.manifest = CONFIG.synapse_manifest_basename
189+
self.root_synapse_cache = "/root/.synapseCache"
185190
self._query_fileview()
186191

187-
def _purge_synapse_cache(self, root_dir: str = "/var/www/.synapseCache/", maximum_storage_allowed_cache_gb=7):
192+
def _purge_synapse_cache(self, maximum_storage_allowed_cache_gb=1):
188193
"""
189-
Purge synapse cache if it exceeds 7GB
194+
Purge synapse cache if it exceeds a certain size. Default to 1GB.
190195
Args:
191-
root_dir: directory of the .synapseCache function
192-
maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 7 GB.
193-
194-
Returns:
195-
if size of cache reaches a certain threshold (default is 7GB), return the number of files that get deleted
196-
otherwise, return the total remaining space (assuming total ephemeral storage is 20GB on AWS )
196+
maximum_storage_allowed_cache_gb: the maximum storage allowed before purging cache. Default is 1 GB.
197197
"""
198198
# try clearing the cache
199199
# scan a directory and check size of files
200-
cache = self.syn.cache
201-
if os.path.exists(root_dir):
200+
if os.path.exists(self.root_synapse_cache):
202201
maximum_storage_allowed_cache_bytes = convert_gb_to_bytes(maximum_storage_allowed_cache_gb)
203-
total_ephemeral_storag_gb = 20
204-
total_ephemeral_storage_bytes = convert_gb_to_bytes(total_ephemeral_storag_gb)
205-
nbytes = get_dir_size(root_dir)
206-
# if 7 GB has already been taken, purge cache before 15 min
207-
if nbytes >= maximum_storage_allowed_cache_bytes:
208-
minutes_earlier = datetime.strftime(datetime.utcnow()- timedelta(minutes = 15), '%s')
209-
num_of_deleted_files = cache.purge(before_date = int(minutes_earlier))
210-
logger.info(f'{num_of_deleted_files} number of files have been deleted from {root_dir}')
202+
nbytes = get_dir_size(self.root_synapse_cache)
203+
dir_size_bytes = check_synapse_cache_size(directory=self.root_synapse_cache)
204+
# if 1 GB has already been taken, purge cache before 15 min
205+
if dir_size_bytes >= maximum_storage_allowed_cache_bytes:
206+
num_of_deleted_files = clear_synapse_cache(self.syn.cache, minutes=15)
207+
logger.info(f'{num_of_deleted_files} files have been deleted from {self.root_synapse_cache}')
211208
else:
212-
remaining_space = total_ephemeral_storage_bytes - nbytes
213-
converted_space = convert_size(remaining_space)
214-
logger.info(f'Estimated {remaining_space} bytes (which is approximately {converted_space}) remained in ephemeral storage after calculating size of .synapseCache excluding OS')
209+
# on AWS, OS takes around 14-17% of our ephemeral storage (20GiB)
210+
# instead of guessing how much space that we left, print out .synapseCache here
211+
logger.info(f'the total size of .synapseCache is: {nbytes} bytes')
215212

216213
def _query_fileview(self):
217214
self._purge_synapse_cache()

schematic/utils/general.py

Lines changed: 69 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,20 @@
11
# allows specifying explicit variable types
2-
from typing import Any, Dict, Optional, Text
3-
import os
4-
import math
52
import logging
3+
import math
4+
import os
65
import pstats
6+
import subprocess
7+
import tempfile
78
from cProfile import Profile
9+
from datetime import datetime, timedelta
810
from functools import wraps
9-
10-
import tempfile
11+
from typing import Union
1112

1213
from synapseclient.core.exceptions import SynapseHTTPError
13-
from synapseclient.table import EntityViewSchema
1414
from synapseclient.entity import File, Folder, Project
15+
from synapseclient.table import EntityViewSchema
16+
17+
import synapseclient.core.cache as cache
1518

1619
logger = logging.getLogger(__name__)
1720

@@ -57,24 +60,69 @@ def get_dir_size(path: str):
5760
total += get_dir_size(entry.path)
5861
return total
5962

63+
def calculate_datetime(minutes: int, input_date: datetime, before_or_after: str = "before") -> datetime:
64+
"""calculate date time
65+
66+
Args:
67+
input_date (datetime): date time object provided by users
68+
minutes (int): number of minutes
69+
before_or_after (str): default to "before". if "before", calculate x minutes before current date time. if "after", calculate x minutes after current date time.
70+
71+
Returns:
72+
datetime: return result of date time calculation
73+
"""
74+
if before_or_after=="before":
75+
date_time_result = input_date - timedelta(minutes=minutes)
76+
elif before_or_after=="after":
77+
date_time_result = input_date + timedelta(minutes=minutes)
78+
else:
79+
raise ValueError("Invalid value. Use either 'before' or 'after'.")
80+
return date_time_result
81+
82+
83+
def check_synapse_cache_size(directory='/root/.synapseCache')-> Union[float, int]:
84+
"""use du --sh command to calculate size of .synapseCache.
6085
61-
def convert_size(size_bytes: int):
62-
"""convert bytes to a human readable format
6386
Args:
64-
size_bytes: total byte sizes
65-
return: a string that indicates bytes in a different format
87+
directory (str, optional): .synapseCache directory. Defaults to '/root/.synapseCache'
88+
89+
Returns:
90+
float or integer: returns size of .synapsecache directory in bytes
6691
"""
67-
if size_bytes == 0:
68-
return "0B"
69-
size_name = ("B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB")
70-
# calculate the log of size (in bytes) to base 1024 and run it down to the nearest integer
71-
index_int = int(math.floor(math.log(size_bytes, 1024)))
72-
# return the value of 1024 raised to the power of index
73-
power_cal = math.pow(1024, index_int)
74-
# convert bytes to a different unit if applicable
75-
size_bytes_converted = round(size_bytes / power_cal, 2)
76-
return f"{size_bytes_converted} {size_name[index_int]})"
92+
# Note: this command might fail on windows user. But since this command is primarily for running on AWS, it is fine.
93+
command = ['du', '-sh', directory]
94+
output = subprocess.run(command, capture_output=True).stdout.decode('utf-8')
95+
96+
# Parsing the output to extract the directory size
97+
size = output.split('\t')[0]
98+
if "K" in size:
99+
size_in_kb = float(size.rstrip('K'))
100+
byte_size = size_in_kb * 1000
101+
elif "M" in size:
102+
size_in_mb = float(size.rstrip('M'))
103+
byte_size = size_in_mb * 1000000
104+
elif "G" in size:
105+
size_in_gb = float(size.rstrip('G'))
106+
byte_size = convert_gb_to_bytes(size_in_gb)
107+
elif "B" in size:
108+
byte_size = float(size.rstrip('B'))
109+
else:
110+
logger.error('Cannot recongize the file size unit')
111+
return byte_size
112+
113+
def clear_synapse_cache(cache: cache.Cache, minutes: int) -> int:
114+
"""clear synapse cache before a certain time
77115
116+
Args:
117+
cache: an object of synapseclient Cache.
118+
minutes (int): all files before this minute will be removed
119+
Returns:
120+
int: number of files that get deleted
121+
"""
122+
current_date = datetime.utcnow()
123+
minutes_earlier = calculate_datetime(input_date=current_date, minutes=minutes, before_or_after="before")
124+
num_of_deleted_files = cache.purge(before_date = minutes_earlier)
125+
return num_of_deleted_files
78126

79127
def convert_gb_to_bytes(gb: int):
80128
"""convert gb to bytes
@@ -84,6 +132,7 @@ def convert_gb_to_bytes(gb: int):
84132
"""
85133
return gb * 1024 * 1024 * 1024
86134

135+
87136
def entity_type_mapping(syn, entity_id):
88137
"""
89138
Return the entity type of manifest

tests/test_utils.py

Lines changed: 82 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,36 @@
1-
import logging
21
import json
2+
import logging
33
import os
4+
import shutil
5+
import tempfile
6+
import time
7+
from datetime import datetime
8+
from unittest import mock
49

5-
import pandas as pd
610
import numpy as np
11+
import pandas as pd
712
import pytest
8-
9-
import tempfile
10-
13+
import synapseclient
14+
import synapseclient.core.cache as cache
1115
from pandas.testing import assert_frame_equal
1216
from synapseclient.core.exceptions import SynapseHTTPError
1317

14-
from schematic.schemas.explorer import SchemaExplorer
15-
from schematic.schemas import df_parser
16-
from schematic.utils import general
17-
from schematic.utils import cli_utils
18-
from schematic.utils import io_utils
19-
from schematic.utils import df_utils
20-
from schematic.utils import validate_utils
21-
from schematic.exceptions import (
22-
MissingConfigValueError,
23-
MissingConfigAndArgumentValueError,
24-
)
2518
from schematic import LOADER
19+
from schematic.exceptions import (MissingConfigAndArgumentValueError,
20+
MissingConfigValueError)
21+
from schematic.schemas import df_parser
22+
from schematic.schemas.explorer import SchemaExplorer
2623
from schematic.store.synapse import SynapseStorage
27-
from schematic.utils.general import entity_type_mapping
24+
from schematic.utils import (cli_utils, df_utils, general, io_utils,
25+
validate_utils)
26+
from schematic.utils.general import (calculate_datetime,
27+
check_synapse_cache_size,
28+
clear_synapse_cache, entity_type_mapping)
2829

2930
logging.basicConfig(level=logging.DEBUG)
3031
logger = logging.getLogger(__name__)
3132

33+
IN_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS")
3234

3335
@pytest.fixture
3436
def synapse_store():
@@ -39,8 +41,69 @@ def synapse_store():
3941
synapse_store = SynapseStorage()
4042
yield synapse_store
4143

42-
4344
class TestGeneral:
45+
def test_clear_synapse_cache(self, tmp_path):
46+
# define location of mock synapse cache
47+
mock_synapse_cache_dir = tmp_path / ".synapseCache/"
48+
mock_synapse_cache_dir.mkdir()
49+
mock_sub_folder = mock_synapse_cache_dir / "123"
50+
mock_sub_folder.mkdir()
51+
mock_table_query_folder = mock_sub_folder/ "456"
52+
mock_table_query_folder.mkdir()
53+
54+
# create mock table query csv and a mock cache map
55+
mock_synapse_table_query_csv = mock_table_query_folder/ "mock_synapse_table_query.csv"
56+
mock_synapse_table_query_csv.write_text("mock table query content")
57+
mock_cache_map = mock_table_query_folder/ ".cacheMap"
58+
mock_cache_map.write_text(f"{mock_synapse_table_query_csv}: '2022-06-13T19:24:27.000Z'")
59+
60+
assert os.path.exists(mock_synapse_table_query_csv)
61+
62+
# since synapse python client would compare last modified date and before date
63+
# we have to create a little time gap here
64+
time.sleep(1)
65+
66+
# clear cache
67+
my_cache = cache.Cache(cache_root_dir=mock_synapse_cache_dir)
68+
clear_synapse_cache(my_cache, minutes=0.0001)
69+
# make sure that cache files are now gone
70+
assert os.path.exists(mock_synapse_table_query_csv) == False
71+
assert os.path.exists(mock_cache_map) == False
72+
73+
def test_calculate_datetime_before_minutes(self):
74+
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
75+
minutes_before = calculate_datetime(input_date=input_date, minutes=10, before_or_after="before")
76+
expected_result_date_before = datetime.strptime("07/20/23 17:26:34", '%m/%d/%y %H:%M:%S')
77+
assert minutes_before == expected_result_date_before
78+
79+
def test_calculate_datetime_after_minutes(self):
80+
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
81+
minutes_after = calculate_datetime(input_date=input_date, minutes=10, before_or_after="after")
82+
expected_result_date_after = datetime.strptime("07/20/23 17:46:34", '%m/%d/%y %H:%M:%S')
83+
assert minutes_after == expected_result_date_after
84+
85+
def test_calculate_datetime_raise_error(self):
86+
with pytest.raises(ValueError):
87+
input_date = datetime.strptime("07/20/23 17:36:34", '%m/%d/%y %H:%M:%S')
88+
minutes = calculate_datetime(input_date=input_date, minutes=10, before_or_after="error")
89+
90+
# this test might fail for windows machine
91+
@pytest.mark.not_windows
92+
def test_check_synapse_cache_size(self,tmp_path):
93+
mock_synapse_cache_dir = tmp_path / ".synapseCache"
94+
mock_synapse_cache_dir.mkdir()
95+
96+
mock_synapse_table_query_csv = mock_synapse_cache_dir/ "mock_synapse_table_query.csv"
97+
mock_synapse_table_query_csv.write_text("example file for calculating cache")
98+
99+
file_size = check_synapse_cache_size(mock_synapse_cache_dir)
100+
101+
# For some reasons, when running in github action, the size of file changes.
102+
if IN_GITHUB_ACTIONS:
103+
assert file_size == 8000
104+
else:
105+
assert file_size == 4000
106+
44107
def test_find_duplicates(self):
45108

46109
mock_list = ["foo", "bar", "foo"]
@@ -84,6 +147,7 @@ def test_download_manifest_to_temp_folder(self):
84147
path_dir = general.create_temp_folder(tmpdir)
85148
assert os.path.exists(path_dir)
86149

150+
87151
class TestCliUtils:
88152
def test_query_dict(self):
89153

0 commit comments

Comments
 (0)