Skip to content

Commit 027fc22

Browse files
author
Alex Toker
committed
Timescaledb: add column nullability check on insert
1 parent daa1233 commit 027fc22

File tree

2 files changed

+431
-4
lines changed

2 files changed

+431
-4
lines changed

integration/test_timescaledb.py

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,3 +306,363 @@ async def test_timescaledb_async_emit(timescaledb):
306306
cursor.close()
307307

308308
assert count == 2, "Async emission should insert exactly 2 rows"
309+
310+
311+
def test_timescaledb_schema_validation_missing_required_column(timescaledb):
312+
"""Test that validation properly catches missing required (non-nullable) columns"""
313+
(connection, _, _, _, _, _, table_name, dsn_url, _, _) = (
314+
timescaledb
315+
)
316+
317+
# Create a table with a non-nullable column
318+
cursor = connection.cursor()
319+
validation_table = f"{table_name}_validation"
320+
with contextlib.suppress(psycopg2.Error):
321+
cursor.execute(f"DROP TABLE IF EXISTS {validation_table};")
322+
323+
cursor.execute(f"""
324+
CREATE TABLE {validation_table} (
325+
time TIMESTAMPTZ NOT NULL,
326+
required_col INTEGER NOT NULL,
327+
optional_col VARCHAR(50)
328+
);
329+
""")
330+
cursor.execute(f"SELECT create_hypertable('{validation_table}', 'time');")
331+
cursor.close()
332+
333+
time_format = "%d/%m/%y %H:%M:%S UTC%z"
334+
controller = build_flow([
335+
SyncEmitSource(),
336+
TimescaleDBTarget(
337+
dsn=dsn_url,
338+
table=validation_table,
339+
time_col="time",
340+
columns=["required_col", "optional_col"],
341+
time_format=time_format,
342+
max_events=10,
343+
),
344+
]).run()
345+
346+
# Test data missing the required column
347+
invalid_data = {
348+
"time": "18/09/19 01:55:10 UTC+0000",
349+
"optional_col": "test_value"
350+
# Missing required_col (non-nullable)
351+
}
352+
353+
# Should raise ValueError for missing required column
354+
with pytest.raises(ValueError, match=r"Missing required non-nullable column 'required_col'"):
355+
controller.emit(invalid_data, None)
356+
controller.terminate()
357+
controller.await_termination()
358+
359+
# Cleanup
360+
cursor = connection.cursor()
361+
cursor.execute(f"DROP TABLE IF EXISTS {validation_table} CASCADE;")
362+
cursor.close()
363+
364+
365+
def test_timescaledb_schema_validation_missing_nullable_column(timescaledb):
366+
"""Test that validation handles missing nullable columns correctly"""
367+
(connection, _, _, _, _, _, table_name, dsn_url, _, _) = (
368+
timescaledb
369+
)
370+
371+
# Create a table with nullable and non-nullable columns
372+
cursor = connection.cursor()
373+
validation_table = f"{table_name}_nullable"
374+
with contextlib.suppress(psycopg2.Error):
375+
cursor.execute(f"DROP TABLE IF EXISTS {validation_table};")
376+
377+
cursor.execute(f"""
378+
CREATE TABLE {validation_table} (
379+
time TIMESTAMPTZ NOT NULL,
380+
required_col INTEGER NOT NULL,
381+
nullable_col VARCHAR(50)
382+
);
383+
""")
384+
cursor.execute(f"SELECT create_hypertable('{validation_table}', 'time');")
385+
cursor.close()
386+
387+
time_format = "%d/%m/%y %H:%M:%S UTC%z"
388+
controller = build_flow([
389+
SyncEmitSource(),
390+
TimescaleDBTarget(
391+
dsn=dsn_url,
392+
table=validation_table,
393+
time_col="time",
394+
columns=["required_col", "nullable_col"],
395+
time_format=time_format,
396+
max_events=10,
397+
),
398+
]).run()
399+
400+
# Test data with missing nullable column - should work fine
401+
valid_data = {
402+
"time": "18/09/19 01:55:10 UTC+0000",
403+
"required_col": 42
404+
# Missing nullable_col - should be set to None
405+
}
406+
407+
controller.emit(valid_data, None)
408+
controller.terminate()
409+
controller.await_termination()
410+
411+
# Verify data was inserted with NULL for missing nullable column
412+
cursor = connection.cursor()
413+
cursor.execute(f"SELECT required_col, nullable_col FROM {validation_table}")
414+
result = cursor.fetchone()
415+
assert result[0] == 42
416+
assert result[1] is None # nullable_col should be None
417+
cursor.close()
418+
419+
# Cleanup
420+
cursor = connection.cursor()
421+
cursor.execute(f"DROP TABLE IF EXISTS {validation_table} CASCADE;")
422+
cursor.close()
423+
424+
425+
def test_timescaledb_schema_validation_table_not_found(timescaledb):
426+
"""Test that schema validation raises appropriate error for non-existent table"""
427+
(_, _, _, _, _, _, _, dsn_url, _, _) = (
428+
timescaledb
429+
)
430+
431+
non_existent_table = "non_existent_table_12345"
432+
433+
controller = build_flow([
434+
SyncEmitSource(),
435+
TimescaleDBTarget(
436+
dsn=dsn_url,
437+
table=non_existent_table,
438+
time_col="time",
439+
columns=["col1"],
440+
max_events=10,
441+
),
442+
]).run()
443+
444+
test_data = {
445+
"time": "2019-09-18 01:55:10+00:00",
446+
"col1": "test_value"
447+
}
448+
449+
# Should raise ValueError for table not found
450+
with pytest.raises(ValueError, match=r"Table 'public\.non_existent_table_12345' not found or has no columns"):
451+
controller.emit(test_data, None)
452+
controller.terminate()
453+
controller.await_termination()
454+
455+
456+
def test_timescaledb_schema_validation_with_schema_prefix(timescaledb):
457+
"""Test schema validation works correctly with schema.table format"""
458+
(connection, _, _, _, _, _, table_name, dsn_url, _, _) = (
459+
timescaledb
460+
)
461+
462+
# Create a schema and table
463+
cursor = connection.cursor()
464+
schema_name = "test_schema"
465+
with contextlib.suppress(psycopg2.Error):
466+
cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name};")
467+
468+
schema_table = f"{schema_name}.{table_name}_schema"
469+
with contextlib.suppress(psycopg2.Error):
470+
cursor.execute(f"DROP TABLE IF EXISTS {schema_table};")
471+
472+
cursor.execute(f"""
473+
CREATE TABLE {schema_table} (
474+
time TIMESTAMPTZ NOT NULL,
475+
required_col INTEGER NOT NULL,
476+
optional_col VARCHAR(50)
477+
);
478+
""")
479+
cursor.execute(f"SELECT create_hypertable('{schema_table}', 'time');")
480+
cursor.close()
481+
482+
time_format = "%d/%m/%y %H:%M:%S UTC%z"
483+
controller = build_flow([
484+
SyncEmitSource(),
485+
TimescaleDBTarget(
486+
dsn=dsn_url,
487+
table=schema_table, # Using schema.table format
488+
time_col="time",
489+
columns=["required_col", "optional_col"],
490+
time_format=time_format,
491+
max_events=10,
492+
),
493+
]).run()
494+
495+
# Test with missing required column
496+
invalid_data = {
497+
"time": "18/09/19 01:55:10 UTC+0000",
498+
"optional_col": "test_value"
499+
# Missing required_col
500+
}
501+
502+
# Should raise ValueError for missing required column, showing correct schema.table format
503+
with pytest.raises(ValueError, match=r"Missing required non-nullable column 'required_col'"):
504+
controller.emit(invalid_data, None)
505+
controller.terminate()
506+
controller.await_termination()
507+
508+
# Cleanup
509+
cursor = connection.cursor()
510+
cursor.execute(f"DROP TABLE IF EXISTS {schema_table} CASCADE;")
511+
cursor.execute(f"DROP SCHEMA IF EXISTS {schema_name} CASCADE;")
512+
cursor.close()
513+
514+
515+
@pytest.mark.asyncio
516+
async def test_timescaledb_schema_caching(timescaledb):
517+
"""Test that schema information is properly cached to avoid repeated queries"""
518+
(_, _, _, _, _, _, table_name, dsn_url, timestamp_precision, columns_config) = (
519+
timescaledb
520+
)
521+
522+
time_formats = {
523+
"milliseconds": "%d/%m/%y %H:%M:%S.%f UTC%z",
524+
"microseconds": "%d/%m/%y %H:%M:%S.%f UTC%z"
525+
}
526+
time_format = time_formats.get(timestamp_precision, "%d/%m/%y %H:%M:%S UTC%z")
527+
528+
target = TimescaleDBTarget(
529+
dsn=dsn_url,
530+
table=table_name,
531+
time_col="time",
532+
columns=columns_config,
533+
time_format=time_format,
534+
max_events=10,
535+
)
536+
537+
# Initialize the target
538+
await target._async_init()
539+
540+
# First call to get schema should query the database
541+
schema1 = await target._get_table_schema()
542+
assert isinstance(schema1, dict)
543+
assert len(schema1) > 0
544+
545+
# Second call should return cached result (same object)
546+
schema2 = await target._get_table_schema()
547+
assert schema1 is schema2 # Should be the same object (cached)
548+
549+
# Verify schema contains expected information
550+
assert "time" in schema1
551+
assert "binary_col" in schema1
552+
assert schema1["time"]["nullable"] is False # time column is NOT NULL
553+
assert schema1["binary_col"]["nullable"] is True # binary_col allows NULL
554+
555+
# Cleanup
556+
await target._terminate()
557+
558+
559+
def test_timescaledb_validation_with_extra_columns(timescaledb):
560+
"""Test that validation works when event has columns not in schema"""
561+
(connection, _, _, _, _, _, table_name, dsn_url, _, _) = (
562+
timescaledb
563+
)
564+
565+
time_format = "%d/%m/%y %H:%M:%S UTC%z"
566+
567+
# Test with extra columns that aren't in the target column configuration
568+
# This tests the fallback behavior for columns not in schema
569+
controller = build_flow([
570+
SyncEmitSource(),
571+
TimescaleDBTarget(
572+
dsn=dsn_url,
573+
table=table_name,
574+
time_col="time",
575+
columns=["binary_col", "int_col"], # Only specify columns that exist
576+
time_format=time_format,
577+
max_events=10,
578+
),
579+
]).run()
580+
581+
# Test data with extra columns in the event data (should be ignored)
582+
test_data = {
583+
"time": "18/09/19 01:55:10 UTC+0000",
584+
"binary_col": b"test_binary",
585+
"int_col": 42,
586+
"extra_column_not_in_config": "this_column_is_extra",
587+
"another_extra_column": "also_extra"
588+
}
589+
590+
# Should work fine - extra columns in event data are ignored
591+
controller.emit(test_data, None)
592+
controller.terminate()
593+
controller.await_termination()
594+
595+
# Verify data was inserted correctly for configured columns
596+
cursor = connection.cursor()
597+
cursor.execute(f"SELECT binary_col, int_col FROM {table_name} ORDER BY int_col DESC LIMIT 1")
598+
result = cursor.fetchone()
599+
assert result[1] == 42 # int_col
600+
# PostgreSQL returns memoryview for binary data, convert to bytes for comparison
601+
binary_result = bytes(result[0]) if isinstance(result[0], memoryview) else result[0]
602+
assert binary_result == b"test_binary" # binary_col
603+
cursor.close()
604+
605+
606+
def test_timescaledb_validation_non_dict_data_type_error(timescaledb):
607+
"""Test that validation properly rejects non-dictionary data types"""
608+
(_, _, _, _, _, _, table_name, dsn_url, _, columns_config) = (
609+
timescaledb
610+
)
611+
612+
time_format = "%d/%m/%y %H:%M:%S UTC%z"
613+
controller = build_flow([
614+
SyncEmitSource(),
615+
TimescaleDBTarget(
616+
dsn=dsn_url,
617+
table=table_name,
618+
time_col="time",
619+
columns=columns_config,
620+
time_format=time_format,
621+
max_events=10,
622+
),
623+
]).run()
624+
625+
# Test with non-dictionary data
626+
invalid_data = "this_is_not_a_dictionary"
627+
628+
# Should raise TypeError for non-dictionary data (the error occurs in the parent Writer class)
629+
with pytest.raises(TypeError, match=r"string indices must be integers"):
630+
controller.emit(invalid_data, None)
631+
controller.terminate()
632+
controller.await_termination()
633+
634+
635+
@pytest.mark.asyncio
636+
async def test_timescaledb_validation_direct_emit_non_dict_error(timescaledb):
637+
"""Test that TimescaleDBTarget._emit properly validates dictionary data types"""
638+
(_, _, _, _, _, _, table_name, dsn_url, timestamp_precision, columns_config) = (
639+
timescaledb
640+
)
641+
642+
time_formats = {
643+
"milliseconds": "%d/%m/%y %H:%M:%S.%f UTC%z",
644+
"microseconds": "%d/%m/%y %H:%M:%S.%f UTC%z"
645+
}
646+
time_format = time_formats.get(timestamp_precision, "%d/%m/%y %H:%M:%S UTC%z")
647+
648+
target = TimescaleDBTarget(
649+
dsn=dsn_url,
650+
table=table_name,
651+
time_col="time",
652+
columns=columns_config,
653+
time_format=time_format,
654+
max_events=10,
655+
)
656+
657+
# Initialize the target
658+
await target._async_init()
659+
660+
# Test with non-dictionary data directly in _emit
661+
non_dict_batch = ["this_is_not_a_dictionary", 123, None]
662+
663+
# Should raise TypeError for non-dictionary data in our validation code
664+
with pytest.raises(TypeError, match=r"TimescaleDBTarget only supports dictionary data, got <class 'str'>"):
665+
await target._emit(non_dict_batch, None, None, None)
666+
667+
# Cleanup
668+
await target._terminate()

0 commit comments

Comments
 (0)