From b1c07f5e6c1622fdc77aa8d26eb1b4ce3e7b4c95 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 12:32:44 +0530 Subject: [PATCH 1/7] fix(bootstrap): Serialize fast-path bootstrap helpers with advisory lock to prevent race conditions (fixes #4993) Signed-off-by: Rakhi Dutta --- mcpgateway/bootstrap_db.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/mcpgateway/bootstrap_db.py b/mcpgateway/bootstrap_db.py index b52edc77e6..6d61fb51a8 100644 --- a/mcpgateway/bootstrap_db.py +++ b/mcpgateway/bootstrap_db.py @@ -856,18 +856,19 @@ async def main() -> None: try: # Fast path: if the schema is already at the current Alembic head, - # skip the migration advisory lock entirely. This is critical for - # deployments behind a transaction-pooling connection pooler — the - # session-scoped advisory lock can be orphaned across pgbouncer's - # backend handoffs, which would otherwise make N-th pod startup - # spin indefinitely. Replicas 2..N take this branch on normal - # restarts. + # skip the migration step but still acquire the advisory lock to + # serialize bootstrap_resource_assignments across concurrent workers. + # This prevents race conditions when multiple pods restart simultaneously + # and attempt to assign the same orphaned resources (issue #4993). + # Replicas 2..N take this branch on normal restarts. with engine.connect() as probe_conn: probe_conn.commit() if alembic_at_head(probe_conn, cfg): - logger.info("Schema already at Alembic head; skipping migration lock") - await _run_post_migration_bootstrap(probe_conn) - probe_conn.commit() + logger.info("Schema already at Alembic head; skipping migration, acquiring lock for bootstrap") + with advisory_lock(probe_conn): + logger.info("Acquired lock for bootstrap helpers") + await _run_post_migration_bootstrap(probe_conn) + probe_conn.commit() logger.info("Database ready") return From c8bfa18bc2341ebf03eadf4603033f0f02656b38 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 12:43:38 +0530 Subject: [PATCH 2/7] test: Add automated test script for issue #4993 race condition fix Signed-off-by: Rakhi Dutta --- scripts/test_issue_4993_fix.sh | 204 +++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100755 scripts/test_issue_4993_fix.sh diff --git a/scripts/test_issue_4993_fix.sh b/scripts/test_issue_4993_fix.sh new file mode 100755 index 0000000000..929807a9a5 --- /dev/null +++ b/scripts/test_issue_4993_fix.sh @@ -0,0 +1,204 @@ +#!/bin/bash +# Testing script for Issue #4993 fix +# Tests that concurrent bootstrap processes no longer race on orphaned resource assignments +# +# Prerequisites: +# - PostgreSQL running and accessible +# - Virtual environment activated (source .venv/bin/activate) +# - DATABASE_URL environment variable set (or defaults to local PostgreSQL) +# +# Usage: +# bash scripts/test_issue_4993_fix.sh + +set -e + +echo "==========================================" +echo "Issue #4993 Fix - Race Condition Test" +echo "==========================================" +echo "" + +# Check if we're in a virtual environment +if [ -z "$VIRTUAL_ENV" ]; then + echo "❌ Error: Virtual environment not activated" + echo "Please run: source .venv/bin/activate" + exit 1 +fi + +# Default DATABASE_URL if not set +DEFAULT_DB_URL="postgresql+psycopg://postgres:mysecretpassword@localhost:5432/mcp" +DB_URL="${DATABASE_URL:-$DEFAULT_DB_URL}" +DB_URL_CLEAN="${DB_URL/+psycopg/}" + +echo "Configuration:" +echo " Database: ${DB_URL}" +echo " Test workers: 30 per run" +echo " Test iterations: 5" +echo " Orphaned tools per test: 30" +echo "" + +# Verify PostgreSQL connection +echo "Checking PostgreSQL connection..." +if ! python -c " +import psycopg +try: + conn = psycopg.connect('${DB_URL_CLEAN}') + conn.close() + print('✅ PostgreSQL connection successful') +except Exception as e: + print(f'❌ Failed to connect to PostgreSQL: {e}') + exit(1) +" 2>/dev/null; then + echo "❌ Cannot connect to PostgreSQL" + echo "Please ensure PostgreSQL is running and DATABASE_URL is correct" + exit 1 +fi + +echo "" + +# Test parameters +NUM_TESTS=5 +WORKERS_PER_TEST=30 +ORPHANED_TOOLS=30 + +race_detected=0 +total_conflicts=0 + +for test_num in $(seq 1 $NUM_TESTS); do + echo "==========================================" + echo "Test Run $test_num/$NUM_TESTS" + echo "==========================================" + + # Setup: Insert orphaned tools + echo "[1/4] Setting up $ORPHANED_TOOLS orphaned tools..." + python << SETUP +import psycopg +import uuid +import os + +num_tools = ${ORPHANED_TOOLS} +conn = psycopg.connect('${DB_URL_CLEAN}') +cur = conn.cursor() + +# Clean up previous test tools +cur.execute("DELETE FROM tools WHERE name LIKE 'issue4993-test-tool%'") + +# Insert orphaned tools +for i in range(num_tools): + tool_id = uuid.uuid4().hex + tool_name = f'issue4993-test-tool-{i:03d}' + cur.execute(""" + INSERT INTO tools ( + id, original_name, name, description, input_schema, + custom_name, custom_name_slug, integration_type, request_type, + enabled, deprecated, reachable, jsonpath_filter, tags, version, visibility, + created_at, updated_at + ) VALUES (%s, %s, %s, %s, '{}'::jsonb, %s, %s, %s, %s, %s, %s, %s, %s, + '[]'::jsonb, %s, %s, NOW(), NOW()) + """, (tool_id, tool_name, tool_name, f'Test tool {i}', tool_name, tool_name, + 'MCP', 'SSE', True, False, True, '', 1, 'private')) + +conn.commit() +cur.execute("SELECT COUNT(*) FROM tools WHERE team_id IS NULL AND name LIKE 'issue4993-test-tool%'") +count = cur.fetchone()[0] +print(f"✓ Created {count} orphaned tools") +conn.close() +SETUP + + # Launch concurrent workers + echo "[2/4] Launching $WORKERS_PER_TEST concurrent workers..." + rm -f test_issue4993_*.log 2>/dev/null || true + + start_time=$(date +%s) + for i in $(seq 1 $WORKERS_PER_TEST); do + (python -m mcpgateway.bootstrap_db > test_issue4993_$i.log 2>&1) & + done + + # Wait for all workers + wait + end_time=$(date +%s) + duration=$((end_time - start_time)) + + echo "[3/4] All workers completed in ${duration}s" + + # Analyze results + echo "[4/4] Analyzing results..." + + # Check for name conflicts (evidence of race condition) + conflicts=$(grep -c "Name conflict" test_issue4993_*.log 2>/dev/null | grep -v ":0$" | wc -l | tr -d ' ') + + if [ "$conflicts" -gt 0 ]; then + echo " ❌ RACE DETECTED: $conflicts worker(s) saw name conflicts" + race_detected=$((race_detected + 1)) + total_conflicts=$((total_conflicts + conflicts)) + + # Show which workers saw conflicts + echo " Workers with conflicts:" + grep -l "Name conflict" test_issue4993_*.log 2>/dev/null | head -5 | while read log; do + count=$(grep -c "Name conflict" "$log") + echo " - $log: $count conflicts" + done + else + echo " ✅ No race condition detected" + fi + + # Verify advisory lock usage + lock_acquisitions=$(grep -c "Acquired lock for bootstrap helpers" test_issue4993_*.log 2>/dev/null | grep -v ":0$" | wc -l | tr -d ' ') + if [ "$lock_acquisitions" -gt 0 ]; then + echo " ✅ Advisory lock used by $lock_acquisitions worker(s)" + fi + + # Check database state + python << CHECK +import psycopg +conn = psycopg.connect('${DB_URL_CLEAN}') +cur = conn.cursor() + +cur.execute("SELECT COUNT(*) FROM tools WHERE name LIKE 'issue4993-test-tool%' AND team_id IS NULL") +orphaned = cur.fetchone()[0] + +cur.execute("SELECT COUNT(*) FROM tools WHERE name LIKE 'issue4993-test-tool%' AND team_id IS NOT NULL") +assigned = cur.fetchone()[0] + +# Check for renamed tools (indicates race condition occurred) +cur.execute("SELECT COUNT(*) FROM tools WHERE name ~ 'issue4993-test-tool-[0-9]+-[0-9]+'") +renamed = cur.fetchone()[0] + +print(f" Database state: {orphaned} orphaned, {assigned} assigned, {renamed} renamed") + +if orphaned > 0: + print(f" ⚠️ WARNING: {orphaned} tools remain orphaned") +if renamed > 0: + print(f" ⚠️ WARNING: {renamed} tools were renamed (race occurred)") + +conn.close() +CHECK + + echo "" +done + +# Cleanup test logs +echo "Cleaning up test logs..." +rm -f test_issue4993_*.log + +# Final summary +echo "==========================================" +echo "Test Summary" +echo "==========================================" +echo "Total test runs: $NUM_TESTS" +echo "Races detected: $race_detected out of $NUM_TESTS" +echo "Total workers with conflicts: $total_conflicts" +echo "" + +if [ "$race_detected" -eq 0 ]; then + echo "✅✅✅ SUCCESS! No race conditions detected!" + echo "" + echo "The fix is working correctly. Multiple concurrent workers" + echo "are properly serialized through the advisory lock mechanism." + exit 0 +else + echo "❌ Race conditions detected in $race_detected test run(s)" + echo "" + echo "This indicates the race condition is not fully resolved." + echo "Please review the fix implementation." + exit 1 +fi From 46d97a4129241501995cb09e173140a43091c0f9 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 13:14:10 +0530 Subject: [PATCH 3/7] test cases fix Signed-off-by: Rakhi Dutta --- tests/unit/mcpgateway/test_bootstrap_db.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/tests/unit/mcpgateway/test_bootstrap_db.py b/tests/unit/mcpgateway/test_bootstrap_db.py index 59b6c6798f..894ebb8268 100644 --- a/tests/unit/mcpgateway/test_bootstrap_db.py +++ b/tests/unit/mcpgateway/test_bootstrap_db.py @@ -1739,7 +1739,7 @@ async def test_main_skip_migration_exception_reraises(self, mock_settings): @pytest.mark.asyncio async def test_main_fast_path_already_at_head(self, mock_settings): - """skip_migration=False, already at head: skips advisory lock, commits (lines 820, 827).""" + """skip_migration=False, already at head: skips migration but still acquires lock for bootstrap (lines 867, 868).""" mock_settings.mcpgateway_skip_migrations = False mock_engine, mock_conn = self._make_engine_cm() @@ -1748,6 +1748,7 @@ async def test_main_fast_path_already_at_head(self, mock_settings): patch("importlib.resources.files") as mock_files, patch("mcpgateway.bootstrap_db.Config", return_value=MagicMock(attributes={})), patch("mcpgateway.bootstrap_db.alembic_at_head", return_value=True), + patch("mcpgateway.bootstrap_db.advisory_lock") as mock_advisory_lock, patch("mcpgateway.bootstrap_db.normalize_team_visibility", return_value=0), patch("mcpgateway.bootstrap_db.bootstrap_admin_user", new=AsyncMock()) as mock_admin, patch("mcpgateway.bootstrap_db.bootstrap_default_roles", new=AsyncMock()) as mock_roles, @@ -1756,8 +1757,10 @@ async def test_main_fast_path_already_at_head(self, mock_settings): patch("mcpgateway.bootstrap_db.logger") as mock_logger, ): mock_files.return_value.joinpath.return_value = "alembic.ini" + mock_advisory_lock.return_value.__enter__ = Mock(return_value=None) + mock_advisory_lock.return_value.__exit__ = Mock(return_value=None) await main() - mock_logger.info.assert_any_call("Schema already at Alembic head; skipping migration lock") + mock_logger.info.assert_any_call("Schema already at Alembic head; skipping migration, acquiring lock for bootstrap") mock_conn.commit.assert_called() mock_admin.assert_called_once() mock_roles.assert_called_once() From 151593d4fa3cb047bd44e59141daa91a15675731 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 14:33:07 +0530 Subject: [PATCH 4/7] fix(bootstrap): replace advisory lock with idempotent per-row commits for resource assignments Signed-off-by: Rakhi Dutta --- mcpgateway/bootstrap_db.py | 39 +++++++++++++++------- tests/unit/mcpgateway/test_bootstrap_db.py | 7 ++-- 2 files changed, 29 insertions(+), 17 deletions(-) diff --git a/mcpgateway/bootstrap_db.py b/mcpgateway/bootstrap_db.py index 6d61fb51a8..3b304bb2fe 100644 --- a/mcpgateway/bootstrap_db.py +++ b/mcpgateway/bootstrap_db.py @@ -47,6 +47,7 @@ from filelock import FileLock from sqlalchemy import create_engine, or_, text from sqlalchemy.engine import Connection +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session # First-Party @@ -741,6 +742,7 @@ def _like_safe(v: str) -> str: # Track names claimed within this batch to catch intra-batch duplicates batch_assigned: set[str] = set() + assigned_count = 0 for resource in unassigned: original_value = getattr(resource, field) @@ -766,8 +768,22 @@ def _like_safe(v: str) -> str: if hasattr(resource, "federation_source") and not resource.federation_source: resource.federation_source = "mcpgateway-0.7.0-migration" - db.commit() - total_assigned += len(unassigned) + # Per-row commit with race-condition handling (issue #4993) + # If another worker assigned this resource concurrently, gracefully skip it + try: + db.commit() + assigned_count += 1 + except IntegrityError as ie: + # Another worker assigned this resource first - rollback and continue + db.rollback() + logger.debug( + f"Skipping {SecurityValidator.sanitize_log_message(resource_name)} " + f"'{SecurityValidator.sanitize_log_message(str(getattr(resource, field)))}' " + f"- already assigned by concurrent worker: {SecurityValidator.sanitize_log_message(str(ie))}" + ) + continue + + total_assigned += assigned_count except Exception as e: logger.error(f"Failed to assign {SecurityValidator.sanitize_log_message(resource_name)}: {SecurityValidator.sanitize_log_message(str(e))}") @@ -856,19 +872,18 @@ async def main() -> None: try: # Fast path: if the schema is already at the current Alembic head, - # skip the migration step but still acquire the advisory lock to - # serialize bootstrap_resource_assignments across concurrent workers. - # This prevents race conditions when multiple pods restart simultaneously - # and attempt to assign the same orphaned resources (issue #4993). - # Replicas 2..N take this branch on normal restarts. + # skip the migration advisory lock entirely. This is critical for + # deployments behind a transaction-pooling connection pooler — the + # session-scoped advisory lock can be orphaned across pgbouncer's + # backend handoffs, which would otherwise make N-th pod startup + # spin indefinitely. Replicas 2..N take this branch on normal + # restarts. with engine.connect() as probe_conn: probe_conn.commit() if alembic_at_head(probe_conn, cfg): - logger.info("Schema already at Alembic head; skipping migration, acquiring lock for bootstrap") - with advisory_lock(probe_conn): - logger.info("Acquired lock for bootstrap helpers") - await _run_post_migration_bootstrap(probe_conn) - probe_conn.commit() + logger.info("Schema already at Alembic head; skipping migration lock") + await _run_post_migration_bootstrap(probe_conn) + probe_conn.commit() logger.info("Database ready") return diff --git a/tests/unit/mcpgateway/test_bootstrap_db.py b/tests/unit/mcpgateway/test_bootstrap_db.py index 894ebb8268..59b6c6798f 100644 --- a/tests/unit/mcpgateway/test_bootstrap_db.py +++ b/tests/unit/mcpgateway/test_bootstrap_db.py @@ -1739,7 +1739,7 @@ async def test_main_skip_migration_exception_reraises(self, mock_settings): @pytest.mark.asyncio async def test_main_fast_path_already_at_head(self, mock_settings): - """skip_migration=False, already at head: skips migration but still acquires lock for bootstrap (lines 867, 868).""" + """skip_migration=False, already at head: skips advisory lock, commits (lines 820, 827).""" mock_settings.mcpgateway_skip_migrations = False mock_engine, mock_conn = self._make_engine_cm() @@ -1748,7 +1748,6 @@ async def test_main_fast_path_already_at_head(self, mock_settings): patch("importlib.resources.files") as mock_files, patch("mcpgateway.bootstrap_db.Config", return_value=MagicMock(attributes={})), patch("mcpgateway.bootstrap_db.alembic_at_head", return_value=True), - patch("mcpgateway.bootstrap_db.advisory_lock") as mock_advisory_lock, patch("mcpgateway.bootstrap_db.normalize_team_visibility", return_value=0), patch("mcpgateway.bootstrap_db.bootstrap_admin_user", new=AsyncMock()) as mock_admin, patch("mcpgateway.bootstrap_db.bootstrap_default_roles", new=AsyncMock()) as mock_roles, @@ -1757,10 +1756,8 @@ async def test_main_fast_path_already_at_head(self, mock_settings): patch("mcpgateway.bootstrap_db.logger") as mock_logger, ): mock_files.return_value.joinpath.return_value = "alembic.ini" - mock_advisory_lock.return_value.__enter__ = Mock(return_value=None) - mock_advisory_lock.return_value.__exit__ = Mock(return_value=None) await main() - mock_logger.info.assert_any_call("Schema already at Alembic head; skipping migration, acquiring lock for bootstrap") + mock_logger.info.assert_any_call("Schema already at Alembic head; skipping migration lock") mock_conn.commit.assert_called() mock_admin.assert_called_once() mock_roles.assert_called_once() From 8e7b15f4db5cefd047b00a25eac94edd4607efc3 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 14:51:54 +0530 Subject: [PATCH 5/7] fix(bootstrap): clean up batch_assigned after rollback to prevent false conflicts in per-row commits Signed-off-by: Rakhi Dutta --- mcpgateway/bootstrap_db.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/mcpgateway/bootstrap_db.py b/mcpgateway/bootstrap_db.py index 3b304bb2fe..ffbebfc96d 100644 --- a/mcpgateway/bootstrap_db.py +++ b/mcpgateway/bootstrap_db.py @@ -773,13 +773,19 @@ def _like_safe(v: str) -> str: try: db.commit() assigned_count += 1 - except IntegrityError as ie: + except IntegrityError: # Another worker assigned this resource first - rollback and continue db.rollback() + + # Clean up batch_assigned to prevent false conflicts in subsequent iterations + if original_value is not None: + final_value = getattr(resource, field) + batch_assigned.discard(final_value) + logger.debug( f"Skipping {SecurityValidator.sanitize_log_message(resource_name)} " f"'{SecurityValidator.sanitize_log_message(str(getattr(resource, field)))}' " - f"- already assigned by concurrent worker: {SecurityValidator.sanitize_log_message(str(ie))}" + f"- already assigned by concurrent worker" ) continue From 4c4c014f6b32e9b55262abcf339a321d507f402e Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 15:06:21 +0530 Subject: [PATCH 6/7] fix(bootstrap): handle race condition with per-row commits, session cleanup, and conflict tracking Signed-off-by: Rakhi Dutta --- mcpgateway/bootstrap_db.py | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/mcpgateway/bootstrap_db.py b/mcpgateway/bootstrap_db.py index ffbebfc96d..9a38184764 100644 --- a/mcpgateway/bootstrap_db.py +++ b/mcpgateway/bootstrap_db.py @@ -746,6 +746,7 @@ def _like_safe(v: str) -> str: for resource in unassigned: original_value = getattr(resource, field) + added_to_batch = None # Track what we add to batch_assigned for cleanup on rollback if original_value is not None: taken = existing_taken | batch_assigned @@ -759,8 +760,10 @@ def _like_safe(v: str) -> str: ) setattr(resource, field, new_value) batch_assigned.add(new_value) + added_to_batch = new_value else: batch_assigned.add(original_value) + added_to_batch = original_value resource.team_id = personal_team.id resource.owner_email = admin_user.email @@ -776,15 +779,20 @@ def _like_safe(v: str) -> str: except IntegrityError: # Another worker assigned this resource first - rollback and continue db.rollback() - + + # Expunge the resource from session to prevent re-flush on subsequent commits + # Without this, the rolled-back resource remains "dirty" and SQLAlchemy will + # attempt to flush it again on every subsequent commit in this loop + db.expunge(resource) + # Clean up batch_assigned to prevent false conflicts in subsequent iterations - if original_value is not None: - final_value = getattr(resource, field) - batch_assigned.discard(final_value) - + # Use the tracked value since resource state is unpredictable after expunge + if added_to_batch is not None: + batch_assigned.discard(added_to_batch) + logger.debug( f"Skipping {SecurityValidator.sanitize_log_message(resource_name)} " - f"'{SecurityValidator.sanitize_log_message(str(getattr(resource, field)))}' " + f"'{SecurityValidator.sanitize_log_message(str(original_value))}' " f"- already assigned by concurrent worker" ) continue From f188e7605215af11f2eb635054a3a04dc9303c62 Mon Sep 17 00:00:00 2001 From: Rakhi Dutta Date: Tue, 2 Jun 2026 15:12:44 +0530 Subject: [PATCH 7/7] test cases added Signed-off-by: Rakhi Dutta --- tests/unit/mcpgateway/test_bootstrap_db.py | 206 +++++++++++++++++++++ 1 file changed, 206 insertions(+) diff --git a/tests/unit/mcpgateway/test_bootstrap_db.py b/tests/unit/mcpgateway/test_bootstrap_db.py index 59b6c6798f..523eb610d5 100644 --- a/tests/unit/mcpgateway/test_bootstrap_db.py +++ b/tests/unit/mcpgateway/test_bootstrap_db.py @@ -1451,6 +1451,212 @@ async def test_resource_assignments_exception(self, mock_settings, mock_db_sessi mock_logger.error.assert_called_with("Failed to bootstrap resource assignments: Database error") + @pytest.mark.asyncio + async def test_resource_assignments_integrity_error_rollback(self, mock_settings, mock_db_session, mock_admin_user, mock_personal_team, mock_conn): + """Test IntegrityError handling with rollback and expunge (issue #4993).""" + from sqlalchemy.exc import IntegrityError + + mock_admin_user.get_personal_team.return_value = mock_personal_team + + # Mock unassigned tool + mock_tool = Mock() + mock_tool.team_id = None + mock_tool.owner_email = None + mock_tool.visibility = None + mock_tool.name = "test-tool" + + def mock_query_handler(model): + query = MagicMock() + query.filter.return_value = query + + if hasattr(model, "__name__") and model.__name__ == "EmailUser": + query.first.return_value = mock_admin_user + elif hasattr(model, "__name__") and model.__name__ == "Tool": + query.all.return_value = [mock_tool] + else: + query.all.return_value = [] + + return query + + # First commit raises IntegrityError (concurrent worker assigned it first) + mock_db_session.query.side_effect = mock_query_handler + mock_db_session.commit.side_effect = IntegrityError("duplicate key", None, None) + + with patch("mcpgateway.bootstrap_db.settings", mock_settings): + with patch("mcpgateway.bootstrap_db.Session", return_value=mock_db_session): + with patch("mcpgateway.db.EmailUser", Mock(__name__="EmailUser")): + with patch("mcpgateway.db.Tool", Mock(__name__="Tool")): + with patch("mcpgateway.db.Server", Mock(__name__="Server")): + with patch("mcpgateway.db.Resource", Mock(__name__="Resource")): + with patch("mcpgateway.db.Prompt", Mock(__name__="Prompt")): + with patch("mcpgateway.db.Gateway", Mock(__name__="Gateway")): + with patch("mcpgateway.db.A2AAgent", Mock(__name__="A2AAgent")): + with patch("mcpgateway.bootstrap_db.logger") as mock_logger: + await bootstrap_resource_assignments(mock_conn) + + # Verify rollback and expunge were called + mock_db_session.rollback.assert_called() + mock_db_session.expunge.assert_called_with(mock_tool) + + # Verify debug log for skipped resource + mock_logger.debug.assert_called() + debug_call_args = mock_logger.debug.call_args[0][0] + assert "Skipping" in debug_call_args + assert "already assigned by concurrent worker" in debug_call_args + + @pytest.mark.asyncio + async def test_resource_assignments_integrity_error_with_rename(self, mock_settings, mock_db_session, mock_admin_user, mock_personal_team, mock_conn): + """Test IntegrityError with renamed resource cleans up batch_assigned correctly.""" + from sqlalchemy.exc import IntegrityError + + mock_admin_user.get_personal_team.return_value = mock_personal_team + + # Mock two unassigned tools with same name + mock_tool1 = Mock() + mock_tool1.team_id = None + mock_tool1.owner_email = None + mock_tool1.visibility = None + mock_tool1.name = "duplicate-tool" + + mock_tool2 = Mock() + mock_tool2.team_id = None + mock_tool2.owner_email = None + mock_tool2.visibility = None + mock_tool2.name = "other-tool" + + # Mock existing tool with same base name (will trigger rename) + mock_existing = Mock() + mock_existing.name = "duplicate-tool" + mock_existing.team_id = mock_personal_team.id + mock_existing.owner_email = mock_admin_user.email + + call_count = {"query": 0} + + def mock_query_handler(model): + query = MagicMock() + query.filter.return_value = query + + if hasattr(model, "__name__") and model.__name__ == "EmailUser": + query.first.return_value = mock_admin_user + elif hasattr(model, "__name__") and model.__name__ == "Tool": + call_count["query"] += 1 + if call_count["query"] == 1: + # First query: return unassigned tools + query.all.return_value = [mock_tool1, mock_tool2] + else: + # Second query: return existing tool for conflict detection + return query + else: + query.all.return_value = [] + + return query + + commit_count = {"count": 0} + + def mock_commit_side_effect(): + commit_count["count"] += 1 + if commit_count["count"] == 1: + # First commit (tool1) fails with IntegrityError + raise IntegrityError("duplicate key", None, None) + # Second commit (tool2) succeeds + + mock_db_session.query.side_effect = mock_query_handler + mock_db_session.commit.side_effect = mock_commit_side_effect + + with patch("mcpgateway.bootstrap_db.settings", mock_settings): + with patch("mcpgateway.bootstrap_db.Session", return_value=mock_db_session): + with patch("mcpgateway.db.EmailUser", Mock(__name__="EmailUser")): + with patch("mcpgateway.db.Tool", Mock(__name__="Tool")): + with patch("mcpgateway.db.Server", Mock(__name__="Server")): + with patch("mcpgateway.db.Resource", Mock(__name__="Resource")): + with patch("mcpgateway.db.Prompt", Mock(__name__="Prompt")): + with patch("mcpgateway.db.Gateway", Mock(__name__="Gateway")): + with patch("mcpgateway.db.A2AAgent", Mock(__name__="A2AAgent")): + with patch("mcpgateway.bootstrap_db.logger") as mock_logger: + await bootstrap_resource_assignments(mock_conn) + + # Verify rollback and expunge were called for failed resource + assert mock_db_session.rollback.call_count >= 1 + assert mock_db_session.expunge.call_count >= 1 + + # Verify second tool was processed (commit called twice) + assert commit_count["count"] == 2 + + @pytest.mark.asyncio + async def test_resource_assignments_multiple_integrity_errors(self, mock_settings, mock_db_session, mock_admin_user, mock_personal_team, mock_conn): + """Test multiple IntegrityErrors in same batch are handled independently.""" + from sqlalchemy.exc import IntegrityError + + mock_admin_user.get_personal_team.return_value = mock_personal_team + + # Mock three unassigned tools + mock_tool1 = Mock() + mock_tool1.team_id = None + mock_tool1.owner_email = None + mock_tool1.visibility = None + mock_tool1.name = "tool-1" + + mock_tool2 = Mock() + mock_tool2.team_id = None + mock_tool2.owner_email = None + mock_tool2.visibility = None + mock_tool2.name = "tool-2" + + mock_tool3 = Mock() + mock_tool3.team_id = None + mock_tool3.owner_email = None + mock_tool3.visibility = None + mock_tool3.name = "tool-3" + + def mock_query_handler(model): + query = MagicMock() + query.filter.return_value = query + + if hasattr(model, "__name__") and model.__name__ == "EmailUser": + query.first.return_value = mock_admin_user + elif hasattr(model, "__name__") and model.__name__ == "Tool": + query.all.return_value = [mock_tool1, mock_tool2, mock_tool3] + else: + query.all.return_value = [] + + return query + + commit_count = {"count": 0} + + def mock_commit_side_effect(): + commit_count["count"] += 1 + if commit_count["count"] in [1, 3]: + # First and third commits fail (tool-1 and tool-3) + raise IntegrityError("duplicate key", None, None) + # Second commit succeeds (tool-2) + + mock_db_session.query.side_effect = mock_query_handler + mock_db_session.commit.side_effect = mock_commit_side_effect + + with patch("mcpgateway.bootstrap_db.settings", mock_settings): + with patch("mcpgateway.bootstrap_db.Session", return_value=mock_db_session): + with patch("mcpgateway.db.EmailUser", Mock(__name__="EmailUser")): + with patch("mcpgateway.db.Tool", Mock(__name__="Tool")): + with patch("mcpgateway.db.Server", Mock(__name__="Server")): + with patch("mcpgateway.db.Resource", Mock(__name__="Resource")): + with patch("mcpgateway.db.Prompt", Mock(__name__="Prompt")): + with patch("mcpgateway.db.Gateway", Mock(__name__="Gateway")): + with patch("mcpgateway.db.A2AAgent", Mock(__name__="A2AAgent")): + with patch("mcpgateway.bootstrap_db.logger") as mock_logger: + await bootstrap_resource_assignments(mock_conn) + + # Verify rollback called twice (for tool-1 and tool-3) + assert mock_db_session.rollback.call_count == 2 + + # Verify expunge called twice + assert mock_db_session.expunge.call_count == 2 + + # Verify all three commits were attempted + assert commit_count["count"] == 3 + + # Verify success message logged with 1 assigned resource + mock_logger.info.assert_any_call("Successfully assigned 1 orphaned resources to admin team") + class TestMain: """Test main function."""