Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 32 additions & 2 deletions mcpgateway/bootstrap_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from filelock import FileLock
from sqlalchemy import create_engine, or_, text
from sqlalchemy.engine import Connection
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session

# First-Party
Expand Down Expand Up @@ -741,9 +742,11 @@ def _like_safe(v: str) -> str:

# Track names claimed within this batch to catch intra-batch duplicates
batch_assigned: set[str] = set()
assigned_count = 0

for resource in unassigned:
original_value = getattr(resource, field)
added_to_batch = None # Track what we add to batch_assigned for cleanup on rollback

if original_value is not None:
taken = existing_taken | batch_assigned
Expand All @@ -757,17 +760,44 @@ def _like_safe(v: str) -> str:
)
setattr(resource, field, new_value)
batch_assigned.add(new_value)
added_to_batch = new_value
else:
batch_assigned.add(original_value)
added_to_batch = original_value

resource.team_id = personal_team.id
resource.owner_email = admin_user.email
resource.visibility = "public" # Make visible to all users
if hasattr(resource, "federation_source") and not resource.federation_source:
resource.federation_source = "mcpgateway-0.7.0-migration"

db.commit()
total_assigned += len(unassigned)
# Per-row commit with race-condition handling (issue #4993)
# If another worker assigned this resource concurrently, gracefully skip it
try:
db.commit()
assigned_count += 1
except IntegrityError:
# Another worker assigned this resource first - rollback and continue
db.rollback()

# Expunge the resource from session to prevent re-flush on subsequent commits
# Without this, the rolled-back resource remains "dirty" and SQLAlchemy will
# attempt to flush it again on every subsequent commit in this loop
db.expunge(resource)

# Clean up batch_assigned to prevent false conflicts in subsequent iterations
# Use the tracked value since resource state is unpredictable after expunge
if added_to_batch is not None:
batch_assigned.discard(added_to_batch)

logger.debug(
f"Skipping {SecurityValidator.sanitize_log_message(resource_name)} "
f"'{SecurityValidator.sanitize_log_message(str(original_value))}' "
f"- already assigned by concurrent worker"
)
continue

total_assigned += assigned_count

except Exception as e:
logger.error(f"Failed to assign {SecurityValidator.sanitize_log_message(resource_name)}: {SecurityValidator.sanitize_log_message(str(e))}")
Expand Down
204 changes: 204 additions & 0 deletions scripts/test_issue_4993_fix.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
#!/bin/bash
# Testing script for Issue #4993 fix
# Tests that concurrent bootstrap processes no longer race on orphaned resource assignments
#
# Prerequisites:
# - PostgreSQL running and accessible
# - Virtual environment activated (source .venv/bin/activate)
# - DATABASE_URL environment variable set (or defaults to local PostgreSQL)
#
# Usage:
# bash scripts/test_issue_4993_fix.sh

set -e

echo "=========================================="
echo "Issue #4993 Fix - Race Condition Test"
echo "=========================================="
echo ""

# Check if we're in a virtual environment
if [ -z "$VIRTUAL_ENV" ]; then
echo "❌ Error: Virtual environment not activated"
echo "Please run: source .venv/bin/activate"
exit 1
fi

# Default DATABASE_URL if not set
DEFAULT_DB_URL="postgresql+psycopg://postgres:mysecretpassword@localhost:5432/mcp"
DB_URL="${DATABASE_URL:-$DEFAULT_DB_URL}"
DB_URL_CLEAN="${DB_URL/+psycopg/}"

echo "Configuration:"
echo " Database: ${DB_URL}"
echo " Test workers: 30 per run"
echo " Test iterations: 5"
echo " Orphaned tools per test: 30"
echo ""

# Verify PostgreSQL connection
echo "Checking PostgreSQL connection..."
if ! python -c "
import psycopg
try:
conn = psycopg.connect('${DB_URL_CLEAN}')
conn.close()
print('✅ PostgreSQL connection successful')
except Exception as e:
print(f'❌ Failed to connect to PostgreSQL: {e}')
exit(1)
" 2>/dev/null; then
echo "❌ Cannot connect to PostgreSQL"
echo "Please ensure PostgreSQL is running and DATABASE_URL is correct"
exit 1
fi

echo ""

# Test parameters
NUM_TESTS=5
WORKERS_PER_TEST=30
ORPHANED_TOOLS=30

race_detected=0
total_conflicts=0

for test_num in $(seq 1 $NUM_TESTS); do
echo "=========================================="
echo "Test Run $test_num/$NUM_TESTS"
echo "=========================================="

# Setup: Insert orphaned tools
echo "[1/4] Setting up $ORPHANED_TOOLS orphaned tools..."
python << SETUP
import psycopg
import uuid
import os

num_tools = ${ORPHANED_TOOLS}
conn = psycopg.connect('${DB_URL_CLEAN}')
cur = conn.cursor()

