Skip to content

Commit a849ec3

Browse files
committed
Enhance pipeline functionality by serializing final variables and updating optional dependencies in pyproject.toml
1 parent 26b1dd0 commit a849ec3

File tree

6 files changed

+243
-124
lines changed

6 files changed

+243
-124
lines changed

pyproject.toml

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,18 @@ flowerpower = "flowerpower.cli:app"
3434

3535
[project.optional-dependencies]
3636
# dask = ["dask[complete]>=2024.7.1"]
37-
filesystem-ext = [
38-
"pyarrow>=18.1.0",
39-
"polars>=1.15.0", #"duckdb>=1.1.3", #"datafusion>=42.0.0",
37+
#filesystem-ext = [
38+
# "pyarrow>=18.1.0",
39+
# "polars>=1.15.0", #"duckdb>=1.1.3", #"datafusion>=42.0.0",
40+
# "orjson>=3.10.12",
41+
#]
42+
io = [
43+
"datafusion>=43.1.0",
44+
"duckdb>=1.1.3",
4045
"orjson>=3.10.12",
46+
"polars>=1.15.0",
47+
"pyarrow>=18.1.0",
48+
"pydala2>=0.9.4.5",
4149
]
4250
mongodb = ["pymongo>=4.7.2"]
4351
mqtt = ["paho-mqtt>=2.1.0", "orjson>=3.10.11"]

src/flowerpower/http/api/pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,8 @@ async def run(
4545
) as pipeline:
4646
final_vars = pipeline.run(**body.model_dump())
4747

48-
# final_vars = {k: dill.dumps(v) for k, v in final_vars.items()}
49-
# return bytes(final_vars)
48+
final_vars = {k: dill.dumps(v) for k, v in final_vars.items()}
49+
return bytes(final_vars)
5050
return json({"status": "success", "message": "Pipeline ran successfully"})
5151
except Exception as e:
5252
raise SanicException(str(e))

src/flowerpower/http/api/scheduler.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import uuid
22

33
import dill
4-
from sanic import Blueprint
4+
from sanic import Blueprint
55
from sanic.exceptions import SanicException
66
from sanic.response import json, raw
77
from sanic_ext import openapi
@@ -11,7 +11,7 @@
1111
SchedulerModify,
1212
SchedulerDelete,
1313
SchedulerList,
14-
SchedulerInfo
14+
SchedulerInfo,
1515
)
1616
from ..utils import deserialize_and_validate
1717

@@ -62,16 +62,15 @@ async def job_result(request, job_id: str) -> raw:
6262

6363

6464
@bp.get("/schedules")
65-
@openapi.summary("List all schedules")
65+
@openapi.summary("List all schedules")
6666
@openapi.description("Get a list of all pipeline schedules")
6767
@openapi.body({"application/json": SchedulerList}, required=False)
6868
@openapi.response(200, {"application/json": dict})
6969
async def schedules(request) -> json:
7070
try:
7171
body = await deserialize_and_validate(SchedulerList, query=request.args)
7272
schedules = request.app.ctx.scheduler.get_schedules(
73-
pattern=body.pattern,
74-
as_dict=True
73+
pattern=body.pattern, as_dict=True
7574
)
7675
return json({"schedules": schedules})
7776
except Exception as e:
@@ -87,7 +86,7 @@ async def schedules(request) -> json:
8786
@openapi.response(404, {"application/json": dict})
8887
async def schedule(request, schedule_id: str) -> json:
8988
try:
90-
body = await deserialize_and_validate(SchedulerInfo, body=request.json)
89+
# body = await deserialize_and_validate(SchedulerInfo, body=request.json)
9190
if schedule_id not in [s.id for s in request.app.ctx.scheduler.get_schedules()]:
9291
raise SanicException("Schedule not found", status_code=404)
9392
schedule = request.app.ctx.scheduler.get_schedule(schedule_id, as_dict=True)
@@ -113,15 +112,15 @@ async def add_schedule(request) -> json:
113112
@bp.patch("/schedule/<schedule_id>")
114113
@openapi.summary("Modify schedule")
115114
@openapi.description("Modify an existing pipeline schedule")
116-
@openapi.parameter("schedule_id", str, required=True)
115+
@openapi.parameter("schedule_id", str, required=True)
117116
@openapi.body({"application/json": SchedulerModify}, required=True)
118117
@openapi.response(200, {"application/json": dict})
119118
@openapi.response(404, {"application/json": dict})
120119
async def modify_schedule(request, schedule_id: str) -> json:
121120
try:
122121
if schedule_id not in [s.id for s in request.app.ctx.scheduler.get_schedules()]:
123122
raise SanicException("Schedule not found", status_code=404)
124-
123+
125124
body = await deserialize_and_validate(SchedulerModify, body=request.json)
126125
request.app.ctx.scheduler.modify_schedule(schedule_id, **body.model_dump())
127126
return json({"status": "success"})
@@ -138,7 +137,7 @@ async def modify_schedule(request, schedule_id: str) -> json:
138137
@openapi.response(404, {"application/json": dict})
139138
async def remove_schedule(request, schedule_id: str) -> json:
140139
try:
141-
body = await deserialize_and_validate(SchedulerDelete, body=request.json)
140+
# body = await deserialize_and_validate(SchedulerDelete, body=request.json)
142141
if schedule_id not in [s.id for s in request.app.ctx.scheduler.get_schedules()]:
143142
raise SanicException("Schedule not found", status_code=404)
144143
request.app.ctx.scheduler.remove_schedule(schedule_id)

src/flowerpower/io/base.py

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def _update_fs(self):
121121
dirfs=False,
122122
)
123123

124+
@property
124125
def _glob_path(self):
125126
if isinstance(self.path, list):
126127
return
@@ -140,7 +141,7 @@ def list_files(self):
140141
if isinstance(self.path, list):
141142
return self.path
142143

143-
glob_path = self._glob_path()
144+
glob_path = self._glob_path
144145
return self.fs.glob(glob_path)
145146

146147

@@ -180,18 +181,16 @@ class BaseFileLoader(BaseFileIO):
180181

181182
include_file_path: bool = False
182183
concat: bool = True
184+
batch_size: int | None = None
183185
conn: duckdb.DuckDBPyConnection | None = None
184186
ctx: datafusion.SessionContext | None = None
187+
jsonlines: bool | None = None
185188

186189
def _load(self, **kwargs):
187-
if "include_file_path" not in kwargs:
188-
self.include_file_path = kwargs.get(
189-
"include_file_path", self.include_file_path
190-
)
191-
if "concat" not in kwargs:
192-
self.concat = kwargs.get("concat", self.concat)
190+
self.include_file_path = kwargs.get("include_file_path", self.include_file_path)
191+
self.concat = kwargs.get("concat", self.concat)
193192

194-
if not hasattr(self, "_data"):
193+
if not hasattr(self, "_data") or self._data is None:
195194
self._data = self.fs.read_files(
196195
self._glob_path,
197196
self.format,
@@ -208,8 +207,11 @@ def _load(self, **kwargs):
208207
reload = True
209208
if isinstance(self._data, list) and self.concat:
210209
self._data = pl.concat(self._data, how="diagonal_relaxed")
210+
211211
if reload:
212-
self._load(
212+
self._data = self.fs.read_files(
213+
self._glob_path,
214+
self.format,
213215
include_file_path=self.include_file_path,
214216
concat=self.concat,
215217
jsonlines=self.jsonlines or None,

0 commit comments

Comments
 (0)