Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions autogpt_platform/backend/backend/executor/cluster_lock_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,14 +188,14 @@ class TestClusterLockExpiry:
def test_lock_natural_expiry(self, redis_client, lock_key, owner_id):
"""Test lock expires naturally via Redis TTL."""
lock = ClusterLock(
redis_client, lock_key, owner_id, timeout=2
) # 2 second timeout
redis_client, lock_key, owner_id, timeout=1
) # 1 second timeout

lock.try_acquire()
assert redis_client.exists(lock_key) == 1

# Wait for expiry
time.sleep(3)
time.sleep(1.2)
assert redis_client.exists(lock_key) == 0

# New lock with same key should succeed
Expand All @@ -205,18 +205,18 @@ def test_lock_natural_expiry(self, redis_client, lock_key, owner_id):
def test_lock_refresh_prevents_expiry(self, redis_client, lock_key, owner_id):
"""Test refreshing prevents lock from expiring."""
lock = ClusterLock(
redis_client, lock_key, owner_id, timeout=3
) # 3 second timeout
redis_client, lock_key, owner_id, timeout=2
) # 2 second timeout

lock.try_acquire()

# Wait and refresh before expiry
time.sleep(1)
time.sleep(0.5)
lock._last_refresh = 0 # Force refresh past rate limit
assert lock.refresh() is True

# Wait beyond original timeout
time.sleep(2.5)
time.sleep(1.8)
assert redis_client.exists(lock_key) == 1 # Should still exist


Expand Down Expand Up @@ -249,16 +249,16 @@ def try_acquire_lock(thread_id):
assert len(successful_acquisitions) == 1

def test_sequential_lock_reuse(self, redis_client, lock_key):
"""Test lock can be reused after natural expiry."""
"""Test lock can be reused after release."""
owners = [str(uuid.uuid4()) for _ in range(3)]

for i, owner_id in enumerate(owners):
lock = ClusterLock(redis_client, lock_key, owner_id, timeout=1) # 1 second
lock = ClusterLock(redis_client, lock_key, owner_id, timeout=60)

assert lock.try_acquire() == owner_id
time.sleep(1.5) # Wait for expiry
lock.release() # Release immediately instead of waiting for expiry

# Verify lock expired
# Verify lock released
assert redis_client.exists(lock_key) == 0

def test_refresh_during_concurrent_access(self, redis_client, lock_key):
Expand Down Expand Up @@ -445,18 +445,18 @@ def test_long_running_execution_with_refresh(
):
"""Test lock maintains ownership during long execution with periodic refresh."""
lock = ClusterLock(
redis_client, lock_key, owner_id, timeout=30
) # 30 second timeout, refresh interval = max(30//10, 1) = 3 seconds
redis_client, lock_key, owner_id, timeout=5
) # 5 second timeout, refresh interval = max(5//10, 1) = 1 second

def long_execution_with_refresh():
"""Simulate long-running execution with periodic refresh."""
assert lock.try_acquire() == owner_id

# Simulate 10 seconds of work with refreshes every 2 seconds
# This respects rate limiting - actual refreshes will happen at 0s, 3s, 6s, 9s
# Simulate 2 seconds of work with refreshes
try:
for i in range(5): # 5 iterations * 2 seconds = 10 seconds total
time.sleep(2)
for i in range(2): # 2 iterations * 0.5 seconds = 1 second total
time.sleep(0.5)
lock._last_refresh = 0 # Force refresh past rate limit
refresh_success = lock.refresh()
assert refresh_success is True, f"Refresh failed at iteration {i}"
return "completed"
Expand All @@ -471,7 +471,7 @@ def test_graceful_degradation_pattern(self, redis_client, lock_key):
"""Test graceful degradation when Redis becomes unavailable."""
owner_id = str(uuid.uuid4())
lock = ClusterLock(
redis_client, lock_key, owner_id, timeout=3
redis_client, lock_key, owner_id, timeout=1
) # Use shorter timeout

