Skip to content

Commit c45f1af

Browse files
committed
Add baseline infrastructure for emulating materialized views
1 parent 8876fb1 commit c45f1af

File tree

14 files changed

+709
-2
lines changed

14 files changed

+709
-2
lines changed

CHANGES.md

+1
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
deploy/start/resume and data import procedures using fluent API and CLI.
1111
- Cloud API: Added JWT authentication to client API and `ctk shell`.
1212
- Cloud API: Added `health` and `ping` subcommands to `ctk cluster`
13+
- Add baseline infrastructure for emulating materialized views.
1314

1415
**Breaking changes**
1516

cratedb_toolkit/materialized/__init__.py

Whitespace-only changes.

cratedb_toolkit/materialized/core.py

+66
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
# Copyright (c) 2023-2025, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
5+
import sqlalchemy as sa
6+
7+
from cratedb_toolkit.materialized.model import MaterializedViewSettings
8+
from cratedb_toolkit.materialized.store import MaterializedViewStore
9+
from cratedb_toolkit.model import TableAddress
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class MaterializedViewManager:
15+
"""
16+
The main application, implementing basic synthetic materialized views.
17+
"""
18+
19+
def __init__(self, settings: MaterializedViewSettings):
20+
# Runtime context settings.
21+
self.settings = settings
22+
23+
# Retention policy store API.
24+
self.store = MaterializedViewStore(settings=self.settings)
25+
26+
def refresh(self, name: str):
27+
"""
28+
Resolve a materialized view and refresh it.
29+
"""
30+
logger.info(f"Refreshing materialized view: {name}")
31+
32+
table_schema, table_name = name.split(".")
33+
table_address = TableAddress(schema=table_schema, table=table_name)
34+
mview = self.store.get_by_table(table_address)
35+
logger.info(f"Loaded materialized view definition: {mview}")
36+
37+
sql_ddl = f"DROP TABLE IF EXISTS {mview.staging_table_fullname}"
38+
logger.info(f"Dropping materialized view (staging): {sql_ddl}")
39+
self.store.execute(sa.text(sql_ddl))
40+
41+
sql_ddl = f"CREATE TABLE IF NOT EXISTS {mview.staging_table_fullname} AS (\n{mview.sql}\n)"
42+
logger.info(f"Creating materialized view (staging): {sql_ddl}")
43+
self.store.execute(sa.text(sql_ddl))
44+
sql_refresh = f"REFRESH TABLE {mview.staging_table_fullname}"
45+
self.store.execute(sa.text(sql_refresh))
46+
47+
sql_ddl = f"CREATE TABLE IF NOT EXISTS {mview.table_fullname} (dummy INT)"
48+
logger.info(f"Creating materialized view (live): {sql_ddl}")
49+
self.store.execute(sa.text(sql_ddl))
50+
51+
# TODO: Use `ALTER TABLE ... RENAME TO ...` after resolving issue.
52+
# SQLParseException[Target table name must not include a schema]
53+
# https://github.com/crate/crate/issues/14833
54+
# CrateDB does not support renaming to a different schema, thus the target
55+
# table identifier must not include a schema. This is an artificial limitation.
56+
# Technically, it can be done.
57+
# https://github.com/crate/crate/blob/5.3.3/server/src/main/java/io/crate/analyze/AlterTableAnalyzer.java#L97-L102
58+
sql_ddl = f"ALTER CLUSTER SWAP TABLE {mview.staging_table_fullname} TO {mview.table_fullname}"
59+
logger.info(f"Activating materialized view: {sql_ddl}")
60+
self.store.execute(sa.text(sql_ddl))
61+
sql_refresh = f"REFRESH TABLE {mview.table_fullname}"
62+
self.store.execute(sa.text(sql_refresh))
63+
64+
sql_ddl = f"DROP TABLE IF EXISTS {mview.staging_table_fullname}"
65+
logger.info(f"Dropping materialized view (staging): {sql_ddl}")
66+
self.store.execute(sa.text(sql_ddl))

cratedb_toolkit/materialized/model.py

