Skip to content

Commit ca4c05e

Browse files
authored
Merge pull request #89 from yassun7010/fix_recorder
fix: recorder
2 parents 48797cb + 6501d33 commit ca4c05e

File tree

4 files changed

+54
-97
lines changed

4 files changed

+54
-97
lines changed

turu-core/src/turu/core/record/async_record_cursor.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,35 +23,39 @@ def __init__(
2323
],
2424
):
2525
self._recorder = recorder
26-
self._cursor: turu.core.async_cursor.AsyncCursor = cursor
26+
self.__record_taregt_cursor: turu.core.async_cursor.AsyncCursor = cursor
2727

2828
@property
2929
def rowcount(self) -> int:
30-
return self._cursor.rowcount
30+
return self.__record_taregt_cursor.rowcount
3131

3232
@property
3333
def arraysize(self) -> int:
34-
return self._cursor.arraysize
34+
return self.__record_taregt_cursor.arraysize
3535

3636
@arraysize.setter
3737
def arraysize(self, size: int) -> None:
38-
self._cursor.arraysize = size
38+
self.__record_taregt_cursor.arraysize = size
3939

4040
async def close(self) -> None:
41-
await self._cursor.close()
41+
await self.__record_taregt_cursor.close()
4242
self._recorder.close()
4343

4444
async def execute(
4545
self, operation: str, parameters: Optional[Parameters] = None, /
4646
) -> "AsyncRecordCursor[turu.core.cursor.GenericRowType, Parameters]":
47-
self._cursor = await self._cursor.execute(operation, parameters)
47+
self.__record_taregt_cursor = await self.__record_taregt_cursor.execute(
48+
operation, parameters
49+
)
4850

4951
return self
5052

5153
async def executemany(
5254
self, operation: str, seq_of_parameters: "Sequence[Parameters]", /
5355
) -> "AsyncRecordCursor[turu.core.cursor.GenericRowType, Parameters]":
54-
self._cursor = await self._cursor.executemany(operation, seq_of_parameters)
56+
self.__record_taregt_cursor = await self.__record_taregt_cursor.executemany(
57+
operation, seq_of_parameters
58+
)
5559

5660
return self
5761

@@ -62,7 +66,9 @@ async def execute_map(
6266
parameters: Optional[Parameters] = None,
6367
/,
6468
) -> "AsyncRecordCursor[turu.core.cursor.GenericNewRowType, Parameters]":
65-
self._cursor = await self._cursor.execute_map(row_type, operation, parameters)
69+
self.__record_taregt_cursor = await self.__record_taregt_cursor.execute_map(
70+
row_type, operation, parameters
71+
)
6672

6773
return cast(AsyncRecordCursor, self)
6874

