Skip to content

Commit 8d277f4

Browse files
alxtkr77Alex Toker
andauthored
Timescaledb: handle schemas (#584)
* Timescaledb: handle schemas * Fix docstring and make table name mandatory --------- Co-authored-by: Alex Toker <alext@mckinsey.com>
1 parent 72c6ce1 commit 8d277f4

File tree

1 file changed

+9
-4
lines changed

1 file changed

+9
-4
lines changed

storey/timescaledb_target.py

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,8 @@ class TimescaleDBTarget(_Batching, _Writer):
3636
If not provided, timestamps will be parsed according to ISO-8601 format. Common formats include:
3737
"%Y-%m-%d %H:%M:%S", "%d/%m/%y %H:%M:%S UTC%z", etc.
3838
:param table: Name of the TimescaleDB hypertable where events will be written. The table must exist and be
39-
configured as a hypertable before writing data. If not specified, the table name should be provided through
40-
other means (e.g., via batching configuration).
39+
configured as a hypertable before writing data. If the table name contains a '.', it will be interpreted
40+
as <schema>.<table> format.
4141
:param max_events: Maximum number of events to write in a single batch. If None (default), all events will be
4242
written on flow termination, or after flush_after_seconds (if flush_after_seconds is set). Larger batches
4343
improve write performance but increase memory usage.
@@ -76,8 +76,8 @@ def __init__(
7676
dsn: str,
7777
time_col: str,
7878
columns: list[str],
79+
table: str,
7980
time_format: Optional[str] = None,
80-
table: Optional[str] = None,
8181
**kwargs,
8282
) -> None:
8383

@@ -112,6 +112,9 @@ def __init__(
112112
self._dsn = dsn
113113
self._pool = None # Connection pool will be created lazily during first use
114114
self._column_names = self._get_column_names()
115+
self._schema = None
116+
if "." in self._table:
117+
self._schema, self._table = self._table.split(".", 1)
115118

116119
def _init(self):
117120
"""Initialize the target (called synchronously).
@@ -193,7 +196,9 @@ async def _emit(self, batch, batch_key, batch_time, batch_events, last_event_tim
193196
async with self._pool.acquire() as conn:
194197
# Use PostgreSQL's COPY protocol for optimal performance
195198
# This is significantly faster than individual INSERT statements
196-
await conn.copy_records_to_table(self._table, records=records, columns=self._column_names)
199+
await conn.copy_records_to_table(
200+
self._table, schema_name=self._schema, records=records, columns=self._column_names
201+
)
197202

198203
async def _terminate(self):
199204
"""Terminate and cleanup resources.

0 commit comments

Comments
 (0)