Skip to content

Commit bacbb35

Browse files
authored
Merge pull request #87 from yassun7010/add_fetch_pyarrow
feat: add fetch_arrow*.
2 parents 4f52d6a + 11b7b84 commit bacbb35

File tree

10 files changed

+232
-49
lines changed

10 files changed

+232
-49
lines changed

turu-snowflake/poetry.lock

Lines changed: 28 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

turu-snowflake/pyproject.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ classifiers = [
2828
]
2929

3030
[tool.poetry.extras]
31-
pandas = ["pandas", "pyarrow", "pandas-stubs"]
31+
pandas = ["pandas", "pyarrow"]
3232
pandera = ["pandera"]
3333

3434
[tool.poetry.dependencies]
@@ -64,8 +64,8 @@ pydantic = "^2.5.2"
6464
pytest-xdist = "^3.5.0"
6565
pytest-cov = "^4.1.0"
6666
pytest-asyncio = "^0.23.2"
67-
68-
pyarrow-stubs = { version = "^10.0.1.7", optional = true }
67+
pyarrow-stubs = "^10.0.1.7"
68+
pandas-stubs = "^2.1.4.231227"
6969
numpy = "^1.26.3"
7070

7171
[tool.taskipy.tasks]

turu-snowflake/src/turu/snowflake/async_connection.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -183,23 +183,23 @@ async def execute_map(
183183
@overload
184184
async def execute_map(
185185
self,
186-
row_type: Type[GenericNewPanderaDataFrameModel],
186+
row_type: Type[GenericNewPyArrowTable],
187187
operation: str,
188188
parameters: "Optional[Any]" = None,
189189
/,
190190
**options: Unpack[ExecuteOptions],
191-
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
191+
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
192192
...
193193

194194
@overload
195195
async def execute_map(
196196
self,
197-
row_type: Type[GenericNewPyArrowTable],
197+
row_type: Type[GenericNewPanderaDataFrameModel],
198198
operation: str,
199199
parameters: "Optional[Any]" = None,
200200
/,
201201
**options: Unpack[ExecuteOptions],
202-
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
202+
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
203203
...
204204

205205
@override
@@ -267,23 +267,23 @@ async def executemany_map(
267267
@overload
268268
async def executemany_map(
269269
self,
270-
row_type: Type[GenericNewPanderaDataFrameModel],
270+
row_type: Type[GenericNewPyArrowTable],
271271
operation: str,
272272
seq_of_parameters: Sequence[Any],
273273
/,
274274
**options: Unpack[ExecuteOptions],
275-
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
275+
) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]:
276276
...
277277

278278
@overload
279279
async def executemany_map(
280280
self,
281-
row_type: Type[GenericNewPyArrowTable],
281+
row_type: Type[GenericNewPanderaDataFrameModel],
282282
operation: str,
283283
seq_of_parameters: Sequence[Any],
284284
/,
285285
**options: Unpack[ExecuteOptions],
286-
) -> AsyncCursor[Never, Never, GenericNewPyArrowTable]:
286+
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
287287
...
288288

289289
@override

turu-snowflake/src/turu/snowflake/async_cursor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -151,23 +151,23 @@ async def execute_map(
151151
@overload
152152
async def execute_map(
153153
self,
154-
row_type: Type[GenericNewPanderaDataFrameModel],
154+
row_type: Type[GenericNewPyArrowTable],
155155
operation: str,
156156
parameters: "Optional[Any]" = None,
157157
/,
158158
**options: Unpack[ExecuteOptions],
159-
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
159+
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
160160
...
161161

162162
@overload
163163
async def execute_map(
164164
self,
165-
row_type: Type[GenericNewPyArrowTable],
165+
row_type: Type[GenericNewPanderaDataFrameModel],
166166
operation: str,
167167
parameters: "Optional[Any]" = None,
168168
/,
169169
**options: Unpack[ExecuteOptions],
170-
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
170+
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
171171
...
172172

173173
@override
@@ -227,23 +227,23 @@ async def executemany_map(
227227
@overload
228228
async def executemany_map(
229229
self,
230-
row_type: Type[GenericNewPanderaDataFrameModel],
230+
row_type: Type[GenericNewPyArrowTable],
231231
operation: str,
232232
seq_of_parameters: "Sequence[Any]",
233233
/,
234234
**options: Unpack[ExecuteOptions],
235-
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
235+
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
236236
...
237237

238238
@overload
239239
async def executemany_map(
240240
self,
241-
row_type: Type[GenericNewPyArrowTable],
241+
row_type: Type[GenericNewPanderaDataFrameModel],
242242
operation: str,
243243
seq_of_parameters: "Sequence[Any]",
244244
/,
245245
**options: Unpack[ExecuteOptions],
246-
) -> "AsyncCursor[Never, Never, GenericNewPyArrowTable]":
246+
) -> "AsyncCursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
247247
...
248248

249249
@override

turu-snowflake/src/turu/snowflake/connection.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -193,23 +193,23 @@ def execute_map(
193193
@overload
194194
def execute_map(
195195
self,
196-
row_type: Type[GenericNewPanderaDataFrameModel],
196+
row_type: Type[GenericNewPyArrowTable],
197197
operation: str,
198198
parameters: "Optional[Any]" = None,
199199
/,
200200
**options: Unpack[ExecuteOptions],
201-
) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]:
201+
) -> Cursor[Never, Never, GenericNewPyArrowTable]:
202202
...
203203

204204
@overload
205205
def execute_map(
206206
self,
207-
row_type: Type[GenericNewPyArrowTable],
207+
row_type: Type[GenericNewPanderaDataFrameModel],
208208
operation: str,
209209
parameters: "Optional[Any]" = None,
210210
/,
211211
**options: Unpack[ExecuteOptions],
212-
) -> Cursor[Never, Never, GenericNewPyArrowTable]:
212+
) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]:
213213
...
214214

215215
@override
@@ -277,23 +277,23 @@ def executemany_map(
277277
@overload
278278
def executemany_map(
279279
self,
280-
row_type: Type[GenericNewPanderaDataFrameModel],
280+
row_type: Type[GenericNewPyArrowTable],
281281
operation: str,
282282
seq_of_parameters: Sequence[Any],
283283
/,
284284
**options: Unpack[ExecuteOptions],
285-
) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]:
285+
) -> Cursor[Never, Never, GenericNewPyArrowTable]:
286286
...
287287

288288
@overload
289289
def executemany_map(
290290
self,
291-
row_type: Type[GenericNewPyArrowTable],
291+
row_type: Type[GenericNewPanderaDataFrameModel],
292292
operation: str,
293293
seq_of_parameters: Sequence[Any],
294294
/,
295295
**options: Unpack[ExecuteOptions],
296-
) -> Cursor[Never, Never, GenericNewPyArrowTable]:
296+
) -> Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]:
297297
...
298298

299299
@override

turu-snowflake/src/turu/snowflake/cursor.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -225,24 +225,24 @@ def executemany_map(
225225
@overload
226226
def executemany_map(
227227
self,
228-
row_type: Type[GenericNewPanderaDataFrameModel],
228+
row_type: Type[GenericNewPyArrowTable],
229229
operation: str,
230-
seq_of_parameters: Sequence[Any],
230+
seq_of_parameters: "Sequence[Any]",
231231
/,
232232
**options: Unpack[ExecuteOptions],
233-
) -> "Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
234-
...
233+
) -> "Cursor[Never, Never, GenericNewPyArrowTable]":
234+
pass
235235

236236
@overload
237237
def executemany_map(
238238
self,
239-
row_type: Type[GenericNewPyArrowTable],
239+
row_type: Type[GenericNewPanderaDataFrameModel],
240240
operation: str,
241-
seq_of_parameters: "Sequence[Any]",
241+
seq_of_parameters: Sequence[Any],
242242
/,
243243
**options: Unpack[ExecuteOptions],
244-
) -> "Cursor[Never, Never, GenericNewPyArrowTable]":
245-
pass
244+
) -> "Cursor[Never, PanderaDataFrame[GenericNewPanderaDataFrameModel], Never]":
245+
...
246246

