99import pandas
1010import synapseclient
1111from synapseclient import Column
12- from synapseclient .models import Table
1312from utilities import *
13+ # Import Table from models AFTER utilities to override the legacy Table import
14+ from synapseclient .models import Table
1415
1516
1617def set_up (args : argparse .Namespace ):
@@ -49,9 +50,9 @@ def _get_dd_info(syn: synapseclient.Synapse, version: str) -> tuple[str, str]:
4950 """
5051 Get the PRISSMM non-PHI data dictionary Synapse ID and its associated cohort by version number
5152 """
52- prissmm_info = syn . tableQuery (
53- "SELECT id, name, cohort FROM syn22684834 WHERE name='%s'" % version
54- ). asDataFrame ()
53+ prissmm_info = Table ( id = "syn22684834" ). query (
54+ query = "SELECT id, name, cohort FROM syn22684834 WHERE name='%s'" % version
55+ )
5556 # TODO: error message if the version does not exist
5657 for file_info in syn .getChildren (prissmm_info ["id" ][0 ]):
5758 if file_info ["name" ] == "Data Dictionary non-PHI" :
@@ -253,11 +254,11 @@ def update_by_data_dictionary(args: argparse.Namespace) -> None:
253254 names = ["variable" , "instrument" , "type" , "label" , "choices" , "validation" ],
254255 )
255256 # extract curated data from data element catalog table
256- curated_var_catalog = syn . tableQuery (
257- "SELECT variable, synColSize, numCols \
257+ curated_var_catalog = Table ( id = catalog_id ). query (
258+ query = "SELECT variable, synColSize, numCols \
258259 FROM %s WHERE dataType='curated'"
259260 % catalog_id
260- ). asDataFrame ()
261+ )
261262 curated_var_catalog .index = curated_var_catalog .index .map (str )
262263 curated_var_catalog ["index" ] = curated_var_catalog .index
263264 vars_to_add_df , vars_to_rm_df , vars_to_update_df = _update_by_data_dictionary (
@@ -268,25 +269,33 @@ def update_by_data_dictionary(args: argparse.Namespace) -> None:
268269 # update: variable, synColSize, numCols
269270 if not dry_run :
270271 table_schema = syn .get (catalog_id )
271- results = syn .tableQuery ("select * from %s" % catalog_id )
272272 saved_to_table = False
273+
273274 if not vars_to_update_df .empty :
275+ # Use upsert_rows with 'variable' as the primary key
276+ # This will update existing rows based on the 'variable' column
274277 vars_to_update_df = vars_to_update_df [
275278 ["synColSize" , "numCols" , "colLabels" ]
276279 ]
277- vars_to_update_df = syn .store (
278- Table (table_schema , vars_to_update_df , etag = results .etag )
280+ Table (id = catalog_id ).upsert_rows (
281+ values = vars_to_update_df ,
282+ primary_keys = ["variable" ]
279283 )
280284 saved_to_table = True
281285
282286 if not vars_to_add_df .empty :
283287 vars_to_add_df = _create_new_row (vars_to_add_df , cohort )
284288 # add the new cohort_dd column to the table schema
285- if "%s_dd" % cohort not in results .asDataFrame ().columns :
289+ # Check if column exists by querying
290+ existing_df = Table (id = catalog_id ).query (
291+ query = "select * from %s limit 1" % catalog_id
292+ )
293+ if "%s_dd" % cohort not in existing_df .columns :
286294 new_col = syn .store (Column (name = "%s_dd" % cohort , columnType = "BOOLEAN" ))
287295 table_schema .addColumn (new_col )
288296 syn .store (table_schema )
289- syn .store (Table (table_schema , vars_to_add_df ))
297+ # Use store_rows for new rows
298+ Table (id = catalog_id ).store_rows (values = vars_to_add_df )
290299 saved_to_table = True
291300
292301 if saved_to_table :
@@ -486,30 +495,30 @@ def _update_by_release_scope(sor_formatted, data_element_catalog, logger):
486495def update_by_release_scope (args ):
487496 dry_run , syn , logger , catalog_id , sor_id = set_up (args )
488497 sor = download_bpc_sor (syn , logger , sor_id )
489- release_info = syn . tableQuery (
490- "SELECT cohort, release_version, release_type \
498+ release_info = Table ( id = "syn27628075" ). query (
499+ query = "SELECT cohort, release_version, release_type \
491500 FROM syn27628075 \
492501 WHERE current is true"
493- ). asDataFrame ()
502+ )
494503 sor_formatted = format_bpc_sor (sor , release_info , logger )
495- data_element_catalog_query = syn .tableQuery ("SELECT * FROM %s" % catalog_id )
496- data_element_catalog = data_element_catalog_query .asDataFrame ()
504+ # Query the data element catalog using the new API
505+ data_element_catalog = Table (id = catalog_id ).query (
506+ query = "SELECT * FROM %s" % catalog_id
507+ )
497508 vars_to_add_df , vars_to_rm_df , vars_to_update_df = _update_by_release_scope (
498509 sor_formatted , data_element_catalog , logger
499510 )
500511 if not dry_run :
501- table_schema = syn .get (catalog_id )
502512 if not vars_to_update_df .empty :
503- syn .store (
504- Table (
505- table_schema ,
506- vars_to_update_df ,
507- etag = data_element_catalog_query .etag ,
508- )
513+ # Use upsert_rows with 'variable' as the primary key
514+ Table (id = catalog_id ).upsert_rows (
515+ values = vars_to_update_df ,
516+ primary_keys = ["variable" ]
509517 )
510518 if not vars_to_add_df .empty :
511519 logger .info ("Adding new variables to the data element catalog..." )
512- syn .store (Table (table_schema , vars_to_add_df ))
520+ # Use store_rows for new rows
521+ Table (id = catalog_id ).store_rows (values = vars_to_add_df )
513522
514523
515524def main ():
0 commit comments