Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,6 @@ Server/tasks/sleep_quality/Cargo.lock

.venv_test/
Server/tasks/sleep_quality/target
Server/tasks/synthid/target
Server/cert/

5 changes: 5 additions & 0 deletions Server/.env_dev
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ CELERY_WORKER_COUNT_USECASE_QUEUE=2
CELERY_WORKER_CONCURRENCY_AD_QUEUE=2
CELERY_WORKER_COUNT_AD_QUEUE=1

# SynthID Worker Configuration
CELERY_WORKER_CONCURRENCY_SYNTHID_QUEUE=1
CELERY_WORKER_COUNT_SYNTHID_QUEUE=1

# Container names
REDIS_CONTAINER_NAME=dev_container_redis_bd
FASTAPI_CONTAINER_NAME=dev_container_fastapi_app
CELERY_USECASE_CONTAINER_NAME=dev_container_celery_usecases
CELERY_ADS_CONTAINER_NAME=dev_container_celery_ads
CELERY_SYNTHID_CONTAINER_NAME=dev_container_celery_synthid
5 changes: 5 additions & 0 deletions Server/.env_prod
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,13 @@ CELERY_WORKER_COUNT_USECASE_QUEUE=2
CELERY_WORKER_CONCURRENCY_AD_QUEUE=2
CELERY_WORKER_COUNT_AD_QUEUE=1

# SynthID Worker Configuration
CELERY_WORKER_CONCURRENCY_SYNTHID_QUEUE=1
CELERY_WORKER_COUNT_SYNTHID_QUEUE=1

# Container names
REDIS_CONTAINER_NAME=prod_container_redis_bd
FASTAPI_CONTAINER_NAME=prod_container_fastapi_app
CELERY_USECASE_CONTAINER_NAME=prod_container_celery_usecases
CELERY_ADS_CONTAINER_NAME=prod_container_celery_ads
CELERY_SYNTHID_CONTAINER_NAME=prod_container_celery_synthid
5 changes: 5 additions & 0 deletions Server/.env_staging
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,13 @@ CELERY_WORKER_COUNT_USECASE_QUEUE=2
CELERY_WORKER_CONCURRENCY_AD_QUEUE=2
CELERY_WORKER_COUNT_AD_QUEUE=1

# SynthID Worker Configuration
CELERY_WORKER_CONCURRENCY_SYNTHID_QUEUE=1
CELERY_WORKER_COUNT_SYNTHID_QUEUE=1

# Container names
REDIS_CONTAINER_NAME=staging_container_redis_bd
FASTAPI_CONTAINER_NAME=staging_container_fastapi_app
CELERY_USECASE_CONTAINER_NAME=staging_container_celery_usecases
CELERY_ADS_CONTAINER_NAME=staging_container_celery_ads
CELERY_SYNTHID_CONTAINER_NAME=staging_container_celery_synthid
21 changes: 20 additions & 1 deletion Server/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ machine ?= c5.4xlarge
TESTS = ad_targeting weight_stats sleep_quality endpoints

.PHONY: check_certificates certificates
.PHONY: docker_build docker_run docker_build_run
.PHONY: docker_build docker_run docker_build_run clean_restart shutdown restart
.PHONY: tests_build tests_run
.PHONY: clean_files benchmark

Expand Down Expand Up @@ -153,3 +153,22 @@ check_certificates:
else \
echo "✅ Certificate files are present in $$HOST_CERTS_PATH."; \
fi'

clean_restart:
@echo "🧹 Stopping celery workers and clearing queues for environment '$(environment)' with prefix '$(PREFIX)'..."
@bash -c 'set -a && source $(ENV_FILE) && set +a && docker-compose -p $(PREFIX) stop service_celery_usecases service_celery_usecases_2 2>/dev/null || true'
@echo "🗑️ Clearing Redis queues..."
@bash -c 'set -a && source $(ENV_FILE) && set +a && docker-compose -p $(PREFIX) exec -T service_redis redis-cli FLUSHALL 2>/dev/null || true'
@echo "🚀 Restarting celery workers..."
@bash -c 'set -a && source $(ENV_FILE) && set +a && docker-compose -p $(PREFIX) up -d service_celery_usecases service_celery_usecases_2'

shutdown:
@echo "🛑 Shutting down all services for environment '$(environment)' with prefix '$(PREFIX)'..."
@bash -c 'set -a && source $(ENV_FILE) && set +a && docker-compose -p $(PREFIX) down'
@echo "✅ Environment '$(environment)' has been shut down successfully."

