Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions plugins/plugin-discord/__tests__/staleness.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import type { Content } from "@elizaos/core";
import { describe, expect, it } from "vitest";
import {
applyDiscordStalenessGuard,
getDiscordStalenessConfig,
recordDiscordChannelMessageSeen,
} from "../staleness";

function mockMessage(channelId = "channel-1") {
return {
id: "message-1",
channel: { id: channelId },
} as never;
}

describe("Discord staleness guard", () => {
it("is disabled by default and parses scoped settings", () => {
const settings = new Map<string, unknown>([
["DISCORD_STALENESS_BEHAVIOR", "skip"],
["DISCORD_STALENESS_THRESHOLD", "4"],
]);

expect(getDiscordStalenessConfig((key) => settings.get(key))).toEqual({
enabled: false,
behavior: "skip",
threshold: 4,
});

settings.set("DISCORD_STALENESS_ENABLED", "true");
expect(getDiscordStalenessConfig((key) => settings.get(key))).toEqual({
enabled: true,
behavior: "skip",
threshold: 4,
});
});

it("allows responses when the newer-message delta is within threshold", () => {
const owner = {};
const start = recordDiscordChannelMessageSeen(owner, "channel-1", "a");
recordDiscordChannelMessageSeen(owner, "channel-1", "b");
const content: Content = { text: "hello" };

expect(
applyDiscordStalenessGuard({
config: { enabled: true, behavior: "skip", threshold: 1 },
owner,
message: mockMessage(),
startSequence: start,
content,
}),
).toMatchObject({ shouldSend: true, stale: false });
expect(content.text).toBe("hello");
});

it("skips stale responses when configured to skip", () => {
const owner = {};
const start = recordDiscordChannelMessageSeen(owner, "channel-1", "a");
recordDiscordChannelMessageSeen(owner, "channel-1", "b");
recordDiscordChannelMessageSeen(owner, "channel-1", "c");
const content: Content = { text: "hello" };

expect(
applyDiscordStalenessGuard({
config: { enabled: true, behavior: "skip", threshold: 1 },
owner,
message: mockMessage(),
startSequence: start,
content,
}),
).toMatchObject({
shouldSend: false,
stale: true,
messagesSinceTurnStart: 2,
});
});

it("tags stale responses once when configured to tag", () => {
const owner = {};
const start = recordDiscordChannelMessageSeen(owner, "channel-1", "a");
recordDiscordChannelMessageSeen(owner, "channel-1", "b");
recordDiscordChannelMessageSeen(owner, "channel-1", "c");
const content: Content = { text: "hello" };

const first = applyDiscordStalenessGuard({
config: { enabled: true, behavior: "tag", threshold: 1 },
owner,
message: mockMessage(),
startSequence: start,
content,
});
const second = applyDiscordStalenessGuard({
config: { enabled: true, behavior: "tag", threshold: 1 },
owner,
message: mockMessage(),
startSequence: start,
content,
});

expect(first).toMatchObject({ shouldSend: true, stale: true });
expect(second).toMatchObject({ shouldSend: true, stale: true });
expect(content.text).toBe("(catching up:) hello");
});
});
9 changes: 9 additions & 0 deletions plugins/plugin-discord/discord-events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
handleAutocomplete as handleBuiltinAutocomplete,
handleSlashCommand as handleBuiltinSlashCommand,
} from "./slash-commands";
import { recordDiscordChannelMessageSeen } from "./staleness";
import {
DiscordEventTypes,
type DiscordListenChannelPayload,
Expand Down Expand Up @@ -249,145 +250,153 @@
service.channelDebouncer = channelDebouncer;

// ── messageCreate ──────────────────────────────────────────────────
service.client.on("messageCreate", async (message) => {
const clientUser = service.client?.user;
if (
(clientUser && message.author.id === clientUser.id) ||
(message.author.bot && service.discordSettings.shouldIgnoreBotMessages)
) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
authorId: message.author.id,
isBot: message.author.bot,
},
"Ignoring message from bot or self",
);
return;
}

if (service.messageManager) {
recordDiscordChannelMessageSeen(
service.messageManager,
message.channel.id,
message.id,
);
}

if (listenCids.includes(message.channel.id) && message) {
const newMessage = await service.buildMemoryFromMessage(message);

if (!newMessage) {
service.runtime.logger.warn(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
messageId: message.id,
},
"Failed to build memory from listen channel message",
);
return;
}

const listenPayload: DiscordListenChannelPayload = {
runtime: service.runtime,
message: newMessage,
source: "discord",
};
service.runtime.emitEvent(
DiscordEventTypes.LISTEN_CHANNEL_MESSAGE,
listenPayload,
);
}

// Skip if channel restrictions are set and this channel is not allowed
if (
service.allowedChannelIds &&
!service.isChannelAllowed(message.channel.id)
) {
const channel = service.client
? await service.client.channels.fetch(message.channel.id)
: null;

const notInChannelsPayload: DiscordNotInChannelsPayload = {
runtime: service.runtime,
message: message,
source: "discord",
};
service.runtime.emitEvent(
DiscordEventTypes.NOT_IN_CHANNELS_MESSAGE,
notInChannelsPayload,
);

if (!channel) {
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
channelId: message.channel.id,
},
"Channel not found",
);
return;
}
if (channel.isThread()) {
if (!channel.parentId || !service.isChannelAllowed(channel.parentId)) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
parentChannelId: channel.parentId,
},
"Thread not in allowed channel",
);
return;
}
} else {
if (
channel?.isTextBased &&
typeof channel.isTextBased === "function" &&
channel.isTextBased()
) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
channelId: channel.id,
},
"Channel not allowed",
);
}
return;
}
}