# Clean up previous test tools
cur.execute("DELETE FROM tools WHERE name LIKE 'issue4993-test-tool%'")

# Insert orphaned tools
for i in range(num_tools):
tool_id = uuid.uuid4().hex
tool_name = f'issue4993-test-tool-{i:03d}'
cur.execute("""
INSERT INTO tools (
id, original_name, name, description, input_schema,
custom_name, custom_name_slug, integration_type, request_type,
enabled, deprecated, reachable, jsonpath_filter, tags, version, visibility,
created_at, updated_at
) VALUES (%s, %s, %s, %s, '{}'::jsonb, %s, %s, %s, %s, %s, %s, %s, %s,
'[]'::jsonb, %s, %s, NOW(), NOW())
""", (tool_id, tool_name, tool_name, f'Test tool {i}', tool_name, tool_name,
'MCP', 'SSE', True, False, True, '', 1, 'private'))

conn.commit()
cur.execute("SELECT COUNT(*) FROM tools WHERE team_id IS NULL AND name LIKE 'issue4993-test-tool%'")
count = cur.fetchone()[0]
print(f"✓ Created {count} orphaned tools")
conn.close()
SETUP

# Launch concurrent workers
echo "[2/4] Launching $WORKERS_PER_TEST concurrent workers..."
rm -f test_issue4993_*.log 2>/dev/null || true

start_time=$(date +%s)
for i in $(seq 1 $WORKERS_PER_TEST); do
(python -m mcpgateway.bootstrap_db > test_issue4993_$i.log 2>&1) &
done

# Wait for all workers
wait
end_time=$(date +%s)
duration=$((end_time - start_time))

echo "[3/4] All workers completed in ${duration}s"

# Analyze results
echo "[4/4] Analyzing results..."

# Check for name conflicts (evidence of race condition)
conflicts=$(grep -c "Name conflict" test_issue4993_*.log 2>/dev/null | grep -v ":0$" | wc -l | tr -d ' ')

if [ "$conflicts" -gt 0 ]; then
echo " ❌ RACE DETECTED: $conflicts worker(s) saw name conflicts"
race_detected=$((race_detected + 1))
total_conflicts=$((total_conflicts + conflicts))

# Show which workers saw conflicts
echo " Workers with conflicts:"
grep -l "Name conflict" test_issue4993_*.log 2>/dev/null | head -5 | while read log; do
count=$(grep -c "Name conflict" "$log")
echo " - $log: $count conflicts"
done
else
echo " ✅ No race condition detected"
fi

# Verify advisory lock usage
lock_acquisitions=$(grep -c "Acquired lock for bootstrap helpers" test_issue4993_*.log 2>/dev/null | grep -v ":0$" | wc -l | tr -d ' ')
if [ "$lock_acquisitions" -gt 0 ]; then
echo " ✅ Advisory lock used by $lock_acquisitions worker(s)"
fi

# Check database state
python << CHECK
import psycopg
conn = psycopg.connect('${DB_URL_CLEAN}')
cur = conn.cursor()

cur.execute("SELECT COUNT(*) FROM tools WHERE name LIKE 'issue4993-test-tool%' AND team_id IS NULL")
orphaned = cur.fetchone()[0]

cur.execute("SELECT COUNT(*) FROM tools WHERE name LIKE 'issue4993-test-tool%' AND team_id IS NOT NULL")
assigned = cur.fetchone()[0]

# Check for renamed tools (indicates race condition occurred)
cur.execute("SELECT COUNT(*) FROM tools WHERE name ~ 'issue4993-test-tool-[0-9]+-[0-9]+'")
renamed = cur.fetchone()[0]

print(f" Database state: {orphaned} orphaned, {assigned} assigned, {renamed} renamed")

if orphaned > 0:
print(f" ⚠️ WARNING: {orphaned} tools remain orphaned")
if renamed > 0:
print(f" ⚠️ WARNING: {renamed} tools were renamed (race occurred)")

conn.close()
CHECK

echo ""
done

# Cleanup test logs
echo "Cleaning up test logs..."
rm -f test_issue4993_*.log

# Final summary
echo "=========================================="
echo "Test Summary"
echo "=========================================="
echo "Total test runs: $NUM_TESTS"
echo "Races detected: $race_detected out of $NUM_TESTS"
echo "Total workers with conflicts: $total_conflicts"
echo ""

if [ "$race_detected" -eq 0 ]; then
echo "✅✅✅ SUCCESS! No race conditions detected!"
echo ""
echo "The fix is working correctly. Multiple concurrent workers"
echo "are properly serialized through the advisory lock mechanism."
exit 0
else
echo "❌ Race conditions detected in $race_detected test run(s)"
echo ""
echo "This indicates the race condition is not fully resolved."
echo "Please review the fix implementation."
exit 1
fi
Loading
Loading