Skip to content

Commit 863afdc

Browse files
committed
Add automatic datetime field schema mismatch detection
CRITICAL PRODUCTION SAFETY FEATURE: Detects when users deploy new datetime indexing code without running the required migration, preventing runtime query failures. Features: - Automatic detection during query execution with helpful warnings - Manual schema checking via 'om migrate-data check-schema' command - Programmatic API for application startup validation - Detailed mismatch reporting with specific models and fields - Clear guidance on resolution steps Detection scenarios: - Code expects NUMERIC datetime indexing (new format) - Redis has TAG datetime indexing (old format) - Prevents cryptic syntax errors during queries Usage: om migrate-data check-schema # Check for mismatches om migrate-data datetime # Fix detected mismatches This addresses the critical deployment safety issue where users could deploy new code without running migrations, causing production query failures. Essential for safe 1.0 rollout.
1 parent 784a759 commit 863afdc

File tree

4 files changed

+486
-1
lines changed

4 files changed

+486
-1
lines changed

aredis_om/model/cli/migrate_data.py

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -575,5 +575,64 @@ def clear_progress(migrations_dir: str, module: str, yes: bool):
575575
click.echo("✅ Saved migration progress cleared.")
576576

577577

578+
@migrate_data.command()
579+
@click.option(
580+
"--migrations-dir",
581+
default="",
582+
help="Directory containing migration files (default: <root>/data-migrations)",
583+
)
584+
@click.option("--module", help="Python module containing migrations")
585+
@handle_redis_errors
586+
def check_schema(migrations_dir: str, module: str):
587+
"""Check for datetime field schema mismatches between code and Redis."""
588+
import os
589+
590+
from ...settings import get_root_migrations_dir
591+
from ..migrations.datetime_migration import DatetimeFieldDetector
592+
593+
resolved_dir = migrations_dir or os.path.join(
594+
get_root_migrations_dir(), "data-migrations"
595+
)
596+
migrator = DataMigrator(
597+
migrations_dir=resolved_dir,
598+
module_name=module,
599+
)
600+
601+
async def check_schema_async():
602+
click.echo("🔍 Checking for datetime field schema mismatches...")
603+
604+
models = migrator.get_models()
605+
detector = DatetimeFieldDetector(migrator.redis)
606+
result = await detector.check_for_schema_mismatches(models)
607+
608+
if not result['has_mismatches']:
609+
click.echo("✅ No schema mismatches detected - all datetime fields are properly indexed")
610+
return
611+
612+
click.echo(f"⚠️ Found {len(result['mismatches'])} datetime field schema mismatch(es):")
613+
click.echo()
614+
615+
for mismatch in result['mismatches']:
616+
click.echo(f" Model: {mismatch['model']}")
617+
click.echo(f" Field: {mismatch['field']}")
618+
click.echo(f" Current Redis type: {mismatch['current_type']}")
619+
click.echo(f" Expected type: {mismatch['expected_type']}")
620+
click.echo(f" Index: {mismatch['index_name']}")
621+
click.echo()
622+
623+
click.echo("🚨 CRITICAL ISSUE DETECTED:")
624+
click.echo(result['recommendation'])
625+
click.echo()
626+
click.echo("To fix this issue, run:")
627+
click.echo(" om migrate-data datetime")
628+
click.echo()
629+
click.echo("This will convert your datetime fields from TAG to NUMERIC indexing,")
630+
click.echo("enabling proper range queries and sorting.")
631+
632+
raise click.ClickException("Schema mismatches detected")
633+
634+
run_async(check_schema_async())
635+
636+
578637
if __name__ == "__main__":
579638
migrate_data()

aredis_om/model/migrations/datetime_migration.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,137 @@
2020
log = logging.getLogger(__name__)
2121

2222