try {
if (!service.messageManager) {
return;
}

const channelType = message.channel.type as DiscordChannelType;
const isDm =
channelType === DiscordChannelType.DM ||
channelType === DiscordChannelType.GroupDM;

if (isDm) {
if (service.messageDebouncer) {
service.messageDebouncer.enqueue(message);
} else {
await service.messageManager.handleMessage(message);
}
} else if (service.channelDebouncer) {
service.channelDebouncer.enqueue(message);
} else if (service.messageDebouncer) {
service.messageDebouncer.enqueue(message);
} else {
await service.messageManager.handleMessage(message);
}
} catch (error) {
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
error: error instanceof Error ? error.message : String(error),
},
"Error handling message",
);
}
});

Check notice on line 399 in plugins/plugin-discord/discord-events.ts

View check run for this annotation

codefactor.io / CodeFactor

plugins/plugin-discord/discord-events.ts#L253-L399

Complex Method

// ── messageReactionAdd ─────────────────────────────────────────────
service.client.on("messageReactionAdd", async (reaction, user) => {
Expand Down Expand Up @@ -476,205 +485,205 @@
});

// ── interactionCreate ──────────────────────────────────────────────
service.client.on("interactionCreate", async (interaction) => {
if (interaction.isAutocomplete()) {
try {
await handleBuiltinAutocomplete(interaction);
} catch (error) {
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
error: error instanceof Error ? error.message : String(error),
},
"Error handling Discord autocomplete interaction",
);
}
return;
}

const isSlashCommand = interaction.isCommand();
const isModalSubmit = interaction.isModalSubmit();
const isComponent = interaction.isMessageComponent();

const bypassChannelRestriction =
isSlashCommand &&
service.allowAllSlashCommands.has(interaction.commandName ?? "");

service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
interactionType: interaction.type,
commandName: isSlashCommand ? interaction.commandName : undefined,
channelId: interaction.channelId,
inGuild: interaction.inGuild(),
bypassChannelRestriction,
},
"[DiscordService] interactionCreate received",
);

const isFollowUpInteraction = Boolean(
interaction.isModalSubmit() ||
interaction.isMessageComponent() ||
interaction.isAutocomplete(),
);

if (
!isFollowUpInteraction &&
service.allowedChannelIds &&
interaction.channelId &&
!service.isChannelAllowed(interaction.channelId) &&
!bypassChannelRestriction
) {
if (isSlashCommand && interaction.isCommand()) {
try {
await interaction.reply({
content: "This command is not available in this channel.",
ephemeral: true,
});
} catch (responseError) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
error:
responseError instanceof Error
? responseError.message
: String(responseError),
},
"Could not send channel restriction response",
);
}
}
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
channelId: interaction.channelId,
allowedChannelIds: service.allowedChannelIds,
isSlashCommand,
isModalSubmit,
isComponent,
bypassChannelRestriction,
},
"[DiscordService] interactionCreate ignored (channel not allowed)",
);
return;
}

// Run custom validator if provided for slash commands
if (isSlashCommand && interaction.commandName) {
const command = service.slashCommands.find(
(cmd) => cmd.name === interaction.commandName,
);
if (command?.validator) {
try {
const isValid = await command.validator(interaction, service.runtime);
if (!isValid) {
if (!interaction.replied) {
try {
const errorMessage =
"You do not have permission to use this command.";
if (interaction.deferred) {
await interaction.editReply({ content: errorMessage });
} else {
await interaction.reply({
content: errorMessage,
ephemeral: true,
});
}
} catch (responseError) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
commandName: interaction.commandName,
error:
responseError instanceof Error
? responseError.message
: String(responseError),
},
"Could not send validator rejection response (may have already responded)",
);
}
}
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
commandName: interaction.commandName,
},
"[DiscordService] interactionCreate ignored (custom validator returned false)",
);
return;
}
} catch (error) {
if (!interaction.replied) {
try {
const errorMessage =
"An error occurred while validating this command.";
if (interaction.deferred) {
await interaction.editReply({ content: errorMessage });
} else {
await interaction.reply({
content: errorMessage,
ephemeral: true,
});
}
} catch (responseError) {
service.runtime.logger.debug(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
commandName: interaction.commandName,
error:
responseError instanceof Error
? responseError.message
: String(responseError),
},
"Could not send validator error response (may have already responded)",
);
}
}
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
commandName: interaction.commandName,
error: error instanceof Error ? error.message : String(error),
},
"[DiscordService] Custom validator threw error",
);
return;
}
}
}

