Skip to content

Commit 281f2a9

Browse files
committed
Bump version to 0.9.6.3; update type hints, improve MQTT client configuration, and enhance storage options handling
1 parent 46700b9 commit 281f2a9

File tree

6 files changed

+108
-101
lines changed

6 files changed

+108
-101
lines changed

pyproject.toml

Lines changed: 6 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
11
[project]
22
name = "FlowerPower"
33
description = "A simple workflow framework. Hamilton + APScheduler = FlowerPower"
4-
authors = [{name = "Volker L.", email = "[email protected]"}]
4+
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.9.6.2"
8-
keywords = [
9-
"hamilton",
10-
"workflow",
11-
"pipeline",
12-
"scheduler",
13-
"apscheduler",
14-
"dask",
15-
"ray",
16-
]
7+
version = "0.9.6.3"
8+
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "apscheduler", "dask", "ray"]
179
dependencies = [
1810
"sf-hamilton[visualization]>=1.69.0",
1911
"python-dotenv>=1.0.1",
@@ -32,21 +24,14 @@ dependencies = [
3224

3325

3426
[project.urls]
35-
"Homepage" = "https://github.com/legout/flowerpower"
27+
"Homepage" = "https://github.com/legout/flowerpower"
3628
"Bug Tracker" = "https://github.com/legout/flowerpower/issues"
3729

3830
[project.scripts]
3931
flowerpower = "flowerpower.cli:app"
4032

4133
[project.optional-dependencies]
42-
io = [
43-
"datafusion>=43.1.0",
44-
"duckdb>=1.1.3",
45-
"orjson>=3.10.12",
46-
"polars>=1.15.0",
47-
"pyarrow>=18.1.0",
48-
"pydala2>=0.9.4.5",
49-
]
34+
io = ["datafusion>=43.1.0", "duckdb>=1.1.3", "orjson>=3.10.12", "polars>=1.15.0", "pyarrow>=18.1.0", "pydala2>=0.9.4.5"]
5035
mongodb = ["pymongo>=4.7.2"]
5136
mqtt = ["paho-mqtt>=2.1.0", "orjson>=3.10.11"]
5237
opentelemetry = [
@@ -65,11 +50,7 @@ scheduler = [
6550
]
6651
tui = ["textual>=0.85.2"]
6752
ui = ["sf-hamilton-ui>=0.0.11"]
68-
webserver = [
69-
"sanic>=24.6.0",
70-
"sanic-ext>=23.12.0",
71-
"orjson>=3.10.11",
72-
]
53+
webserver = ["sanic>=24.6.0", "sanic-ext>=23.12.0", "orjson>=3.10.11"]
7354

7455
#[build-system]
7556
#build-backend = "hatchling.build"

src/flowerpower/io/base.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import os
1+
import posixpath
22
from typing import Any, Generator
33

44
import datafusion
@@ -14,11 +14,11 @@
1414
from ..utils.polars import pl
1515
from ..utils.sql import sql2polars_filter, sql2pyarrow_filter
1616
from ..utils.storage_options import (
17-
# AwsStorageOptions,
18-
# AzureStorageOptions,
19-
# GcsStorageOptions,
20-
# GitHubStorageOptions,
21-
# GitLabStorageOptions,
17+
AwsStorageOptions,
18+
AzureStorageOptions,
19+
GcsStorageOptions,
20+
GitHubStorageOptions,
21+
GitLabStorageOptions,
2222
StorageOptions,
2323
)
2424
import importlib
@@ -62,7 +62,16 @@ class BaseFileIO(BaseModel):
6262

6363
model_config = ConfigDict(arbitrary_types_allowed=True)
6464
path: str | list[str]
65-
storage_options: StorageOptions | dict[str, Any] | None = None
65+
storage_options: (
66+
StorageOptions
67+
| AwsStorageOptions
68+
| AzureStorageOptions
69+
| GcsStorageOptions
70+
| GitLabStorageOptions
71+
| GitHubStorageOptions
72+
| dict[str, Any]
73+
| None
74+
) = None
6675
fs: AbstractFileSystem | None = None
6776
format: str | None = None
6877

@@ -74,6 +83,8 @@ def model_post_init(self, __context):
7483
self.storage_options = StorageOptions(
7584
**self.storage_options
7685
).storage_options
86+
if isinstance(self.storage_options, StorageOptions):
87+
self.storage_options = self.storage_options.storage_options
7788

7889
if self.fs is None:
7990
self.fs = get_filesystem(
@@ -326,7 +337,6 @@ def iter_polars(
326337
batch_size: int = 1,
327338
**kwargs,
328339
) -> Generator[pl.DataFrame | pl.LazyFrame, None, None]:
329-
330340
if lazy:
331341
yield from self._iter_polars_lazyframe(batch_size=batch_size, **kwargs)
332342
yield from self._iter_polars_dataframe(batch_size=batch_size, **kwargs)
@@ -754,13 +764,13 @@ def write(
754764
| list[
755765
pl.DataFrame | pl.LazyFrame | pa.Table | pd.DataFrame | dict[str, Any]
756766
]
757-
) | None = None,
767+
)
768+
| None = None,
758769
basename: str | None = None,
759770
concat: bool = False,
760771
mode: str = "append",
761772
**kwargs,
762773
):
763-
764774
self.fs.write_files(
765775
data=data or self.data,
766776
basename=basename or self.basename,
@@ -819,7 +829,8 @@ def write(
819829
| pd.DataFrame
820830
| dict[str, Any]
821831
]
822-
) | None = None,
832+
)
833+
| None = None,
823834
unique: bool | list[str] | str = False,
824835
delta_subset: str | None = None,
825836
alter_schema: bool = False,

src/flowerpower/io/loader/deltatable.py

Lines changed: 61 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -11,19 +11,21 @@
1111
# from hamilton.function_modifiers import dataloader
1212

1313
from ...utils.polars import pl
14-
from ..base import BaseFileIO
14+
from ..base import BaseDatasetLoader
1515

1616
# from ..utils import get_dataframe_metadata, get_delta_metadata
1717

1818

19-
class DeltaTableLoader(BaseFileIO):
19+
class DeltaTableLoader(BaseDatasetLoader):
2020
"""Delta table loader.
2121
2222
This class is responsible for loading Delta tables into several dataframe formats,
2323
duckdb and datafusion.
2424
2525
"""
2626

27+
delta_table: DeltaTable | None = None
28+
2729
def model_post_init(self, __context):
2830
super().model_post_init(__context)
2931
self.format = "delta"
@@ -39,63 +41,63 @@ def dt(self) -> DeltaTable:
3941

4042
def to_pyarrow_dataset(self) -> pds.Dataset:
4143
"""Converts the DeltaTable to a PyArrow Dataset."""
42-
self.delta_table.to_pyarrow_dataset()
44+
return self.delta_table.to_pyarrow_dataset()
4345

4446
def to_pyrrow_table(self) -> pa.Table:
4547
"""Converts the DeltaTable to a PyArrow Table."""
46-
self.delta_table.to_pyarrow_table()
47-
48-
def to_polars(self, lazy: bool = True) -> pl.DataFrame | pl.LazyFrame:
49-
"""Converts the DeltaTable to a Polars DataFrame."""
50-
if lazy:
51-
return pl.scan_pyarrow_dataset(self.to_pyarrow_dataset())
52-
53-
return pl.from_arrow(self.to_pyarrow_table())
54-
55-
def to_pandas(self) -> pd.DataFrame:
56-
"""Converts the DeltaTable to a Pandas DataFrame."""
57-
return self.to_pyrrow_table().to_pandas()
58-
59-
def to_duckdb_relation(
60-
self, conn: duckdb.DuckDBPyConnection | None = None, lazy: bool = True
61-
) -> duckdb.DuckDBPyRelation:
62-
"""Converts the DeltaTable to a DuckDB relation."""
63-
if conn is None:
64-
conn = duckdb.connect()
65-
if lazy:
66-
return conn.from_arrow(self.to_pyarrow_dataset())
67-
return conn.from_arrow(self.to_pyarrow_table())
68-
69-
def register_in_duckdb(
70-
self,
71-
conn: duckdb.DuckDBPyConnection | None = None,
72-
name: str | None = None,
73-
lazy: bool = True,
74-
) -> duckdb.DuckDBPyConnection:
75-
"""Registers the DeltaTable in a DuckDB connection."""
76-
if name is None:
77-
name = f"{self.format}:{self.path}"
78-
if conn is None:
79-
conn = duckdb.connect()
80-
table = self.to_duckdb_relation(conn=conn, lazy=lazy)
81-
conn.register(name, table)
82-
return conn
83-
84-
def register_in_datafusion(
85-
self,
86-
ctx: dtf.SessionContext | None = None,
87-
name: str | None = None,
88-
lazy: bool = True,
89-
) -> dtf.SessionContext:
90-
"""Registers the DeltaTable in a DataFusion context."""
91-
if name is None:
92-
name = f"{self.format}:{self.path}"
93-
if ctx is None:
94-
ctx = dtf.SessionContext()
95-
if lazy:
96-
ctx.register_dataset(name, self.to_pyarrow_dataset())
97-
else:
98-
ctx.register_record_batches(name, [self.to_pyarrow_table().to_batches()])
99-
# table = table.to_table()
100-
# ctx.register_dataset(name, table)
101-
return ctx
48+
return self.delta_table.to_pyarrow_table()
49+
50+
# def to_polars(self, lazy: bool = True) -> pl.DataFrame | pl.LazyFrame:
51+
# """Converts the DeltaTable to a Polars DataFrame."""
52+
# if lazy:
53+
# return pl.scan_pyarrow_dataset(self.to_pyarrow_dataset())
54+
55+
# return pl.from_arrow(self.to_pyarrow_table())
56+
57+
# def to_pandas(self) -> pd.DataFrame:
58+
# """Converts the DeltaTable to a Pandas DataFrame."""
59+
# return self.to_pyrrow_table().to_pandas()
60+
61+
# def to_duckdb_relation(
62+
# self, conn: duckdb.DuckDBPyConnection | None = None, lazy: bool = True
63+
# ) -> duckdb.DuckDBPyRelation:
64+
# """Converts the DeltaTable to a DuckDB relation."""
65+
# if conn is None:
66+
# conn = duckdb.connect()
67+
# if lazy:
68+
# return conn.from_arrow(self.to_pyarrow_dataset())
69+
# return conn.from_arrow(self.to_pyarrow_table())
70+
71+
# def register_in_duckdb(
72+
# self,
73+
# conn: duckdb.DuckDBPyConnection | None = None,
74+
# name: str | None = None,
75+
# lazy: bool = True,
76+
# ) -> duckdb.DuckDBPyConnection:
77+
# """Registers the DeltaTable in a DuckDB connection."""
78+
# if name is None:
79+
# name = f"{self.format}:{self.path}"
80+
# if conn is None:
81+
# conn = duckdb.connect()
82+
# table = self.to_duckdb_relation(conn=conn, lazy=lazy)
83+
# conn.register(name, table)
84+
# return conn
85+
86+
# def register_in_datafusion(
87+
# self,
88+
# ctx: dtf.SessionContext | None = None,
89+
# name: str | None = None,
90+
# lazy: bool = True,
91+
# ) -> dtf.SessionContext:
92+
# """Registers the DeltaTable in a DataFusion context."""
93+
# if name is None:
94+
# name = f"{self.format}:{self.path}"
95+
# if ctx is None:
96+
# ctx = dtf.SessionContext()
97+
# if lazy:
98+
# ctx.register_dataset(name, self.to_pyarrow_dataset())
99+
# else:
100+
# ctx.register_record_batches(name, [self.to_pyarrow_table().to_batches()])
101+
# # table = table.to_table()
102+
# # ctx.register_dataset(name, table)
103+
# return ctx

src/flowerpower/io/saver/deltatable.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ class DeltaTableWriter(BaseDatasetWriter):
2020
```
2121
"""
2222

23-
description = None
23+
description: str | None = None
2424

2525
def model_post_init(self, __context):
26+
super().model_post_init(__context)
2627
self.format = "delta"
2728

2829
def write(
@@ -44,7 +45,8 @@ def write(
4445
| pd.DataFrame
4546
| dict[str, Any]
4647
]
47-
) | None = None,
48+
)
49+
| None = None,
4850
mode: str = "append", # "overwrite" | "append" | "error | "ignore"
4951
schema: pa.Schema | None = None,
5052
schema_mode: str | None = None, # "merge" | "overwrite"
@@ -76,7 +78,7 @@ def write(
7678
data = [data]
7779
if isinstance(data[0], dict):
7880
data = [_dict_to_dataframe(d) for d in data]
79-
if isinstance(data[0], pa.LazyFrame):
81+
if isinstance(data[0], pl.LazyFrame):
8082
data = [d.collect() for d in data]
8183
if isinstance(data[0], pl.DataFrame):
8284
data = pl.concat(data, how="diagonal_relaxed").to_arrow()

src/flowerpower/mqtt.py

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,8 +230,8 @@ def from_event_broker(cls, base_dir: str):
230230
if event_broker_cfg is not None:
231231
if event_broker_cfg.get("type", None) == "mqtt":
232232
return cls(
233-
user=event_broker_cfg.get("user", None),
234-
pw=event_broker_cfg.get("pw", None),
233+
user=event_broker_cfg.get("username", None),
234+
pw=event_broker_cfg.get("password", None),
235235
host=event_broker_cfg.get("host", "localhost"),
236236
port=event_broker_cfg.get("port", 1883),
237237
transport=event_broker_cfg.get("transport", "tcp"),
@@ -387,6 +387,10 @@ def start_pipeline_listener(
387387
storage_options: dict = {},
388388
fs: AbstractFileSystem | None = None,
389389
background: bool = False,
390+
host: str | None = None,
391+
port: int | None = None,
392+
username: str | None = None,
393+
password: str | None = None,
390394
**kwargs,
391395
):
392396
"""
@@ -410,6 +414,13 @@ def start_pipeline_listener(
410414
background: Run the listener in the background
411415
**kwargs: Additional keyword arguments
412416
"""
417+
if host and port:
418+
client = MQTTClient(
419+
user=username,
420+
pw=password,
421+
host=host,
422+
port=port,
423+
)
413424
client = MQTTClient.from_event_broker(base_dir=base_dir)
414425
client.start_pipeline_listener(
415426
name=name,

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)