restart:
@echo "🔄 Restarting all services for environment '$(environment)' without rebuilding..."
$(MAKE) shutdown environment=$(environment)
$(MAKE) docker_run environment=$(environment)
@echo "✅ Environment '$(environment)' has been restarted successfully."
20 changes: 20 additions & 0 deletions Server/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,23 @@ services:
NVIDIA_DRIVER_CAPABILITIES: compute,utility
restart: $RESTART_POLICY

service_celery_synthid:
env_file:
- $ENV_FILE # Load the environment variables
image: $FINAL_IMAGE_NAME:latest
volumes:
- $HOST_CERTS_PATH:/project/certs:ro
- ./$SHARED_DIR:/project/$SHARED_DIR
depends_on:
- service_redis
- service_fastapi
runtime: $DOCKER_RUNTIME
environment:
RUN_TYPE: "synthid_worker"
DOMAIN_NAME: $DOMAIN_NAME
CELERY_BROKER_URL: $BROKER_URL
CELERY_RESULT_BACKEND: $BACKEND_URL
NVIDIA_VISIBLE_DEVICES: all
NVIDIA_DRIVER_CAPABILITIES: compute,utility
restart: $RESTART_POLICY

6 changes: 4 additions & 2 deletions Server/scripts/docker_run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ else
fi

echo "🚀 [$COMPOSE_PROJECT_NAME]: launching Docker containers using '$DOCKER_COMPOSE_NAME'..."
docker-compose -p "$COMPOSE_PROJECT_NAME" up -d --scale service_celery_usecases="$CELERY_WORKER_COUNT_USECASE_QUEUE"
--scale service_celery_ads="$CELERY_WORKER_COUNT_AD_QUEUE"
docker-compose -p "$COMPOSE_PROJECT_NAME" up -d \
--scale service_celery_usecases="$CELERY_WORKER_COUNT_USECASE_QUEUE" \
--scale service_celery_ads="$CELERY_WORKER_COUNT_AD_QUEUE" \
--scale service_celery_synthid="$CELERY_WORKER_COUNT_SYNTHID_QUEUE"

if [[ "$1" != "ci" ]]; then
echo "[MODE=$1] Following logs..."
Expand Down
9 changes: 9 additions & 0 deletions Server/scripts/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,15 @@ case "$RUN_TYPE" in
--concurrency="$CELERY_WORKER_CONCURRENCY_AD_QUEUE"
;;

synthid_worker)
# Start Celery worker for synthid queue with concurrency 1
echo "🚀 Starting Celery Worker for synthid tasks..."
exec celery -A task_executor.celery_app worker \
--loglevel="$CELERY_LOGLEVEL" \
--queues="synthid_queue" \
--concurrency="$CELERY_WORKER_CONCURRENCY_SYNTHID_QUEUE"
;;

*)
# Invalid RUN_TYPE
echo "RUN_TYPE='$RUN_TYPE' is not valid!"
Expand Down
126 changes: 99 additions & 27 deletions Server/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,33 @@
UploadFile,
)
from fastapi.responses import JSONResponse, StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
import json

from utils import *
from task_executor import *

# Known Celery queues in the system
KNOWN_CELERY_QUEUES = ["usecases", "ads", "synthid_queue"]

# Instanciate FastAPI app
app = FastAPI()

# Configure CORS
origins = [
"https://zama-fhe-private-synthid.static.hf.space",
"http://localhost",
"http://localhost:8000",
]

app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

logger.info(f"🚀 FastAPI server running at {URL}:{FASTAPI_HOST_PORT_HTTPS}")