try {
await service.handleInteractionCreate(interaction);
if (interaction.isChatInputCommand()) {
const entityId = service.resolveDiscordEntityId(interaction.user.id);
const roomId = createUniqueUuid(
service.runtime,
interaction.channelId || interaction.user.username,
);
await handleBuiltinSlashCommand(interaction, service.runtime, {
entityId,
roomId,
});
}
} catch (error) {
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
error: error instanceof Error ? error.message : String(error),
},
"Error handling interaction",
);
}
});

Check notice on line 686 in plugins/plugin-discord/discord-events.ts

View check run for this annotation

codefactor.io / CodeFactor

plugins/plugin-discord/discord-events.ts#L488-L686

Complex Method

// ── userStream (voice) ─────────────────────────────────────────────
service.client.on(
Expand Down Expand Up @@ -707,104 +716,104 @@

if (isAuditLogEnabled) {
// channelUpdate
service.client.on("channelUpdate", async (oldChannel, newChannel) => {
try {
let channel = newChannel;
if (channel.partial) {
channel = await channel.fetch();
}

if (!("permissionOverwrites" in oldChannel) || !("guild" in channel)) {
return;
}

const guildChannel = channel as GuildChannel;
const oldGuildChannel = oldChannel as GuildChannel;
const oldOverwrites = oldGuildChannel.permissionOverwrites.cache;
const newOverwrites = guildChannel.permissionOverwrites.cache;

const allIds = new Set([
...oldOverwrites.keys(),
...newOverwrites.keys(),
]);

for (const id of allIds) {
const oldOw = oldOverwrites.get(id);
const newOw = newOverwrites.get(id);
const { changes, action } = diffOverwrites(oldOw, newOw);

if (changes.length === 0) {
continue;
}

const auditAction =
action === "DELETE"
? AuditLogEvent.ChannelOverwriteDelete
: action === "CREATE"
? AuditLogEvent.ChannelOverwriteCreate
: AuditLogEvent.ChannelOverwriteUpdate;

const audit = await fetchAuditEntry(
guildChannel.guild,
auditAction,
guildChannel.id,
service.runtime,
);

const clientUser = service.client?.user;
if (
audit?.executorId &&
clientUser &&
audit.executorId === clientUser.id
) {
continue;
}

const oldOwType =
oldOw && oldOw.type !== undefined ? oldOw.type : null;
const newOwType =
newOw && newOw.type !== undefined ? newOw.type : null;
const targetType =
(oldOwType ?? newOwType ?? 1) === 0 ? "role" : "user";
let targetName: string;
if (targetType === "role") {
const role = guildChannel.guild.roles.cache.get(id);
targetName = role?.name ?? "Unknown";
} else {
const user = service.client
? await service.client.users.fetch(id).catch(() => null)
: null;
targetName = user?.tag ?? "Unknown";
}

service.runtime.emitEvent(
DiscordEventTypes.CHANNEL_PERMISSIONS_CHANGED,
{
runtime: service.runtime,
source: "discord",
guild: {
id: guildChannel.guild.id,
name: guildChannel.guild.name,
},
channel: { id: guildChannel.id, name: guildChannel.name },
target: { type: targetType, id, name: targetName },
action,
changes,
audit,
} as EventPayload,
);
}
} catch (err) {
service.runtime.logger.error(
{
src: "plugin:discord",
agentId: service.runtime.agentId,
error: err instanceof Error ? err.message : String(err),
},
"Error in channelUpdate handler",
);
}
});

Check notice on line 816 in plugins/plugin-discord/discord-events.ts

View check run for this annotation

codefactor.io / CodeFactor

plugins/plugin-discord/discord-events.ts#L719-L816

Complex Method

// roleUpdate
service.client.on("roleUpdate", async (oldRole, newRole) => {
Expand Down
43 changes: 43 additions & 0 deletions plugins/plugin-discord/messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,12 @@
import { buildDiscordWorldMetadata } from "./identity";
import { formatInboundEnvelope } from "./inbound-envelope";
import { stripReasoningTags } from "./reasoning-tags";
import {
applyDiscordStalenessGuard,
type DiscordStalenessConfig,
getDiscordChannelMessageSequence,
getDiscordStalenessConfig,
} from "./staleness";
import {
createStatusReactionController,
type StatusReactionScope,
Expand Down Expand Up @@ -123,6 +129,7 @@
private statusReactionScope: StatusReactionScope;
private envelopeEnabled: boolean;
private draftStreamingEnabled: boolean;
private stalenessConfig: DiscordStalenessConfig;
private recentlyProcessedMessageIds = new Map<string, number>();
private static readonly PROCESSED_MESSAGE_TTL_MS = 2 * 60 * 1000;
/**
Expand Down Expand Up @@ -170,6 +177,9 @@
) as string | undefined;
this.draftStreamingEnabled =
draftStreamSetting === "true" || draftStreamSetting === "1";
this.stalenessConfig = getDiscordStalenessConfig((key) =>
this.runtime.getSetting(key),
);
}

/**
Expand Down Expand Up @@ -310,642 +320,675 @@
*
* @param {DiscordMessage} message - The Discord message to be handled
*/
async handleMessage(message: DiscordMessage) {
// this filtering is already done in setupEventListeners
/*
if (
(this.discordSettings.allowedChannelIds && this.discordSettings.allowedChannelIds.length) &&
!this.discordSettings.allowedChannelIds.some((id: string) => id === message.channel.id)
) {
return;
}
*/

const clientUser = this.client.user;
if (
message.interaction ||
(clientUser && message.author.id === clientUser.id)
) {
return;
}

if (this.discordSettings.shouldIgnoreBotMessages && message.author?.bot) {
return;
}

if (message.id && !this.markMessageAsProcessing(message.id)) {
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
messageId: message.id,
},
"Skipping duplicate Discord message",
);
return;
}

// DM policy check - applies access control policies for direct messages
if (message.channel.type === DiscordChannelType.DM) {
const userId = message.author.id;
if (this.discordSettings.shouldIgnoreDirectMessages) {
const staticallyAllowed =
this.discordSettings.allowFrom?.includes(userId) === true;
const dynamicallyAllowed = await isInAllowlist(
this.runtime,
"discord",
userId,
);
if (!staticallyAllowed && !dynamicallyAllowed) {
return;
}
}

const accessCheck = await this.checkDmAccess(message);
if (!accessCheck.allowed) {
// If a reply message was generated (new pairing request), send it
if (accessCheck.replyMessage) {
try {
await message.author.send(accessCheck.replyMessage);
} catch (err) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
userId: message.author.id,
error: err instanceof Error ? err.message : String(err),
},
"Failed to send pairing reply",
);
}
}
return;
}
}

const isBotMentioned = !!(
clientUser?.id && message.mentions.users?.has(clientUser.id)
);
const isReplyToBot =
!!message.reference?.messageId &&
message.mentions.repliedUser?.id === clientUser?.id;
const mentionedOtherUsers = message.mentions.users
? Array.from(message.mentions.users.values()).some(
(user) => user.id !== clientUser?.id && user.id !== message.author.id,
)
: false;
const isReplyToOtherUser =
!!message.reference?.messageId &&
!!message.mentions.repliedUser?.id &&
message.mentions.repliedUser.id !== clientUser?.id &&
message.mentions.repliedUser.id !== message.author.id;
const isInThread = message.channel.isThread();
const isDM = message.channel.type === DiscordChannelType.DM;
const strictModeEnabled =
this.discordSettings.shouldRespondOnlyToMentions === true;
const replyToMode = normalizeReplyToMode(this.discordSettings.replyToMode);
const outboundReplyToMessageId =
!isDM && replyToMode !== "off" && (isBotMentioned || isReplyToBot)
? message.id
: undefined;
const strictModeShouldProcess = isDM || isBotMentioned || isReplyToBot;

const userName = message.author.bot
? `${message.author.username}#${message.author.discriminator}`
: message.author.username;
const name =
message.member?.displayName ??
message.author.globalName ??
message.author.displayName ??
message.author.username;
const channelId = message.channel.id;
const roomId = createUniqueUuid(this.runtime, channelId);
const roomName =
message.guild &&
"name" in message.channel &&
typeof message.channel.name === "string"
? message.channel.name
: name || userName;

// Determine channel type and server ID for ensureConnection
// messageServerId is a Discord snowflake string, converted to UUID when needed
let type: ChannelType;
let messageServerId: string | undefined;

if (message.guild) {
const guild = await message.guild.fetch();
type = await this.getChannelType(message.channel as Channel);
if (type === null) {
// usually a forum type post
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
},
"Null channel type",
);
}
messageServerId = guild.id;
} else {
type = ChannelType.DM;
messageServerId = message.channel.id;
}

