Skip to content

Commit 14b29d0

Browse files
committed
queue
1 parent 5fe033d commit 14b29d0

File tree

4 files changed

+195
-86
lines changed

4 files changed

+195
-86
lines changed

app.py

Lines changed: 72 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -170,39 +170,64 @@ def validate_tts_request(body: Dict[str, Any]) -> Tuple[Optional[Dict[str, Any]]
170170
logger.error(f"Error validating request: {str(e)}")
171171
return None, "Invalid request format", 400
172172

173-
def get_queue_size() -> int:
174-
"""Get the current queue size from Celery"""
173+
def get_queue_details() -> Dict[str, Any]:
174+
"""Get detailed queue counts from Celery workers."""
175+
details = {
176+
'active': 0,
177+
'reserved': 0,
178+
'scheduled': 0,
179+
'total_reported_by_workers': 0,
180+
'error': None
181+
}
175182
try:
176-
i = celery.control.inspect()
177-
active = i.active()
178-
reserved = i.reserved()
179-
scheduled = i.scheduled()
183+
i = celery.control.inspect(timeout=1.0) # Add timeout
184+
if not i:
185+
details['error'] = "Could not connect to Celery workers for inspection."
186+
return details
187+
188+
active_tasks = i.active()
189+
reserved_tasks = i.reserved()
190+
scheduled_tasks = i.scheduled()
180191

181-
total_size = 0
182-
if active:
183-
total_size += sum(len(tasks) for tasks in active.values())
184-
if reserved:
185-
total_size += sum(len(tasks) for tasks in reserved.values())
186-
if scheduled:
187-
total_size += sum(len(tasks) for tasks in scheduled.values())
192+
if active_tasks:
193+
details['active'] = sum(len(tasks) for tasks in active_tasks.values())
194+
if reserved_tasks:
195+
details['reserved'] = sum(len(tasks) for tasks in reserved_tasks.values())
196+
if scheduled_tasks:
197+
details['scheduled'] = sum(len(tasks) for tasks in scheduled_tasks.values())
198+
199+
details['total_reported_by_workers'] = details['active'] + details['reserved'] + details['scheduled']
188200

189-
return total_size
190201
except Exception as e:
191-
logger.error(f"Error calculating queue size: {str(e)}")
192-
return 0
202+
logger.error(f"Error calculating queue details: {str(e)}")
203+
details['error'] = f"Failed to inspect Celery workers: {str(e)}"
204+
# Reset counts to 0 on error to avoid misleading data
205+
details['active'] = 0
206+
details['reserved'] = 0
207+
details['scheduled'] = 0
208+
details['total_reported_by_workers'] = 0
209+
210+
return details
193211

194212
@app.route('/v1/audio/speech', methods=['POST'])
195213
def openai_speech() -> Response:
196214
"""Handle POST requests to /v1/audio/speech (OpenAI compatible API)"""
197215
try:
198-
# Check queue size from Celery
199-
current_size = get_queue_size()
200-
if current_size >= MAX_QUEUE_SIZE:
201-
logger.warning(f"Queue is full. Current size: {current_size}, Max size: {MAX_QUEUE_SIZE}")
216+
# Check queue size from Celery worker reports
217+
queue_details = get_queue_details()
218+
current_total = queue_details['total_reported_by_workers']
219+
220+
# Check for inspection errors
221+
if queue_details['error']:
222+
logger.warning(f"Could not determine queue size due to inspection error: {queue_details['error']}. Allowing request.")
223+
# Optionally, you could reject here, but allowing might be safer if inspection is flaky
224+
225+
elif current_total >= MAX_QUEUE_SIZE:
226+
logger.warning(f"Queue is full based on worker reports. Current total: {current_total}, Max size: {MAX_QUEUE_SIZE}")
202227
return jsonify({
203228
"error": "Queue is full. Please try again later.",
204-
"queue_size": current_size,
205-
"max_queue_size": MAX_QUEUE_SIZE
229+
"queue_details": queue_details, # Provide detailed counts
230+
"max_queue_size_limit": MAX_QUEUE_SIZE
206231
}), 429
207232

208233
# Read and validate JSON data
@@ -262,19 +287,34 @@ def openai_speech() -> Response:
262287

263288
@app.route('/api/queue-size', methods=['GET'])
264289
def queue_size() -> Response:
265-
"""Handle GET requests to /api/queue-size"""
290+
"""Handle GET requests to /api/queue-size with detailed counts"""
266291
try:
267-
current_size = get_queue_size()
268-
return jsonify({
269-
"queue_size": current_size,
270-
"max_queue_size": MAX_QUEUE_SIZE
271-
})
292+
queue_details = get_queue_details()
293+
294+
response_data = {
295+
"active_tasks": queue_details['active'],
296+
"reserved_tasks": queue_details['reserved'],
297+
"scheduled_tasks": queue_details['scheduled'],
298+
"total_reported_by_workers": queue_details['total_reported_by_workers'],
299+
"max_queue_size_limit": MAX_QUEUE_SIZE,
300+
"error": queue_details['error']
301+
}
302+
303+
# Determine status code based on whether there was an inspection error
304+
status_code = 500 if queue_details['error'] else 200
305+
306+
return jsonify(response_data), status_code
307+
272308
except Exception as e:
273-
logger.error(f"Error getting queue size: {str(e)}")
309+
# This handles errors in the route handler itself, not inspection errors
310+
logger.error(f"Error in /api/queue-size endpoint: {str(e)}")
274311
return jsonify({
275-
"queue_size": 0,
276-
"max_queue_size": MAX_QUEUE_SIZE,
277-
"error": "Failed to get queue status"
312+
"active_tasks": 0,
313+
"reserved_tasks": 0,
314+
"scheduled_tasks": 0,
315+
"total_reported_by_workers": 0,
316+
"max_queue_size_limit": MAX_QUEUE_SIZE,
317+
"error": "Failed to process queue status request"
278318
}), 500
279319

280320
@app.route('/api/voice-sample/<voice>', methods=['GET'])

static/index.html

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,19 +59,28 @@ <h3>Queue Status</h3>
5959
<div class="status-indicator" id="status-indicator"></div>
6060
</div>
6161
<div class="queue-stats">
62-
<div class="stat-item">
63-
<span class="stat-label">Active Requests:</span>
64-
<span class="stat-value" id="queue-size">0</span>
62+
<div class="stat-item">
63+
<span class="stat-label">Processing Now:</span>
64+
<span class="stat-value" id="processing-tasks">0</span>
65+
</div>
66+
<div class="stat-item">
67+
<span class="stat-label">Waiting in Queue:</span>
68+
<span class="stat-value" id="waiting-tasks">0</span>
69+
</div>
70+
<div class="stat-item">
71+
<span class="stat-label">Total Reported:</span>
72+
<span class="stat-value" id="total-tasks">0</span>
6573
</div>
6674
<div class="stat-item">
67-
<span class="stat-label">Maximum Capacity:</span>
75+
<span class="stat-label">Max Capacity:</span>
6876
<span class="stat-value" id="max-queue-size">-</span>
6977
</div>
7078
</div>
7179
<div class="queue-progress-container">
7280
<div class="queue-progress-bar" id="queue-progress-bar"></div>
7381
</div>
7482
<div class="queue-load-text" id="queue-load-text">No Load</div>
83+
<div class="queue-error-text" id="queue-error-text" style="display: none; color: red; margin-top: 5px;"></div>
7584
</div>
7685
</div>
7786
</section>
@@ -396,10 +405,14 @@ <h3>Queue System</h3>
396405

397406
<h4>Queue Status Endpoint</h4>
398407
<pre><code class="language-http">GET /api/queue-size</code></pre>
399-
<p>Returns JSON with queue information:</p>
408+
<p>Returns JSON with detailed queue information:</p>
400409
<pre><code class="language-json">{
401-
"queue_size": 0, // Current number of requests in queue
402-
"max_queue_size": 100 // Maximum queue capacity
410+
"active_tasks": 1, // Tasks currently being processed
411+
"reserved_tasks": 0, // Tasks fetched by workers, about to start
412+
"scheduled_tasks": 2, // Tasks scheduled for later execution
413+
"total_reported_by_workers": 3, // Total tasks known by active workers (active + reserved + scheduled)
414+
"max_queue_size_limit": 100, // Maximum queue capacity
415+
"error": null // Error message if inspection failed, otherwise null
403416
}</code></pre>
404417

405418
<div class="response-codes">

static/index_zh.html

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,16 @@ <h3>队列状态</h3>
6161
</div>
6262
<div class="queue-stats">
6363
<div class="stat-item">
64-
<span class="stat-label">活动请求:</span>
65-
<span class="stat-value" id="queue-size">0</span>
64+
<span class="stat-label">正在处理:</span>
65+
<span class="stat-value" id="processing-tasks">0</span>
66+
</div>
67+
<div class="stat-item">
68+
<span class="stat-label">队列等待:</span>
69+
<span class="stat-value" id="waiting-tasks">0</span>
70+
</div>
71+
<div class="stat-item">
72+
<span class="stat-label">报告总数:</span>
73+
<span class="stat-value" id="total-tasks">0</span>
6674
</div>
6775
<div class="stat-item">
6876
<span class="stat-label">最大容量:</span>
@@ -73,6 +81,7 @@ <h3>队列状态</h3>
7381
<div class="queue-progress-bar" id="queue-progress-bar"></div>
7482
</div>
7583
<div class="queue-load-text" id="queue-load-text">无负载</div>
84+
<div class="queue-error-text" id="queue-error-text" style="display: none; color: red; margin-top: 5px;"></div>
7685
</div>
7786
</div>
7887
</section>
@@ -397,10 +406,14 @@ <h3>队列系统</h3>
397406

398407
<h4>队列状态端点</h4>
399408
<pre><code class="language-http">GET /api/queue-size</code></pre>
400-
<p>返回包含队列信息的 JSON:</p>
409+
<p>返回包含详细队列信息的 JSON:</p>
401410
<pre><code class="language-json">{
402-
"queue_size": 0, // 当前队列中的请求数
403-
"max_queue_size": 100 // 队列最大容量
411+
"active_tasks": 1, // 当前正在处理的任务数
412+
"reserved_tasks": 0, // 已被工作节点获取、即将开始的任务数
413+
"scheduled_tasks": 2, // 已计划稍后执行的任务数
414+
"total_reported_by_workers": 3, // 活动工作节点已知的总任务数 (活动 + 预留 + 计划)
415+
"max_queue_size_limit": 100, // 队列最大容量
416+
"error": null // 如果检查失败则包含错误信息,否则为 null
404417
}</code></pre>
405418

406419
<div class="response-codes">

static/script.js

Lines changed: 85 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -28,12 +28,16 @@ const translations = {
2828
availableVoices: "Available Voices",
2929
apiReference: "API Reference",
3030
queueStatus: "Queue Status",
31-
activeRequests: "Active Requests",
31+
processingNow: "Processing Now",
32+
waitingInQueue: "Waiting in Queue",
33+
totalReported: "Total Reported",
3234
maxCapacity: "Maximum Capacity",
3335
noLoad: "No Load",
3436
lowLoad: "Low Load",
3537
mediumLoad: "Medium Load",
36-
highLoad: "High Load"
38+
highLoad: "High Load",
39+
error: "Error",
40+
queueError: "Queue status unavailable"
3741
},
3842
zh: {
3943
title: "OpenAI TTS API 文档",
@@ -47,12 +51,16 @@ const translations = {
4751
availableVoices: "可用语音",
4852
apiReference: "API 参考",
4953
queueStatus: "队列状态",
50-
activeRequests: "活动请求",
54+
processingNow: "正在处理",
55+
waitingInQueue: "队列等待",
56+
totalReported: "报告总数",
5157
maxCapacity: "最大容量",
5258
noLoad: "无负载",
5359
lowLoad: "低负载",
5460
mediumLoad: "中负载",
55-
highLoad: "高负载"
61+
highLoad: "高负载",
62+
error: "错误",
63+
queueError: "队列状态不可用"
5664
}
5765
};
5866

@@ -121,60 +129,95 @@ function updateLastUpdate() {
121129

122130
// Function to update queue size with visual indicators
123131
async function updateQueueSize() {
132+
const processingTasksElement = document.getElementById('processing-tasks');
133+
const waitingTasksElement = document.getElementById('waiting-tasks');
134+
const totalTasksElement = document.getElementById('total-tasks');
135+
const maxQueueSizeElement = document.getElementById('max-queue-size');
136+
const queueProgressBar = document.getElementById('queue-progress-bar');
137+
const statusIndicator = document.getElementById('status-indicator');
138+
const queueLoadText = document.getElementById('queue-load-text');
139+
const queueErrorTextElement = document.getElementById('queue-error-text');
140+
124141
try {
125142
const response = await fetch('/api/queue-size');
126-
if (!response.ok) {
127-
throw new Error(`HTTP error! status: ${response.status}`);
143+
// Don't throw immediately for non-200, as 500 might contain error info
144+
const data = await response.json();
145+
146+
if (!response.ok || data.error) {
147+
// Handle error reported by the API (e.g., inspection failure)
148+
console.error('Error fetching queue size:', data.error || `HTTP error! status: ${response.status}`);
149+
showQueueErrorState(data.error || `HTTP ${response.status}`);
150+
return; // Stop processing if there's an error
128151
}
129-
const data = await response.json();
130152

131-
// Get elements
132-
const queueSizeElement = document.getElementById('queue-size');
133-
const maxQueueSizeElement = document.getElementById('max-queue-size');
134-
const queueProgressBar = document.getElementById('queue-progress-bar');
135-
const statusIndicator = document.getElementById('status-indicator');
136-
const queueLoadText = document.getElementById('queue-load-text');
153+
// Clear any previous error message
154+
if (queueErrorTextElement) {
155+
queueErrorTextElement.textContent = '';
156+
queueErrorTextElement.style.display = 'none';
157+
}
158+
159+
// Calculate waiting tasks
160+
const waitingTasks = data.reserved_tasks + data.scheduled_tasks;
137161

138-
// Check if elements exist before updating
139-
if (queueSizeElement) queueSizeElement.textContent = data.queue_size;
140-
if (maxQueueSizeElement) maxQueueSizeElement.textContent = data.max_queue_size;
162+
// Update text content
163+
if (processingTasksElement) processingTasksElement.textContent = data.active_tasks;
164+
if (waitingTasksElement) waitingTasksElement.textContent = waitingTasks;
165+
if (totalTasksElement) totalTasksElement.textContent = data.total_reported_by_workers;
166+
if (maxQueueSizeElement) maxQueueSizeElement.textContent = data.max_queue_size_limit;
141167

142-
// Calculate load percentage
143-
const loadPercentage = (data.queue_size / data.max_queue_size) * 100;
168+
// Calculate load percentage based on total reported tasks
169+
// Avoid division by zero if max_queue_size_limit is 0 or undefined
170+
const maxLimit = data.max_queue_size_limit || 1; // Use 1 to prevent division by zero
171+
const loadPercentage = (data.total_reported_by_workers / maxLimit) * 100;
144172

145-
// Update progress bar if it exists
173+
// Update progress bar
146174
if (queueProgressBar) {
147175
queueProgressBar.style.width = `${Math.min(loadPercentage, 100)}%`;
148176
}
149177

150-
// Update status indicators if they exist
178+
// Update status indicators
151179
if (statusIndicator && queueProgressBar && queueLoadText) {
152180
updateLoadStatus(loadPercentage);
153181
}
154182

155183
} catch (error) {
156-
console.error('Error fetching queue size:', error);
157-
// Show error state in UI if elements exist
158-
const queueSizeElement = document.getElementById('queue-size');
159-
const maxQueueSizeElement = document.getElementById('max-queue-size');
160-
const queueProgressBar = document.getElementById('queue-progress-bar');
161-
const statusIndicator = document.getElementById('status-indicator');
162-
const queueLoadText = document.getElementById('queue-load-text');
163-
164-
if (queueSizeElement) queueSizeElement.textContent = '?';
165-
if (maxQueueSizeElement) maxQueueSizeElement.textContent = '?';
166-
if (queueProgressBar) queueProgressBar.style.width = '0%';
167-
if (statusIndicator) {
168-
statusIndicator.classList.remove('indicator-low', 'indicator-medium', 'indicator-high');
169-
statusIndicator.classList.add('indicator-error');
170-
}
171-
if (queueProgressBar) {
172-
queueProgressBar.classList.remove('progress-low', 'progress-medium', 'progress-high');
173-
}
174-
if (queueLoadText) {
175-
queueLoadText.classList.remove('low-load', 'medium-load', 'high-load');
176-
queueLoadText.textContent = 'Error';
177-
}
184+
// Handle network errors or JSON parsing errors
185+
console.error('Failed to fetch or parse queue size:', error);
186+
showQueueErrorState(translations[currentLang].queueError || 'Queue status unavailable');
187+
}
188+
}
189+
190+
// Function to display error state in the queue status UI
191+
function showQueueErrorState(errorMessage) {
192+
const processingTasksElement = document.getElementById('processing-tasks');
193+
const waitingTasksElement = document.getElementById('waiting-tasks');
194+
const totalTasksElement = document.getElementById('total-tasks');
195+
const maxQueueSizeElement = document.getElementById('max-queue-size');
196+
const queueProgressBar = document.getElementById('queue-progress-bar');
197+
const statusIndicator = document.getElementById('status-indicator');
198+
const queueLoadText = document.getElementById('queue-load-text');
199+
const queueErrorTextElement = document.getElementById('queue-error-text');
200+
201+
if (processingTasksElement) processingTasksElement.textContent = '?';
202+
if (waitingTasksElement) waitingTasksElement.textContent = '?';
203+
if (totalTasksElement) totalTasksElement.textContent = '?';
204+
if (maxQueueSizeElement) maxQueueSizeElement.textContent = '?';
205+
206+
if (queueProgressBar) {
207+
queueProgressBar.style.width = '0%';
208+
queueProgressBar.classList.remove('progress-low', 'progress-medium', 'progress-high');
209+
}
210+
if (statusIndicator) {
211+
statusIndicator.classList.remove('indicator-low', 'indicator-medium', 'indicator-high');
212+
statusIndicator.classList.add('indicator-error');
213+
}
214+
if (queueLoadText) {
215+
queueLoadText.classList.remove('low-load', 'medium-load', 'high-load');
216+
queueLoadText.textContent = translations[currentLang].error || 'Error';
217+
}
218+
if (queueErrorTextElement) {
219+
queueErrorTextElement.textContent = errorMessage;
220+
queueErrorTextElement.style.display = 'block';
178221
}
179222
}
180223

0 commit comments

Comments
 (0)