Skip to content

Commit a3ac31e

Browse files
authored
Merge pull request #945 from Sage-Bionetworks/develop-table-schema-fix
Fixing parsing of table schema parameters on table replace
2 parents d2ace93 + 3aad72c commit a3ac31e

File tree

1 file changed

+78
-21
lines changed

1 file changed

+78
-21
lines changed

schematic/store/synapse.py

Lines changed: 78 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -138,8 +138,11 @@ def wrapper(*args, **kwargs):
138138
return method(*args, **kwargs)
139139
except(SynapseHTTPError) as ex:
140140
str_message = str(ex).replace("\n","")
141-
logging.warning(str_message)
142-
return None
141+
if 'missing' in str_message:
142+
logging.warning(str_message)
143+
return None
144+
else:
145+
raise ex
143146
return wrapper
144147

145148

@@ -1103,9 +1106,8 @@ def associateMetadataWithFiles(
11031106
# Load manifest to synapse as a CSV File
11041107
manifest_synapse_file_id = self.uplodad_manifest_file(manifest, metadataManifestPath, datasetId, restrict_manifest, component_name = component_name)
11051108

1106-
# Get annotations for the file manifest.
1109+
# Set annotations for the file manifest.
11071110
manifest_annotations = self.format_manifest_annotations(manifest, manifest_synapse_file_id)
1108-
11091111
self.syn.set_annotations(manifest_annotations)
11101112

11111113
logger.info("Associated manifest file with dataset on Synapse.")
@@ -1122,7 +1124,7 @@ def associateMetadataWithFiles(
11221124
restrict = restrict_manifest
11231125
)
11241126

1125-
# Get annotations for the table manifest
1127+
# Set annotations for the table manifest
11261128
manifest_annotations = self.format_manifest_annotations(manifest, manifest_synapse_table_id)
11271129
self.syn.set_annotations(manifest_annotations)
11281130

@@ -1350,6 +1352,18 @@ def getDatasetAnnotationsBatch(
13501352

13511353
return table
13521354

1355+
def _get_table_schema_by_cname(self, table_schema):
1356+
1357+
# assume no duplicate column names in the table
1358+
table_schema_by_cname = {}
1359+
1360+
for col_record in table_schema:
1361+
1362+
#TODO clean up dictionary for compactness (e.g. remove redundant 'name' key)
1363+
table_schema_by_cname[col_record["name"]] = col_record
1364+
1365+
return table_schema_by_cname
1366+
13531367
def make_synapse_table(self,
13541368
table_to_load: pd.DataFrame,
13551369
dataset_id: str, table_name: str,
@@ -1392,34 +1406,76 @@ def make_synapse_table(self,
13921406
# locate its position in the old and new table.
13931407
if manipulation == 'update':
13941408
table_to_load = update_df(existing_table, table_to_load, update_col)
1395-
1409+
# store table with existing etag data and impose restrictions as appropriate
1410+
self.syn.store(Table(existingTableId, table_to_load, etag = existing_results.etag), isRestricted = restrict)
1411+
13961412
elif manipulation == 'replace':
13971413
# remove rows
13981414
self.syn.delete(existing_results)
1399-
14001415
# wait for row deletion to finish on synapse before getting empty table
1401-
sleep(5)
1416+
sleep(1)
1417+
14021418
# removes all current columns
14031419
current_table = self.syn.get(existingTableId)
14041420
current_columns = self.syn.getTableColumns(current_table)
14051421
for col in current_columns:
14061422
current_table.removeColumn(col)
14071423

1408-
# adds new columns to schema
1409-
new_columns = as_table_columns(table_to_load)
1410-
for col in new_columns:
1411-
current_table.addColumn(col)
1412-
self.syn.store(current_table, isRestricted = restrict)
1413-
1414-
# store table with existing etag data and impose restrictions as appropriate
1415-
self.syn.store(Table(existingTableId, table_to_load, etag = existing_results.etag), isRestricted = restrict)
1424+
if not table_name:
1425+
table_name = datasetName + 'table'
1426+
1427+
# Process columns according to manifest entries
1428+
table_schema_by_cname = self._get_table_schema_by_cname(column_type_dictionary)
1429+
datasetParentProject = self.getDatasetProject(dataset_id)
1430+
if specify_schema:
1431+
if column_type_dictionary == {}:
1432+
logger.error("Did not provide a column_type_dictionary.")
1433+
#create list of columns:
1434+
cols = []
1435+
1436+
for col in table_to_load.columns:
1437+
1438+
if col in table_schema_by_cname:
1439+
col_type = table_schema_by_cname[col]['columnType']
1440+
max_size = table_schema_by_cname[col]['maximumSize']
1441+
max_list_len = 250
1442+
if max_size and max_list_len:
1443+
cols.append(Column(name=col, columnType=col_type,
1444+
maximumSize=max_size, maximumListLength=max_list_len))
1445+
elif max_size:
1446+
cols.append(Column(name=col, columnType=col_type,
1447+
maximumSize=max_size))
1448+
else:
1449+
cols.append(Column(name=col, columnType=col_type))
1450+
else:
1451+
1452+
#TODO add warning that the given col was not found and it's max size is set to 100
1453+
cols.append(Column(name=col, columnType='STRING', maximumSize=100))
1454+
1455+
# adds new columns to schema
1456+
for col in cols:
1457+
current_table.addColumn(col)
1458+
self.syn.store(current_table, isRestricted = restrict)
1459+
1460+
# wait for synapse store to finish
1461+
sleep(1)
1462+
1463+
# build schema and table from columns and store with necessary restrictions
1464+
schema = Schema(name=table_name, columns=cols, parent=datasetParentProject)
1465+
schema.id = existingTableId
1466+
table = Table(schema, table_to_load, etag = existing_results.etag)
1467+
table = self.syn.store(table, isRestricted = restrict)
1468+
else:
1469+
logging.error("Must specify a schema for table replacements")
14161470

14171471
# remove system metadata from manifest
14181472
existing_table.drop(columns = ['ROW_ID', 'ROW_VERSION'], inplace = True)
14191473
return existingTableId
14201474
else:
14211475
datasetEntity = self.syn.get(dataset_id, downloadFile = False)
14221476
datasetName = datasetEntity.name
1477+
table_schema_by_cname = self._get_table_schema_by_cname(column_type_dictionary)
1478+
14231479
if not table_name:
14241480
table_name = datasetName + 'table'
14251481
datasetParentProject = self.getDatasetProject(dataset_id)
@@ -1429,10 +1485,10 @@ def make_synapse_table(self,
14291485
#create list of columns:
14301486
cols = []
14311487
for col in table_to_load.columns:
1432-
if col in column_type_dictionary:
1433-
col_type = column_type_dictionary[col]['column_type']
1434-
max_size = column_type_dictionary[col]['maximum_size']
1435-
max_list_len = column_type_dictionary[col]['maximum_list_length']
1488+
if col in table_schema_by_cname:
1489+
col_type = table_schema_by_cname[col]['columnType']
1490+
max_size = table_schema_by_cname[col]['maximumSize']
1491+
max_list_len = 250
14361492
if max_size and max_list_len:
14371493
cols.append(Column(name=col, columnType=col_type,
14381494
maximumSize=max_size, maximumListLength=max_list_len))
@@ -1442,7 +1498,8 @@ def make_synapse_table(self,
14421498
else:
14431499
cols.append(Column(name=col, columnType=col_type))
14441500
else:
1445-
cols.append(Column(name=col, columnType='STRING', maximumSize=500))
1501+
#TODO add warning that the given col was not found and it's max size is set to 100
1502+
cols.append(Column(name=col, columnType='STRING', maximumSize=100))
14461503
schema = Schema(name=table_name, columns=cols, parent=datasetParentProject)
14471504
table = Table(schema, table_to_load)
14481505
table = self.syn.store(table, isRestricted = restrict)

0 commit comments

Comments
 (0)