try {
let { processedContent, attachments } =
await this.processMessage(message);
// Audio attachments already processed in processMessage via attachmentManager

if (this.envelopeEnabled && processedContent) {
try {
const envelope = await formatInboundEnvelope(
message,
processedContent,
);
processedContent = envelope.formattedContent;
} catch {
// Envelope formatting is best-effort only.
}
}

if (!processedContent && !attachments?.length) {
// Only process messages that are not empty
return;
}

// Users often mention a teammate and then ask the bot by name in the
// same message. Only short-circuit these messages when the bot is not
// also clearly addressed.
const explicitlyAddressesBotByName = textMentionsAnyName(
processedContent,
[
this.runtime.character.name,
this.runtime.character.username,
clientUser?.globalName,
clientUser?.username,
],
);
const ignoresOtherTarget =
!isDM &&
!isBotMentioned &&
!isReplyToBot &&
!explicitlyAddressesBotByName &&
(mentionedOtherUsers || isReplyToOtherUser);

// Use the service's buildMemoryFromMessage method with pre-processed content
const newMessage = await this.discordService.buildMemoryFromMessage(
message,
{
processedContent,
processedAttachments: attachments,
extraContent: {
mentionContext: {
isMention: isBotMentioned,
isReply: isReplyToBot,
isThread: isInThread,
mentionType: isBotMentioned
? "platform_mention"
: isReplyToBot
? "reply"
: isInThread
? "thread"
: "none",
},
},
extraMetadata: {
// Reply attribution for cross-agent filtering
// WHY: When user replies to another bot's message, we need to know
// so other agents can ignore it (only the replied-to agent should respond)
replyToAuthor: message.mentions.repliedUser
? {
id: message.mentions.repliedUser.id,
displayName:
message.mentions.repliedUser.globalName ??
message.mentions.repliedUser.username,
username: message.mentions.repliedUser.username,
isBot: message.mentions.repliedUser.bot,
}
: undefined,
replyToMessageId: message.reference?.messageId
? createUniqueUuid(this.runtime, message.reference.messageId)
: undefined,
replyToExternalMessageId: message.reference?.messageId,
replyToSenderId: message.mentions.repliedUser?.id,
replyToSenderName:
message.mentions.repliedUser?.globalName ??
message.mentions.repliedUser?.username,
replyToSenderUserName: message.mentions.repliedUser?.username,
},
},
);

if (!newMessage) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
messageId: message.id,
},
"Failed to build memory from message",
);
return;
}

