Skip to content

Commit bdd0910

Browse files
committed
Add baseline infrastructure for emulating materialized views
1 parent 4a9816a commit bdd0910

File tree

12 files changed

+683
-0
lines changed

12 files changed

+683
-0
lines changed

CHANGES.md

+2
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,8 @@
2020
when the repository does not exist. With previous versions of CrateDB, it was
2121
`RepositoryUnknownException`.
2222

23+
- Add baseline infrastructure for emulating materialized views.
24+
2325
## 2023/06/27 0.0.0
2426

2527
- Import "data retention" implementation from <https://github.com/crate/crate-airflow-tutorial>.

cratedb_toolkit/materialized/__init__.py

Whitespace-only changes.

cratedb_toolkit/materialized/core.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
# Copyright (c) 2023, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
5+
import sqlalchemy as sa
6+
7+
from cratedb_toolkit.materialized.model import MaterializedViewSettings
8+
from cratedb_toolkit.materialized.store import MaterializedViewStore
9+
from cratedb_toolkit.model import TableAddress
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class MaterializedViewManager:
15+
"""
16+
The main application, implementing basic synthetic materialized views.
17+
"""
18+
19+
def __init__(self, settings: MaterializedViewSettings):
20+
# Runtime context settings.
21+
self.settings = settings
22+
23+
# Retention policy store API.
24+
self.store = MaterializedViewStore(settings=self.settings)
25+
26+
def refresh(self, name: str):
27+
"""
28+
Resolve materialized view, and refresh it.
29+
"""
30+
logger.info(f"Refreshing materialized view: {name}")
31+
32+
table_schema, table_name = name.split(".")
33+
table_address = TableAddress(schema=table_schema, table=table_name)
34+
mview = self.store.get_by_table(table_address)
35+
logger.info(f"Loaded materialized view definition: {mview}")
36+
37+
sql_ddl = f"DROP TABLE IF EXISTS {mview.staging_table_fullname}"
38+
logger.info(f"Dropping materialized view (staging): {sql_ddl}")
39+
self.store.execute(sa.text(sql_ddl))
40+
41+
# TODO: IF NOT EXISTS
42+
sql_ddl = f"CREATE TABLE {mview.staging_table_fullname} AS (\n{mview.sql}\n)"
43+
logger.info(f"Creating materialized view (staging): {sql_ddl}")
44+
self.store.execute(sa.text(sql_ddl))
45+
sql_refresh = f"REFRESH TABLE {mview.staging_table_fullname}"
46+
self.store.execute(sa.text(sql_refresh))
47+
48+
# sql_ddl = f"DROP TABLE IF EXISTS {mview.table_fullname}"
49+
# logger.info(f"Dropping materialized view (live): {sql_ddl}")
50+
# self.store.execute(sa.text(sql_ddl))
51+
52+
# FIXME: SQLParseException[Target table name must not include a schema]
53+
sql_ddl = f"ALTER TABLE {mview.staging_table_fullname} RENAME TO {mview.table_name}"
54+
logger.info(f"Activating materialized view: {sql_ddl}")
55+
self.store.execute(sa.text(sql_ddl))
56+
sql_refresh = f"REFRESH TABLE {mview.table_fullname}"
57+
self.store.execute(sa.text(sql_refresh))

cratedb_toolkit/materialized/model.py

