Skip to content

Commit

Permalink
read-input-tables and replace-output-tables APIs
Browse files Browse the repository at this point in the history
  • Loading branch information
vjterpstra committed Jan 11, 2022
1 parent 3749e4c commit 1edd738
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Added
- ScenarioDbManager - Edit cells in tables
- ScenarioDbManager - Duplicate, Rename and Delete scenario
- ScenarioDbManager.read_scenario_input_tables_from_db main API to read input for solve
- ScenarioDbManager.update_scenario_output_tables_in_db main API to store solve output

## [0.5.3.1] - 2021-12-30
### Changed
Expand Down
98 changes: 78 additions & 20 deletions dse_do_utils/scenariodbmanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,15 @@ def insert_table_in_db_bulk(self, df: pd.DataFrame, mgr, connection=None):
print(f"DataFrame insert/append of table '{table_name}'")
print(e)

def _delete_scenario_table_from_db(self, scenario_name, connection):
"""Delete all rows associated with the scenario in the DB table.
Beware: make sure this is done in the right 'inverse cascading' order to avoid FK violations.
"""
# sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
t = self.get_sa_table() # A Table()
sql = t.delete().where(t.c.scenario_name == scenario_name)
connection.execute(sql)

@staticmethod
def sqlcol(df: pd.DataFrame) -> Dict:
dtypedict = {}
Expand Down Expand Up @@ -842,9 +851,14 @@ def _read_scenario_db_table_from_db_thread(self, scenario_table_name, db_table):

return inputs, outputs

def read_scenario_input_tables_from_db(self, scenario_name: str):
"""Convenience method to load all input tables.
Typically used at start if optimization model."""
return self.read_scenario_tables_from_db(scenario_name, input_table_names=['*'])

def read_scenario_tables_from_db(self, scenario_name: str,
input_table_names: List[str] = None,
output_table_names: List[str] = None) -> (Inputs, Outputs):
input_table_names: Optional[List[str]] = None,
output_table_names: Optional[List[str]] = None) -> (Inputs, Outputs):
"""Read selected set input and output tables from scenario.
If input_table_names/output_table_names contains a '*', then all input/output tables will be read.
If empty list or None, then no tables will be read.
Expand Down Expand Up @@ -948,6 +962,61 @@ def _update_cell_change_in_db(self, db_cell_update: DbCellUpdate, connection):

connection.execute(sql)

############################################################################################
# Update/Replace tables in scenario
############################################################################################
def update_scenario_output_tables_in_db(self, scenario_name, outputs: Outputs):
"""Main API to update output from a DO solve in the scenario.
Deletes ALL output tables. Then inserts the given set of tables.
Since this only touches the output tables, more efficient than replacing the whole scenario."""
if self.enable_transactions:
with self.engine.begin() as connection:
self._update_scenario_output_tables_in_db(scenario_name, outputs, connection)
else:
self._update_scenario_output_tables_in_db(scenario_name, outputs, self.engine)

def _update_scenario_output_tables_in_db(self, scenario_name, outputs: Outputs, connection):
"""Deletes ALL output tables. Then inserts the given set of tables.
Note that if a defined output table is not included in the outputs, it will still be deleted from the scenario data."""
# 1. Add scenario name to dfs:
outputs = ScenarioDbManager.add_scenario_name_to_dfs(scenario_name, outputs)
# 2. Delete all output tables
for scenario_table_name, db_table in reversed(self.output_db_tables.items()): # Note this INCLUDES the SCENARIO table!
if (scenario_table_name != 'Scenario'):
db_table._delete_scenario_table_from_db()
# 3. Insert new data
for scenario_table_name, db_table in self.output_db_tables.items(): # Note this INCLUDES the SCENARIO table!
if (scenario_table_name != 'Scenario') and db_table.db_table_name in outputs.keys(): # If in given set of tables to replace
df = outputs[scenario_table_name]
db_table.insert_table_in_db_bulk(df=df, mgr=self, connection=connection) # The scenario_name is a column in the df

def replace_scenario_tables_in_db(self, scenario_name, inputs={}, outputs={}):
"""Untested"""
if self.enable_transactions:
with self.engine.begin() as connection:
self._replace_scenario_tables_in_db(connection, scenario_name, inputs, outputs)
else:
self._replace_scenario_tables_in_db(self.engine, scenario_name, inputs, outputs)

def _replace_scenario_tables_in_db(self, connection, scenario_name, inputs={}, outputs={}):
"""Untested
Replace only the tables listed in the inputs and outputs. But leave all other tables untouched.
Will first delete all given tables (in reverse cascading order), then insert the new ones (in cascading order)"""

# Add scenario name to dfs:
inputs = ScenarioDbManager.add_scenario_name_to_dfs(scenario_name, inputs)
outputs = ScenarioDbManager.add_scenario_name_to_dfs(scenario_name, outputs)
dfs = {**inputs, **outputs}
# 1. Delete tables
for scenario_table_name, db_table in reversed(self.db_tables.items()): # Note this INCLUDES the SCENARIO table!
if (scenario_table_name != 'Scenario') and db_table.db_table_name in dfs.keys(): # If in given set of tables to replace
db_table._delete_scenario_table_from_db()
# 2. Insert new data
for scenario_table_name, db_table in self.db_tables.items(): # Note this INCLUDES the SCENARIO table!
if (scenario_table_name != 'Scenario') and db_table.db_table_name in dfs.keys(): # If in given set of tables to replace
df = dfs[scenario_table_name]
db_table.insert_table_in_db_bulk(df=df, mgr=self, connection=connection) # The scenario_name is a column in the df

############################################################################################
# CRUD operations on scenarios in DB:
# - Delete scenario
Expand Down Expand Up @@ -1117,28 +1186,17 @@ def _delete_scenario_from_db(self, scenario_name: str, connection):
TODO: check with 'auto-inserted' tables
TODO: batch all sql statements in single execute. Faster? And will that do the defer integrity checks?
"""
batch_sql=False # Batch=True does NOT work!
# batch_sql=False # Batch=True does NOT work!
insp = sqlalchemy.inspect(connection)
tables_in_db = insp.get_table_names(schema=self.schema)
sql_statements = []
# sql_statements = []
for scenario_table_name, db_table in reversed(self.db_tables.items()): # Note this INCLUDES the SCENARIO table!
if db_table.db_table_name in tables_in_db:
# sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
t = db_table.table_metadata # A Table()
sql = t.delete().where(t.c.scenario_name == scenario_name)
if batch_sql:
sql_statements.append(sql)
else:
connection.execute(sql)

# Because the scenario table has already been included in above loop, no need to do separately
# Delete scenario entry in scenario table:
# sql = f"DELETE FROM SCENARIO WHERE scenario_name = '{scenario_name}'"
# sql_statements.append(sql)
if batch_sql:
batch_sql = ";\n".join(sql_statements)
# print(batch_sql)
connection.execute(batch_sql)
# # sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
# t = db_table.table_metadata # A Table()
# sql = t.delete().where(t.c.scenario_name == scenario_name)
# connection.execute(sql)
db_table._delete_scenario_table_from_db(scenario_name, connection)

############################################################################################
# Old Read scenario APIs
Expand Down

0 comments on commit 1edd738

Please sign in to comment.