await this.runtime.ensureConnection({
entityId: newMessage.entityId,
roomId,
roomName,
userName,
name,
source: "discord",
channelId: message.channel.id,
// Convert Discord snowflake to UUID (see service.ts header for why stringToUuid not asUUID)
messageServerId: messageServerId
? stringToUuid(messageServerId)
: undefined,
type,
worldId: createUniqueUuid(this.runtime, messageServerId ?? roomId),
worldName: message.guild?.name,
// Preserve the raw Discord user id in source metadata for role and allowlist checks.
userId: message.author.id as unknown as UUID,
metadata: buildDiscordWorldMetadata(
this.runtime,
message.guild?.ownerId ?? undefined,
),
});

if (
!this.discordSettings.autoReply ||
lifeOpsPassiveConnectorsEnabled(this.runtime)
) {
await this.persistInboundMemory(newMessage);
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
},
"Auto-reply disabled; message ingested without response",
);
return;
}

if (ignoresOtherTarget) {
await this.persistInboundMemory(newMessage);
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
},
"Ignoring message that targets another mentioned user",
);
return;
}

if (strictModeEnabled && !strictModeShouldProcess) {
await this.persistInboundMemory(newMessage);
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
},
"Strict mode: ignoring message (no mention or reply)",
);
return;
}

if (strictModeEnabled) {
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
},
"Strict mode: processing message",
);
}

const canSendResult = canSendMessage(message.channel);
if (!canSendResult.canSend) {
await this.persistInboundMemory(newMessage);
return this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
reason: canSendResult.reason,
},
"Cannot send message to channel",
);
}

const messageId = newMessage.id;
const stalenessStartSequence = getDiscordChannelMessageSequence(
this,
message.channel.id,
);
const channel = message.channel as TextChannel;
const typingController = createTypingController(channel);
const clientUserId = this.client.user?.id;
const useReactions = shouldShowStatusReaction(
this.statusReactionScope,
message,
clientUserId,
);
const statusReactions = useReactions
? createStatusReactionController(message)
: null;
const draftStream = this.draftStreamingEnabled
? createDraftStreamController({
log: (entry) =>
this.runtime.logger.debug(
{ src: "plugin:discord", agentId: this.runtime.agentId },
entry,
),
warn: (entry) =>
this.runtime.logger.warn(
{ src: "plugin:discord", agentId: this.runtime.agentId },
entry,
),
})
: null;
let typingStarted = false;
let responseEmitted = false;
let generationTimedOut = false;
const generationTimeoutMs = Math.max(
30_000,
Number.parseInt(
String(
this.runtime.getSetting("DISCORD_GENERATION_TIMEOUT_MS") ??
this.runtime.getSetting("MESSAGE_TIMEOUT_MS") ??
"120000",
),
10,
) || 120_000,
);

const finalizePendingDraft = async () => {
if (draftStream?.isStarted() && !draftStream.isDone()) {
await draftStream.finalize("");
}
};

const abortPendingDraft = async () => {
if (draftStream?.isStarted() && !draftStream.isDone()) {
await draftStream.abort(
"An error occurred while generating the response.",
);
}
};

const sendFailureReply = async (text: string) => {
try {
await channel.send({
content: text,
...(outboundReplyToMessageId && replyToMode !== "off"
? {
reply: {
messageReference: outboundReplyToMessageId,
},
}
: {}),
});
responseEmitted = true;
} catch (sendError) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
error:
sendError instanceof Error
? sendError.message
: String(sendError),
},
"Failed to send Discord failure reply",
);
}
};

if (draftStream) {
await draftStream.start(channel, outboundReplyToMessageId, replyToMode);
}
// Typing indicator is deferred until the runtime actually invokes the
// handler callback (see the `typingStarted` guard further down). This
// avoids showing "Eliza is typing…" for messages the agent decides to
// IGNORE/NONE, and lines up with the message-service preamble that
// fires the callback the moment we commit to responding.

statusReactions?.setQueued();
statusReactions?.setThinking();

const callback: HandlerCallback = async (content: Content) => {
try {
if (generationTimedOut) {
return [];
}
// target is set but not addressed to us handling
if (
content.target &&
typeof content.target === "string" &&
content.target.toLowerCase() !== "discord"
) {
return [];
}

const stalenessDecision = applyDiscordStalenessGuard({
config: this.stalenessConfig,
owner: this,
message,
startSequence: stalenessStartSequence,
content,
});
if (stalenessDecision.stale) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
channelId: message.channel.id,
messageId: message.id,
messagesSinceTurnStart:
stalenessDecision.messagesSinceTurnStart,
threshold: this.stalenessConfig.threshold,
behavior: stalenessDecision.behavior,
},
"Discord response completed after newer channel messages arrived",
);
}
if (!stalenessDecision.shouldSend) {
typingController.stop();
statusReactions?.setDone();
await finalizePendingDraft();
return [];
}
Comment on lines +768 to +795
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Partial response sent when channel becomes busy mid-multi-chunk reply