# Normal operation
Expand All @@ -484,7 +484,7 @@ def test_graceful_degradation_pattern(self, redis_client, lock_key):
lock.redis = redis.Redis(
host="invalid_host",
port=1234,
socket_connect_timeout=1,
socket_connect_timeout=0.5,
decode_responses=False,
)

Expand All @@ -495,8 +495,8 @@ def test_graceful_degradation_pattern(self, redis_client, lock_key):

# Restore Redis and verify can acquire again
lock.redis = original_redis
# Wait for original lock to expire (use longer wait for 3s timeout)
time.sleep(4)
# Wait for original lock to expire
time.sleep(1.2)

new_lock = ClusterLock(redis_client, lock_key, owner_id, timeout=60)
assert new_lock.try_acquire() == owner_id
Expand Down
26 changes: 13 additions & 13 deletions autogpt_platform/backend/backend/util/cache_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,7 +432,7 @@ def test_ttl_functionality(self):
"""Test TTL functionality with sync function."""
call_count = 0

@cached(maxsize=10, ttl_seconds=1) # Short TTL
@cached(maxsize=10, ttl_seconds=0.3) # Short TTL
def ttl_function(x: int) -> int:
nonlocal call_count
call_count += 1
Expand All @@ -449,7 +449,7 @@ def ttl_function(x: int) -> int:
assert call_count == 1

# Wait for TTL to expire
time.sleep(1.1)
time.sleep(0.35)

# Third call after expiration - should call function again
result3 = ttl_function(3)
Expand All @@ -461,7 +461,7 @@ async def test_async_ttl_functionality(self):
"""Test TTL functionality with async function."""
call_count = 0

@cached(maxsize=10, ttl_seconds=1) # Short TTL
@cached(maxsize=10, ttl_seconds=0.3) # Short TTL
async def async_ttl_function(x: int) -> int:
nonlocal call_count
call_count += 1
Expand All @@ -479,7 +479,7 @@ async def async_ttl_function(x: int) -> int:
assert call_count == 1

# Wait for TTL to expire
await asyncio.sleep(1.1)
await asyncio.sleep(0.35)

# Third call after expiration - should call function again
result3 = await async_ttl_function(3)
Expand Down Expand Up @@ -761,16 +761,16 @@ def ttl_refresh_function(x: int) -> int:
assert result1 == 30
assert call_count == 1

# Wait 1 second
time.sleep(1)
# Wait 0.5 second
time.sleep(0.5)

# Second call - should refresh TTL and use cache
result2 = ttl_refresh_function(3)
assert result2 == 30
assert call_count == 1

# Wait another 1.5 seconds (total 2.5s from first call, 1.5s from second)
time.sleep(1.5)
# Wait another 1.0 second (total 1.5s from first call, 1.0s from second)
time.sleep(1.0)

# Third call - TTL should have been refreshed, so still cached
result3 = ttl_refresh_function(3)
Expand All @@ -792,7 +792,7 @@ def test_shared_cache_without_ttl_refresh(self):
"""Test that TTL doesn't refresh when refresh_ttl_on_get=False."""
call_count = 0

@cached(ttl_seconds=2, shared_cache=True, refresh_ttl_on_get=False)
@cached(ttl_seconds=1, shared_cache=True, refresh_ttl_on_get=False)
def no_ttl_refresh_function(x: int) -> int:
nonlocal call_count
call_count += 1
Expand All @@ -806,16 +806,16 @@ def no_ttl_refresh_function(x: int) -> int:
assert result1 == 40
assert call_count == 1

# Wait 1 second
time.sleep(1)
# Wait 0.4 seconds
time.sleep(0.4)

# Second call - should use cache but NOT refresh TTL
result2 = no_ttl_refresh_function(4)
assert result2 == 40
assert call_count == 1

# Wait another 1.1 seconds (total 2.1s from first call)
time.sleep(1.1)
# Wait another 0.7 seconds (total 1.1s from first call)
time.sleep(0.7)