Expand Down Expand Up @@ -239,9 +259,15 @@ async def start_task(

try:
task_logger.debug(f"START_TASK: Attempting to submit Celery task for UID={get_id_prefix(uid)}, task_name={task_name}, Binary={binary}.")
task = run_binary_task.delay(binary, uid, task_name)

# Determine the queue for the task
task_config = use_cases.get(task_name, {})
queue_name = task_config.get("queue", "usecases") # Default to 'usecases' if not specified

task = run_binary_task.apply_async(args=[binary, uid, task_name], queue=queue_name)

task_logger.info(
f"🚀 Task submitted [task_id=`{get_id_prefix(task.id)}` - UID=`{get_id_prefix(uid)}`] for task_name=`{task_name}`. Celery task ID: {task.id}"
f"🚀 Task submitted to queue '{queue_name}' [task_id=`{get_id_prefix(task.id)}` - UID=`{get_id_prefix(uid)}`] for task_name=`{task_name}`. Celery task ID: {task.id}"
)
task_logger.debug(f"START_TASK: Completed for UID={get_id_prefix(uid)}, task_name={task_name}. Celery Task ID: {task.id}")
return JSONResponse({"task_id": task.id})
Expand Down Expand Up @@ -367,25 +393,30 @@ def get_task_status(task_id: str = Depends(get_task_id), uid: str = Depends(get_
return response

task_info = {"task_id": task_id, "uid": uid}
# Check if the task is in the Redis broker queue
try:
queued_tasks = redis_bd_broker.lrange("usecases", 0, -1)
total_tasks = len(queued_tasks)
task_logger.debug(f"Pending tasks in Redis broker: {total_tasks}")
for position, task in enumerate(queued_tasks):
task_data = json.loads(task)
if task_id == task_data["headers"]["id"]:
response = {
**STATUS_TEMPLATES["queued"].copy(),
**task_info,
"logger_msg": STATUS_TEMPLATES['queued']['logger_msg'].format(get_id_prefix(task_id), get_id_prefix(uid), position + 1, total_tasks),
}
logger.info(response["logger_msg"])
return response
except Exception as e:
logger.error("❌ Failed to check Redis broker bd: %s", str(e))

# Check if the task is in any of the known Redis broker queues
is_task_queued = False
for queue_name in KNOWN_CELERY_QUEUES:
try:
queued_tasks_in_current_queue = redis_bd_broker.lrange(queue_name, 0, -1)
total_tasks_in_queue = len(queued_tasks_in_current_queue)
task_logger.debug(f"Pending tasks in Redis broker (queue: {queue_name}): {total_tasks_in_queue}")
for position, task_str in enumerate(queued_tasks_in_current_queue):
task_data = json.loads(task_str)
if task_id == task_data["headers"]["id"]:
response = {
**STATUS_TEMPLATES["queued"].copy(),
**task_info,
"details": f"Task is in the Redis broker queue '{queue_name}', waiting to be picked up. Position: {position + 1}/{total_tasks_in_queue}",
"logger_msg": f"📥 [task_id=`{get_id_prefix(task_id)}` - uid=`{get_id_prefix(uid)}`] is queued in '{queue_name}' and waiting. Position: `{position + 1} / {total_tasks_in_queue}`",
}
logger.info(response["logger_msg"])
return response # Task found in this queue
except Exception as e:
logger.error(f"❌ Failed to check Redis broker for queue '{queue_name}': {str(e)}")
# Continue to check other queues

# Check if the task is marked as completed in the Redis backend queue
# Note: Redis only stores task statuses for a limited period of time (Time To Live)
try:
key = f"celery-task-meta-{task_id}"
if redis_bd_backend.exists(key):
Expand All @@ -400,6 +431,21 @@ def get_task_status(task_id: str = Depends(get_task_id), uid: str = Depends(get_
try:
result = AsyncResult(task_id, app=celery_app)
status = result.state.lower()

# Check content of result if Celery task is SUCCESS
if status == 'success':
task_outcome = result.result
if isinstance(task_outcome, dict) and task_outcome.get('status') == 'error':
logger.error(
f"❌ [task_id=`{get_id_prefix(task_id)}` - uid=`{get_id_prefix(uid)}`] Celery task succeeded, but binary execution failed. Detail: {task_outcome.get('detail')}"
)
response_template = STATUS_TEMPLATES["failure"].copy()
response_template["details"] = f"Binary execution failed: {task_outcome.get('detail', 'Unknown error in binary.')}"
response_template["logger_msg"] = f"❌ [task_id=`{get_id_prefix(task_id)}` - uid=`{get_id_prefix(uid)}`] binary execution failed. Detail: {task_outcome.get('detail')}"
response = {**response_template, **task_info}
logger.info(response['logger_msg'])
return response

except Exception as e:
logger.error(f"❌ Failed to get task status for `%s`: `%s`", task_id, str(e))

Expand Down Expand Up @@ -431,11 +477,15 @@ def get_task_status(task_id: str = Depends(get_task_id), uid: str = Depends(get_
return response

# Case, where the status is neither 'completed', 'started', 'unknown' or 'queued'
response = {**STATUS_TEMPLATES[status].copy(), **task_info}
response = {**STATUS_TEMPLATES.get(status, STATUS_TEMPLATES["unknown"]).copy(), **task_info}

if status != 'revoked':
if status != 'revoked' and 'logger_msg' in response and "{}" in response['logger_msg']:
response['logger_msg'] = response['logger_msg'].format(get_id_prefix(task_id), get_id_prefix(uid))

if 'logger_msg' in response:
logger.info(response['logger_msg'])
else:
logger.info(f"ℹ️ Task status for [task_id=`{get_id_prefix(task_id)}` - uid=`{get_id_prefix(uid)}`]: {response.get('status')}, Details: {response.get('details')}")

return response

Expand Down Expand Up @@ -540,7 +590,6 @@ def build_stream_response(task_id, uid, task_name, output_files_config, response

stream_headers = {
"Content-Disposition": f"attachment; filename={output_file_path.name}",
"stderr": stderr_output,
**response
}

Expand Down Expand Up @@ -709,9 +758,32 @@ def get_logs(lines: int = 10) -> Response:

# Get Celery queue information
try:
usecases_queue_length = redis_bd_broker.llen("usecases")
completed_tasks = len(redis_bd_backend.keys("celery-task-meta-*"))
queue_info = f"Queue Status:\nQueued tasks: {usecases_queue_length}\nCompleted in last hour: {completed_tasks}"
# Check all known queues for total queued tasks
total_queued_tasks = 0
for queue_name in KNOWN_CELERY_QUEUES:
queue_length = redis_bd_broker.llen(queue_name)
total_queued_tasks += queue_length

# Count tasks completed in the last hour
completed_tasks = 0
one_hour_ago = datetime.datetime.now() - datetime.timedelta(hours=1)
for key in redis_bd_backend.keys("celery-task-meta-*"):
try:
raw = redis_bd_backend.get(key)
data = json.loads(raw)
if data.get('status') == 'SUCCESS':
# Get the date_done from the task result
date_done = data.get('date_done')
if date_done:
# Parse the ISO format timestamp
completion_time = datetime.datetime.fromisoformat(date_done.replace('Z', '+00:00'))
if completion_time > one_hour_ago:
completed_tasks += 1
except Exception as e:
logger.error(f"Error processing task metadata: {str(e)}")
continue

queue_info = f"Queue Status:\nQueued tasks: {total_queued_tasks}\nCompleted in last hour: {completed_tasks}"
except Exception as e:
queue_info = f"Failed to get queue information: {str(e)}"

Expand Down Expand Up @@ -855,7 +927,7 @@ def get_logs(lines: int = 10) -> Response:
<div class="queue-stats">
<div class="stat-item">
<div>Queued Tasks</div>
<div class="stat-value">{usecases_queue_length}</div>
<div class="stat-value">{total_queued_tasks}</div>
</div>
<div class="stat-item">
<div>Completed (Last Hour)</div>
Expand Down
8 changes: 6 additions & 2 deletions Server/task_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,14 @@
result_expires=60 * 60 * 24 * 30,
# `task_acks_late`: Redispatch unfinished tasks
task_acks_late=True,
# `task_acks_on_failure_or_timeout`: Avoid marking a task as acknowledged if it crashes
# `task_acks_on_failure_or_timeout`: Avoid marking a task as "acknowledged" if it crashes
task_acks_on_failure_or_timeout=False,
# `broker_transport_options`: X seconds before an abandoned task becomes available again
broker_transport_options={"visibility_timeout": 60 * 1},
broker_transport_options={
"visibility_timeout": 600, # Increased to 10 minutes (from 60 seconds)
"fanout_patterns": True, # Recommended for Redis
"fanout_prefix": True, # Recommended for Redis
},
# `worker_prefetch_multiplier`: How many tasks a Celery worker prefetchs before starting it
worker_prefetch_multiplier=1,
task_reject_on_worker_lost=True,
Expand Down
7 changes: 7 additions & 0 deletions Server/tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,10 @@ tasks:
output_files:
- filename: "{uid}.fetch_ad.output.fheencrypted"
response_type: stream

synthid:
binary: synthid
output_files:
- filename: "{uid}.synthid.output.fheencrypted"
response_type: stream
queue: synthid_queue
19 changes: 19 additions & 0 deletions Server/tasks/synthid/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
[package]
name = "synthid"
version = "0.1.0"
edition = "2021"

[dependencies]
bincode = "1.3"
serde = { version = "1.0", features = ["derive"] }
tfhe = { git = "https://github.com/zama-ai/tfhe-rs.git", rev = "1ec21a5e0b7c12165aa7e556c01e730c3117765a", features = ["integer"] }

[profile.release]
opt-level = 3
lto = "thin"
codegen-units = 1
panic = "abort"

[[bin]]
name = "synthid"
path = "src/main.rs"
Loading