Skip to content

Commit 9e2d119

Browse files
authored
DH-4703 Set table-desription status (#183)
* DH-4703 Set table-desription status * DH-4703 Added script to update table_descriptions rows
1 parent c31ddfb commit 9e2d119

File tree

6 files changed

+111
-18
lines changed

6 files changed

+111
-18
lines changed

dataherald/api/fastapi.py

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,15 @@
66
from bson import json_util
77
from fastapi import BackgroundTasks, HTTPException
88
from overrides import override
9+
from sqlalchemy import MetaData, inspect
910

1011
from dataherald.api import API
1112
from dataherald.api.types import Query
1213
from dataherald.config import System
1314
from dataherald.context_store import ContextStore
1415
from dataherald.db import DB
1516
from dataherald.db_scanner import Scanner
16-
from dataherald.db_scanner.models.types import TableSchemaDetail
17+
from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail
1718
from dataherald.db_scanner.repository.base import DBScannerRepository
1819
from dataherald.eval import Evaluator
1920
from dataherald.repositories.base import NLQueryResponseRepository
@@ -221,10 +222,36 @@ def list_table_descriptions(
221222
self, db_connection_id: str | None = None, table_name: str | None = None
222223
) -> list[TableSchemaDetail]:
223224
scanner_repository = DBScannerRepository(self.storage)
224-
return scanner_repository.find_by(
225+
table_descriptions = scanner_repository.find_by(
225226
{"db_connection_id": db_connection_id, "table_name": table_name}
226227
)
227228

229+
if db_connection_id:
230+
db_connection_repository = DatabaseConnectionRepository(self.storage)
231+
db_connection = db_connection_repository.find_by_id(db_connection_id)
232+
database = SQLDatabase.get_sql_engine(db_connection)
233+
inspector = inspect(database.engine)
234+
meta = MetaData(bind=database.engine)
235+
MetaData.reflect(meta, views=True)
236+
all_tables = inspector.get_table_names() + inspector.get_view_names()
237+
238+
for table_description in table_descriptions:
239+
if table_description.table_name not in all_tables:
240+
table_description.status = TableDescriptionStatus.DEPRECATED.value
241+
else:
242+
all_tables.remove(table_description.table_name)
243+
for table in all_tables:
244+
table_descriptions.append(
245+
TableSchemaDetail(
246+
table_name=table,
247+
status=TableDescriptionStatus.NOT_SYNCHRONIZED.value,
248+
db_connection_id=db_connection_id,
249+
columns=[],
250+
)
251+
)
252+
253+
return table_descriptions
254+
228255
@override
229256
def add_golden_records(
230257
self, golden_records: List[GoldenRecordRequest]

dataherald/db_scanner/models/types.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from datetime import datetime
2+
from enum import Enum
23
from typing import Any
34

45
from pydantic import BaseModel
@@ -19,13 +20,22 @@ class ColumnDetail(BaseModel):
1920
foreign_key: ForeignKeyDetail | None
2021

2122

23+
class TableDescriptionStatus(Enum):
24+
NOT_SYNCHRONIZED = "NOT_SYNCHRONIZED"
25+
SYNCHRONIZING = "SYNCHRONIZING"
26+
DEPRECATED = "DEPRECATED"
27+
SYNCHRONIZED = "SYNCHRONIZED"
28+
FAILED = "FAILED"
29+
30+
2231
class TableSchemaDetail(BaseModel):
2332
id: Any
2433
db_connection_id: str
2534
table_name: str
2635
description: str | None
2736
table_schema: str | None
28-
columns: list[ColumnDetail]
37+
columns: list[ColumnDetail] = []
2938
examples: list = []
3039
last_schema_sync: datetime | None
31-
status: str = "synchrinozed"
40+
status: str = TableDescriptionStatus.SYNCHRONIZED.value
41+
error_message: str | None

dataherald/db_scanner/sqlalchemy.py

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,11 @@
88
from sqlalchemy.sql import func
99

1010
from dataherald.db_scanner import Scanner
11-
from dataherald.db_scanner.models.types import ColumnDetail, TableSchemaDetail
11+
from dataherald.db_scanner.models.types import (
12+
ColumnDetail,
13+
TableDescriptionStatus,
14+
TableSchemaDetail,
15+
)
1216
from dataherald.db_scanner.repository.base import DBScannerRepository
1317
from dataherald.sql_database.base import SQLDatabase
1418

@@ -146,7 +150,7 @@ def scan_single_table(
146150
meta=meta, db_engine=db_engine, table=table, rows_number=3
147151
),
148152
last_schema_sync=datetime.now(),
149-
status="syncronized",
153+
status="SYNCHRONIZED",
150154
)
151155

152156
repository.save_table_info(object)
@@ -171,13 +175,32 @@ def scan(
171175
]
172176
if len(tables) == 0:
173177
raise ValueError("No table found")
174-
result = []
178+
179+
# persist tables to be scanned
175180
for table in tables:
176-
obj = self.scan_single_table(
177-
meta=meta,
178-
table=table,
179-
db_engine=db_engine,
180-
db_connection_id=db_connection_id,
181-
repository=repository,
181+
repository.save_table_info(
182+
TableSchemaDetail(
183+
db_connection_id=db_connection_id,
184+
table_name=table,
185+
status=TableDescriptionStatus.SYNCHRONIZING.value,
186+
)
182187
)
183-
result.append(obj)
188+
189+
for table in tables:
190+
try:
191+
self.scan_single_table(
192+
meta=meta,
193+
table=table,
194+
db_engine=db_engine,
195+
db_connection_id=db_connection_id,
196+
repository=repository,
197+
)
198+
except Exception as e:
199+
repository.save_table_info(
200+
TableSchemaDetail(
201+
db_connection_id=db_connection_id,
202+
table_name=table,
203+
status=TableDescriptionStatus.FAILED.value,
204+
error_message=f"{e}",
205+
)
206+
)
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
import dataherald.config
2+
from dataherald.config import System
3+
from dataherald.db import DB
4+
5+
if __name__ == "__main__":
6+
settings = dataherald.config.Settings()
7+
system = System(settings)
8+
system.start()
9+
storage = system.instance(DB)
10+
# Update table_descriptions status
11+
collection_rows = storage.find_all("table_descriptions")
12+
for collection_row in collection_rows:
13+
collection_row["status"] = "SYNCHRONIZED"
14+
# update object
15+
storage.update_or_create(
16+
"table_descriptions", {"_id": collection_row["_id"]}, collection_row
17+
)

dataherald/sql_generator/dataherald_sqlagent.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030

3131
from dataherald.context_store import ContextStore
3232
from dataherald.db import DB
33-
from dataherald.db_scanner.models.types import TableSchemaDetail
33+
from dataherald.db_scanner.models.types import TableDescriptionStatus, TableSchemaDetail
3434
from dataherald.db_scanner.repository.base import DBScannerRepository
3535
from dataherald.sql_database.base import SQLDatabase, SQLInjectionError
3636
from dataherald.sql_database.models.types import (
@@ -581,7 +581,8 @@ def generate_response(
581581
)
582582
repository = DBScannerRepository(storage)
583583
db_scan = repository.get_all_tables_by_db(
584-
db_connection_id=database_connection.id
584+
db_connection_id=database_connection.id,
585+
status=TableDescriptionStatus.SYNCHRONIZED.value,
585586
)
586587
if not db_scan:
587588
raise ValueError("No scanned tables found for database")

docs/api.list_table_description.rst

Lines changed: 17 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
List table descriptions
44
=======================
55

6-
Once you have scanned a db connection you can list the table descriptions by requesting this endpoint.
6+
This endpoint returns the database connection tables and includes a status field that indicates whether the tables have been scanned or not.
77

88
Request this ``GET`` endpoint::
99

@@ -15,7 +15,7 @@ Request this ``GET`` endpoint::
1515
:header: "Name", "Type", "Description"
1616
:widths: 20, 20, 60
1717

18-
"db_connection_id", "string", "Filter by connection id, ``Optional``"
18+
"db_connection_id", "string", "Filter by connection id, ``Optional``. By configuring this field, it establishes a connection with the database to fetch table names and subsequently merges this data with the pre-existing rows in our MongoDB."
1919
"table_name", "string", "Filter by table name, ``Optional``"
2020

2121
**Responses**
@@ -31,6 +31,8 @@ HTTP 200 code response
3131
"table_name": "string",
3232
"description": "string",
3333
"table_schema": "string",
34+
"status": "NOT_SYNCHRONIZED | SYNCHRONIZING | DEPRECATED | SYNCHRONIZED | FAILED"
35+
"error_message": "string",
3436
"columns": [
3537
{
3638
"name": "string",
@@ -51,6 +53,19 @@ HTTP 200 code response
5153
}
5254
]
5355
56+
.. csv-table::
57+
:header: "Name", "Type", "Description"
58+
:widths: 20, 20, 60
59+
60+
"status", "string", "It can be one of the next options:
61+
- `NOT_SYNCHRONIZED` if the table has not been scanned
62+
- `SYNCHRONIZING` while the sync schema process is running
63+
- `DEPRECATED` if there is a row in our `table-descriptions` collection that is no longer in the database, probably because the table/view was deleted or renamed
64+
- `SYNCHRONIZED` when we have scanned the table
65+
- `FAILED` if anything failed during the sync schema process, and the `error_message` field stores the error."
66+
"error_message", "string", "This field is set only if the async schema process fails"
67+
68+
5469
**Request example**
5570

5671
.. code-block:: rst

0 commit comments

Comments
 (0)