stalenessStartSequence is captured once before the callback is defined, and applyDiscordStalenessGuard is evaluated on every individual callback invocation. For a slow-reasoning agent that fires the callback multiple times (multi-part response), if the channel moves from within-threshold to beyond-threshold between the first and second invocations, the first chunk is already sent to Discord and cannot be unsent, but subsequent chunks are suppressed. The user receives a visibly truncated message — exactly the worst-case outcome in a busy channel, which is precisely the scenario this feature targets.

Since Discord messages cannot be retracted after sending, the only robust mitigation for behavior=skip would be to buffer all callback output until reasoning completes and gate the send on a final staleness evaluation, rather than checking per-chunk.


if (message.id && !content.inReplyTo) {
content.inReplyTo = createUniqueUuid(this.runtime, message.id);
}

if (typeof content.text === "string" && content.text.length > 0) {
content.text = stripReasoningTags(content.text);
}

const textContent = normalizeDiscordMessageText(content.text);
const hasText = textContent.trim().length > 0;
const attachmentCount = Array.isArray(content.attachments)
? content.attachments.filter((media) => Boolean(media?.url)).length
: 0;

if (!hasText && attachmentCount === 0) {
return [];
}

if (!typingStarted) {
typingStarted = true;
typingController.start();
}

// Dedup: error when the runtime emits identical text
// twice in response to the same inbound message (e.g.
// post-action continuation repeating action output).
if (hasText && content.inReplyTo) {
const dedupKey = `${content.inReplyTo}::${textContent.replace(/\s+/g, " ").trim()}`;
const callbackDedup = message as DiscordMessage & {
_elizaSentReplyKeys?: Set<string>;
};
callbackDedup._elizaSentReplyKeys ??= new Set();
if (callbackDedup._elizaSentReplyKeys.has(dedupKey)) {
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
messageId: message.id,
textPreview: textContent
.replace(/\s+/g, " ")
.trim()
.slice(0, 200),
},
"Suppressing duplicate callback reply with identical text",
);
return [];
}
callbackDedup._elizaSentReplyKeys.add(dedupKey);
}

const files: AttachmentBuilder[] = [];
if (content.attachments && content.attachments.length > 0) {
for (const media of content.attachments) {
if (media.url) {
const fileName = getAttachmentFileName(media);
files.push(
new AttachmentBuilder(media.url, { name: fileName }),
);
}
}
}

let messages: DiscordMessage[] = [];
if (draftStream?.isStarted() && !draftStream.isDone()) {
if (hasText || files.length === 0) {
messages = await draftStream.finalize(textContent);
} else {
await finalizePendingDraft();
}

if (files.length > 0) {
try {
const attachmentMessage = await channel.send({
files,
...(outboundReplyToMessageId &&
(replyToMode === "all" || !hasText)
? {
reply: {
messageReference: outboundReplyToMessageId,
},
}
: {}),
});
messages.push(attachmentMessage);
} catch (error) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
error:
error instanceof Error ? error.message : String(error),
},
"Failed to send Discord attachments after draft finalize",
);
}
}
} else if (content && content.channelType === "DM") {
const user = await this.client.users.fetch(message.author.id);
if (!user) {
this.runtime.logger.warn(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
entityId: message.author.id,
},
"User not found for DM",
);
return [];
}

const dmMessage = await user.send({
content: textContent,
files: files.length > 0 ? files : undefined,
});
messages = [dmMessage];
} else {
if (!message.id) {
this.runtime.logger.warn(
{ src: "plugin:discord", agentId: this.runtime.agentId },
"Cannot send message: message.id is missing",
);
return [];
}
messages = await sendMessageInChunks(
channel,
textContent,
outboundReplyToMessageId ?? "",
files,
undefined,
this.runtime,
replyToMode,
);
}

const attemptedSend = hasText || attachmentCount > 0;
if (attemptedSend && messages.length === 0) {
throw new Error(
"Discord response callback completed without sending any messages",
);
}

const memories: Memory[] = [];
for (const m of messages) {
const actions = content.actions;
// Only attach files to the memory for the message that actually carries them
const hasAttachments = m.attachments?.size > 0;

const memory: Memory = {
id: createUniqueUuid(this.runtime, m.id),
entityId: this.runtime.agentId,
agentId: this.runtime.agentId,
content: {
...content,
source: "discord",
text: m.content || textContent || " ",
actions,
inReplyTo: messageId,
url: m.url,
channelType: type,
// Only include attachments for the message chunk that actually has them
attachments:
hasAttachments && content.attachments
? content.attachments
: undefined,
},
roomId,
createdAt: m.createdTimestamp,
};
memories.push(memory);
}

for (const m of memories) {
await this.runtime.createMemory(m, "messages");
}

