diff --git a/CHANGELOG.md b/CHANGELOG.md
index 82b89b3..1c7e835 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -4,7 +4,9 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
-## [Unreleased]## [0.5.4.0b]
+## [Unreleased]## [0.5.4.1b]
+
+## [0.5.4.0] - 2022-01-11
### Changed
- ScenarioDbManager - Converted text SQL operations to SQLAlchemy operations to support any column-name (i.e. lower, upper, mixed, reserved words)
- Updated ScenarioDbManager.read_scenario_tables_from_db to selectively read tables from a scenario
diff --git a/VersioningReadMe.md b/VersioningReadMe.md
index 5b7e88f..248f238 100644
--- a/VersioningReadMe.md
+++ b/VersioningReadMe.md
@@ -20,7 +20,7 @@ Note that if you added/removed modules, you first need to re-run the sphinx comm
`python setup.py sdist bdist_wheel`
5. Upload to PyPI (from PyCharm terminal run):
-`make clean`
+`twine upload dist/* --verbose`
Enter username and password when prompted.
(For TestPyPI use: `twine upload --repository-url https://test.pypi.org/legacy/ dist/* --verbose`)
Before the twine upload, you can check the distribution with:
diff --git a/docs/doc_build/doctrees/dse_do_utils.doctree b/docs/doc_build/doctrees/dse_do_utils.doctree
index 5d4dc5e..f87f1dd 100644
Binary files a/docs/doc_build/doctrees/dse_do_utils.doctree and b/docs/doc_build/doctrees/dse_do_utils.doctree differ
diff --git a/docs/doc_build/doctrees/environment.pickle b/docs/doc_build/doctrees/environment.pickle
index 04c41f5..fbcfc9b 100644
Binary files a/docs/doc_build/doctrees/environment.pickle and b/docs/doc_build/doctrees/environment.pickle differ
diff --git a/docs/doc_build/html/.buildinfo b/docs/doc_build/html/.buildinfo
index 7ff3c5f..e47c0e5 100644
--- a/docs/doc_build/html/.buildinfo
+++ b/docs/doc_build/html/.buildinfo
@@ -1,4 +1,4 @@
# Sphinx build info version 1
# This file hashes the configuration used when building these files. When it is not found, a full rebuild will be done.
-config: effd2c6945fa34906f8d2c06e4196c89
+config: bd2b9a92071f1c40e294c4bb1fb76baa
tags: 645f666f9bcd5a90fca523b33c5a78b7
diff --git a/docs/doc_build/html/_modules/dse_do_utils.html b/docs/doc_build/html/_modules/dse_do_utils.html
index e8eb574..ccdd09f 100644
--- a/docs/doc_build/html/_modules/dse_do_utils.html
+++ b/docs/doc_build/html/_modules/dse_do_utils.html
@@ -6,7 +6,7 @@
- dse_do_utils — DSE DO Utils 0.5.2.0 documentation
+ dse_do_utils — DSE DO Utils 0.5.4.0 documentation
@@ -31,7 +31,7 @@
# - Make 'multi_scenario' the default option
# -----------------------------------------------------------------------------------fromabcimportABC
+frommultiprocessing.poolimportThreadPoolimportsqlalchemyimportpandasaspd
-fromtypingimportDict,List
+fromtypingimportDict,List,NamedTuple,Any,OptionalfromcollectionsimportOrderedDictimportrefromsqlalchemyimportexc
@@ -108,6 +109,7 @@
Source code for dse_do_utils.scenariodbmanager
reserved_table_names =['order','parameter']# TODO: add more reserved words for table namesifdb_table_nameinreserved_table_names:print(f"Warning: the db_table_name '{db_table_name}' is a reserved word. Do not use as table name.")
+ self._sa_column_by_name=None# Dict[str, sqlalchemy.Column] Will be generated dynamically first time it is needed.
[docs]defget_sa_table(self)->sqlalchemy.Table:
+ """Returns the SQLAlchemy Table"""
+ returnself.table_metadata
+
+
[docs]defget_sa_column(self,db_column_name)->Optional[sqlalchemy.Column]:
+ """Returns the SQLAlchemy column with the specified name.
+ Dynamically creates a dict/hashtable for more efficient access."""
+ # for c in self.columns_metadata:
+ # if isinstance(c, sqlalchemy.Column) and c.name == db_column_name:
+ # return c
+ ifself._sa_column_by_nameisNone:
+ self._sa_column_by_name={c.name:cforcinself.columns_metadataifisinstance(c,sqlalchemy.Column)}
+ returnself._sa_column_by_name.get(db_column_name)# returns None if npt found (?)
+
+
[docs]defcreate_table_metadata(self,metadata,multi_scenario:bool=False)->sqlalchemy.Table:"""If multi_scenario, then add a primary key 'scenario_name'."""columns_metadata=self.columns_metadataconstraints_metadata=self.constraints_metadata
@@ -177,6 +193,15 @@
Source code for dse_do_utils.scenariodbmanager
print(f"DataFrame insert/append of table '{table_name}'")print(e)
+ def_delete_scenario_table_from_db(self,scenario_name,connection):
+ """Delete all rows associated with the scenario in the DB table.
+ Beware: make sure this is done in the right 'inverse cascading' order to avoid FK violations.
+ """
+ # sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
+ t=self.get_sa_table()# A Table()
+ sql=t.delete().where(t.c.scenario_name==scenario_name)
+ connection.execute(sql)
+
[docs]classDbCellUpdate(NamedTuple):
+ scenario_name:str
+ table_name:str
+ row_index:List[Dict[str,Any]]# e.g. [{'column': 'col1', 'value': 1}, {'column': 'col2', 'value': 'pear'}]
+ column_name:str
+ current_value:Any
+ previous_value:Any# Not used for DB operation
+ row_idx:int# Not used for DB operation
print("Warning: the `Scenario` table should be the first in the input tables")returninput_db_tables
+
[docs]defget_scenario_db_table(self)->ScenarioDbTable:
+ """Scenario table must be the first in self.input_db_tables"""
+ db_table:ScenarioTable=list(self.input_db_tables.values())[0]
+ returndb_table
+
def_create_database_engine(self,credentials=None,schema:str=None,echo:bool=False):"""Creates a SQLAlchemy engine at initialization. If no credentials, creates an in-memory SQLite DB. Which can be used for schema validation of the data.
@@ -458,9 +498,9 @@
Source code for dse_do_utils.scenariodbmanager
with self.engine.begin()asconnection:self._create_schema_transaction(connection=connection)else:
- self._create_schema_transaction()
+ self._create_schema_transaction(self.engine)
- def_create_schema_transaction(self,connection=None):
+ def_create_schema_transaction(self,connection):"""(Re)creates a schema, optionally using a transaction Drops all tables and re-creates the schema in the DB."""# if self.schema is None:
@@ -469,10 +509,7 @@
Source code for dse_do_utils.scenariodbmanager
# self.drop_schema_transaction(self.schema)
# DROP SCHEMA isn't working properly, so back to dropping all tablesself._drop_all_tables_transaction(connection=connection)
- ifconnectionisNone:
- self.metadata.create_all(self.engine,checkfirst=True)
- else:
- self.metadata.create_all(connection,checkfirst=True)
+ self.metadata.create_all(connection,checkfirst=True)
[docs]defdrop_all_tables(self):"""Drops all tables in the current schema."""
@@ -480,9 +517,9 @@
Source code for dse_do_utils.scenariodbmanager
with self.engine.begin()asconnection:self._drop_all_tables_transaction(connection=connection)else:
- self._drop_all_tables_transaction()
+ self._drop_all_tables_transaction(self.engine)
- def_drop_all_tables_transaction(self,connection=None):
+ def_drop_all_tables_transaction(self,connection):"""Drops all tables as defined in db_tables (if exists) TODO: loop over tables as they exist in the DB. This will make sure that however the schema definition has changed, all tables will be cleared.
@@ -493,15 +530,18 @@
Source code for dse_do_utils.scenariodbmanager
However, the order is alphabetically, which causes FK constraint violation
Weirdly, this happens in SQLite, not in DB2! With or without transactions
+
+ TODO:
+ 1. Use SQLAlchemy to drop table, avoid text SQL
+ 2. Drop all tables without having to loop and know all tables
+ See: https://stackoverflow.com/questions/35918605/how-to-delete-a-table-in-sqlalchemy)
+ See https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.MetaData.drop_all """forscenario_table_name,db_tableinreversed(self.db_tables.items()):db_table_name=db_table.db_table_namesql=f"DROP TABLE IF EXISTS {db_table_name}"# print(f"Dropping table {db_table_name}")
- ifconnectionisNone:
- r=self.engine.execute(sql)
- else:
- r=connection.execute(sql)
+ connection.execute(sql)def_drop_schema_transaction(self,schema:str,connection=None):"""NOT USED. Not working in DB2 Cloud.
@@ -509,6 +549,7 @@
Source code for dse_do_utils.scenariodbmanager
See: https://www.ibm.com/docs/en/db2/11.5?topic=procedure-admin-drop-schema-drop-schema
However, this doesn't work on DB2 cloud. TODO: find out if and how we can get this to work.
+ See https://docs.sqlalchemy.org/en/14/core/metadata.html#sqlalchemy.schema.MetaData.drop_all """# sql = f"DROP SCHEMA {schema} CASCADE" # Not allowed in DB2!sql=f"CALL SYSPROC.ADMIN_DROP_SCHEMA('{schema}', NULL, 'ERRORSCHEMA', 'ERRORTABLE')"
@@ -561,18 +602,20 @@
Source code for dse_do_utils.scenariodbmanager
if self.enable_transactions:print("Replacing scenario within transaction")withself.engine.begin()asconnection:
- self._replace_scenario_in_db_transaction(scenario_name=scenario_name,inputs=inputs,outputs=outputs,bulk=bulk,connection=connection)
+ self._replace_scenario_in_db_transaction(connection,scenario_name=scenario_name,inputs=inputs,outputs=outputs,bulk=bulk)else:
- self._replace_scenario_in_db_transaction(scenario_name=scenario_name,inputs=inputs,outputs=outputs,bulk=bulk)
- def_replace_scenario_in_db_transaction(self,scenario_name:str,inputs:Inputs={},outputs:Outputs={},
- bulk:bool=True,connection=None):
+ def_replace_scenario_in_db_transaction(self,connection,scenario_name:str,inputs:Inputs={},outputs:Outputs={},
+ bulk:bool=True):"""Replace a single full scenario in the DB. If doesn't exist, will insert. Only inserts tables with an entry defined in self.db_tables (i.e. no `auto_insert`). Will first delete all rows associated with a scenario_name. Will set/overwrite the scenario_name in all dfs, so no need to add in advance. Assumes schema has been created. Note: there is no difference between dfs in inputs or outputs, i.e. they are inserted the same way.
+
+ TODO: break-out in a delete and an insert. Then we can re-use the insert for the duplicate API """# Step 1: delete scenario if existsself._delete_scenario_from_db(scenario_name,connection=connection)
@@ -580,11 +623,10 @@
Source code for dse_do_utils.scenariodbmanager
inputs =ScenarioDbManager.add_scenario_name_to_dfs(scenario_name,inputs)outputs=ScenarioDbManager.add_scenario_name_to_dfs(scenario_name,outputs)# Step 3: insert scenario_name in scenario table
- sql=f"INSERT INTO SCENARIO (scenario_name) VALUES ('{scenario_name}')"
- ifconnectionisNone:
- self.engine.execute(sql)
- else:
- connection.execute(sql)
+ # sql = f"INSERT INTO SCENARIO (scenario_name) VALUES ('{scenario_name}')"
+ sa_scenario_table=self.get_scenario_db_table().get_sa_table()
+ sql_insert=sa_scenario_table.insert().values(scenario_name=scenario_name)
+ connection.execute(sql_insert)# Step 4: (bulk) insert scenarionum_caught_exceptions=self._insert_single_scenario_tables_in_db(inputs=inputs,outputs=outputs,bulk=bulk,connection=connection)# Throw exception if any exceptions caught in 'non-bulk' mode
@@ -592,34 +634,38 @@
Source code for dse_do_utils.scenariodbmanager
if num_caught_exceptions>0:raiseRuntimeError(f"Multiple ({num_caught_exceptions}) Integrity and/or Statement errors caught. See log. Raising exception to allow for rollback.")
- def_delete_scenario_from_db(self,scenario_name:str,connection=None):
- """Deletes all rows associated with a given scenario.
- Note that it only deletes rows from tables defined in the self.db_tables, i.e. will NOT delete rows in 'auto-inserted' tables!
- Must do a 'cascading' delete to ensure not violating FK constraints. In reverse order of how they are inserted.
- Also deletes entry in scenario table
- TODO: do within one session/cursor, so we don't have to worry about the order of the delete?
- """
- insp=sqlalchemy.inspect(self.engine)
- forscenario_table_name,db_tableinreversed(self.db_tables.items()):
- ifinsp.has_table(db_table.db_table_name,schema=self.schema):
- sql=f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'"
- ifconnectionisNone:
- self.engine.execute(sql)
- else:
- connection.execute(sql)
-
- # Delete scenario entry in scenario table:
- sql=f"DELETE FROM SCENARIO WHERE scenario_name = '{scenario_name}'"
- ifconnectionisNone:
- self.engine.execute(sql)
- else:
- connection.execute(sql)
+ # def _delete_scenario_from_db(self, scenario_name: str, connection=None):
+ # """Deletes all rows associated with a given scenario.
+ # Note that it only deletes rows from tables defined in the self.db_tables, i.e. will NOT delete rows in 'auto-inserted' tables!
+ # Must do a 'cascading' delete to ensure not violating FK constraints. In reverse order of how they are inserted.
+ # Also deletes entry in scenario table
+ # """
+ # insp = sqlalchemy.inspect(self.engine)
+ # for scenario_table_name, db_table in reversed(self.db_tables.items()):
+ # if insp.has_table(db_table.db_table_name, schema=self.schema):
+ #
+ # # sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'"
+ # t: sqlalchemy.Table = db_table.get_sa_table() # A Table()
+ # sql = t.delete().where(t.c.scenario_name == scenario_name)
+ # if connection is None:
+ # self.engine.execute(sql)
+ # else:
+ # connection.execute(sql)
+ #
+ # # Delete scenario entry in scenario table:
+ # # sql = f"DELETE FROM SCENARIO WHERE scenario_name = '{scenario_name}'"
+ # t: sqlalchemy.Table = self.get_scenario_db_table().get_sa_table() # A Table()
+ # sql = t.delete().where(t.c.scenario_name == scenario_name)
+ # if connection is None:
+ # self.engine.execute(sql)
+ # else:
+ # connection.execute(sql)def_insert_single_scenario_tables_in_db(self,inputs:Inputs={},outputs:Outputs={},bulk:bool=True,connection=None)->int:"""Specifically for single scenario replace/insert. Does NOT insert into the `scenario` table.
- No `auto_insert`, i.e. only df matching db_tables.
+ No `auto_insert`, i.e. only df matching db_tables. TODO: verify if doesn't work with AutoScenarioDbTable """num_caught_exceptions=0dfs={**inputs,**outputs}# Combine all dfs in one dict
@@ -712,13 +758,19 @@
[docs]defget_scenarios_df(self)->pd.DataFrame:"""Return all scenarios in df. Result is indexed by `scenario_name`. Main API to get all scenarios. The API called by a cached procedure in the dse_do_dashboard.DoDashApp. """
- sql=f"SELECT * FROM SCENARIO"
- df=pd.read_sql(sql,con=self.engine).set_index(['scenario_name'])
+ # sql = f"SELECT * FROM SCENARIO"
+ sa_scenario_table=list(self.input_db_tables.values())[0].table_metadata
+ sql=sa_scenario_table.select()
+ ifself.enable_transactions:
+ withself.engine.begin()asconnection:
+ df=pd.read_sql(sql,con=connection).set_index(['scenario_name'])
+ else:
+ df=pd.read_sql(sql,con=self.engine).set_index(['scenario_name'])returndf
# error!
raiseValueError(f"Scenario table name '{scenario_table_name}' unknown. Cannot load data from DB.")
- df=self._read_scenario_db_table_from_db(scenario_name,db_table)
+ ifself.enable_transactions:
+ withself.engine.begin()asconnection:
+ df=self._read_scenario_db_table_from_db(scenario_name,db_table,connection)
+ else:
+ df=self._read_scenario_db_table_from_db(scenario_name,db_table,self.engine)returndf
-
[docs]defread_scenario_from_db(self,scenario_name:str)->(Inputs,Outputs):
+ # def read_scenario_from_db(self, scenario_name: str) -> (Inputs, Outputs):
+ # """Single scenario load.
+ # Main API to read a complete scenario.
+ # Reads all tables for a single scenario.
+ # Returns all tables in one dict"""
+ # inputs = {}
+ # for scenario_table_name, db_table in self.input_db_tables.items():
+ # inputs[scenario_table_name] = self._read_scenario_db_table_from_db(scenario_name, db_table)
+ #
+ # outputs = {}
+ # for scenario_table_name, db_table in self.output_db_tables.items():
+ # outputs[scenario_table_name] = self._read_scenario_db_table_from_db(scenario_name, db_table)
+ #
+ # return inputs, outputs
+
+
+ # def _read_scenario_from_db(self, scenario_name: str, connection) -> (Inputs, Outputs):
+ # """Single scenario load.
+ # Main API to read a complete scenario.
+ # Reads all tables for a single scenario.
+ # Returns all tables in one dict
+ # """
+ # inputs = {}
+ # for scenario_table_name, db_table in self.input_db_tables.items():
+ # # print(f"scenario_table_name = {scenario_table_name}")
+ # if scenario_table_name != 'Scenario': # Skip the Scenario table as an input
+ # inputs[scenario_table_name] = self._read_scenario_db_table_from_db(scenario_name, db_table, connection=connection)
+ #
+ # outputs = {}
+ # for scenario_table_name, db_table in self.output_db_tables.items():
+ # outputs[scenario_table_name] = self._read_scenario_db_table_from_db(scenario_name, db_table, connection=connection)
+ # # if scenario_table_name == 'kpis':
+ # # # print(f"kpis table columns = {outputs[scenario_table_name].columns}")
+ # # outputs[scenario_table_name] = outputs[scenario_table_name].rename(columns={'name': 'NAME'}) #HACK!!!!!
+ # return inputs, outputs
+
[docs]defread_scenario_from_db(self,scenario_name:str,multi_threaded:bool=False)->(Inputs,Outputs):"""Single scenario load. Main API to read a complete scenario. Reads all tables for a single scenario.
- Returns all tables in one dict"""
+ Returns all tables in one dict
+
+ Note: multi_threaded doesn't seem to lead to performance improvement.
+ Fixed: omit reading scenario table as an input.
+ """
+ # print(f"read_scenario_from_db.multi_threaded = {multi_threaded}")
+ ifmulti_threaded:
+ inputs,outputs=self._read_scenario_from_db_multi_threaded(scenario_name)
+ else:
+ ifself.enable_transactions:
+ withself.engine.begin()asconnection:
+ inputs,outputs=self._read_scenario_from_db(scenario_name,connection)
+ else:
+ inputs,outputs=self._read_scenario_from_db(scenario_name,self.engine)
+ returninputs,outputs
+
+ def_read_scenario_from_db(self,scenario_name:str,connection)->(Inputs,Outputs):
+ """Single scenario load.
+ Main API to read a complete scenario.
+ Reads all tables for a single scenario.
+ Returns all tables in one dict
+ """inputs={}forscenario_table_name,db_tableinself.input_db_tables.items():
- inputs[scenario_table_name]=self._read_scenario_db_table_from_db(scenario_name,db_table)
+ # print(f"scenario_table_name = {scenario_table_name}")
+ ifscenario_table_name!='Scenario':# Skip the Scenario table as an input
+ inputs[scenario_table_name]=self._read_scenario_db_table_from_db(scenario_name,db_table,connection=connection)outputs={}forscenario_table_name,db_tableinself.output_db_tables.items():
- outputs[scenario_table_name]=self._read_scenario_db_table_from_db(scenario_name,db_table)
+ outputs[scenario_table_name]=self._read_scenario_db_table_from_db(scenario_name,db_table,connection=connection)
+
+ returninputs,outputs
+
+ def_read_scenario_from_db_multi_threaded(self,scenario_name)->(Inputs,Outputs):
+ """Reads all tables from a scenario using multi-threading.
+ Does NOT seem to result in performance improvement!"""
+ classReadTableFunction(object):
+ def__init__(self,dbm):
+ self.dbm=dbm
+ def__call__(self,scenario_table_name,db_table):
+ returnself._read_scenario_db_table_from_db_thread(scenario_table_name,db_table)
+ def_read_scenario_db_table_from_db_thread(self,scenario_table_name,db_table):
+ withself.dbm.engine.begin()asconnection:
+ df=self.dbm._read_scenario_db_table_from_db(scenario_name,db_table,connection)
+ dict={scenario_table_name:df}
+ returndict
+
+ thread_number=8
+ pool=ThreadPool(thread_number)
+ thread_worker=ReadTableFunction(self)
+ # print("ThreadPool created")
+ all_tables=[(scenario_table_name,db_table)forscenario_table_name,db_tableinself.db_tables.items()ifscenario_table_name!='Scenario']
+ # print(all_tables)
+ all_results=pool.starmap(thread_worker,all_tables)
+ inputs={k:vforelementinall_resultsfork,vinelement.items()ifkinself.input_db_tables.keys()}
+ outputs={k:vforelementinall_resultsfork,vinelement.items()ifkinself.output_db_tables.keys()}
+ # print("All tables loaded")
+
+ returninputs,outputs
+
+
[docs]defread_scenario_input_tables_from_db(self,scenario_name:str):
+ """Convenience method to load all input tables.
+ Typically used at start if optimization model."""
+ returnself.read_scenario_tables_from_db(scenario_name,input_table_names=['*'])
+
[docs]defread_scenario_tables_from_db(self,scenario_name:str,
+ input_table_names:Optional[List[str]]=None,
+ output_table_names:Optional[List[str]]=None)->(Inputs,Outputs):
+ """Read selected set input and output tables from scenario.
+ If input_table_names/output_table_names contains a '*', then all input/output tables will be read.
+ If empty list or None, then no tables will be read.
+ """
+ ifself.enable_transactions:
+ withself.engine.begin()asconnection:
+ inputs,outputs=self._read_scenario_tables_from_db(connection,scenario_name,input_table_names,output_table_names)
+ else:
+ inputs,outputs=self._read_scenario_tables_from_db(self.engine,scenario_name,input_table_names,output_table_names)returninputs,outputs
- def_read_scenario_db_table_from_db(self,scenario_name:str,db_table:ScenarioDbTable)->pd.DataFrame:
+ def_read_scenario_tables_from_db(self,connection,scenario_name:str,
+ input_table_names:List[str]=None,
+ output_table_names:List[str]=None)->(Inputs,Outputs):
+ """Loads data for selected input and output tables.
+ If either list is names is ['*'], will load all tables as defined in db_tables configuration.
+ """
+ ifinput_table_namesisNone:# load no tables by default
+ input_table_names=[]
+ elif'*'ininput_table_names:
+ input_table_names=list(self.input_db_tables.keys())
+ if'Scenario'ininput_table_names:input_table_names.remove('Scenario')# Remove the scenario table
+
+ ifoutput_table_namesisNone:# load no tables by default
+ output_table_names=[]
+ elif'*'inoutput_table_names:
+ output_table_names=self.output_db_tables.keys()
+
+ inputs={}
+ forscenario_table_name,db_tableinself.input_db_tables.items():
+ ifscenario_table_nameininput_table_names:
+ inputs[scenario_table_name]=self._read_scenario_table_from_db(scenario_name,db_table,connection=connection)
+ outputs={}
+ forscenario_table_name,db_tableinself.output_db_tables.items():
+ ifscenario_table_nameinoutput_table_names:
+ outputs[scenario_table_name]=self._read_scenario_db_table_from_db(scenario_name,db_table,connection=connection)
+ returninputs,outputs
+
+ # def _read_scenario_db_table_from_db(self, scenario_name: str, db_table: ScenarioDbTable) -> pd.DataFrame:
+ # """Read one table from the DB.
+ # Removes the `scenario_name` column."""
+ # db_table_name = db_table.db_table_name
+ # sql = f"SELECT * FROM {db_table_name} WHERE scenario_name = '{scenario_name}'"
+ # df = pd.read_sql(sql, con=self.engine)
+ # if db_table_name != 'scenario':
+ # df = df.drop(columns=['scenario_name'])
+ #
+ # return df
+ def_read_scenario_db_table_from_db(self,scenario_name:str,db_table:ScenarioDbTable,connection)->pd.DataFrame:"""Read one table from the DB.
- Removes the `scenario_name` column."""
+ Removes the `scenario_name` column.
+
+ Modification: based on SQLAlchemy syntax. If doing the plain text SQL, then some column names not properly extracted
+ """db_table_name=db_table.db_table_name
- sql=f"SELECT * FROM {db_table_name} WHERE scenario_name = '{scenario_name}'"
- df=pd.read_sql(sql,con=self.engine)
+ # sql = f"SELECT * FROM {db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
+ # db_table.table_metadata is a Table()
+ t:sqlalchemy.Table=db_table.get_sa_table()#table_metadata
+ sql=t.select().where(t.c.scenario_name==scenario_name)# This is NOT a simple string!
+ df=pd.read_sql(sql,con=connection)ifdb_table_name!='scenario':df=df.drop(columns=['scenario_name'])returndf
+ ############################################################################################
+ # Update scenario
+ ############################################################################################
+
[docs]defupdate_cell_changes_in_db(self,db_cell_updates:List[DbCellUpdate]):
+ """Update a set of cells in the DB.
+
+ :param db_cell_updates:
+ :return:
+ """
+ ifself.enable_transactions:
+ print("Update cells with transaction")
+ withself.engine.begin()asconnection:
+ self._update_cell_changes_in_db(db_cell_updates,connection=connection)
+ else:
+ self._update_cell_changes_in_db(db_cell_updates)
+
+ def_update_cell_changes_in_db(self,db_cell_updates:List[DbCellUpdate],connection=None):
+ """Update an ordered list of single value changes (cell) in the DB."""
+ fordb_cell_changeindb_cell_updates:
+ self._update_cell_change_in_db(db_cell_change,connection)
+
+ def_update_cell_change_in_db(self,db_cell_update:DbCellUpdate,connection):
+ """Update a single value (cell) change in the DB."""
+ # db_table_name = self.db_tables[db_cell_update.table_name].db_table_name
+ # column_change = f"{db_cell_update.column_name} = '{db_cell_update.current_value}'"
+ # scenario_condition = f"scenario_name = '{db_cell_update.scenario_name}'"
+ # pk_conditions = ' AND '.join([f"{pk['column']} = '{pk['value']}'" for pk in db_cell_update.row_index])
+ # old_sql = f"UPDATE {db_table_name} SET {column_change} WHERE {pk_conditions} AND {scenario_condition};"
+
+ db_table:ScenarioDbTable=self.db_tables[db_cell_update.table_name]
+ t:sqlalchemy.Table=db_table.get_sa_table()
+ pk_conditions=[(db_table.get_sa_column(pk['column'])==pk['value'])forpkindb_cell_update.row_index]
+ target_col:sqlalchemy.Column=db_table.get_sa_column(db_cell_update.column_name)
+ sql=t.update().where(sqlalchemy.and_((t.c.scenario_name==db_cell_update.scenario_name),*pk_conditions)).values({target_col:db_cell_update.current_value})
+ # print(f"_update_cell_change_in_db = {sql}")
+
+ connection.execute(sql)
+
+ ############################################################################################
+ # Update/Replace tables in scenario
+ ############################################################################################
+
[docs]defupdate_scenario_output_tables_in_db(self,scenario_name,outputs:Outputs):
+ """Main API to update output from a DO solve in the scenario.
+ Deletes ALL output tables. Then inserts the given set of tables.
+ Since this only touches the output tables, more efficient than replacing the whole scenario."""
+ ifself.enable_transactions:
+ withself.engine.begin()asconnection:
+ self._update_scenario_output_tables_in_db(scenario_name,outputs,connection)
+ else:
+ self._update_scenario_output_tables_in_db(scenario_name,outputs,self.engine)
+
+ def_update_scenario_output_tables_in_db(self,scenario_name,outputs:Outputs,connection):
+ """Deletes ALL output tables. Then inserts the given set of tables.
+ Note that if a defined output table is not included in the outputs, it will still be deleted from the scenario data."""
+ # 1. Add scenario name to dfs:
+ outputs=ScenarioDbManager.add_scenario_name_to_dfs(scenario_name,outputs)
+ # 2. Delete all output tables
+ forscenario_table_name,db_tableinreversed(self.output_db_tables.items()):# Note this INCLUDES the SCENARIO table!
+ if(scenario_table_name!='Scenario'):
+ db_table._delete_scenario_table_from_db(scenario_name,connection)
+ # 3. Insert new data
+ forscenario_table_name,db_tableinself.output_db_tables.items():# Note this INCLUDES the SCENARIO table!
+ if(scenario_table_name!='Scenario')anddb_table.db_table_nameinoutputs.keys():# If in given set of tables to replace
+ df=outputs[scenario_table_name]
+ db_table.insert_table_in_db_bulk(df=df,mgr=self,connection=connection)# The scenario_name is a column in the df
+
+
+
+ def_replace_scenario_tables_in_db(self,connection,scenario_name,inputs={},outputs={}):
+ """Untested
+ Replace only the tables listed in the inputs and outputs. But leave all other tables untouched.
+ Will first delete all given tables (in reverse cascading order), then insert the new ones (in cascading order)"""
+
+ # Add scenario name to dfs:
+ inputs=ScenarioDbManager.add_scenario_name_to_dfs(scenario_name,inputs)
+ outputs=ScenarioDbManager.add_scenario_name_to_dfs(scenario_name,outputs)
+ dfs={**inputs,**outputs}
+ # 1. Delete tables
+ forscenario_table_name,db_tableinreversed(self.db_tables.items()):# Note this INCLUDES the SCENARIO table!
+ if(scenario_table_name!='Scenario')anddb_table.db_table_nameindfs.keys():# If in given set of tables to replace
+ db_table._delete_scenario_table_from_db()
+ # 2. Insert new data
+ forscenario_table_name,db_tableinself.db_tables.items():# Note this INCLUDES the SCENARIO table!
+ if(scenario_table_name!='Scenario')anddb_table.db_table_nameindfs.keys():# If in given set of tables to replace
+ df=dfs[scenario_table_name]
+ db_table.insert_table_in_db_bulk(df=df,mgr=self,connection=connection)# The scenario_name is a column in the df
+
+ ############################################################################################
+ # CRUD operations on scenarios in DB:
+ # - Delete scenario
+ # - Duplicate scenario
+ # - Rename scenario
+ ############################################################################################
+
[docs]defdelete_scenario_from_db(self,scenario_name:str):
+ """Delete a scenario. Uses a transaction (when enabled)."""
+ ifself.enable_transactions:
+ print("Delete scenario within a transaction")
+ withself.engine.begin()asconnection:
+ self._delete_scenario_from_db(scenario_name=scenario_name,connection=connection)
+ else:
+ self._delete_scenario_from_db(scenario_name=scenario_name,connection=self.engine)
[docs]defduplicate_scenario_in_db(self,source_scenario_name:str,target_scenario_name:str):
+ """Duplicate a scenario. Uses a transaction (when enabled)."""
+ ifself.enable_transactions:
+ print("Duplicate scenario within a transaction")
+ withself.engine.begin()asconnection:
+ self._duplicate_scenario_in_db(connection,source_scenario_name,target_scenario_name)
+ else:
+ self._duplicate_scenario_in_db(self.engine,source_scenario_name,target_scenario_name)
+
+ def_duplicate_scenario_in_db(self,connection,source_scenario_name:str,target_scenario_name:str=None):
+ """Is fully done in DB using SQL in one SQL execute statement
+ :param source_scenario_name:
+ :param target_scenario_name:
+ :param connection:
+ :return:
+ """
+ iftarget_scenario_nameisNone:
+ new_scenario_name=self._find_free_duplicate_scenario_name(source_scenario_name)
+ elifself._check_free_scenario_name(target_scenario_name):
+ new_scenario_name=target_scenario_name
+ else:
+ raiseValueError(f"Target name for duplicate scenario '{target_scenario_name}' already exists.")
+
+ # inputs, outputs = self.read_scenario_from_db(source_scenario_name)
+ # self._replace_scenario_in_db_transaction(scenario_name=new_scenario_name, inputs=inputs, outputs=outputs,
+ # bulk=True, connection=connection)
+ self._duplicate_scenario_in_db_sql(connection,source_scenario_name,new_scenario_name)
+
+ def_duplicate_scenario_in_db_sql(self,connection,source_scenario_name:str,target_scenario_name:str=None):
+ """
+ :param source_scenario_name:
+ :param target_scenario_name:
+ :param connection:
+ :return:
+
+ See https://stackoverflow.com/questions/9879830/select-modify-and-insert-into-the-same-table
+
+ Problem: the table Parameter/parameters has a column 'value' (lower-case).
+ Almost all of the column names in the DFs are lower-case, as are the column names in the ScenarioDbTable.
+ Typically, the DB schema converts that the upper-case column names in the DB.
+ But probably because 'VALUE' is a reserved word, it does NOT do this for 'value'. But that means in order to refer to this column in SQL,
+ one needs to put "value" between double quotes.
+ Problem is that you CANNOT do that for other columns, since these are in upper-case in the DB.
+ Note that the kpis table uses upper case 'VALUE' and that seems to work fine
+
+ Resolution: use SQLAlchemy to construct the SQL. Do NOT create SQL expressions by text manipulation.
+ SQLAlchemy has the smarts to properly deal with these complex names.
+ """
+ iftarget_scenario_nameisNone:
+ new_scenario_name=self._find_free_duplicate_scenario_name(source_scenario_name)
+ elifself._check_free_scenario_name(target_scenario_name):
+ new_scenario_name=target_scenario_name
+ else:
+ raiseValueError(f"Target name for duplicate scenario '{target_scenario_name}' already exists.")
+
+ batch_sql=False# BEWARE: batch = True does NOT work!
+ sql_statements=[]
+
+ # 1. Insert scenario in scenario table
+ # sql_insert = f"INSERT INTO SCENARIO (scenario_name) VALUES ('{new_scenario_name}')" # Old SQL
+ # sa_scenario_table = list(self.input_db_tables.values())[0].get_sa_table() # Scenario table must be the first
+ sa_scenario_table=self.get_scenario_db_table().get_sa_table()
+ sql_insert=sa_scenario_table.insert().values(scenario_name=new_scenario_name)
+ # print(f"_duplicate_scenario_in_db_sql - Insert SQL = {sql_insert}")
+ ifbatch_sql:
+ sql_statements.append(sql_insert)
+ else:
+ connection.execute(sql_insert)
+
+ # 2. Do 'insert into select' to duplicate rows in each table
+ forscenario_table_name,db_tableinself.db_tables.items():
+ ifscenario_table_name=='Scenario':
+ continue
+
+ t:sqlalchemy.table=db_table.table_metadata# The table at hand
+ s:sqlalchemy.table=sa_scenario_table# The scenario table
+ # print("+++++++++++SQLAlchemy insert-select")
+ select_columns=[s.c.scenario_nameifc.name=='scenario_name'elsecforcint.columns]# Replace the t.c.scenario_name with s.c.scenario_name, so we get the new value
+ # print(f"select columns = {select_columns}")
+ select_sql=(sqlalchemy.select(select_columns)
+ .where(sqlalchemy.and_(t.c.scenario_name==source_scenario_name,s.c.scenario_name==target_scenario_name)))
+ target_columns=[cforcint.columns]
+ sql_insert=t.insert().from_select(target_columns,select_sql)
+ # print(f"sql_insert = {sql_insert}")
+
+ # sql_insert = f"INSERT INTO {db_table.db_table_name} ({target_columns_txt}) SELECT '{target_scenario_name}',{other_source_columns_txt} FROM {db_table.db_table_name} WHERE scenario_name = '{source_scenario_name}'"
+ ifbatch_sql:
+ sql_statements.append(sql_insert)
+ else:
+ connection.execute(sql_insert)
+ ifbatch_sql:
+ batch_sql=";\n".join(sql_statements)
+ print(batch_sql)
+ connection.execute(batch_sql)
+
+ def_find_free_duplicate_scenario_name(self,scenario_name:str,scenarios_df=None)->Optional[str]:
+ """Finds next free scenario name based on pattern '{scenario_name}_copy_n'.
+ Will try at maximum 20 attempts.
+ """
+ max_num_attempts=20
+ foriinrange(1,max_num_attempts+1):
+ new_name=f"{scenario_name}({i})"
+ free=self._check_free_scenario_name(new_name,scenarios_df)
+ iffree:
+ returnnew_name
+ raiseValueError(f"Cannot find free name for duplicate scenario. Tried {max_num_attempts}. Last attempt = {new_name}. Rename scenarios.")
+ returnNone
+
+ def_check_free_scenario_name(self,scenario_name,scenarios_df=None)->bool:
+ ifscenarios_dfisNone:
+ scenarios_df=self.get_scenarios_df()
+ free=(Falseifscenario_nameinscenarios_df.indexelseTrue)
+ returnfree
+
+ ##############################################
+
[docs]defrename_scenario_in_db(self,source_scenario_name:str,target_scenario_name:str):
+ """Rename a scenario. Uses a transaction (when enabled)."""
+ ifself.enable_transactions:
+ print("Rename scenario within a transaction")
+ withself.engine.begin()asconnection:
+ # self._rename_scenario_in_db(source_scenario_name, target_scenario_name, connection=connection)
+ self._rename_scenario_in_db_sql(connection,source_scenario_name,target_scenario_name)
+ else:
+ # self._rename_scenario_in_db(source_scenario_name, target_scenario_name)
+ self._rename_scenario_in_db_sql(self.engine,source_scenario_name,target_scenario_name)
+
+ def_rename_scenario_in_db_sql(self,connection,source_scenario_name:str,target_scenario_name:str=None):
+ """Rename scenario.
+ Uses 2 steps:
+ 1. Duplicate scenario
+ 2. Delete source scenario.
+
+ Problem is that we use scenario_name as a primary key. You should not change the value of primary keys in a DB.
+ Instead, first copy the data using a new scenario_name, i.e. duplicate a scenario. Next, delete the original scenario.
+
+ Long-term solution: use a scenario_seq sequence key as the PK. With scenario_name as a ordinary column in the scenario table.
+
+ Use of 'insert into select': https://stackoverflow.com/questions/9879830/select-modify-and-insert-into-the-same-table
+ """
+ # 1. Duplicate scenario
+ self._duplicate_scenario_in_db_sql(connection,source_scenario_name,target_scenario_name)
+ # 2. Delete scenario
+ self._delete_scenario_from_db(source_scenario_name,connection=connection)
+
+ def_delete_scenario_from_db(self,scenario_name:str,connection):
+ """Deletes all rows associated with a given scenario.
+ Note that it only deletes rows from tables defined in the self.db_tables, i.e. will NOT delete rows in 'auto-inserted' tables!
+ Must do a 'cascading' delete to ensure not violating FK constraints. In reverse order of how they are inserted.
+ Also deletes entry in scenario table
+ Uses SQLAlchemy syntax to generate SQL
+ TODO: check with 'auto-inserted' tables
+ TODO: batch all sql statements in single execute. Faster? And will that do the defer integrity checks?
+ """
+ # batch_sql=False # Batch=True does NOT work!
+ insp=sqlalchemy.inspect(connection)
+ tables_in_db=insp.get_table_names(schema=self.schema)
+ # sql_statements = []
+ forscenario_table_name,db_tableinreversed(self.db_tables.items()):# Note this INCLUDES the SCENARIO table!
+ ifdb_table.db_table_nameintables_in_db:
+ # # sql = f"DELETE FROM {db_table.db_table_name} WHERE scenario_name = '{scenario_name}'" # Old
+ # t = db_table.table_metadata # A Table()
+ # sql = t.delete().where(t.c.scenario_name == scenario_name)
+ # connection.execute(sql)
+ db_table._delete_scenario_table_from_db(scenario_name,connection)############################################################################################# Old Read scenario APIs
@@ -912,29 +1388,13 @@
[docs]defread_scenario_tables_from_db(self,scenario_name:str,
- input_table_names:List[str]=None,
- output_table_names:List[str]=None)->(Inputs,Outputs):
- """Loads data for selected input and output tables.
- If either list is names is None, will load all tables as defined in db_tables configuration.
- """
- ifinput_table_namesisNone:# load all tables by default
- input_table_names=list(self.input_db_tables.keys())
- if'Scenario'ininput_table_names:input_table_names.remove('Scenario')# Remove the scenario table
- ifoutput_table_namesisNone:# load all tables by default
- output_table_names=self.output_db_tables.keys()
- inputs={}
- forscenario_table_nameininput_table_names:
- inputs[scenario_table_name]=self.read_scenario_table_from_db(scenario_name,scenario_table_name)
- outputs={}
- forscenario_table_nameinoutput_table_names:
- outputs[scenario_table_name]=self.read_scenario_table_from_db(scenario_name,scenario_table_name)
- returninputs,outputs
[docs]defread_scenarios_from_db(self,scenario_names:List[str]=[])->(Inputs,Outputs):"""Multi scenario load.
- Reads all tables from set of scenarios"""
+ Reads all tables from set of scenarios
+ TODO: avoid use of text SQL. Use SQLAlchemy sql generation.
+ """where_scenarios=','.join([f"'{n}'"forninscenario_names])inputs={}
@@ -1063,7 +1523,7 @@
Return all scenarios in df. Result is indexed by scenario_name.
Main API to get all scenarios.
The API called by a cached procedure in the dse_do_dashboard.DoDashApp.
Loads data for selected input and output tables.
-If either list is names is None, will load all tables as defined in db_tables configuration.
+
Read selected set input and output tables from scenario.
+If input_table_names/output_table_names contains a ‘*’, then all input/output tables will be read.
+If empty list or None, then no tables will be read.
Main API to update output from a DO solve in the scenario.
+Deletes ALL output tables. Then inserts the given set of tables.
+Since this only touches the output tables, more efficient than replacing the whole scenario.