Skip to content

Commit 8cd7c05

Browse files
linglpLingling Peng
andauthored
[SYNPY-1781] Implement CSV Import to Grid Session (#1360)
* add two async job; tested the basic of import * update protocol * add comment to explain the error * add test for preview request class * add test for GridCsvImportRequest class * correct errors; add unit test for import_csv * add integration test * edit test; fix typo; fix log * update test * add back if csv_table_descriptor: * remove slash * add docstring to indicate response class * fix GridCsvImportRequest class; make field required; fix test; use constant * use delete none keys * turn schema to required; fix delete_none_key * get a default to csv_descriptor * make upload_file_handle_id required; add default to import_csv_async * change to list * fix to_synapse_request * fix protocol * clean up grid * fix test * one liner fix * raise value error when there is no suggested columns from preview * add import_csv to curator.md * update protocol * edit test; fix consistency in docstring; make code cleaner * remove unnecessary line * update test * fix type hint * fix otel method * use path parameter; not file_handle_id * fix import_csv * no need to commit the file * fix integration test --------- Co-authored-by: Lingling Peng <lpeng@w290.local>
1 parent 273935a commit 8cd7c05

7 files changed

Lines changed: 745 additions & 3 deletions

File tree

docs/reference/experimental/async/curator.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ at your own risk.
5656
members:
5757
- create_async
5858
- export_to_record_set_async
59+
- import_csv_async
5960
---
6061
[](){ #query-reference-async }
6162
::: synapseclient.models.Query

docs/reference/experimental/sync/curator.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ at your own risk.
5656
members:
5757
- create
5858
- export_to_record_set
59+
- import_csv
5960
---
6061
[](){ #query-reference }
6162
::: synapseclient.models.Query

synapseclient/core/constants/concrete_types.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,3 +149,7 @@
149149
LIST_GRID_SESSIONS_RESPONSE = (
150150
"org.sagebionetworks.repo.model.grid.ListGridSessionsResponse"
151151
)
152+
GRID_CSV_IMPORT_REQUEST = "org.sagebionetworks.repo.model.grid.GridCsvImportRequest"
153+
UPLOAD_TO_TABLE_PREVIEW_REQUEST = (
154+
"org.sagebionetworks.repo.model.table.UploadToTablePreviewRequest"
155+
)

synapseclient/models/curation.py

Lines changed: 316 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
data or metadata in Synapse.
66
"""
77

8+
import os
89
from dataclasses import dataclass, field, replace
910
from typing import Any, AsyncGenerator, Dict, Generator, Optional, Protocol, Union
1011

@@ -22,21 +23,25 @@
2223
)
2324
from synapseclient.core.async_utils import (
2425
async_to_sync,
26+
otel_trace_method,
2527
skip_async_to_sync,
2628
wrap_async_generator_to_sync_generator,
2729
)
2830
from synapseclient.core.constants.concrete_types import (
2931
CREATE_GRID_REQUEST,
3032
FILE_BASED_METADATA_TASK_PROPERTIES,
33+
GRID_CSV_IMPORT_REQUEST,
3134
GRID_RECORD_SET_EXPORT_REQUEST,
3235
LIST_GRID_SESSIONS_REQUEST,
3336
LIST_GRID_SESSIONS_RESPONSE,
3437
RECORD_BASED_METADATA_TASK_PROPERTIES,
38+
UPLOAD_TO_TABLE_PREVIEW_REQUEST,
3539
)
40+
from synapseclient.core.upload.upload_functions_async import upload_synapse_s3
3641
from synapseclient.core.utils import delete_none_keys, merge_dataclass_entities
3742
from synapseclient.models.mixins.asynchronous_job import AsynchronousCommunicator
3843
from synapseclient.models.recordset import ValidationSummary
39-
from synapseclient.models.table_components import Query
44+
from synapseclient.models.table_components import Column, CsvTableDescriptor, Query
4045

4146

4247
@dataclass
@@ -989,6 +994,158 @@ def to_synapse_request(self) -> Dict[str, Any]:
989994
return request_dict
990995

991996

997+
@dataclass
998+
class GridCsvImportRequest(AsynchronousCommunicator):
999+
"""
1000+
A request to import a CSV file into a grid. Currently supports only grid
1001+
created from a record set.
1002+
1003+
This request is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/GridCsvImportRequest.html>
1004+
1005+
The response is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/grid/GridCsvImportResponse.html>
1006+
"""
1007+
1008+
session_id: str
1009+
"""The grid session ID."""
1010+
1011+
file_handle_id: str
1012+
"""The id of the file handle that contains the CSV data."""
1013+
1014+
schema: list[Column]
1015+
"""The list of ColumnModel that describe the CSV file. Currently this is required."""
1016+
1017+
concrete_type: str = GRID_CSV_IMPORT_REQUEST
1018+
"""The concrete type for this request."""
1019+
1020+
csv_descriptor: CsvTableDescriptor = field(default_factory=CsvTableDescriptor)
1021+
"""The description of a csv for upload or download."""
1022+
1023+
# Response fields (populated by fill_from_dict)
1024+
total_count: Optional[int] = field(default=None, compare=False)
1025+
"""The total number of rows in the CSV."""
1026+
1027+
created_count: Optional[int] = field(default=None, compare=False)
1028+
"""The number of rows that were created."""
1029+
1030+
updated_count: Optional[int] = field(default=None, compare=False)
1031+
"""The number of rows that were updated."""
1032+
1033+
def fill_from_dict(
1034+
self, synapse_response: Dict[str, Any]
1035+
) -> "GridCsvImportRequest":
1036+
"""
1037+
Converts a response from the REST API into this dataclass.
1038+
1039+
Arguments:
1040+
synapse_response: The response from the REST API.
1041+
1042+
Returns:
1043+
The GridCsvImportRequest object.
1044+
"""
1045+
self.session_id = synapse_response.get("sessionId", self.session_id)
1046+
self.total_count = synapse_response.get("totalCount", None)
1047+
self.created_count = synapse_response.get("createdCount", None)
1048+
self.updated_count = synapse_response.get("updatedCount", None)
1049+
return self
1050+
1051+
def to_synapse_request(self) -> Dict[str, Any]:
1052+
"""
1053+
Converts this dataclass to a dictionary suitable for a Synapse REST API request.
1054+
1055+
Returns:
1056+
A dictionary representation of this object for API requests.
1057+
"""
1058+
request_dict = {
1059+
"concreteType": self.concrete_type,
1060+
"sessionId": self.session_id,
1061+
"fileHandleId": self.file_handle_id,
1062+
"csvDescriptor": self.csv_descriptor.to_synapse_request(),
1063+
"schema": [col.to_synapse_request() for col in self.schema],
1064+
}
1065+
delete_none_keys(request_dict)
1066+
return request_dict
1067+
1068+
1069+
@dataclass
1070+
class UploadToTablePreviewRequest(AsynchronousCommunicator):
1071+
"""
1072+
Request for a preview of an upload to a Table.
1073+
1074+
This request is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/UploadToTablePreviewRequest.html>
1075+
1076+
This response is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/UploadToTablePreviewResult.html>
1077+
"""
1078+
1079+
upload_file_handle_id: str
1080+
"""The ID of the file handle for a type of UPLOAD"""
1081+
1082+
concrete_type: str = UPLOAD_TO_TABLE_PREVIEW_REQUEST
1083+
"""The concrete type for this request."""
1084+
1085+
lines_to_skip: Optional[int] = None
1086+
"""The number of lines to skip from the start of the file. The default value of 0 will be used if this is not provided by the caller."""
1087+
1088+
csv_table_descriptor: CsvTableDescriptor = field(default_factory=CsvTableDescriptor)
1089+
"""The description of a csv for upload or download."""
1090+
1091+
do_full_file_scan: Optional[bool] = None
1092+
"""When set to true the full file will be scanned for a schema suggestions. A full scan is more accurate but can take more time. When set to false only a sub-set of the first rows will be scanned, which can be faster but is less accurate. The default value is false."""
1093+
1094+
# Response fields (populated by fill_from_dict)
1095+
suggested_columns: Optional[list[Column]] = field(default=None, compare=False)
1096+
"""The suggested columns for the table based on the file scan."""
1097+
1098+
sample_rows: Optional[list[list[Optional[str]]]] = field(
1099+
default=None, compare=False
1100+
)
1101+
"""A sample of the rows in the file."""
1102+
1103+
rows_scanned: Optional[int] = field(default=None, compare=False)
1104+
"""The number of rows scanned from the file."""
1105+
1106+
def fill_from_dict(
1107+
self, synapse_response: Dict[str, Any]
1108+
) -> "UploadToTablePreviewRequest":
1109+
"""
1110+
Converts a response from the REST API into this dataclass.
1111+
1112+
Arguments:
1113+
synapse_response: The response from the REST API.
1114+
1115+
Returns:
1116+
The UploadToTablePreviewRequest object.
1117+
"""
1118+
suggested_columns_data = synapse_response.get("suggestedColumns", None)
1119+
if suggested_columns_data is not None:
1120+
self.suggested_columns = [
1121+
Column().fill_from_dict(col) for col in suggested_columns_data
1122+
]
1123+
1124+
sample_rows_data = synapse_response.get("sampleRows", None)
1125+
if sample_rows_data is not None:
1126+
self.sample_rows = [row.get("values", []) for row in sample_rows_data]
1127+
1128+
self.rows_scanned = synapse_response.get("rowsScanned", None)
1129+
return self
1130+
1131+
def to_synapse_request(self) -> Dict[str, Any]:
1132+
"""
1133+
Converts this dataclass to a dictionary suitable for a Synapse REST API request.
1134+
1135+
Returns:
1136+
A dictionary representation of this object for API requests.
1137+
"""
1138+
request_dict = {
1139+
"concreteType": self.concrete_type,
1140+
"uploadFileHandleId": self.upload_file_handle_id,
1141+
"linesToSkip": self.lines_to_skip,
1142+
"doFullFileScan": self.do_full_file_scan,
1143+
"csvTableDescriptor": self.csv_table_descriptor.to_synapse_request(),
1144+
}
1145+
delete_none_keys(request_dict)
1146+
return request_dict
1147+
1148+
9921149
@dataclass
9931150
class GridRecordSetExportRequest(AsynchronousCommunicator):
9941151
"""
@@ -1427,6 +1584,53 @@ def list(
14271584
```
14281585
"""
14291586

1587+
def import_csv(
1588+
self,
1589+
path: str,
1590+
*,
1591+
timeout: int = 120,
1592+
csv_table_descriptor: Optional[CsvTableDescriptor] = None,
1593+
synapse_client: Optional[Synapse] = None,
1594+
) -> "Grid":
1595+
"""
1596+
Import a CSV file into this grid session. Previews the file to determine
1597+
the column schema, then imports the data. Currently supports only grids
1598+
created from a record set.
1599+
1600+
Arguments:
1601+
path: Local path to the CSV file to import.
1602+
csv_table_descriptor: The description of the CSV format (delimiter,
1603+
quote character, etc.). If not provided, the default CSV format
1604+
will be used.
1605+
timeout: The number of seconds to wait for each async job to complete
1606+
or progress before raising a SynapseTimeoutError. Defaults to 120.
1607+
synapse_client: If not passed in and caching was not disabled by
1608+
`Synapse.allow_client_caching(False)` this will use the last created
1609+
instance from the Synapse class constructor.
1610+
1611+
Returns:
1612+
The Grid object.
1613+
1614+
Raises:
1615+
ValueError: If session_id is not provided.
1616+
1617+
Example: Import a CSV file into a grid session
1618+
&nbsp;
1619+
1620+
```python
1621+
from synapseclient import Synapse
1622+
from synapseclient.models import Grid
1623+
1624+
syn = Synapse()
1625+
syn.login()
1626+
1627+
grid = Grid(session_id="abc-123-def")
1628+
grid = grid.import_csv(path="/local/path/to/data.csv")
1629+
print(f"Import complete for session: {grid.session_id}")
1630+
```
1631+
"""
1632+
return self
1633+
14301634

14311635
@dataclass
14321636
@async_to_sync
@@ -1789,6 +1993,9 @@ def list(
17891993
synapse_client=synapse_client,
17901994
)
17911995

1996+
@otel_trace_method(
1997+
method_to_trace_name=lambda self, **kwargs: f"Grid_Delete: ID: {self.session_id}"
1998+
)
17921999
async def delete_async(self, *, synapse_client: Optional[Synapse] = None) -> None:
17932000
"""
17942001
Delete the grid session.
@@ -1838,3 +2045,111 @@ async def main():
18382045
await delete_grid_session(
18392046
session_id=self.session_id, synapse_client=synapse_client
18402047
)
2048+
2049+
@otel_trace_method(
2050+
method_to_trace_name=lambda self, **kwargs: f"Grid_ImportCsv: ID: {self.session_id}"
2051+
)
2052+
async def import_csv_async(
2053+
self,
2054+
path: str,
2055+
*,
2056+
timeout: int = 120,
2057+
csv_table_descriptor: Optional[CsvTableDescriptor] = None,
2058+
synapse_client: Optional[Synapse] = None,
2059+
) -> "Grid":
2060+
"""
2061+
Import a CSV file into this grid session. Previews the file to determine
2062+
the column schema, then imports the data. Currently supports only grids
2063+
created from a record set.
2064+
2065+
Arguments:
2066+
path: Local path to the CSV file to import.
2067+
csv_table_descriptor: The description of the CSV format (delimiter,
2068+
quote character, etc.). If not provided, the default CSV format
2069+
will be used.
2070+
timeout: The number of seconds to wait for each async job to complete
2071+
or progress before raising a SynapseTimeoutError. Defaults to 120.
2072+
synapse_client: If not passed in and caching was not disabled by
2073+
`Synapse.allow_client_caching(False)` this will use the last created
2074+
instance from the Synapse class constructor.
2075+
2076+
Returns:
2077+
The Grid object.
2078+
2079+
Raises:
2080+
ValueError: If session_id is not provided.
2081+
2082+
Example: Import a CSV file into a grid session asynchronously
2083+
&nbsp;
2084+
2085+
```python
2086+
import asyncio
2087+
from synapseclient import Synapse
2088+
from synapseclient.models import Grid
2089+
2090+
syn = Synapse()
2091+
syn.login()
2092+
2093+
async def main():
2094+
grid = Grid(session_id="abc-123-def")
2095+
grid = await grid.import_csv_async(path="/local/path/to/data.csv")
2096+
print(f"Import complete for session: {grid.session_id}")
2097+
2098+
asyncio.run(main())
2099+
```
2100+
"""
2101+
2102+
if not self.session_id:
2103+
raise ValueError(
2104+
"session_id is required to import a CSV into a GridSession"
2105+
)
2106+
2107+
if not os.path.isfile(path):
2108+
raise ValueError(f"Path '{path}' is not a valid file.")
2109+
2110+
trace.get_current_span().set_attributes(
2111+
{
2112+
"synapse.session_id": self.session_id,
2113+
}
2114+
)
2115+
2116+
client = Synapse.get_client(synapse_client=synapse_client)
2117+
file_handle = await upload_synapse_s3(syn=client, file_path=path)
2118+
file_handle_id = file_handle["id"]
2119+
2120+
effective_descriptor = csv_table_descriptor or CsvTableDescriptor()
2121+
2122+
upload_to_table_preview = UploadToTablePreviewRequest(
2123+
csv_table_descriptor=effective_descriptor,
2124+
upload_file_handle_id=file_handle_id,
2125+
)
2126+
2127+
preview_response = await upload_to_table_preview.send_job_and_wait_async(
2128+
timeout=timeout, synapse_client=synapse_client
2129+
)
2130+
if not preview_response.suggested_columns:
2131+
raise ValueError(
2132+
f"CSV preview for file handle {file_handle_id} returned no suggested "
2133+
f"columns (rows scanned: {preview_response.rows_scanned}). The file may "
2134+
f"be empty, contain only a header row, or use a separator different "
2135+
f"from the configured csv_table_descriptor "
2136+
f"(separator={repr(effective_descriptor.separator)})."
2137+
)
2138+
2139+
import_request = GridCsvImportRequest(
2140+
session_id=self.session_id,
2141+
file_handle_id=file_handle_id,
2142+
schema=preview_response.suggested_columns,
2143+
csv_descriptor=effective_descriptor,
2144+
)
2145+
import_response = await import_request.send_job_and_wait_async(
2146+
timeout=timeout, synapse_client=synapse_client
2147+
)
2148+
client.logger.info(
2149+
f"CSV import to grid session {self.session_id} completed successfully, "
2150+
f"total count: {import_response.total_count}, "
2151+
f"total created: {import_response.created_count}, "
2152+
f"total updated: {import_response.updated_count}"
2153+
)
2154+
2155+
return self

0 commit comments

Comments
 (0)