23+
class SchemaMismatchError(Exception):
24+
"""Raised when deployed code expects different field types than what's in Redis."""
25+
pass
26+
27+
28+
class DatetimeFieldDetector:
29+
"""Detects datetime field schema mismatches between code and Redis."""
30+
31+
def __init__(self, redis):
32+
self.redis = redis
33+
34+
async def check_for_schema_mismatches(self, models: List[Any]) -> Dict[str, Any]:
35+
"""
36+
Check if any models have datetime fields that are indexed as TAG instead of NUMERIC.
37+
38+
This detects the scenario where:
39+
1. User had old code with datetime fields indexed as TAG
40+
2. User deployed new code that expects NUMERIC indexing
41+
3. User hasn't run the migration yet
42+
43+
Returns:
44+
Dict with mismatch information and recommended actions
45+
"""
46+
mismatches = []
47+
48+
for model in models:
49+
try:
50+
# Get the current index schema from Redis
51+
index_name = f"{model._meta.global_key_prefix}:{model._meta.model_key_prefix}"
52+
53+
try:
54+
# Try to get index info
55+
index_info = await self.redis.execute_command("FT.INFO", index_name)
56+
current_schema = self._parse_index_schema(index_info)
57+
except Exception:
58+
# Index doesn't exist or other error - skip this model
59+
continue
60+
61+
# Check datetime fields in the model
62+
datetime_fields = self._get_datetime_fields(model)
63+
64+
for field_name, field_info in datetime_fields.items():
65+
redis_field_type = current_schema.get(field_name, {}).get('type')
66+
67+
if redis_field_type == 'TAG' and field_info.get('expected_type') == 'NUMERIC':
68+
mismatches.append({
69+
'model': model.__name__,
70+
'field': field_name,
71+
'current_type': 'TAG',
72+
'expected_type': 'NUMERIC',
73+
'index_name': index_name
74+
})
75+
76+
except Exception as e:
77+
log.warning(f"Could not check schema for model {model.__name__}: {e}")
78+
continue
79+
80+
return {
81+
'has_mismatches': len(mismatches) > 0,
82+
'mismatches': mismatches,
83+
'total_affected_models': len(set(m['model'] for m in mismatches)),
84+
'recommendation': self._get_recommendation(mismatches)
85+
}
86+
87+
def _parse_index_schema(self, index_info: List) -> Dict[str, Dict[str, Any]]:
88+
"""Parse FT.INFO output to extract field schema information."""
89+
schema = {}
90+
91+
# FT.INFO returns a list of key-value pairs
92+
info_dict = {}
93+
for i in range(0, len(index_info), 2):
94+
if i + 1 < len(index_info):
95+
key = index_info[i].decode() if isinstance(index_info[i], bytes) else str(index_info[i])
96+
value = index_info[i + 1]
97+
info_dict[key] = value
98+
99+
# Extract attributes (field definitions)
100+
attributes = info_dict.get('attributes', [])
101+
102+
for attr in attributes:
103+
if isinstance(attr, list) and len(attr) >= 4:
104+
field_name = attr[0].decode() if isinstance(attr[0], bytes) else str(attr[0])
105+
field_type = attr[2].decode() if isinstance(attr[2], bytes) else str(attr[2])
106+
107+
schema[field_name] = {
108+
'type': field_type,
109+
'raw_attr': attr
110+
}
111+
112+
return schema
113+
114+
def _get_datetime_fields(self, model) -> Dict[str, Dict[str, Any]]:
115+
"""Get datetime fields from a model and their expected types."""
116+
datetime_fields = {}
117+
118+
try:
119+
# Get model fields in a compatible way
120+
if hasattr(model, '_get_model_fields'):
121+
model_fields = model._get_model_fields()
122+
elif hasattr(model, 'model_fields'):
123+
model_fields = model.model_fields
124+
else:
125+
model_fields = getattr(model, '__fields__', {})
126+
127+
for field_name, field_info in model_fields.items():
128+
# Check if this is a datetime field
129+
field_type = getattr(field_info, 'annotation', None)
130+
if field_type in (datetime.datetime, datetime.date):
131+
datetime_fields[field_name] = {
132+
'expected_type': 'NUMERIC', # New code expects NUMERIC
133+
'field_info': field_info
134+
}
135+
136+
except Exception as e:
137+
log.warning(f"Could not analyze fields for model {model.__name__}: {e}")
138+
139+
return datetime_fields
140+
141+
def _get_recommendation(self, mismatches: List[Dict]) -> str:
142+
"""Get recommendation based on detected mismatches."""
143+
if not mismatches:
144+
return "No schema mismatches detected."
145+
146+
return (
147+
f"CRITICAL: Found {len(mismatches)} datetime field(s) with schema mismatches. "
148+
f"Your deployed code expects NUMERIC indexing but Redis has TAG indexing. "
149+
f"Run 'om migrate-data datetime' to fix this before queries fail. "
150+
f"Affected models: {', '.join(set(m['model'] for m in mismatches))}"
151+
)
152+
153+
23154
class ConversionFailureMode(Enum):
24155
"""How to handle datetime conversion failures."""
25156