+92
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
# Copyright (c) 2023, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import dataclasses
4+
import os
5+
import typing as t
6+
7+
from cratedb_toolkit.model import DatabaseAddress, TableAddress
8+
9+
10+
@dataclasses.dataclass
11+
class MaterializedView:
12+
"""
13+
Manage the database representation of a "materialized view" entity.
14+
15+
This layout has to be synchronized with the corresponding table definition
16+
per SQL DDL statement within `schema.sql`.
17+
"""
18+
19+
table_schema: t.Optional[str] = dataclasses.field(
20+
default=None,
21+
metadata={"help": "The target table schema"},
22+
)
23+
table_name: t.Optional[str] = dataclasses.field(
24+
default=None,
25+
metadata={"help": "The target table name"},
26+
)
27+
sql: t.Optional[str] = dataclasses.field(
28+
default=None,
29+
metadata={"help": "The SQL statement defining the emulated materialized view"},
30+
)
31+
32+
id: t.Optional[str] = dataclasses.field( # noqa: A003
33+
default=None,
34+
metadata={"help": "The materialized view identifier"},
35+
)
36+
37+
@property
38+
def table_fullname(self) -> str:
39+
return f'"{self.table_schema}"."{self.table_name}"'
40+
41+
@property
42+
def staging_table_fullname(self) -> str:
43+
return f'"{self.table_schema}-staging"."{self.table_name}"'
44+
45+
@classmethod
46+
def from_record(cls, record) -> "MaterializedView":
47+
return cls(**record)
48+
49+
def to_storage_dict(self, identifier: t.Optional[str] = None) -> t.Dict[str, str]:
50+
"""
51+
Return representation suitable for storing into database table using SQLAlchemy.
52+
"""
53+
54+
# Serialize to dictionary.
55+
data = dataclasses.asdict(self)
56+
57+
# Optionally add identifier.
58+
if identifier is not None:
59+
data["id"] = identifier
60+
61+
return data
62+
63+
64+
def default_table_address():
65+
"""
66+
The default address of the materialized view management table.
67+
"""
68+
schema = os.environ.get("CRATEDB_EXT_SCHEMA", "ext")
69+
return TableAddress(schema=schema, table="materialized_view")
70+
71+
72+
@dataclasses.dataclass
73+
class MaterializedViewSettings:
74+
"""
75+
Bundle all configuration and runtime settings.
76+
"""
77+
78+
# Database connection URI.
79+
database: DatabaseAddress = dataclasses.field(
80+
default_factory=lambda: DatabaseAddress.from_string("crate://localhost/")
81+
)
82+
83+
# The address of the materialized view table.
84+
materialized_table: TableAddress = dataclasses.field(default_factory=default_table_address)
85+
86+
# Only pretend to invoke statements.
87+
dry_run: t.Optional[bool] = False
88+
89+
def to_dict(self):
90+
data = dataclasses.asdict(self)
91+
data["materialized_table"] = self.materialized_table
92+
return data
+36
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Copyright (c) 2021-2025, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
from importlib.resources import read_text
5+
6+
from cratedb_toolkit.materialized.model import MaterializedViewSettings
7+
from cratedb_toolkit.util.database import run_sql
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
def setup_schema(settings: MaterializedViewSettings):
13+
"""
14+
Set up the materialized view management table schema.
15+
16+
TODO: Refactor to `store` module.
17+
"""
18+
19+
logger.info(
20+
f"Installing materialized view management table at "
21+
f"database '{settings.database.safe}', table {settings.materialized_table}"
22+
)
23+
24+
# Read SQL DDL statement.
25+
sql = read_text("cratedb_toolkit.materialized", "schema.sql")
26+
27+
tplvars = settings.to_dict()
28+
sql = sql.format_map(tplvars)
29+
30+
if settings.dry_run:
31+
logger.info(f"Pretending to execute SQL statement:\n{sql}")
32+
return
33+
34+
# Materialize table schema.
35+
run_sql(settings.database.dburi, sql)
36+
run_sql(settings.database.dburi, f"REFRESH TABLE {settings.materialized_table.fullname}")
+14
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
-- Set up the materialized view management database table schema.
2+
CREATE TABLE IF NOT EXISTS {materialized_table.fullname} (
3+
4+
"id" TEXT NOT NULL PRIMARY KEY,
5+
6+
-- Target: The database table to be populated.
7+
"table_schema" TEXT, -- The source table schema.
8+
"table_name" TEXT, -- The source table name.
9+
10+
-- The SQL statement defining the emulated materialized view.
11+
"sql" TEXT
12+
13+
)
14+
CLUSTERED INTO 1 SHARDS;

cratedb_toolkit/materialized/store.py