@@ -73,14 +79,14 @@ async def executemany_map(
7379
seq_of_parameters: Sequence[Parameters],
7480
/,
7581
) -> "AsyncRecordCursor[turu.core.cursor.GenericNewRowType, Parameters]":
76-
self._cursor = await self._cursor.executemany_map(
82+
self.__record_taregt_cursor = await self.__record_taregt_cursor.executemany_map(
7783
row_type, operation, seq_of_parameters
7884
)
7985

8086
return cast(AsyncRecordCursor, self)
8187

8288
async def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
83-
row = await self._cursor.fetchone()
89+
row = await self.__record_taregt_cursor.fetchone()
8490
if row is not None:
8591
self._recorder.record([row])
8692

@@ -89,14 +95,14 @@ async def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
8995
async def fetchmany(
9096
self, size: Optional[int] = None
9197
) -> List[turu.core.cursor.GenericRowType]:
92-
rows = await self._cursor.fetchmany(size)
98+
rows = await self.__record_taregt_cursor.fetchmany(size)
9399

94100
self._recorder.record(rows)
95101

96102
return rows
97103

98104
async def fetchall(self) -> List[turu.core.cursor.GenericRowType]:
99-
rows = await self._cursor.fetchall()
105+
rows = await self.__record_taregt_cursor.fetchall()
100106

101107
self._recorder.record(rows)
102108

@@ -108,7 +114,7 @@ def __iter__(
108114
return self
109115

110116
async def __anext__(self) -> turu.core.cursor.GenericRowType:
111-
row = await self._cursor.__anext__()
117+
row = await self.__record_taregt_cursor.__anext__()
112118

113119
self._recorder.record([row])
114120

@@ -118,4 +124,4 @@ def __getattr__(self, name):
118124
def _method_missing(*args):
119125
return args
120126

121-
return getattr(self._cursor, name, _method_missing)
127+
return getattr(self.__record_taregt_cursor, name, _method_missing)

turu-core/src/turu/core/record/record_cursor.py

Lines changed: 20 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -20,35 +20,39 @@ def __init__(
2020
cursor: turu.core.cursor.Cursor[turu.core.cursor.GenericRowType, Parameters],
2121
):
2222
self._recorder = recorder
23-
self._cursor: turu.core.cursor.Cursor = cursor
23+
self.__record_taregt_cursor: turu.core.cursor.Cursor = cursor
2424

2525
@property
2626
def rowcount(self) -> int:
27-
return self._cursor.rowcount
27+
return self.__record_taregt_cursor.rowcount
2828

2929
@property
3030
def arraysize(self) -> int:
31-
return self._cursor.arraysize
31+
return self.__record_taregt_cursor.arraysize
3232

3333
@arraysize.setter
3434
def arraysize(self, size: int) -> None:
35-
self._cursor.arraysize = size
35+
self.__record_taregt_cursor.arraysize = size
3636

3737
def close(self) -> None:
38-
self._cursor.close()
38+
self.__record_taregt_cursor.close()
3939
self._recorder.close()
4040

4141
def execute(
4242
self, operation: str, parameters: Optional[Parameters] = None, /
4343
) -> "RecordCursor[turu.core.cursor.GenericRowType, Parameters]":
44-
self._cursor = self._cursor.execute(operation, parameters)
44+
self.__record_taregt_cursor = self.__record_taregt_cursor.execute(
45+
operation, parameters
46+
)
4547

4648
return self
4749

4850
def executemany(
4951
self, operation: str, seq_of_parameters: "Sequence[Parameters]", /
5052
) -> "RecordCursor[turu.core.cursor.GenericRowType, Parameters]":
51-
self._cursor = self._cursor.executemany(operation, seq_of_parameters)
53+
self.__record_taregt_cursor = self.__record_taregt_cursor.executemany(
54+
operation, seq_of_parameters
55+
)
5256

5357
return self
5458

@@ -59,7 +63,9 @@ def execute_map(
5963
parameters: Optional[Parameters] = None,
6064
/,
6165
) -> "RecordCursor[turu.core.cursor.GenericNewRowType, Parameters]":
62-
self._cursor = self._cursor.execute_map(row_type, operation, parameters)
66+
self.__record_taregt_cursor = self.__record_taregt_cursor.execute_map(
67+
row_type, operation, parameters
68+
)
6369

6470
return cast(RecordCursor, self)
6571

@@ -70,14 +76,14 @@ def executemany_map(
7076
seq_of_parameters: Sequence[Parameters],
7177
/,
7278
) -> "RecordCursor[turu.core.cursor.GenericNewRowType, Parameters]":
73-
self._cursor = self._cursor.executemany_map(
79+
self.__record_taregt_cursor = self.__record_taregt_cursor.executemany_map(
7480
row_type, operation, seq_of_parameters
7581
)
7682

7783
return cast(RecordCursor, self)
7884

7985
def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
80-
row = self._cursor.fetchone()
86+
row = self.__record_taregt_cursor.fetchone()
8187
if row is not None:
8288
self._recorder.record([row])
8389

@@ -86,14 +92,14 @@ def fetchone(self) -> Optional[turu.core.cursor.GenericRowType]:
8692
def fetchmany(
8793
self, size: Optional[int] = None
8894
) -> List[turu.core.cursor.GenericRowType]:
89-
rows = self._cursor.fetchmany(size)
95+
rows = self.__record_taregt_cursor.fetchmany(size)
9096

9197
self._recorder.record(rows)
9298

9399
return rows
94100

95101
def fetchall(self) -> List[turu.core.cursor.GenericRowType]:
96-
rows = self._cursor.fetchall()
102+
rows = self.__record_taregt_cursor.fetchall()
97103

98104
self._recorder.record(rows)
99105

@@ -103,7 +109,7 @@ def __iter__(self) -> "RecordCursor[turu.core.cursor.GenericRowType, Parameters]
103109
return self
104110

105111
def __next__(self) -> turu.core.cursor.GenericRowType:
106-
row = next(self._cursor)
112+
row = next(self.__record_taregt_cursor)
107113

108114
self._recorder.record([row])
109115

@@ -113,4 +119,4 @@ def __getattr__(self, name):
113119
def _method_missing(*args):
114120
return args
115121

116-
return getattr(self._cursor, name, _method_missing)
122+
return getattr(self.__record_taregt_cursor, name, _method_missing)

turu-snowflake/src/turu/snowflake/record/async_record_cursor.py

Lines changed: 8 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ class AsyncRecordCursor( # type: ignore[override]
1818
],
1919
):
2020
async def fetch_pandas_all(self, **kwargs) -> GenericPandasDataFrame:
21-
df = await self._sf_cursor.fetch_pandas_all(**kwargs)
21+
df = await self.__sf_cursor.fetch_pandas_all(**kwargs)
2222

2323
if isinstance(self._recorder, turu.core.record.CsvRecorder):
2424
if limit := self._recorder._options.get("limit"):
@@ -35,7 +35,7 @@ async def fetch_pandas_all(self, **kwargs) -> GenericPandasDataFrame:
3535
async def fetch_pandas_batches(
3636
self, **kwargs
3737
) -> AsyncIterator[GenericPandasDataFrame]:
38-
batches = self._sf_cursor.fetch_pandas_batches(**kwargs)
38+
batches = self.__sf_cursor.fetch_pandas_batches(**kwargs)
3939

4040
if isinstance(self._recorder, turu.core.record.CsvRecorder):
4141
if limit := self._recorder._options.get("limit"):
@@ -50,7 +50,7 @@ async def fetch_pandas_batches(
5050
yield batch
5151

5252
async def fetch_arrow_all(self) -> GenericPyArrowTable:
53-
table = await self._sf_cursor.fetch_arrow_all()
53+
table = await self.__sf_cursor.fetch_arrow_all()
5454

5555
if isinstance(self._recorder, turu.core.record.CsvRecorder):
5656
if limit := self._recorder._options.get("limit"):
@@ -65,7 +65,7 @@ async def fetch_arrow_all(self) -> GenericPyArrowTable:
6565
return table
6666

6767
async def fetch_arrow_batches(self) -> AsyncIterator[GenericPyArrowTable]:
68-
batches = self._sf_cursor.fetch_arrow_batches()
68+
batches = self.__sf_cursor.fetch_arrow_batches()
6969

7070
if isinstance(self._recorder, turu.core.record.CsvRecorder):
7171
if limit := self._recorder._options.get("limit"):
@@ -79,38 +79,12 @@ async def fetch_arrow_batches(self) -> AsyncIterator[GenericPyArrowTable]:
7979
async for batch in batches:
8080
yield batch
8181

82-
def use_warehouse(self, warehouse: str, /) -> "AsyncRecordCursor":
83-
"""Use a warehouse in cursor."""
84-
85-
self._sf_cursor.use_warehouse(warehouse)
86-
87-
return self
88-
89-
def use_database(self, database: str, /) -> "AsyncRecordCursor":
90-
"""Use a database in cursor."""
91-
92-
self._sf_cursor.use_database(database)
93-
94-
return self
95-
96-
def use_schema(self, schema: str, /) -> "AsyncRecordCursor":
97-
"""Use a schema in cursor."""
98-
99-
self._sf_cursor.use_schema(schema)
100-
101-
return self
102-
103-
def use_role(self, role: str, /) -> "AsyncRecordCursor":
104-
"""Use a role in cursor."""
105-
106-
self._sf_cursor.use_role(role)
107-
108-
return self
109-
11082
@property
111-
def _sf_cursor(
83+
def __sf_cursor(
11284
self,
11385
) -> turu.snowflake.async_cursor.AsyncCursor[
11486
GenericRowType, GenericPandasDataFrame, GenericPyArrowTable
11587
]:
116-
return cast(turu.snowflake.async_cursor.AsyncCursor, self._cursor)
88+
return cast(
89+
turu.snowflake.async_cursor.AsyncCursor, self.__record_taregt_cursor
90+
)

turu-snowflake/src/turu/snowflake/record/record_cursor.py

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,14 @@
77
GenericPandasDataFrame,
88
GenericPyArrowTable,
99
)
10-
from typing_extensions import Self
1110

1211

1312
class RecordCursor( # type: ignore[override]
1413
turu.core.record.RecordCursor,
1514
Generic[GenericRowType, GenericPandasDataFrame, GenericPyArrowTable],
1615
):
1716
def fetch_pandas_all(self, **kwargs) -> GenericPandasDataFrame:
18-
df = self._sf_cursor.fetch_pandas_all(**kwargs)
17+
df = self.__sf_cursor.fetch_pandas_all(**kwargs)
1918

2019
if isinstance(self._recorder, turu.core.record.CsvRecorder):
2120
if limit := self._recorder._options.get("limit"):
@@ -30,7 +29,7 @@ def fetch_pandas_all(self, **kwargs) -> GenericPandasDataFrame:
3029
return df
3130

3231
def fetch_pandas_batches(self, **kwargs) -> Iterator[GenericPandasDataFrame]:
33-
batches = self._sf_cursor.fetch_pandas_batches(**kwargs)
32+
batches = self.__sf_cursor.fetch_pandas_batches(**kwargs)
3433

3534
if isinstance(self._recorder, turu.core.record.CsvRecorder):
3635
if limit := self._recorder._options.get("limit"):
@@ -44,7 +43,7 @@ def fetch_pandas_batches(self, **kwargs) -> Iterator[GenericPandasDataFrame]:
4443
return batches
4544

4645
def fetch_arrow_all(self) -> GenericPyArrowTable:
47-
table = self._sf_cursor.fetch_arrow_all()
46+
table = self.__sf_cursor.fetch_arrow_all()
4847

4948
if isinstance(self._recorder, turu.core.record.CsvRecorder):
5049
if limit := self._recorder._options.get("limit"):
@@ -59,7 +58,7 @@ def fetch_arrow_all(self) -> GenericPyArrowTable:
5958
return table
6059

6160
def fetch_arrow_batches(self) -> Iterator[GenericPyArrowTable]:
62-
batches = self._sf_cursor.fetch_arrow_batches()
61+
batches = self.__sf_cursor.fetch_arrow_batches()
6362

6463
if isinstance(self._recorder, turu.core.record.CsvRecorder):
6564
if limit := self._recorder._options.get("limit"):
@@ -72,38 +71,10 @@ def fetch_arrow_batches(self) -> Iterator[GenericPyArrowTable]:
7271

7372
return batches
7473

75-
def use_warehouse(self, warehouse: str, /) -> Self:
76-
"""Use a warehouse in cursor."""
77-
78-
self._sf_cursor.use_warehouse(warehouse)
79-
80-
return self
81-
82-
def use_database(self, database: str, /) -> Self:
83-
"""Use a database in cursor."""
84-
85-
self._sf_cursor.use_database(database)
86-
87-
return self
88-
89-
def use_schema(self, schema: str, /) -> Self:
90-
"""Use a schema in cursor."""
91-
92-
self._sf_cursor.use_schema(schema)
93-
94-
return self
95-
96-
def use_role(self, role: str, /) -> Self:
97-
"""Use a role in cursor."""
98-
99-
self._sf_cursor.use_role(role)
100-
101-
return self
102-
10374
@property
104-
def _sf_cursor(
75+
def __sf_cursor(
10576
self,
10677
) -> turu.snowflake.cursor.Cursor[
10778
GenericRowType, GenericPandasDataFrame, GenericPyArrowTable
10879
]:
109-
return cast(turu.snowflake.cursor.Cursor, self._cursor)
80+
return cast(turu.snowflake.cursor.Cursor, self.__record_taregt_cursor)

0 commit comments

Comments
 (0)