# Third call - should have expired
result3 = no_ttl_refresh_function(4)
Expand Down
4 changes: 2 additions & 2 deletions autogpt_platform/backend/backend/util/retry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,10 @@ def test_function():
async def test_conn_retry_async_function():
retry_count = 0

@conn_retry("Test", "Test function", max_retry=2, max_wait=0.1)
@conn_retry("Test", "Test function", max_retry=2, max_wait=0.01)
async def test_function():
nonlocal retry_count
await asyncio.sleep(1)
await asyncio.sleep(0.01)
retry_count -= 1
if retry_count > 0:
raise ValueError("Test error")
Expand Down
16 changes: 8 additions & 8 deletions autogpt_platform/backend/backend/util/service_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def __init__(self):
self.cleanup_completed = False

@expose
async def slow_endpoint(self, duration: int = 5) -> dict:
async def slow_endpoint(self, duration: float = 1.0) -> dict:
"""Endpoint that takes time to complete"""
start_time = time.time()
self.request_log.append(f"slow_endpoint started at {start_time}")
Expand Down Expand Up @@ -575,7 +575,7 @@ async def wait_until_service_ready(base_url: str, timeout: float = 10):
async def send_slow_request(base_url: str) -> dict:
"""Send a slow request and return the result"""
async with httpx.AsyncClient(timeout=30) as client:
response = await client.post(f"{base_url}/slow_endpoint", json={"duration": 5})
response = await client.post(f"{base_url}/slow_endpoint", json={"duration": 1.0})
assert response.status_code == 200
return response.json()

Expand All @@ -589,14 +589,14 @@ async def test_graceful_shutdown(test_service):
slow_task = asyncio.create_task(send_slow_request(test_service_url))

# Give the slow request time to start
await asyncio.sleep(1)
await asyncio.sleep(0.2)

# Send SIGTERM to the service process
shutdown_start_time = time.time()
service.process.terminate() # This sends SIGTERM

# Wait a moment for shutdown to start
await asyncio.sleep(0.5)
await asyncio.sleep(0.2)

# Try to send a new request - should be rejected or connection refused
try:
Expand All @@ -612,18 +612,18 @@ async def test_graceful_shutdown(test_service):
# The slow request should still complete successfully
slow_result = await slow_task
assert slow_result["message"] == "completed"
assert 4.9 < slow_result["duration"] < 5.5 # Should have taken ~5 seconds
assert 0.9 < slow_result["duration"] < 1.5 # Should have taken ~1 second

# Wait for the service to fully shut down
service.process.join(timeout=15)
service.process.join(timeout=10)
shutdown_end_time = time.time()

# Verify the service actually terminated
assert not service.process.is_alive()

# Verify shutdown took reasonable time (slow request - 1s + cleanup)
# Verify shutdown took reasonable time
shutdown_duration = shutdown_end_time - shutdown_start_time
assert 4 <= shutdown_duration <= 6 # ~5s request - 1s + buffer
assert 0.5 <= shutdown_duration <= 3 # ~1s request + buffer

print(f"Shutdown took {shutdown_duration:.2f} seconds")
print(f"Slow request completed in: {slow_result['duration']:.2f} seconds")
Expand Down
8 changes: 1 addition & 7 deletions autogpt_platform/backend/docker-compose.test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ networks:

volumes:
supabase-config:
clamav-data:

x-agpt-services:
&agpt-services
Expand All @@ -19,10 +20,6 @@ x-supabase-services:
- app-network
- shared-network


volumes:
clamav-data:

services:

db:
Expand Down Expand Up @@ -85,6 +82,3 @@ services:
interval: 30s
timeout: 10s
retries: 3
networks:
app-network-test:
driver: bridge
38 changes: 37 additions & 1 deletion autogpt_platform/backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions autogpt_platform/backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pytest-watcher = "^0.4.2"
requests = "^2.32.5"
ruff = "^0.14.5"
# NOTE: please insert new dependencies in their alphabetical location
pytest-xdist = "^3.8.0"

[build-system]
requires = ["poetry-core"]
Expand Down
Loading