|
| 1 | +"""Migration 001: Add generic task system columns and backfill from jira_key. |
| 2 | +
|
| 3 | +Additive only — no columns removed, no constraints changed. |
| 4 | +Idempotent — safe to run multiple times (skips rows where external_key is already set). |
| 5 | +
|
| 6 | +Usage: |
| 7 | + python -m memory_server.migrations.m001_generic_tasks |
| 8 | +""" |
| 9 | + |
| 10 | +import asyncio |
| 11 | +import json |
| 12 | +import os |
| 13 | +import sys |
| 14 | +from pathlib import Path |
| 15 | + |
| 16 | +import asyncpg |
| 17 | + |
| 18 | +SCHEMA_PATH = Path(__file__).parent.parent / "schema.sql" |
| 19 | + |
| 20 | +JIRA_BASE_URL = "https://issues.redhat.com/browse" |
| 21 | + |
| 22 | +SIMPLE_TABLES = [ |
| 23 | + "bot_status", |
| 24 | + "bot_instances", |
| 25 | + "cycles", |
| 26 | + "slack_notifications", |
| 27 | + "memories", |
| 28 | +] |
| 29 | + |
| 30 | + |
| 31 | +def _build_dsn() -> str: |
| 32 | + url = os.environ.get("DATABASE_URL") |
| 33 | + if url: |
| 34 | + return url |
| 35 | + host = os.environ.get("PGSQL_HOSTNAME", "localhost") |
| 36 | + port = os.environ.get("PGSQL_PORT", "5432") |
| 37 | + user = os.environ.get("PGSQL_USER", "devbot_test") |
| 38 | + password = os.environ.get("PGSQL_PASSWORD", "devbot_test") |
| 39 | + database = os.environ.get("PGSQL_DATABASE", "devbot_migration_test") |
| 40 | + return f"postgresql://{user}:{password}@{host}:{port}/{database}" |
| 41 | + |
| 42 | + |
| 43 | +def _build_artifacts(pr_number, pr_url, metadata) -> list[dict]: |
| 44 | + artifacts = [] |
| 45 | + seen_urls: set[str] = set() |
| 46 | + |
| 47 | + if pr_number and pr_url: |
| 48 | + artifacts.append( |
| 49 | + {"name": f"PR #{pr_number}", "url": pr_url, "type": "pull_request"} |
| 50 | + ) |
| 51 | + seen_urls.add(pr_url) |
| 52 | + |
| 53 | + meta = metadata if isinstance(metadata, dict) else {} |
| 54 | + for pr in meta.get("prs", []): |
| 55 | + url = pr.get("url", "") |
| 56 | + if not url or url in seen_urls: |
| 57 | + continue |
| 58 | + seen_urls.add(url) |
| 59 | + number = pr.get("number", "?") |
| 60 | + pr_type = "merge_request" if pr.get("host") == "gitlab" else "pull_request" |
| 61 | + prefix = "MR" if pr_type == "merge_request" else "PR" |
| 62 | + artifacts.append({"name": f"{prefix} #{number}", "url": url, "type": pr_type}) |
| 63 | + |
| 64 | + return artifacts |
| 65 | + |
| 66 | + |
| 67 | +async def run_migration(conn: asyncpg.Connection) -> dict: |
| 68 | + schema = SCHEMA_PATH.read_text() |
| 69 | + await conn.execute(schema) |
| 70 | + |
| 71 | + stats = {"tasks": 0} |
| 72 | + |
| 73 | + rows = await conn.fetch( |
| 74 | + "SELECT id, jira_key, pr_number, pr_url, metadata " |
| 75 | + "FROM tasks WHERE external_key IS NULL AND jira_key IS NOT NULL" |
| 76 | + ) |
| 77 | + for row in rows: |
| 78 | + meta = row["metadata"] |
| 79 | + if isinstance(meta, str): |
| 80 | + meta = json.loads(meta) |
| 81 | + |
| 82 | + artifacts = _build_artifacts(row["pr_number"], row["pr_url"], meta) |
| 83 | + source_url = f"{JIRA_BASE_URL}/{row['jira_key']}" if row["jira_key"] else None |
| 84 | + |
| 85 | + await conn.execute( |
| 86 | + "UPDATE tasks SET external_key = $1, source_type = $2, " |
| 87 | + "source_url = $3, artifacts = $4 WHERE id = $5", |
| 88 | + row["jira_key"], |
| 89 | + "jira", |
| 90 | + source_url, |
| 91 | + json.dumps(artifacts), |
| 92 | + row["id"], |
| 93 | + ) |
| 94 | + stats["tasks"] += 1 |
| 95 | + |
| 96 | + for table in SIMPLE_TABLES: |
| 97 | + has_jira_key = await conn.fetchval( |
| 98 | + "SELECT EXISTS (" |
| 99 | + " SELECT 1 FROM information_schema.columns " |
| 100 | + " WHERE table_name = $1 AND column_name = 'jira_key'" |
| 101 | + ")", |
| 102 | + table, |
| 103 | + ) |
| 104 | + if not has_jira_key: |
| 105 | + stats[table] = 0 |
| 106 | + continue |
| 107 | + |
| 108 | + result = await conn.execute( |
| 109 | + f"UPDATE {table} SET external_key = jira_key, source_type = 'jira' " # noqa: S608 |
| 110 | + f"WHERE external_key IS NULL AND jira_key IS NOT NULL" |
| 111 | + ) |
| 112 | + count = int(result.split()[-1]) if result else 0 |
| 113 | + stats[table] = count |
| 114 | + |
| 115 | + return stats |
| 116 | + |
| 117 | + |
| 118 | +async def main(): |
| 119 | + dsn = _build_dsn() |
| 120 | + conn = await asyncpg.connect(dsn) |
| 121 | + try: |
| 122 | + stats = await run_migration(conn) |
| 123 | + print("Migration 001 complete:") |
| 124 | + for table, count in stats.items(): |
| 125 | + print(f" {table}: {count} rows backfilled") |
| 126 | + finally: |
| 127 | + await conn.close() |
| 128 | + |
| 129 | + |
| 130 | +if __name__ == "__main__": |
| 131 | + asyncio.run(main()) |
0 commit comments