Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,10 @@ describe('AgentInboundHandler', () => {
subscriberResolver,
startCodeService,
channelEndpointRepository,
handleManagedAgentSetupInbound,
managedAgentService,
agentRepository,
subscriberRepository,
};
}

Expand Down Expand Up @@ -290,6 +294,61 @@ describe('AgentInboundHandler', () => {
});
expect(bridgeExecutor.execute.firstCall.args[0].storedAttachments).to.deep.equal(storedAttachments);
});

it('should show typing before managed-agent setup gate when acknowledgeOnReceived is enabled', async () => {
const setupInbound = sinon.stub().resolves(true);
const logger = makeLogger();
const subscriberResolver = { resolve: sinon.stub().resolves('sub-1') };
const conversationService = {
createOrGetConversation: sinon.stub().resolves(conversation),
getPrimaryChannel: sinon.stub().callsFake((conv) => conv.channels[0]),
persistInboundMessage: sinon.stub().resolves({ _id: 'activity1' }),
persistAgentMessage: sinon.stub().resolves({ _id: 'agent-activity1' }),
setFirstPlatformMessageId: sinon.stub().resolves(undefined),
findByPlatformThread: sinon.stub().resolves(conversation),
getHistory: sinon.stub().resolves([]),
};
const managedAgentService = { dispatch: sinon.stub().resolves(undefined) };
const handleManagedAgentSetupInbound = { execute: setupInbound };
const subscriberRepository = {
findBySubscriberId: sinon.stub().resolves({ subscriberId: 'sub-1' }),
};
const agentRepository = {
findOne: sinon.stub().resolves({
_id: 'agent1',
runtime: 'managed',
managedRuntime: { providerId: 'anthropic', _integrationId: 'int1', externalAgentId: 'ext1' },
}),
};
const handler = new AgentInboundHandler(
logger as any,
subscriberResolver as any,
conversationService as any,
{ execute: sinon.stub().resolves(undefined) } as any,
managedAgentService as any,
{ execute: sinon.stub().resolves(undefined) } as any,
handleManagedAgentSetupInbound as any,
{ registerInboundCallbacks: sinon.stub() } as any,
agentRepository as any,
subscriberRepository as any,
{ findOne: sinon.stub().resolves(null) } as any,
{ track: sinon.stub() } as any,
{ storeInbound: sinon.stub().resolves([]) } as any,
{ consumeIfMatches: sinon.stub().resolves({ status: 'missing' }) } as any,
{ findByPlatformIdentity: sinon.stub().resolves(null) } as any,
{ execute: sinon.stub().resolves({ created: true }) } as any
);

const thread = makeSlackDmThread();
const message = makeSlackDmMessage();
const slackConfig = { ...config, acknowledgeOnReceived: true };

await handler.handle('agent1', slackConfig as any, thread as any, message as any, AgentEventEnum.ON_MESSAGE);

expect(thread.startTyping.calledOnceWith('Thinking...')).to.equal(true);
expect(setupInbound.calledOnce).to.equal(true);
expect(managedAgentService.dispatch.called).to.equal(false);
});
});