+98
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
# Copyright (c) 2023, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import dataclasses
4+
import os
5+
import typing as t
6+
7+
from cratedb_toolkit.model import DatabaseAddress, TableAddress
8+
9+
10+
@dataclasses.dataclass
11+
class MaterializedView:
12+
"""
13+
Manage the database representation of a "materialized view" entity.
14+
15+
This layout has to be synchronized with the corresponding table definition
16+
per SQL DDL statement within `schema.sql`.
17+
"""
18+
19+
table_schema: t.Optional[str] = dataclasses.field(
20+
default=None,
21+
metadata={"help": "The target table schema"},
22+
)
23+
table_name: t.Optional[str] = dataclasses.field(
24+
default=None,
25+
metadata={"help": "The target table name"},
26+
)
27+
sql: t.Optional[str] = dataclasses.field(
28+
default=None,
29+
metadata={"help": "The SQL statement defining the emulated materialized view"},
30+
)
31+
32+
id: t.Optional[str] = dataclasses.field( # noqa: A003
33+
default=None,
34+
metadata={"help": "The materialized view identifier"},
35+
)
36+
37+
@property
38+
def table_fullname(self) -> str:
39+
return f'"{self.table_schema}"."{self.table_name}"'
40+
41+
@property
42+
def staging_table_fullname(self) -> str:
43+
# FIXME: SQLParseException[Target table name must not include a schema]
44+
# TODO: CrateDB does not support renaming to a different schema, thus the target
45+
# table identifier must not include a schema. This is an artificial limitation.
46+
# Technically, it can be done.
47+
# https://github.com/crate/crate/blob/5.3.3/server/src/main/java/io/crate/analyze/AlterTableAnalyzer.java#L97-L102
48+
# return f'"{self.table_schema}-staging"."{self.table_name}"'
49+
return f'"{self.table_schema}"."{self.table_name}-staging"'
50+
51+
@classmethod
52+
def from_record(cls, record) -> "MaterializedView":
53+
return cls(**record)
54+
55+
def to_storage_dict(self, identifier: t.Optional[str] = None) -> t.Dict[str, str]:
56+
"""
57+
Return representation suitable for storing into database table using SQLAlchemy.
58+
"""
59+
60+
# Serialize to dictionary.
61+
data = dataclasses.asdict(self)
62+
63+
# Optionally add identifier.
64+
if identifier is not None:
65+
data["id"] = identifier
66+
67+
return data
68+
69+
70+
def default_table_address():
71+
"""
72+
The default address of the materialized view management table.
73+
"""
74+
schema = os.environ.get("CRATEDB_EXT_SCHEMA", "ext")
75+
return TableAddress(schema=schema, table="materialized_view")
76+
77+
78+
@dataclasses.dataclass
79+
class MaterializedViewSettings:
80+
"""
81+
Bundle all configuration and runtime settings.
82+
"""
83+
84+
# Database connection URI.
85+
database: DatabaseAddress = dataclasses.field(
86+
default_factory=lambda: DatabaseAddress.from_string("crate://localhost/")
87+
)
88+
89+
# The address of the materialized view table.
90+
materialized_table: TableAddress = dataclasses.field(default_factory=default_table_address)
91+
92+
# Only pretend to invoke statements.
93+
dry_run: t.Optional[bool] = False
94+
95+
def to_dict(self):
96+
data = dataclasses.asdict(self)
97+
data["materialized_table"] = self.materialized_table
98+
return data
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright (c) 2021-2023, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
from importlib.resources import read_text
5+
6+
from cratedb_toolkit.materialized.model import MaterializedViewSettings
7+
from cratedb_toolkit.util.database import run_sql
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def setup_schema(settings: MaterializedViewSettings):
13+
"""
14+
Set up the materialized view management table schema.
15+
16+
TODO: Refactor to `store` module.
17+
"""
18+
19+
logger.info(
20+
f"Installing materialized view management table at "
21+
f"database '{settings.database.safe}', table {settings.materialized_table}"
22+
)
23+
24+
# Read SQL DDL statement.
25+
sql = read_text("cratedb_toolkit.materialized", "schema.sql")
26+
27+
tplvars = settings.to_dict()
28+
sql = sql.format_map(tplvars)
29+
30+
if settings.dry_run:
31+
logger.info(f"Pretending to execute SQL statement:\n{sql}")
32+
return
33+
34+
# Materialize table schema.
35+
run_sql(settings.database.dburi, sql)
36+
run_sql(settings.database.dburi, f"REFRESH TABLE {settings.materialized_table.fullname}")
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Set up the materialized view management database table schema.
2+
CREATE TABLE IF NOT EXISTS {materialized_table.fullname} (
3+
4+
"id" TEXT NOT NULL PRIMARY KEY,
5+
6+
-- Target: The database table to be populated.
7+
"table_schema" TEXT, -- The source table schema.
8+
"table_name" TEXT, -- The source table name.
9+
10+
-- The SQL statement defining the emulated materialized view.
11+
"sql" TEXT
12+
13+
)
14+
CLUSTERED INTO 1 SHARDS;

cratedb_toolkit/materialized/store.py