aredis_om/model/model.py

Lines changed: 65 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1603,7 +1603,23 @@ async def execute(
16031603

16041604
# If the offset is greater than 0, we're paginating through a result set,
16051605
# so append the new results to results already in the cache.
1606-
raw_result = await self.model.db().execute_command(*args)
1606+
try:
1607+
raw_result = await self.model.db().execute_command(*args)
1608+
except Exception as e:
1609+
error_msg = str(e).lower()
1610+
1611+
# Check if this might be a datetime field schema mismatch
1612+
if "syntax error" in error_msg and self._has_datetime_fields():
1613+
log.warning(
1614+
f"Query failed with syntax error on model with datetime fields. "
1615+
f"This might indicate a schema mismatch where datetime fields are "
1616+
f"indexed as TAG but code expects NUMERIC. "
1617+
f"Run 'om migrate-data check-schema' to verify and "
1618+
f"'om migrate-data datetime' to fix."
1619+
)
1620+
1621+
# Re-raise the original exception
1622+
raise
16071623
if return_raw_result:
16081624
return raw_result
16091625
count = raw_result[0]
@@ -1806,6 +1822,21 @@ async def get_item(self, item: int):
18061822
result = await query.execute()
18071823
return result[0]
18081824

1825+
def _has_datetime_fields(self) -> bool:
1826+
"""Check if the model has any datetime fields."""
1827+
try:
1828+
import datetime
1829+
model_fields = self.model._get_model_fields()
1830+
1831+
for field_name, field_info in model_fields.items():
1832+
field_type = getattr(field_info, "annotation", None)
1833+
if field_type in (datetime.datetime, datetime.date):
1834+
return True
1835+
1836+
return False
1837+
except Exception:
1838+
return False
1839+
18091840

18101841
class PrimaryKeyCreator(Protocol):
18111842
def create_pk(self, *args, **kwargs) -> str:
@@ -2226,6 +2257,39 @@ def _get_model_fields(cls):
22262257
else:
22272258
return cls.__fields__
22282259

2260+
@classmethod
2261+
async def check_datetime_schema_compatibility(cls) -> Dict[str, Any]:
2262+
"""
2263+
Check if this model's datetime fields have compatible schema in Redis.
2264+
2265+
This detects if the model was deployed with new datetime indexing code
2266+
but the migration hasn't been run yet.
2267+
2268+
Returns:
2269+
Dict with compatibility information and warnings
2270+
"""
2271+
try:
2272+
from .migrations.datetime_migration import DatetimeFieldDetector
2273+
2274+
detector = DatetimeFieldDetector(cls.db())
2275+
result = await detector.check_for_schema_mismatches([cls])
2276+
2277+
if result['has_mismatches']:
2278+
log.warning(
2279+
f"Schema mismatch detected for {cls.__name__}: "
2280+
f"{result['recommendation']}"
2281+
)
2282+
2283+
return result
2284+
2285+
except Exception as e:
2286+
log.debug(f"Could not check datetime schema compatibility for {cls.__name__}: {e}")
2287+
return {
2288+
'has_mismatches': False,
2289+
'error': str(e),
2290+
'recommendation': 'Could not check schema compatibility'
2291+
}
2292+
22292293
def __init__(__pydantic_self__, **data: Any) -> None:
22302294
if PYDANTIC_V2:
22312295
is_indexed = __pydantic_self__.model_config.get("index") is True

0 commit comments

Comments
 (0)