responseEmitted = memories.length > 0;
typingController.stop();
statusReactions?.setDone();

return memories;
} catch (error) {
this.runtime.logger.error(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
error: error instanceof Error ? error.message : String(error),
},
"Error handling message callback",
);
typingController.stop();
statusReactions?.setError();
await abortPendingDraft();
throw error;
}
};

Check notice on line 991 in plugins/plugin-discord/messages.ts

View check run for this annotation

codefactor.io / CodeFactor

plugins/plugin-discord/messages.ts#L754-L991

Complex Method

const messagingAPI = getMessagingAPI(this.runtime);
const messageService = getMessageService(this.runtime);
Expand Down Expand Up @@ -1061,185 +1104,185 @@
* @param {DiscordMessage} message The message to process
* @returns {Promise<{ processedContent: string; attachments: Media[] }>} Processed content and media attachments
*/
async processMessage(
message: DiscordMessage,
): Promise<{ processedContent: string; attachments: Media[] }> {
let processedContent = message.content;
const attachments: Media[] = [];

if (message.embeds?.length) {
for (const i in message.embeds) {
const embed = message.embeds[i];
// type: rich
processedContent += `\nEmbed #${parseInt(i, 10) + 1}:\n`;
processedContent += ` Title:${embed.title ?? "(none)"}\n`;
processedContent += ` Description:${embed.description ?? "(none)"}\n`;
}
}
const mentionRegex = /<@!?(\d+)>/g;
processedContent = processedContent.replace(
mentionRegex,
(match, entityId) => {
const user = message.mentions.users.get(entityId);
if (user) {
return `${user.username} (@${entityId})`;
}
return match;
},
);

const codeBlockRegex = /```([\s\S]*?)```/g;
let match: RegExpExecArray | null = codeBlockRegex.exec(processedContent);
while (match !== null) {
const fullMatch = match[0];
const codeBlock = match[1];
const lines = codeBlock.split("\n");
const title = lines[0];
const description = lines.slice(0, 3).join("\n");
const attachmentId =
`code-${Date.now()}-${Math.floor(Math.random() * 1000)}`.slice(-5);
attachments.push({
id: attachmentId,
url: "",
title: title || "Code Block",
source: "Code",
description,
text: codeBlock,
});
processedContent = processedContent.replace(
fullMatch,
`Code Block (${attachmentId})`,
);
match = codeBlockRegex.exec(processedContent);
}

if (message.attachments.size > 0) {
attachments.push(
...(await this.attachmentManager.processAttachments(
message.attachments,
)),
);
}

// Extract and clean URLs from the message content
const urls = extractUrls(processedContent, this.runtime);

for (const url of urls) {
// Use string literal type for getService, assume methods exist at runtime
const videoService = this.runtime.getService(ServiceType.VIDEO) as
| ({
isVideoUrl?: (url: string) => boolean;
processVideo?: (
url: string,
runtime: IAgentRuntime,
) => Promise<{
title: string;
description: string;
text: string;
}>;
} & Service)
| null;
if (videoService?.isVideoUrl(url)) {
try {
const videoInfo = await videoService.processVideo(url, this.runtime);

attachments.push({
id: `youtube-${Date.now()}`,
url,
title: videoInfo.title,
source: "YouTube",
description: videoInfo.description,
text: videoInfo.text,
});
} catch (error) {
// Handle video processing errors gracefully - the URL is still preserved in the message
const errorMsg =
error instanceof Error ? error.message : String(error);
this.runtime.logger.warn(
`Failed to process video ${url}: ${errorMsg}`,
);
}
} else {
try {
const fetched = await fetchKnowledgeFromUrl(url);
attachments.push(fetchedUrlToAttachment(url, fetched));
continue;
} catch (error) {
const errorMsg =
error instanceof Error ? error.message : String(error);
this.runtime.logger.debug(
{
src: "plugin:discord",
agentId: this.runtime.agentId,
url,
error: errorMsg,
},
"Direct URL enrichment failed; trying browser service fallback",
);
}

const browserService = this.runtime.getService(ServiceType.BROWSER) as
| ({
getPageContent?: (
url: string,
runtime: IAgentRuntime,
) => Promise<{ title?: string; description?: string }>;
} & Service)
| null;
if (!browserService) {
this.runtime.logger.debug(
{ src: "plugin:discord", agentId: this.runtime.agentId },
"Skipping URL enrichment because browser service is unavailable",
);
continue;
}

try {
this.runtime.logger.debug(
`Fetching page content for cleaned URL: "${url}"`,
);
const { title, description: summary } =
await browserService.getPageContent(url, this.runtime);

attachments.push({
id: webpageAttachmentId(url),
url,
title: title || "Web Page",
source: "Web",
description: summary,
text: summary,
contentType: ContentType.LINK,
});
} catch (error) {
// Silently handle browser errors (certificate issues, timeouts, dead sites, etc.)
// The URL is still preserved in the message content, just without scraped metadata
const errorMsg =
error instanceof Error ? error.message : String(error);
const errorString = String(error);

// Check for common expected failures that don't need logging
const isExpectedFailure =
errorMsg.includes("ERR_CERT") ||
errorString.includes("ERR_CERT") ||
errorMsg.includes("Timeout") ||
errorString.includes("Timeout") ||
errorMsg.includes("ERR_NAME_NOT_RESOLVED") ||
errorString.includes("ERR_NAME_NOT_RESOLVED") ||
errorMsg.includes("ERR_HTTP_RESPONSE_CODE_FAILURE") ||
errorString.includes("ERR_HTTP_RESPONSE_CODE_FAILURE");

if (!isExpectedFailure) {
this.runtime.logger.warn(
`Failed to fetch page content for ${url}: ${errorMsg}`,
);
}
// Expected failures are silently handled - no logging needed
}
}
}

return { processedContent, attachments };
}