+157
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
# Copyright (c) 2021-2023, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
import typing as t
5+
import uuid
6+
7+
import sqlalchemy as sa
8+
from sqlalchemy import MetaData, Result, Table
9+
from sqlalchemy.orm import Session
10+
11+
from cratedb_toolkit.exception import TableNotFound
12+
from cratedb_toolkit.materialized.model import MaterializedView, MaterializedViewSettings
13+
from cratedb_toolkit.model import TableAddress
14+
from cratedb_toolkit.util.database import DatabaseAdapter
15+
16+
logger = logging.getLogger(__name__)
17+
18+
19+
class MaterializedViewStore:
20+
"""
21+
A wrapper around the materialized view management table.
22+
"""
23+
24+
def __init__(self, settings: MaterializedViewSettings):
25+
self.settings = settings
26+
27+
logger.info(
28+
f"Connecting to database {self.settings.database.safe}, "
29+
f"table {self.settings.materialized_table.fullname}"
30+
)
31+
32+
# Set up generic database adapter.
33+
self.database: DatabaseAdapter = DatabaseAdapter(dburi=self.settings.database.dburi)
34+
35+
# Set up SQLAlchemy Core adapter for materialized view management table.
36+
metadata = MetaData(schema=self.settings.materialized_table.schema)
37+
self.table = Table(self.settings.materialized_table.table, metadata, autoload_with=self.database.engine)
38+
39+
def create(self, mview: MaterializedView, ignore: t.Optional[str] = None):
40+
"""
41+
Create a new materialized view, and return its identifier.
42+
43+
TODO: Generalize, see `RetentionPolicyStore`.
44+
"""
45+
46+
# TODO: Sanity check, whether target table already exists?
47+
48+
ignore = ignore or ""
49+
50+
# Sanity checks.
51+
if mview.table_schema is None:
52+
raise ValueError("Table schema needs to be defined")
53+
if mview.table_name is None:
54+
raise ValueError("Table name needs to be defined")
55+
if self.exists(mview):
56+
if not ignore.startswith("DuplicateKey"):
57+
raise ValueError(f"Materialized view '{mview.table_schema}.{mview.table_name}' already exists")
58+
59+
table = self.table
60+
# TODO: Add UUID as converter to CrateDB driver?
61+
identifier = str(uuid.uuid4())
62+
data = mview.to_storage_dict(identifier=identifier)
63+
insertable = sa.insert(table).values(**data).returning(table.c.id)
64+
cursor = self.execute(insertable)
65+
identifier = cursor.one()[0]
66+
self.synchronize()
67+
return identifier
68+
69+
def retrieve(self):
70+
"""
71+
Retrieve all records from database table.
72+
73+
TODO: Add filtering capabilities.
74+
TODO: Generalize, see `RetentionPolicyStore`.
75+
"""
76+
77+
# Run SELECT statement, and return result.
78+
selectable = sa.select(self.table)
79+
records = self.query(selectable)
80+
return records
81+
82+
def get_by_table(self, table_address: TableAddress) -> MaterializedView:
83+
"""
84+
Retrieve effective policies to process, by strategy and tags.
85+
"""
86+
table = self.table
87+
selectable = sa.select(table).where(
88+
table.c.table_schema == table_address.schema,
89+
table.c.table_name == table_address.table,
90+
)
91+
logger.info(f"View definition DQL: {selectable}")
92+
try:
93+
record = self.query(selectable)[0]
94+
except IndexError:
95+
raise KeyError(
96+
f"Synthetic materialized table definition does not exist: {table_address.schema}.{table_address.table}"
97+
)
98+
mview = MaterializedView.from_record(record)
99+
return mview
100+
101+
def delete(self, identifier: str) -> int:
102+
"""
103+
Delete materialized view by identifier.
104+
105+
TODO: Generalize, see `RetentionPolicyStore`.
106+
"""
107+
table = self.table
108+
constraint = table.c.id == identifier
109+
deletable = sa.delete(table).where(constraint)
110+
result = self.execute(deletable)
111+
self.synchronize()
112+
if result.rowcount == 0:
113+
logger.warning(f"Materialized view not found with id: {identifier}")
114+
return result.rowcount
115+
116+
def execute(self, statement) -> Result:
117+
"""
118+
Execute SQL statement, and return result object.
119+
120+
TODO: Generalize, see `RetentionPolicyStore`.
121+
"""
122+
with Session(self.database.engine) as session:
123+
result = session.execute(statement)
124+
session.commit()
125+
return result
126+
127+
def query(self, statement) -> t.List[t.Dict]:
128+
"""
129+
Execute SQL statement, fetch result rows, and return them converted to dictionaries.
130+
131+
TODO: Generalize, see `RetentionPolicyStore`.
132+
"""
133+
cursor = self.execute(statement)
134+
rows = cursor.mappings().fetchall()
135+
records = [dict(row.items()) for row in rows]
136+
return records
137+
138+
def exists(self, mview: MaterializedView):
139+
"""
140+
Check if retention policy for specific table already exists.
141+
142+
TODO: Generalize, see `RetentionPolicyStore`.
143+
"""
144+
table = self.table
145+
selectable = sa.select(table).where(
146+
table.c.table_schema == mview.table_schema,
147+
table.c.table_name == mview.table_name,
148+
)
149+
result = self.query(selectable)
150+
return bool(result)
151+
152+
def synchronize(self):
153+
"""
154+
Synchronize data by issuing `REFRESH TABLE` statement.
155+
"""
156+
sql = f"REFRESH TABLE {self.settings.materialized_table.fullname};"
157+
self.database.run_sql(sql)

0 commit comments

Comments
 (0)