describe('Telegram /start subscriber-link handling', () => {
Expand Down
40 changes: 20 additions & 20 deletions apps/api/src/app/agents/services/agent-inbound-handler.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,26 @@ export class AgentInboundHandler implements OnModuleInit {

const isManagedAgent = agent?.runtime === 'managed' && agent.managedRuntime;

if (config.acknowledgeOnReceived) {
const supportsTyping = PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform);

if (supportsTyping) {
await thread.startTyping('Thinking...');
} else if (isFirstMessage && message.id) {
thread
.createSentMessageFromMessage(message)
.addReaction(ACKNOWLEDGE_FALLBACK_EMOJI)
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Failed to add ack reaction to first message`);
captureAgentWarning(err, {
component: 'agent-inbound-handler',
operation: 'add-ack-reaction',
agentId,
});
});
}
}
Comment on lines +391 to +409
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Guard the typing call so a typing failure can't skip the setup gate.

await thread.startTyping('Thinking...') is awaited with no error handling, and it now runs before the managed-agent parking logic (Line 413). If startTyping rejects (delivery/network error), the rejection propagates out of handle, so the message is never parked and the setup card is never shown — the opposite of the UX this PR is trying to deliver. Note the fallback reaction branch already swallows its errors via .catch(...), and ChatSdkService.startTypingInConversation routes failures through .catch(toDeliveryError); the typing branch here should be equally resilient.

🛡️ Proposed fix to make typing non-fatal
       if (supportsTyping) {
-        await thread.startTyping('Thinking...');
+        await thread.startTyping('Thinking...').catch((err) => {
+          this.logger.warn(err, `[agent:${agentId}] Failed to start typing indicator`);
+          captureAgentWarning(err, {
+            component: 'agent-inbound-handler',
+            operation: 'start-typing',
+            agentId,
+          });
+        });
       } else if (isFirstMessage && message.id) {
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (config.acknowledgeOnReceived) {
const supportsTyping = PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform);
if (supportsTyping) {
await thread.startTyping('Thinking...');
} else if (isFirstMessage && message.id) {
thread
.createSentMessageFromMessage(message)
.addReaction(ACKNOWLEDGE_FALLBACK_EMOJI)
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Failed to add ack reaction to first message`);
captureAgentWarning(err, {
component: 'agent-inbound-handler',
operation: 'add-ack-reaction',
agentId,
});
});
}
}
if (config.acknowledgeOnReceived) {
const supportsTyping = PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform);
if (supportsTyping) {
await thread.startTyping('Thinking...').catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Failed to start typing indicator`);
captureAgentWarning(err, {
component: 'agent-inbound-handler',
operation: 'start-typing',
agentId,
});
});
} else if (isFirstMessage && message.id) {
thread
.createSentMessageFromMessage(message)
.addReaction(ACKNOWLEDGE_FALLBACK_EMOJI)
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Failed to add ack reaction to first message`);
captureAgentWarning(err, {
component: 'agent-inbound-handler',
operation: 'add-ack-reaction',
agentId,
});
});
}
}
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/api/src/app/agents/services/agent-inbound-handler.service.ts` around
lines 391 - 409, The awaited call to thread.startTyping('Thinking...') can throw
and currently aborts subsequent setup/parking logic; make typing non-fatal by
catching errors instead of allowing the rejection to propagate: replace the
direct awaited call in agent-inbound-handler.service with a resilient form
(e.g., call thread.startTyping('Thinking...').catch(toDeliveryError) or wrap the
await in try/catch), and on error log a warning and call captureAgentWarning
with component 'agent-inbound-handler' and operation 'start-typing' so failures
are recorded but the code continues to the managed-agent parking/setup steps
(keep the existing acknowledge fallback branch behavior for first messages
intact).


// Subscriber still owes MCP OAuth: hold this message, show the setup card, skip dispatch.
// After OAuth completes, CompleteManagedAgentSetup replays the held message.
if (isManagedAgent && subscriber && message.id) {
Expand All @@ -410,26 +430,6 @@ export class AgentInboundHandler implements OnModuleInit {
}
}

if (config.acknowledgeOnReceived) {
const supportsTyping = PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform);

if (supportsTyping) {
await thread.startTyping('Thinking...');
} else if (isFirstMessage && message.id) {
thread
.createSentMessageFromMessage(message)
.addReaction(ACKNOWLEDGE_FALLBACK_EMOJI)
.catch((err) => {
this.logger.warn(err, `[agent:${agentId}] Failed to add ack reaction to first message`);
captureAgentWarning(err, {
component: 'agent-inbound-handler',
operation: 'add-ack-reaction',
agentId,
});
});
}
}

try {
if (isManagedAgent) {
await this.managedAgentService.dispatch(
Expand Down
36 changes: 33 additions & 3 deletions apps/api/src/app/agents/services/chat-sdk.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,11 @@ export class ChatSdkService implements OnModuleDestroy {
dispose: (cached, key) => {
cached.chat.shutdown().catch((err) => {
this.logger.error(err, `Failed to shut down evicted Chat instance ${key}`);
captureAgentException(err, { component: 'chat-sdk', operation: 'shutdown-evicted', extra: { instanceKey: key } });
captureAgentException(err, {
component: 'chat-sdk',
operation: 'shutdown-evicted',
extra: { instanceKey: key },
});
});
},
});
Expand Down Expand Up @@ -348,6 +352,24 @@ export class ChatSdkService implements OnModuleDestroy {
return { messageId: sent.id, platformThreadId: sent.threadId };
}

async startTypingInConversation(
agentId: string,
integrationIdentifier: string,
platformThreadId: string,
status = 'Thinking...'
): Promise<void> {
const config = await this.agentConfigResolver.resolve(agentId, integrationIdentifier);
const instanceKey = `${agentId}:${integrationIdentifier}`;
const chat = await this.getOrCreate(instanceKey, agentId, config.platform, config);
const thread = chat.thread(platformThreadId);

if (typeof thread.startTyping !== 'function') {
return;
}

await thread.startTyping(status).catch(toDeliveryError);
}

async sendDirectMessage(
agentId: string,
integrationIdentifier: string,
Expand Down Expand Up @@ -1311,13 +1333,21 @@ export class ChatSdkService implements OnModuleDestroy {
warn: (msg: string, ctx?: Record<string, unknown>) => {
this.logger.warn(ctx ?? {}, msg);
if (ctx?.err) {
captureAgentWarning(ctx.err, { component: 'chat-sdk', operation: 'chat-state-warn', extra: { message: msg } });
captureAgentWarning(ctx.err, {
component: 'chat-sdk',
operation: 'chat-state-warn',
extra: { message: msg },
});
}
},
error: (msg: string, ctx?: Record<string, unknown>) => {
this.logger.error(ctx ?? {}, msg);
if (ctx?.err) {
captureAgentException(ctx.err, { component: 'chat-sdk', operation: 'chat-state-error', extra: { message: msg } });
captureAgentException(ctx.err, {
component: 'chat-sdk',
operation: 'chat-state-error',
extra: { message: msg },
});
}
},
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@ import {
SubscriberRepository,
} from '@novu/dal';

import { PLATFORMS_WITH_TYPING_INDICATOR } from '../../dtos/agent-platform.enum';
import { AgentConfigResolver, type ResolvedAgentConfig } from '../../services/agent-config-resolver.service';
import { ChatSdkService } from '../../services/chat-sdk.service';
import { ManagedAgentService } from '../../services/managed-agent.service';
import { GenerateMcpOAuthUrl } from '../generate-mcp-oauth-url/generate-mcp-oauth-url.usecase';
import { HandleAgentReplyCommand } from '../handle-agent-reply/handle-agent-reply.command';
Expand All @@ -40,6 +42,7 @@ export class CompleteManagedAgentSetup {
private readonly managedAgentService: ManagedAgentService,
private readonly generateMcpOAuthUrl: GenerateMcpOAuthUrl,
private readonly handleAgentReply: HandleAgentReply,
private readonly chatSdkService: ChatSdkService,
private readonly logger: PinoLogger
) {
this.logger.setContext(this.constructor.name);
Expand Down Expand Up @@ -267,10 +270,15 @@ export class CompleteManagedAgentSetup {
}): Promise<void> {
const { conversation, pending, agent, config, subscriber, mcps } = params;

const platformThreadId = conversation.channels?.[0]?.platformThreadId;
const willShowTypingBeforeReplay =
config.acknowledgeOnReceived && PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform) && !!platformThreadId;

if (pending.setupMessageId) {
const resolvedCard = await buildSetupCardForMcps({
mcps,
resolved: true,
showProcessingHint: !willShowTypingBeforeReplay,
environmentId: config.environmentId,
organizationId: config.organizationId,
agentIdentifier: config.agentIdentifier,
Expand Down Expand Up @@ -311,6 +319,8 @@ export class CompleteManagedAgentSetup {

delete conversation.pendingManagedAgentSetup;

await this.showTypingBeforeSetupReplay(conversation, agent, config);

await this.managedAgentService.replayParkedInboundTurn({
conversation,
config,
Expand All @@ -319,4 +329,25 @@ export class CompleteManagedAgentSetup {
agent,
});
}

private async showTypingBeforeSetupReplay(
conversation: ConversationEntity,
agent: Pick<AgentEntity, '_id'>,
config: ResolvedAgentConfig
): Promise<void> {
const platformThreadId = conversation.channels?.[0]?.platformThreadId;

if (!config.acknowledgeOnReceived || !PLATFORMS_WITH_TYPING_INDICATOR.has(config.platform) || !platformThreadId) {
return;
}

try {
await this.chatSdkService.startTypingInConversation(agent._id, config.integrationIdentifier, platformThreadId);
} catch (err) {
this.logger.warn(
err,
`Failed to show typing before managed-agent setup replay for conversation ${conversation._id}`
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import { listOAuthMcps } from './list-oauth-mcps.helper';
import { ManagedAgentSetupInboundCommand } from './managed-agent-setup-inbound.command';
import { isOAuthMcpPending, type OAuthMcp } from './oauth-mcp.types';
import { buildSetupCardForMcps } from './setup-card.builder';
import { SETUP_GATE_NUDGE_MARKDOWN } from './setup-card.helpers';

/**
* Inbound gate for managed agents: park the user turn and post/edit a setup
Expand Down Expand Up @@ -126,6 +127,9 @@ export class HandleManagedAgentSetupInbound {
};

if (pendingState.setupMessageId) {
// If a setup card was already posted previously, edit the existing card
// to update its contents (for example, to refresh the list of available MCPs
// or to provide refreshed OAuth URLs).
await this.handleAgentReply.execute(
HandleAgentReplyCommand.create({
...replyCommandBase,
Expand All @@ -136,6 +140,14 @@ export class HandleManagedAgentSetupInbound {
})
);

// Post a nudge message to the user to complete the setup.
await this.handleAgentReply.execute(
HandleAgentReplyCommand.create({
...replyCommandBase,
reply: { markdown: SETUP_GATE_NUDGE_MARKDOWN },
})
);

return;
}

Expand Down Expand Up @@ -186,6 +198,7 @@ export class HandleManagedAgentSetupInbound {
const card = await buildSetupCardForMcps({
mcps,
resolved: true,
showProcessingHint: false,
environmentId: command.environmentId,
organizationId: command.organizationId,
agentIdentifier: command.agentIdentifier,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { buildSetupCard, type SetupCardRow } from './setup-card.helpers';
export async function buildSetupCardForMcps(params: {
mcps: OAuthMcp[];
resolved?: boolean;
showProcessingHint?: boolean;
environmentId: string;
organizationId: string;
agentIdentifier: string;
Expand Down Expand Up @@ -54,5 +55,9 @@ export async function buildSetupCardForMcps(params: {
}
}

return buildSetupCard({ mcps: rows, resolved: params.resolved });
return buildSetupCard({
mcps: rows,
resolved: params.resolved,
showProcessingHint: params.showProcessingHint,
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@ export interface SetupCardRow extends OAuthMcp {
const SETUP_REQUIRED_TEXT =
'Connect the tools below to continue. Your message will be handled automatically once setup is complete.';

const SETUP_COMPLETE_TEXT = 'All tools connected. Working on your message…';
const SETUP_COMPLETE_TEXT_CELEBRATION = "You're all set!";

const SETUP_COMPLETE_TEXT_WITH_PROCESSING_HINT = 'All tools connected. Your message will run automatically.';

export const SETUP_GATE_NUDGE_MARKDOWN =
'Please finish connecting your tools using the card above. Your latest message will run automatically once setup is complete.';

function isErrorStatus(status: OAuthMcp['status']): boolean {
return (
Expand Down Expand Up @@ -55,14 +60,21 @@ function buildMcpRowBlocks(mcp: SetupCardRow): Record<string, unknown>[] {
return buildPendingRowBlocks(mcp);
}

export function buildSetupCard(params: { mcps: SetupCardRow[]; resolved?: boolean }): Record<string, unknown> {
export function buildSetupCard(params: {
mcps: SetupCardRow[];
resolved?: boolean;
showProcessingHint?: boolean;
}): Record<string, unknown> {
const title = params.resolved ? 'Setup complete' : 'Connect your tools';

if (params.resolved) {
const showProcessingHint = params.showProcessingHint !== false;
const body = showProcessingHint ? SETUP_COMPLETE_TEXT_WITH_PROCESSING_HINT : SETUP_COMPLETE_TEXT_CELEBRATION;

return {
type: 'card',
title,
children: [{ type: 'text', content: SETUP_COMPLETE_TEXT }],
children: [{ type: 'text', content: body }],
};
}

Expand Down
Loading