Check notice on line 1285 in plugins/plugin-discord/messages.ts

View check run for this annotation

codefactor.io / CodeFactor

plugins/plugin-discord/messages.ts#L1107-L1285

Complex Method

/**
* Asynchronously fetches the bot's username and discriminator from Discord API.
Expand Down
156 changes: 156 additions & 0 deletions plugins/plugin-discord/staleness.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
import type { Content } from "@elizaos/core";
import type { Message as DiscordMessage } from "discord.js";

export type DiscordStalenessBehavior = "tag" | "skip" | "ignore";

export interface DiscordStalenessConfig {
enabled: boolean;
behavior: DiscordStalenessBehavior;
threshold: number;
}

const DEFAULT_THRESHOLD = 2;
const channelSequences = new WeakMap<object, Map<string, number>>();
const lastMessageIds = new WeakMap<object, Map<string, string | undefined>>();

function parseBoolean(value: unknown, fallback: boolean): boolean {
if (value === undefined || value === null) {
return fallback;
}
return String(value).trim().toLowerCase() === "true";
}

function parseNonNegativeInteger(value: unknown, fallback: number): number {
const parsed = Number.parseInt(String(value ?? ""), 10);
return Number.isFinite(parsed) && parsed >= 0 ? parsed : fallback;
}

function parseBehavior(value: unknown): DiscordStalenessBehavior {
const normalized = String(value ?? "tag")
.trim()
.toLowerCase();
return normalized === "skip" ||
normalized === "ignore" ||
normalized === "tag"
? normalized
: "tag";
}

function ensureSequenceMap(owner: object): Map<string, number> {
let map = channelSequences.get(owner);
if (!map) {
map = new Map<string, number>();
channelSequences.set(owner, map);
}
return map;
}

export function getDiscordStalenessConfig(
getSetting: (key: string) => unknown,
): DiscordStalenessConfig {
return {
enabled: parseBoolean(getSetting("DISCORD_STALENESS_ENABLED"), false),
behavior: parseBehavior(getSetting("DISCORD_STALENESS_BEHAVIOR")),
threshold: parseNonNegativeInteger(
getSetting("DISCORD_STALENESS_THRESHOLD"),
DEFAULT_THRESHOLD,
),
};
}

export function recordDiscordChannelMessageSeen(
owner: object | undefined,
channelId: string | undefined,
messageId?: string,
): number {
if (!owner || !channelId) {
return 0;
}
const sequences = ensureSequenceMap(owner);
const next = (sequences.get(channelId) ?? 0) + 1;
sequences.set(channelId, next);

let lastIds = lastMessageIds.get(owner);
if (!lastIds) {
lastIds = new Map<string, string | undefined>();
lastMessageIds.set(owner, lastIds);
}
lastIds.set(channelId, messageId);

return next;
}

export function getDiscordChannelMessageSequence(
owner: object | undefined,
channelId: string | undefined,
): number {
if (!owner || !channelId) {
return 0;
}
return ensureSequenceMap(owner).get(channelId) ?? 0;
}

export interface DiscordStalenessDecision {
shouldSend: boolean;
stale: boolean;
messagesSinceTurnStart: number;
behavior: DiscordStalenessBehavior;
}

export function applyDiscordStalenessGuard(options: {
config: DiscordStalenessConfig;
owner: object | undefined;
message: DiscordMessage;
startSequence: number;
content: Content;
}): DiscordStalenessDecision {
const { config, owner, message, startSequence, content } = options;
if (!config.enabled || config.behavior === "ignore") {
return {
shouldSend: true,
stale: false,
messagesSinceTurnStart: 0,
behavior: config.behavior,
};
}

const currentSequence = getDiscordChannelMessageSequence(
owner,
message.channel?.id,
);
const messagesSinceTurnStart = Math.max(0, currentSequence - startSequence);
const stale = messagesSinceTurnStart > config.threshold;
if (!stale) {
return {
shouldSend: true,
stale: false,
messagesSinceTurnStart,
behavior: config.behavior,
};
}

if (config.behavior === "skip") {
return {
shouldSend: false,
stale: true,
messagesSinceTurnStart,
behavior: config.behavior,
};
}

if (
config.behavior === "tag" &&
typeof content.text === "string" &&
content.text.trim().length > 0 &&
!/^(\s*\(catching up:\))/i.test(content.text)
) {
content.text = `(catching up:) ${content.text}`;
}

return {
shouldSend: true,
stale: true,
messagesSinceTurnStart,
behavior: config.behavior,
};
}
Loading