Skip to content

Commit 1930b47

Browse files
committed
Update version to 0.9.3; refactor filesystem imports and enhance path handling with posixpath
1 parent c2e9763 commit 1930b47

File tree

17 files changed

+188
-108
lines changed

17 files changed

+188
-108
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,4 @@ flowerpower.db
1919
Digraph.gv
2020
Digraph.gv.pdf
2121
test/test2.json
22+
.qodo/history.sqlite

pyproject.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ keywords = [
2828
name = "FlowerPower"
2929
readme = "README.md"
3030
requires-python = ">= 3.11"
31-
version = "0.9.2"
31+
version = "0.9.3"
3232

3333
[project.scripts]
3434
flowerpower = "flowerpower.cli:app"
@@ -113,5 +113,7 @@ dev-dependencies = [
113113
"paho-mqtt>=2.1.0",
114114
"greenlet>=3.1.1",
115115
"obstore>=0.3.0",
116+
"pytest>=8.3.4",
117+
"mocker>=1.1.1",
116118
]
117119
managed = true

src/flowerpower/cfg/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
from munch import Munch, munchify
77
from pydantic import Field
88

9-
from ..utils.filesystem import get_filesystem
9+
from ..filesystem import get_filesystem
1010
from .base import BaseConfig
1111
from .pipeline.run import PipelineRunConfig
1212
from .pipeline.schedule import (

src/flowerpower/cli/cfg.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@
3232
# cfg.update(data)
3333
# try:
3434
# cfg.to_yaml(
35-
# os.path.join(
35+
# posixpath.join(
3636
# "pipelines",
3737
# pipeline_name + ".yml",
3838
# ),
File renamed without changes.
Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import base64
22
import inspect
33
import os
4+
import posixpath
45
import urllib
56
from pathlib import Path
67

@@ -14,8 +15,8 @@
1415
from fsspec.utils import infer_storage_options
1516
from loguru import logger
1617

17-
from ..storage_options import BaseStorageOptions
18-
from ..storage_options import from_dict as storage_options_from_dict
18+
from ..utils.storage_options import BaseStorageOptions
19+
from ..utils.storage_options import from_dict as storage_options_from_dict
1920
from . import AbstractFileSystem
2021

2122

@@ -24,13 +25,15 @@ def __init__(self, directory):
2425
self.directory = directory
2526

2627
def __call__(self, path: str) -> str:
27-
os.makedirs(os.path.dirname(os.path.join(self.directory, path)), exist_ok=True)
28+
os.makedirs(
29+
posixpath.dirname(posixpath.join(self.directory, path)), exist_ok=True
30+
)
2831
return path
2932

3033

3134
class MonitoredSimpleCacheFileSystem(SimpleCacheFileSystem):
3235
def __init__(self, **kwargs):
33-
# kwargs["cache_storage"] = os.path.join(
36+
# kwargs["cache_storage"] = posixpath.join(
3437
# kwargs.get("cache_storage"), kwargs.get("fs").protocol[0]
3538
# )
3639
self._verbose = kwargs.get("verbose", False)
@@ -41,8 +44,8 @@ def _check_file(self, path):
4144
self._check_cache()
4245
cache_path = self._mapper(path)
4346
for storage in self.storage:
44-
fn = os.path.join(storage, cache_path)
45-
if os.path.exists(fn):
47+
fn = posixpath.join(storage, cache_path)
48+
if posixpath.exists(fn):
4649
return fn
4750
if self._verbose:
4851
logger.info(f"Downloading {self.protocol[0]}://{path}")
@@ -55,7 +58,7 @@ def size(self, path):
5558
if cached_file is None:
5659
return self.fs.size(path)
5760
else:
58-
return os.path.getsize(cached_file)
61+
return posixpath.getsize(cached_file)
5962

6063
def sync(self, reload: bool = False):
6164
if reload:
@@ -285,7 +288,7 @@ def get_filesystem(
285288
host = pp.get("host", "")
286289
path = pp.get("path", "")
287290
if host and host not in path:
288-
path = os.path.join(host, path)
291+
path = posixpath.join(host, path)
289292

290293
if protocol == "file" or protocol == "local":
291294
fs = filesystem(protocol)
Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import datetime as dt
2-
import os
2+
import posixpath
33
import uuid
44
from typing import Generator
55

@@ -10,8 +10,12 @@
1010
import pyarrow.parquet as pq
1111
from fsspec import AbstractFileSystem
1212

13-
from ..misc import convert_large_types_to_standard, run_parallel, _dict_to_dataframe
14-
from ..polars import pl
13+
from ..utils.misc import (
14+
convert_large_types_to_standard,
15+
run_parallel,
16+
_dict_to_dataframe,
17+
)
18+
from ..utils.polars import pl
1519

1620
import importlib
1721

@@ -72,8 +76,8 @@ def _read_json(
7276
if "**" in path:
7377
path = self.glob(path)
7478
else:
75-
if ".json" not in os.path.basename(path):
76-
path = os.path.join(path, "**/*.jsonl" if jsonlines else "**/*.json")
79+
if ".json" not in posixpath.basename(path):
80+
path = posixpath.join(path, "**/*.jsonl" if jsonlines else "**/*.json")
7781
path = self.glob(path)
7882

7983
if isinstance(path, list):
@@ -147,8 +151,8 @@ def _read_json_batches(
147151
if "**" in path:
148152
path = self.glob(path)
149153
else:
150-
if ".json" not in os.path.basename(path):
151-
path = os.path.join(path, "**/*.jsonl" if jsonlines else "**/*.json")
154+
if ".json" not in posixpath.basename(path):
155+
path = posixpath.join(path, "**/*.jsonl" if jsonlines else "**/*.json")
152156
path = self.glob(path)
153157

154158
if isinstance(path, str):
@@ -305,8 +309,8 @@ def _read_csv(
305309
if "**" in path:
306310
path = self.glob(path)
307311
else:
308-
if ".csv" not in os.path.basename(path):
309-
path = os.path.join(path, "**/*.csv")
312+
if ".csv" not in posixpath.basename(path):
313+
path = posixpath.join(path, "**/*.csv")
310314
path = self.glob(path)
311315

312316
if isinstance(path, list):
@@ -363,8 +367,8 @@ def _read_csv_batches(
363367
if "**" in path:
364368
path = self.glob(path)
365369
else:
366-
if ".csv" not in os.path.basename(path):
367-
path = os.path.join(path, "**/*.csv")
370+
if ".csv" not in posixpath.basename(path):
371+
path = posixpath.join(path, "**/*.csv")
368372
path = self.glob(path)
369373

370374
# Ensure path is a list
@@ -497,12 +501,12 @@ def _read_parquet(
497501
if isinstance(path, str):
498502
if "**" in path:
499503
if "*.parquet" in path:
500-
path = os.path.join(path, "*.parquet")
504+
path = posixpath.join(path, "*.parquet")
501505

502506
path = self.glob(path)
503507
else:
504508
if ".parquet" in path:
505-
path = os.path.join(path, "**/*.parquet")
509+
path = posixpath.join(path, "**/*.parquet")
506510
path = self.glob(path)
507511

508512
if isinstance(path, list):
@@ -568,11 +572,11 @@ def _read_parquet_batches(
568572
if isinstance(path, str):
569573
if "**" in path:
570574
if "*.parquet" not in path:
571-
path = os.path.join(path, "**/*.parquet")
575+
path = posixpath.join(path, "**/*.parquet")
572576
path = self.glob(path)
573577
else:
574578
if ".parquet" not in path:
575-
path = os.path.join(path, "**/*.parquet")
579+
path = posixpath.join(path, "**/*.parquet")
576580
path = self.glob(path)
577581

578582
if not isinstance(path, list):
@@ -825,7 +829,7 @@ def pyarrow_parquet_dataset(
825829
(pds.Dataset): Pyarrow dataset.
826830
"""
827831
if not self.is_file(path):
828-
path = os.path.join(path, "_metadata")
832+
path = posixpath.join(path, "_metadata")
829833
return pds.dataset(
830834
path,
831835
filesystem=self,
@@ -1076,7 +1080,7 @@ def _write(i, data, p, basename):
10761080
if mode == "delete_matching":
10771081
write_file(self, data[i], p, format, **kwargs)
10781082
elif mode == "overwrite":
1079-
self.fs.rm(os.path.dirname(p), recursive=True)
1083+
self.fs.rm(posixpath.dirname(p), recursive=True)
10801084
write_file(self, data[i], p, format, **kwargs)
10811085
elif mode == "append":
10821086
if not self.exists(p):

src/flowerpower/flowerpower.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
import datetime as dt
22
import os
3+
import posixpath
34
from pathlib import Path
45

56
import rich
67
from fsspec.spec import AbstractFileSystem
78

89
from .cfg import Config
9-
from .utils.filesystem import get_filesystem
10+
from .filesystem import get_filesystem
1011

1112

1213
def init(
@@ -22,20 +23,20 @@ def init(
2223
if base_dir is None:
2324
base_dir = str(Path.cwd())
2425

25-
fs = get_filesystem(os.path.join(base_dir, name), **storage_options)
26+
fs = get_filesystem(posixpath.join(base_dir, name), **storage_options)
2627

2728
fs.makedirs("conf/pipelines", exist_ok=True)
2829
fs.makedirs("pipelines", exist_ok=True)
2930

30-
cfg = Config.load(base_dir=os.path.join(base_dir, name), name=name)
31+
cfg = Config.load(base_dir=posixpath.join(base_dir, name), name=name)
3132

32-
with open(os.path.join(base_dir, name, "README.md"), "w") as f:
33+
with open(posixpath.join(base_dir, name, "README.md"), "w") as f:
3334
f.write(
3435
f"# {name.replace('_', ' ').upper()}\n\n"
3536
f"**created with FlowerPower**\n\n*{dt.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}*\n\n"
3637
)
3738
cfg.save()
38-
os.chdir(os.path.join(base_dir, name))
39+
os.chdir(posixpath.join(base_dir, name))
3940

4041
rich.print(
4142
f"\n✨ Initialized FlowerPower project [bold blue]{name}[/bold blue] at [italic green]{base_dir}[/italic green]\n"

src/flowerpower/http/api/cfg.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import os
1+
import posixpath
22

33
from sanic import Blueprint
44
from sanic.exceptions import SanicException
@@ -36,7 +36,7 @@ async def update_pipeline(request, pipeline_name, body: PipelineConfig) -> json:
3636
cfg.update(data)
3737
try:
3838
cfg.to_yaml(
39-
os.path.join(
39+
posixpath.join(
4040
"pipelines",
4141
pipeline_name + ".yml",
4242
),

src/flowerpower/io/base.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from fsspec.utils import get_protocol
1111
from pydantic import BaseModel, ConfigDict
1212

13-
from ..utils.filesystem import get_filesystem
13+
from ..filesystem import get_filesystem
1414
from ..utils.polars import pl
1515
from ..utils.sql import sql2polars_filter, sql2pyarrow_filter
1616
from ..utils.storage_options import (
@@ -67,7 +67,7 @@ class BaseFileIO(BaseModel):
6767
format: str | None = None
6868

6969
def model_post_init(self, __context):
70-
# self._update_storage_options_from_env()
70+
self._raw_path = self.path
7171
if isinstance(self.storage_options, dict):
7272
if "protocol" not in self.storage_options:
7373
self.storage_options["protocol"] = get_protocol(self.path)
@@ -569,7 +569,7 @@ def to_pyarrow_dataset(
569569
**kwargs,
570570
)
571571
elif self.format == "parquet":
572-
if self.fs.exists(os.path.join(self._path, "_metadata")):
572+
if self.fs.exists(posixpath.join(self._path, "_metadata")):
573573
self._dataset = self.fs.parquet_dataset(
574574
self._path,
575575
schema=self.schema_,
@@ -738,7 +738,7 @@ class BaseFileWriter(BaseFileIO):
738738
| pd.DataFrame
739739
| dict[str, Any]
740740
| list[pl.DataFrame | pl.LazyFrame | pa.Table | pd.DataFrame | dict[str, Any]]
741-
)
741+
) | None = None
742742
basename: str | None = None
743743
concat: bool = False
744744
mode: str = "append" # append, overwrite, delete_matching, error_if_exists
@@ -788,7 +788,7 @@ class BaseDatasetWriter(BaseFileWriter):
788788
| pd.DataFrame
789789
| dict[str, Any]
790790
]
791-
)
791+
) | None = None
792792
basename: str | None = None
793793
schema_: pa.Schema | None = None
794794
partition_by: str | list[str] | pds.Partitioning | None = None
@@ -798,7 +798,7 @@ class BaseDatasetWriter(BaseFileWriter):
798798
max_rows_per_file: int | None = 2_500_000
799799
concat: bool = False
800800
mode: str = "append" # append, overwrite, delete_matching, error_if_exists
801-
is_pydala_dataset: bool = (False,)
801+
is_pydala_dataset: bool = False
802802

803803
def write(
804804
self,

0 commit comments

Comments
 (0)