+158
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
# Copyright (c) 2021-2025, Crate.io Inc.
2+
# Distributed under the terms of the AGPLv3 license, see LICENSE.
3+
import logging
4+
import typing as t
5+
import uuid
6+
7+
import sqlalchemy as sa
8+
from sqlalchemy import CursorResult, MetaData, Table
9+
from sqlalchemy.orm import Session
10+
11+
from cratedb_toolkit.materialized.model import MaterializedView, MaterializedViewSettings
12+
from cratedb_toolkit.model import TableAddress
13+
from cratedb_toolkit.util.database import DatabaseAdapter
14+
15+
logger = logging.getLogger(__name__)
16+
17+
18+
class MaterializedViewStore:
19+
"""
20+
A wrapper around the materialized view management table.
21+
"""
22+
23+
def __init__(self, settings: MaterializedViewSettings):
24+
self.settings = settings
25+
26+
if self.settings.materialized_table.table is None:
27+
raise ValueError("Unable to create MaterializedViewStore without table name")
28+
29+
logger.info(
30+
f"Connecting to database {self.settings.database.safe}, table {self.settings.materialized_table.fullname}"
31+
)
32+
33+
# Set up generic database adapter.
34+
self.database: DatabaseAdapter = DatabaseAdapter(dburi=self.settings.database.dburi)
35+
36+
# Set up SQLAlchemy Core adapter for materialized view management table.
37+
metadata = MetaData(schema=self.settings.materialized_table.schema)
38+
self.table = Table(self.settings.materialized_table.table, metadata, autoload_with=self.database.engine)
39+
40+
def create(self, mview: MaterializedView, ignore: t.Optional[str] = None):
41+
"""
42+
Create a new materialized view, and return its identifier.
43+
44+
TODO: Generalize, see `RetentionPolicyStore`.
45+
"""
46+
47+
# TODO: Sanity check, whether target table already exists?
48+
49+
ignore = ignore or ""
50+
51+
# Sanity checks.
52+
if mview.table_schema is None:
53+
raise ValueError("Table schema needs to be defined")
54+
if mview.table_name is None:
55+
raise ValueError("Table name needs to be defined")
56+
if self.exists(mview):
57+
if not ignore.startswith("DuplicateKey"):
58+
raise ValueError(f"Materialized view '{mview.table_schema}.{mview.table_name}' already exists")
59+
60+
table = self.table
61+
# TODO: Add UUID as converter to CrateDB driver?
62+
identifier = str(uuid.uuid4())
63+
data = mview.to_storage_dict(identifier=identifier)
64+
insertable = sa.insert(table).values(**data).returning(table.c.id)
65+
cursor = self.execute(insertable)
66+
identifier = cursor.one()[0]
67+
self.synchronize()
68+
return identifier
69+
70+
def retrieve(self):
71+
"""
72+
Retrieve all records from database table.
73+
74+
TODO: Add filtering capabilities.
75+
TODO: Generalize, see `RetentionPolicyStore`.
76+
"""
77+
78+
# Run SELECT statement, and return result.
79+
selectable = sa.select(self.table)
80+
records = self.query(selectable)
81+
return records
82+
83+
def get_by_table(self, table_address: TableAddress) -> MaterializedView:
84+
"""
85+
Retrieve effective policies to process, by strategy and tags.
86+
"""
87+
table = self.table
88+
selectable = sa.select(table).where(
89+
table.c.table_schema == table_address.schema,
90+
table.c.table_name == table_address.table,
91+
)
92+
logger.info(f"View definition DQL: {selectable}")
93+
try:
94+
record = self.query(selectable)[0]
95+
except IndexError as ex:
96+
raise KeyError(
97+
f"Synthetic materialized table definition does not exist: {table_address.schema}.{table_address.table}"
98+
) from ex
99+
mview = MaterializedView.from_record(record)
100+
return mview
101+
102+
def delete(self, identifier: str) -> int:
103+
"""
104+
Delete materialized view by identifier.
105+
106+
TODO: Generalize, see `RetentionPolicyStore`.
107+
"""
108+
table = self.table
109+
constraint = table.c.id == identifier
110+
deletable = sa.delete(table).where(constraint)
111+
result = self.execute(deletable)
112+
self.synchronize()
113+
if result.rowcount == 0:
114+
logger.warning(f"Materialized view not found with id: {identifier}")
115+
return result.rowcount
116+
117+
def execute(self, statement) -> CursorResult:
118+
"""
119+
Execute SQL statement, and return result object.
120+
121+
TODO: Generalize, see `RetentionPolicyStore`.
122+
"""
123+
with Session(self.database.engine) as session:
124+
result = session.execute(statement)
125+
session.commit()
126+
return result
127+
128+
def query(self, statement) -> t.List[t.Dict]:
129+
"""
130+
Execute SQL statement, fetch result rows, and return them converted to dictionaries.
131+
132+
TODO: Generalize, see `RetentionPolicyStore`.
133+
"""
134+
cursor = self.execute(statement)
135+
rows = cursor.mappings().fetchall()
136+
records = [dict(row.items()) for row in rows]
137+
return records
138+
139+
def exists(self, mview: MaterializedView):
140+
"""
141+
Check if retention policy for specific table already exists.
142+
143+
TODO: Generalize, see `RetentionPolicyStore`.
144+
"""
145+
table = self.table
146+
selectable = sa.select(table).where(
147+
table.c.table_schema == mview.table_schema,
148+
table.c.table_name == mview.table_name,
149+
)
150+
result = self.query(selectable)
151+
return bool(result)
152+
153+
def synchronize(self):
154+
"""
155+
Synchronize data by issuing `REFRESH TABLE` statement.
156+
"""
157+
sql = f"REFRESH TABLE {self.settings.materialized_table.fullname};"
158+
self.database.run_sql(sql)

0 commit comments

Comments
 (0)