Skip to content

Commit 1dd7806

Browse files
committed
Update MQTT configuration and refactor related classes for improved clarity and functionality
1 parent c450712 commit 1dd7806

File tree

7 files changed

+28
-17
lines changed

7 files changed

+28
-17
lines changed

examples/hello-world/conf/project.yml

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ worker:
1414
uri: sqlite+aiosqlite:///examples/hello-world/flowerpower.db
1515
#uri: postgresql+asyncpg://postgres:password@localhost/flowerpower
1616
event_broker:
17-
type: redis
18-
uri: redis://localhost:6379
17+
#type: redis
18+
#uri: redis://localhost:6379
19+
type: mqtt
20+
host: localhost
21+
port: 1883
22+
username: null
23+
password: null
1924
max_concurrent_jobs: 100

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "A simple workflow framework. Hamilton + APScheduler = FlowerPower
44
authors = [{ name = "Volker L.", email = "[email protected]" }]
55
readme = "README.md"
66
requires-python = ">= 3.11"
7-
version = "0.9.8.6"
7+
version = "0.9.8.7"
88
keywords = ["hamilton", "workflow", "pipeline", "scheduler", "apscheduler", "dask", "ray"]
99
dependencies = [
1010
'aiobotocore<2.18.0',

src/flowerpower/fs/ext.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -868,7 +868,7 @@ def pydala_dataset(
868868
path: str,
869869
partitioning: str | list[str] | pds.Partitioning = None,
870870
**kwargs,
871-
) -> ParquetDataset:
871+
) -> ParquetDataset: # type: ignore
872872
"""
873873
Create a pydala dataset.
874874

src/flowerpower/io/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -678,7 +678,7 @@ def to_pyarrow_table(self, **kwargs) -> pa.Table:
678678
self.to_pyarrow_dataset(**kwargs)
679679
return self._dataset.to_table()
680680

681-
def to_pydala_dataset(self, **kwargs) -> ParquetDataset:
681+
def to_pydala_dataset(self, **kwargs) -> "ParquetDataset":
682682
if ParquetDataset is None:
683683
raise ImportError("pydala is not installed.")
684684
if not hasattr(self, "_pydala_dataset"):

src/flowerpower/io/loader/mqtt.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,20 +34,20 @@ def _to_polars_dataframe(self) -> pl.DataFrame:
3434
def _to_polars_lazyframe(self) -> pl.LazyFrame:
3535
return pl.LazyFrame(self.payload)
3636

37-
def to_poars(self, lazy: bool = False) -> pl.DataFrame | pl.LazyFrame:
37+
def to_polars(self, lazy: bool = False) -> pl.DataFrame | pl.LazyFrame:
3838
if lazy:
3939
return self._to_polars_lazyframe()
4040
else:
4141
return self._to_polars_dataframe()
4242

43-
def to_duckdb(self, conn: duckdb.DuckDBPyConnection | None = None):
43+
def to_duckdb_relation(self, conn: duckdb.DuckDBPyConnection | None = None):
4444
if self.conn is None:
4545
if conn is None:
4646
conn = duckdb.connect()
4747
self.conn = conn
4848
return self.conn.from_arrow(self.to_pyarrow_table())
4949

50-
def to_dataset(self, **kwargs) -> pds.Dataset:
50+
def to_pyarrow_dataset(self, **kwargs) -> pds.Dataset:
5151
return pds.dataset(self.to_pyarrow_table(), **kwargs)
5252

5353
def register_in_duckdb(

src/flowerpower/mqtt.py

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
class MQTTClient:
1717
def __init__(
1818
self,
19-
user: str | None = None,
20-
pw: str | None = None,
19+
username: str | None = None,
20+
password: str | None = None,
2121
host: str | None = "localhost",
2222
port: int | None = 1883,
2323
topic: str | None = None,
@@ -26,11 +26,17 @@ def __init__(
2626
reconnect_rate: int = 2,
2727
max_reconnect_delay: int = 60,
2828
transport: str = "tcp",
29+
**kwargs,
2930
):
31+
if "user" in kwargs:
32+
username = kwargs["user"]
33+
if "pw" in kwargs:
34+
password = kwargs["pw"]
35+
3036
self.topic = topic
3137

32-
self._user = user
33-
self._pw = pw
38+
self._username = username
39+
self._password = password
3440
self._host = host
3541
self._port = port
3642
self._first_reconnect_delay = first_reconnect_delay
@@ -136,8 +142,8 @@ def connect(self) -> Client:
136142
client_id=f"flowerpower-{random.randint(0, 10000)}",
137143
transport=self._transport,
138144
userdata=Munch(
139-
user=self._user,
140-
pw=self._pw,
145+
user=self._username,
146+
pw=self._password,
141147
host=self._host,
142148
port=self._port,
143149
topic=self.topic,
@@ -148,8 +154,8 @@ def connect(self) -> Client:
148154
transport=self._transport,
149155
),
150156
)
151-
if self._pw != "" and self._user != "":
152-
client.username_pw_set(self._user, self._pw)
157+
if self._password != "" and self._username != "":
158+
client.username_pw_set(self._username, self._password)
153159

154160
client.on_connect = self._on_connect
155161
client.on_disconnect = self._on_disconnect

uv.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)