|
15 | 15 | import re |
16 | 16 | import synapseclient |
17 | 17 | import synapseutils |
| 18 | +from time import sleep |
18 | 19 |
|
19 | 20 | from synapseclient import ( |
20 | 21 | Synapse, |
@@ -77,11 +78,12 @@ def __init__( |
77 | 78 | """ |
78 | 79 |
|
79 | 80 | self.syn = self.login(token, access_token, input_token) |
| 81 | + self.project_scope = project_scope |
80 | 82 | try: |
81 | 83 | self.storageFileview = CONFIG["synapse"]["master_fileview"] |
82 | | - if project_scope: |
| 84 | + if self.project_scope: |
83 | 85 | self.storageFileviewTable = self.syn.tableQuery( |
84 | | - f"SELECT * FROM {self.storageFileview} WHERE projectId IN {tuple(project_scope + [''])}" |
| 86 | + f"SELECT * FROM {self.storageFileview} WHERE projectId IN {tuple(self.project_scope + [''])}" |
85 | 87 | ).asDataFrame() |
86 | 88 | else: |
87 | 89 | # get data in administrative fileview for this pipeline |
@@ -712,9 +714,26 @@ def get_synapse_table(self, synapse_id: str) -> Tuple[pd.DataFrame, CsvFileTable |
712 | 714 |
|
713 | 715 | return df, results |
714 | 716 |
|
| 717 | + def _get_tables(self) -> List[Table]: |
| 718 | + project = self.syn.get(self.project_scope[0]) |
| 719 | + return list(self.syn.getChildren(project, includeTypes=["table"])) |
| 720 | + |
| 721 | + def get_table_info(self) -> List[str]: |
| 722 | + """Gets the names of the tables in the schema |
| 723 | + Returns: |
| 724 | + list[str]: A list of table names |
| 725 | + """ |
| 726 | + tables = self._get_tables() |
| 727 | + if tables: |
| 728 | + return {table["name"]: table["id"] for table in tables} |
| 729 | + else: |
| 730 | + return {None:None} |
| 731 | + |
715 | 732 | @missing_entity_handler |
716 | | - def upload_format_manifest_table(self, se, manifest, datasetId, table_name, restrict, useSchemaLabel,): |
| 733 | + def upload_format_manifest_table(self, se, manifest, datasetId, table_name, restrict, useSchemaLabel): |
717 | 734 | # Rename the manifest columns to display names to match fileview |
| 735 | + table_info = self.get_table_info() |
| 736 | + |
718 | 737 | blacklist_chars = ['(', ')', '.', ' '] |
719 | 738 | manifest_columns = manifest.columns.tolist() |
720 | 739 |
|
@@ -748,8 +767,22 @@ def upload_format_manifest_table(self, se, manifest, datasetId, table_name, rest |
748 | 767 |
|
749 | 768 | # Put table manifest onto synapse |
750 | 769 | schema = Schema(name=table_name, columns=col_schema, parent=self.getDatasetProject(datasetId)) |
751 | | - table = self.syn.store(Table(schema, table_manifest), isRestricted=restrict) |
752 | | - manifest_table_id = table.schema.id |
| 770 | + if table_name not in table_info.keys(): |
| 771 | + manifest_table_id = self.make_synapse_table(table_to_load = table_manifest, |
| 772 | + dataset_id = datasetId, |
| 773 | + existingTableId = None, |
| 774 | + table_name = table_name, |
| 775 | + column_type_dictionary = col_schema, |
| 776 | + restrict = restrict, |
| 777 | + manipulation = 'replace') |
| 778 | + else: |
| 779 | + manifest_table_id = self.make_synapse_table(table_to_load = table_manifest, |
| 780 | + dataset_id = datasetId, |
| 781 | + existingTableId = table_info[table_name], |
| 782 | + table_name = table_name, |
| 783 | + column_type_dictionary = col_schema, |
| 784 | + restrict = restrict, |
| 785 | + manipulation = 'replace') |
753 | 786 |
|
754 | 787 |
|
755 | 788 | return manifest_table_id, manifest, table_manifest |
@@ -1036,7 +1069,7 @@ def associateMetadataWithFiles( |
1036 | 1069 | # If specified, upload manifest as a table and get the SynID and manifest |
1037 | 1070 | if manifest_record_type == 'table' or manifest_record_type == 'both': |
1038 | 1071 | manifest_synapse_table_id, manifest, table_manifest = self.upload_format_manifest_table( |
1039 | | - se, manifest, datasetId, table_name, restrict = restrict_manifest, useSchemaLabel=useSchemaLabel,) |
| 1072 | + se, manifest, datasetId, table_name, restrict = restrict_manifest, useSchemaLabel=useSchemaLabel) |
1040 | 1073 |
|
1041 | 1074 | # Iterate over manifest rows, create Synapse entities and store corresponding entity IDs in manifest if needed |
1042 | 1075 | # also set metadata for each synapse entity as Synapse annotations |
@@ -1317,20 +1350,73 @@ def getDatasetAnnotationsBatch( |
1317 | 1350 |
|
1318 | 1351 | return table |
1319 | 1352 |
|
1320 | | - def make_synapse_table(self, table_to_load, dataset_id, existingTableId, table_name, |
1321 | | - update_col = 'entityId', column_type_dictionary = {}, specify_schema=True, restrict = False): |
| 1353 | + def make_synapse_table(self, |
| 1354 | + table_to_load: pd.DataFrame, |
| 1355 | + dataset_id: str, table_name: str, |
| 1356 | + existingTableId: str = None, |
| 1357 | + update_col: str = 'entityId', |
| 1358 | + column_type_dictionary: Dict = {}, |
| 1359 | + specify_schema: bool = True, |
| 1360 | + restrict: bool = False, |
| 1361 | + manipulation: str = 'update') -> str: |
1322 | 1362 | ''' |
1323 | | - Record based data |
| 1363 | + Make a synapse table for record based data |
| 1364 | +
|
| 1365 | + Args: |
| 1366 | + table_to_load (pd.DataFrame): table to upload to synapse |
| 1367 | + dataset_id (str): synID for dataset related to manifest to be uploaded as table |
| 1368 | + existingTableId (str): Optional, synID of existing table to upload to |
| 1369 | + table_name (str): Name of the table that will be displayed on synapse |
| 1370 | + update_col (str): Optional, if updating a table by aligning on index, column to use as indices |
| 1371 | + column_type_dictionary (Dict): dictionary of column types |
| 1372 | + specify_schema (bool): specify a schema for the table at upload according to types in column_type_dictionary |
| 1373 | + restrict (bool): set to True if access restrictions need to be imposed on table when stored on synapse, False otherwise |
| 1374 | + manipulation (str): type of manipulation to do if a table exists already. Can be either "update" or "replace". |
| 1375 | + Defaults to "update" to preserve old behavior |
| 1376 | +
|
| 1377 | + Returns: |
| 1378 | + str: synId of table uploaded to synapse |
| 1379 | +
|
1324 | 1380 | ''' |
1325 | | - # create/update a table corresponding to this dataset in this dataset's parent project |
1326 | | - # update_col is the column in the table that has a unique code that will allow Synapse to |
1327 | | - # locate its position in the old and new table. |
1328 | 1381 | if existingTableId: |
1329 | 1382 | existing_table, existing_results = self.get_synapse_table(existingTableId) |
1330 | | - table_to_load = update_df(existing_table, table_to_load, update_col) |
| 1383 | + |
| 1384 | + manipulation = manipulation.lower() |
| 1385 | + if manipulation not in ['update', 'replace']: |
| 1386 | + raise NotImplementedError( |
| 1387 | + "Currently, only 'update' and 'replace' table operations are supported." |
| 1388 | + ) |
| 1389 | + |
| 1390 | + # create/update a table corresponding to this dataset in this dataset's parent project |
| 1391 | + # update_col is the column in the table that has a unique code that will allow Synapse to |
| 1392 | + # locate its position in the old and new table. |
| 1393 | + if manipulation == 'update': |
| 1394 | + table_to_load = update_df(existing_table, table_to_load, update_col) |
| 1395 | + |
| 1396 | + elif manipulation == 'replace': |
| 1397 | + # remove rows |
| 1398 | + self.syn.delete(existing_results) |
| 1399 | + |
| 1400 | + # wait for row deletion to finish on synapse before getting empty table |
| 1401 | + sleep(5) |
| 1402 | + # removes all current columns |
| 1403 | + current_table = self.syn.get(existingTableId) |
| 1404 | + current_columns = self.syn.getTableColumns(current_table) |
| 1405 | + for col in current_columns: |
| 1406 | + current_table.removeColumn(col) |
| 1407 | + |
| 1408 | + # adds new columns to schema |
| 1409 | + new_columns = as_table_columns(table_to_load) |
| 1410 | + for col in new_columns: |
| 1411 | + current_table.addColumn(col) |
| 1412 | + self.syn.store(current_table, isRestricted = restrict) |
| 1413 | + |
| 1414 | + # store table with existing etag data and impose restrictions as appropriate |
1331 | 1415 | self.syn.store(Table(existingTableId, table_to_load, etag = existing_results.etag), isRestricted = restrict) |
| 1416 | + |
1332 | 1417 | # remove system metadata from manifest |
1333 | 1418 | existing_table.drop(columns = ['ROW_ID', 'ROW_VERSION'], inplace = True) |
| 1419 | + return existingTableId |
1334 | 1420 | else: |
1335 | 1421 | datasetEntity = self.syn.get(dataset_id, downloadFile = False) |
1336 | 1422 | datasetName = datasetEntity.name |
@@ -1359,7 +1445,7 @@ def make_synapse_table(self, table_to_load, dataset_id, existingTableId, table_n |
1359 | 1445 | cols.append(Column(name=col, columnType='STRING', maximumSize=500)) |
1360 | 1446 | schema = Schema(name=table_name, columns=cols, parent=datasetParentProject) |
1361 | 1447 | table = Table(schema, table_to_load) |
1362 | | - table_id = self.syn.store(table, isRestricted = restrict) |
| 1448 | + table = self.syn.store(table, isRestricted = restrict) |
1363 | 1449 | return table.schema.id |
1364 | 1450 | else: |
1365 | 1451 | # For just uploading the tables to synapse using default |
|
0 commit comments