247247
@override
248248
def executemany_map(

turu-snowflake/src/turu/snowflake/record/async_record_cursor.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,3 +50,35 @@ async def fetch_pandas_batches(
5050

5151
async for batch in batches:
5252
yield batch
53+
54+
async def fetch_arrow_all(self) -> GenericPyArrowTable:
55+
table = cast(GenericPyArrowTable, await self._cursor.fetch_arrow_all()) # type: ignore[assignment]
56+
57+
if isinstance(self._recorder, turu.core.record.CsvRecorder):
58+
if limit := self._recorder._options.get("limit"):
59+
table = table.slice(0, limit)
60+
61+
table.to_pandas().to_csv(
62+
self._recorder.file,
63+
index=False,
64+
header=self._recorder._options.get("header", True),
65+
)
66+
67+
return table
68+
69+
async def fetch_arrow_batches(self) -> AsyncIterator[GenericPyArrowTable]:
70+
batches = cast(
71+
AsyncIterator[GenericPyArrowTable],
72+
self._cursor.fetch_arrow_batches(), # type: ignore[assignment]
73+
)
74+
if isinstance(self._recorder, turu.core.record.CsvRecorder):
75+
if limit := self._recorder._options.get("limit"):
76+
async for batch in batches:
77+
yield batch.slice(0, limit)
78+
79+
limit -= len(batch)
80+
if limit <= 0:
81+
return
82+
83+
async for batch in batches:
84+
yield batch

turu-snowflake/src/turu/snowflake/record/record_cursor.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,34 @@ def fetch_pandas_batches(self, **kwargs) -> Iterator[GenericPandasDataFrame]:
4343
return
4444

4545
return batches
46+
47+
def fetch_arrow_all(self) -> GenericPyArrowTable:
48+
table = cast(GenericPyArrowTable, self._cursor.fetch_arrow_all()) # type: ignore[assignment]
49+
50+
if isinstance(self._recorder, turu.core.record.CsvRecorder):
51+
if limit := self._recorder._options.get("limit"):
52+
table = table.slice(0, limit)
53+
54+
table.to_pandas().to_csv(
55+
self._recorder.file,
56+
index=False,
57+
header=self._recorder._options.get("header", True),
58+
)
59+
60+
return table
61+
62+
def fetch_arrow_batches(self) -> Iterator[GenericPyArrowTable]:
63+
batches = cast(
64+
Iterator[GenericPyArrowTable],
65+
self._cursor.fetch_arrow_batches(), # type: ignore[assignment]
66+
)
67+
if isinstance(self._recorder, turu.core.record.CsvRecorder):
68+
if limit := self._recorder._options.get("limit"):
69+
for batch in batches:
70+
yield batch.slice(0, limit)
71+
72+
limit -= batch.num_rows
73+
if limit <= 0:
74+
return
75+
76+
return batches

0 commit comments

Comments
 (0)