Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions back/shared/pLimit.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
import config from '../config';
import { credentials } from '@grpc/grpc-js';
import { ApiClient } from '../protos/api';
import { CrontabModel } from '../data/cron';

class TaskLimit {
private dependenyLimit = new PQueue({ concurrency: 1 });
Expand Down Expand Up @@ -131,13 +132,38 @@ class TaskLimit {
let runs = this.queuedCrons.get(cron.id);
const result = runs?.length ? [...runs, fn] : [fn];
const repeatTimes = this.repeatCronNotifyMap.get(cron.id) || 0;
if (result?.length > 5) {

// Check instance mode from database to determine queue limit
let maxQueueSize = 10; // Default for multi-instance mode (increased from 5)
let isSingleInstanceMode = false;
try {
const cronRecord = await CrontabModel.findOne({
where: { id: Number(cron.id) },
});

// Default to single instance mode (0) for backward compatibility
// allow_multiple_instances is 1 for multi-instance, 0 or null/undefined for single instance
isSingleInstanceMode = cronRecord?.allow_multiple_instances !== 1;

if (isSingleInstanceMode) {
// For single instance mode, allow up to 2 queued tasks
// This allows the new task to be queued while the old one is being killed
maxQueueSize = 2;
}
} catch (error) {
Logger.error(
`[schedule][检查实例模式失败] 任务ID: ${cron.id}, 错误: ${error}`,
);
}

if (result?.length > maxQueueSize) {
if (repeatTimes < 3) {
this.repeatCronNotifyMap.set(cron.id, repeatTimes + 1);
const modeStr = isSingleInstanceMode ? '单实例' : '多实例';
this.client.systemNotify(
{
title: '任务重复运行',
content: `任务:${cron.name},命令:${cron.command},定时:${cron.schedule},处于运行中的超过 5 个,请检查定时设置`,
content: `任务:${cron.name}(${modeStr}模式),命令:${cron.command},定时:${cron.schedule},处于运行中的超过 ${maxQueueSize} 个,请检查定时设置`,
},
(err, res) => {
if (err) {
Expand Down
30 changes: 27 additions & 3 deletions back/shared/runCron.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,12 @@ export function runCron(cmd: string, cron: ICron): Promise<number | void> {
});

// Default to single instance mode (0) for backward compatibility
const allowSingleInstances =
existingCron?.allow_multiple_instances === 0;
// allow_multiple_instances is 1 for multi-instance, 0 or null/undefined for single instance
const isSingleInstanceMode =
existingCron?.allow_multiple_instances !== 1;

if (
allowSingleInstances &&
isSingleInstanceMode &&
existingCron &&
existingCron.pid &&
(existingCron.status === CrontabStatus.running ||
Expand Down Expand Up @@ -49,6 +50,18 @@ export function runCron(cmd: string, cron: ICron): Promise<number | void> {
);
const cp = spawn(cmd, { shell: '/bin/bash' });

// Update status to running after spawning the process
try {
await CrontabModel.update(
{ status: CrontabStatus.running, pid: cp.pid },
{ where: { id: Number(cron.id) } },
);
} catch (error) {
Logger.error(
`[schedule][更新任务状态失败] 任务ID: ${cron.id}, 错误: ${error}`,
);
}

cp.stderr.on('data', (data) => {
Logger.info(
'[schedule][执行任务失败] 命令: %s, 错误信息: %j',
Expand All @@ -66,6 +79,17 @@ export function runCron(cmd: string, cron: ICron): Promise<number | void> {

cp.on('exit', async (code) => {
taskLimit.removeQueuedCron(cron.id);
// Update status to idle after task completes
try {
await CrontabModel.update(
{ status: CrontabStatus.idle, pid: undefined },
{ where: { id: Number(cron.id) } },
);
} catch (error) {
Logger.error(
`[schedule][更新任务状态失败] 任务ID: ${cron.id}, 错误: ${error}`,
);
}
Logger.info(
'[schedule][执行任务结束] 参数: %s, 退出码: %j',
JSON.stringify({
Expand Down