|
| 1 | +"""fix_hash_2 |
| 2 | +
|
| 3 | +Revision ID: f6a1859a4d3b |
| 4 | +Revises: 9e8c841d1a30 |
| 5 | +Create Date: 2025-05-02 05:40:06.640752 |
| 6 | +
|
| 7 | +""" |
| 8 | +import logging |
| 9 | +from alembic import op |
| 10 | +import sqlalchemy as sa |
| 11 | +from sqlalchemy.sql import table |
| 12 | +from sqlalchemy import select |
| 13 | + |
| 14 | +from redash.query_runner import BaseQueryRunner, get_query_runner |
| 15 | + |
| 16 | + |
| 17 | +# revision identifiers, used by Alembic. |
| 18 | +revision = 'f6a1859a4d3b' |
| 19 | +down_revision = '9e8c841d1a30' |
| 20 | +branch_labels = None |
| 21 | +depends_on = None |
| 22 | + |
| 23 | +def update_query_hash(record): |
| 24 | + should_apply_auto_limit = record['options'].get("apply_auto_limit", False) if record['options'] else False |
| 25 | + query_runner = get_query_runner(record['type'], {}) if record['type'] else BaseQueryRunner({}) |
| 26 | + query_text = record['query'] |
| 27 | + |
| 28 | + parameters_dict = {p["name"]: p.get("value") for p in record['options'].get('parameters', [])} if record.options else {} |
| 29 | + if any(parameters_dict): |
| 30 | + print(f"Query {record['query_id']} has parameters. Hash might be incorrect.") |
| 31 | + |
| 32 | + return query_runner.gen_query_hash(query_text, should_apply_auto_limit) |
| 33 | + |
| 34 | + |
| 35 | +def upgrade(): |
| 36 | + conn = op.get_bind() |
| 37 | + |
| 38 | + metadata = sa.MetaData(bind=conn) |
| 39 | + queries = sa.Table("queries", metadata, autoload=True) |
| 40 | + data_sources = sa.Table("data_sources", metadata, autoload=True) |
| 41 | + |
| 42 | + joined_table = queries.outerjoin(data_sources, queries.c.data_source_id == data_sources.c.id) |
| 43 | + |
| 44 | + query = select([ |
| 45 | + queries.c.id.label("query_id"), |
| 46 | + queries.c.query, |
| 47 | + queries.c.query_hash, |
| 48 | + queries.c.options, |
| 49 | + data_sources.c.id.label("data_source_id"), |
| 50 | + data_sources.c.type |
| 51 | + ]).select_from(joined_table) |
| 52 | + |
| 53 | + for record in conn.execute(query): |
| 54 | + new_hash = update_query_hash(record) |
| 55 | + if new_hash == record['query_hash']: |
| 56 | + print(f"Hash for query {record['query_id']} is not changed from {record['query_hash']}") |
| 57 | + continue |
| 58 | + print(f"Updating hash for query {record['query_id']} from {record['query_hash']} to {new_hash}") |
| 59 | + conn.execute( |
| 60 | + queries.update() |
| 61 | + .where(queries.c.id == record['query_id']) |
| 62 | + .values(query_hash=new_hash)) |
| 63 | + |
| 64 | + |
| 65 | +def downgrade(): |
| 66 | + pass |
0 commit comments