Skip to content

Commit 9132270

Browse files
committed
chore: Update version to 0.10.7 and add CACHE_DIR setting; refactor filesystem handling in job queue and pipeline manager
1 parent cbd7f58 commit 9132270

File tree

5 files changed

+48
-17
lines changed

5 files changed

+48
-17
lines changed

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "A simple workflow framework. Hamilton + APScheduler = FlowerPower
44
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.10.6.4"
7+
version = "0.10.7"
88
keywords = [
99
"hamilton",
1010
"workflow",

src/flowerpower/fs/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,7 +220,7 @@ def __getattribute__(self, item):
220220
# new items
221221
"size",
222222
"glob",
223-
"sync",
223+
"sync_cache",
224224
# previous
225225
"load_cache",
226226
"_open",

src/flowerpower/job_queue/base.py

Lines changed: 25 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
from ..cfg import ProjectConfig
2525
from ..fs import AbstractFileSystem, get_filesystem
2626
# from ..utils.misc import update_config_from_dict
27-
from ..settings import BACKEND_PROPERTIES
27+
from ..settings import BACKEND_PROPERTIES, CACHE_DIR, CONFIG_DIR, PIPELINES_DIR
2828

2929

3030
class BackendType(str, Enum):
@@ -350,12 +350,21 @@ def __init__(
350350
self._storage_options = storage_options or {}
351351
self._backend = backend
352352
self._type = type
353-
self._pipelines_dir = kwargs.get("pipelines_dir", "pipelines")
354-
self._conf_dir = "conf"
353+
self._pipelines_dir = kwargs.get("pipelines_dir", PIPELINES_DIR)
354+
self._conf_dir = CONFIG_DIR
355355

356-
if fs is None:
357-
fs = get_filesystem(self._base_dir, **(self._storage_options or {}))
356+
if storage_options is not None:
357+
cached = True
358+
cache_storage = posixpath.join(posixpath.expanduser(CACHE_DIR), self._base_dir.split("://")[-1])
359+
posixpath.makedirs(cache_storage, exist_ok=True)
360+
else:
361+
cached = False
362+
cache_storage = None
363+
if not fs:
364+
fs = get_filesystem(self._base_dir, storage_options=storage_options, cached=cached, cache_storage=cache_storage)
358365
self._fs = fs
366+
if cached:
367+
self._fs.sync()
359368

360369
self._add_modules_path()
361370
self._load_config()
@@ -378,11 +387,17 @@ def _add_modules_path(self):
378387
None
379388
"""
380389
if self._fs.is_cache_fs:
381-
self._fs.sync()
390+
self._fs.sync_cache()
391+
project_path = self._fs.mapper.directory
392+
modules_path = posixpath.join(project_path, self._pipelines_dir)
393+
394+
else:
395+
# Use the base directory directly if not using cache
396+
project_path = self._fs.path
397+
modules_path = posixpath.join(project_path, self._pipelines_dir)
382398

383-
if self._fs.path not in sys.path:
384-
sys.path.insert(0, self._fs.path)
399+
if project_path not in sys.path:
400+
sys.path.insert(0, project_path)
385401

386-
modules_path = posixpath.join(self._fs.path, self._pipelines_dir)
387402
if modules_path not in sys.path:
388-
sys.path.insert(0, modules_path)
403+
sys.path.insert(0, modules_path)

src/flowerpower/pipeline/manager.py

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,18 @@ def __init__(
125125

126126
self._base_dir = base_dir or str(Path.cwd())
127127
self._storage_options = storage_options
128+
if storage_options is not None:
129+
cached = True
130+
cache_storage = posixpath.join(posixpath.expanduser(settings.CACHE_DIR), self._base_dir.split("://")[-1])
131+
posixpath.makedirs(cache_storage, exist_ok=True)
132+
else:
133+
cached = False
134+
cache_storage = None
128135
if not fs:
129-
fs = get_filesystem(self._base_dir, storage_options=storage_options)
136+
fs = get_filesystem(self._base_dir, storage_options=storage_options, cached=cached, cache_storage=cache_storage)
130137
self._fs = fs
138+
if cached:
139+
self._fs.sync()
131140

132141
# Store overrides for ProjectConfig loading
133142
self._cfg_dir = cfg_dir or settings.CONFIG_DIR
@@ -260,12 +269,18 @@ def _add_modules_path(self) -> None:
260269
>>> import my_pipeline # Now importable
261270
"""
262271
if self._fs.is_cache_fs:
263-
self._fs.sync()
272+
self._fs.sync_cache()
273+
project_path = self._fs.mapper.directory
274+
modules_path = posixpath.join(project_path, self._pipelines_dir)
275+
276+
else:
277+
# Use the base directory directly if not using cache
278+
project_path = self._fs.path
279+
modules_path = posixpath.join(project_path, self._pipelines_dir)
264280

265-
if self._fs.path not in sys.path:
266-
sys.path.insert(0, self._fs.path)
281+
if project_path not in sys.path:
282+
sys.path.insert(0, project_path)
267283

268-
modules_path = posixpath.join(self._fs.path, self._pipelines_dir)
269284
if modules_path not in sys.path:
270285
sys.path.insert(0, modules_path)
271286

src/flowerpower/settings/general.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@
33
PIPELINES_DIR = os.getenv("FP_PIPELINES_DIR", "pipelines")
44
CONFIG_DIR = os.getenv("FP_CONFIG_DIR", "conf")
55
HOOKS_DIR = os.getenv("FP_HOOKS_DIR", "hooks")
6+
CACHE_DIR = os.getenv("FP_CACHE_DIR", "~/.flowerpower/cache")

0 commit comments

Comments
 (0)