Skip to content

Commit 16648c8

Browse files
committed
fix(telegram): buffered streaming mode for Pi token-level chunks
Pi's text_delta events can drop newlines and spaces from the model's output. The assembled transcript from agent_end.messages preserves the correct formatting. This change suppresses text_delta streaming and emits the full assembled text instead. For Telegram specifically, adds a 'buffered' streaming mode that coalesces chunks via a 3-second debounce timer. This prevents each token from becoming a separate Telegram message. - Pi event bridge: suppress text_delta, emit assembled text at agent_end - Telegram adapter: buffered mode with debounce, short-buffer skip - Whitespace-only message guard for Telegram - Shutdown flush to avoid losing in-flight buffered text - Tests for buffered mode and updated streaming tail tests Note: Pi's assembled transcript escapes nested markdown (e.g. **bold and *italic*** becomes **\*\*bold and \*italic\*\*\*), which is a Pi SDK bug that cannot be fixed in Archon.
1 parent 94b79fe commit 16648c8

10 files changed

Lines changed: 189 additions & 82 deletions

File tree

bun.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

packages/adapters/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
"@slack/bolt": "^4.6.0",
2424
"discord.js": "^14.16.0",
2525
"grammy": "^1.36.0",
26-
"telegramify-markdown": "^1.3.0"
26+
"telegramify-markdown": "^1.3.3"
2727
},
2828
"peerDependencies": {
2929
"typescript": "^5.0.0"

packages/adapters/src/chat/telegram/adapter.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,47 @@ describe('TelegramAdapter', () => {
166166
});
167167
});
168168

169+
describe('buffered mode', () => {
170+
let adapter: TelegramAdapter;
171+
let mockSendMessage: Mock<() => Promise<void>>;
172+
173+
beforeEach(() => {
174+
adapter = new TelegramAdapter('fake-token-for-testing', 'buffered');
175+
mockSendMessage = mock(() => Promise.resolve());
176+
(adapter.getBot().api as unknown as { sendMessage: Mock<() => Promise<void>> }).sendMessage =
177+
mockSendMessage;
178+
});
179+
180+
test('should report stream mode to orchestrator (not batch)', () => {
181+
expect(adapter.getStreamingMode()).toBe('stream');
182+
});
183+
184+
test('should coalesce rapid chunks into a single message', async () => {
185+
// Rapid-fire chunks within the debounce window (total > BUFFER_MIN_FLUSH_LENGTH)
186+
await adapter.sendMessage('12345', 'Hello from the buffered');
187+
await adapter.sendMessage('12345', ' Telegram adapter test');
188+
await adapter.sendMessage('12345', ' with enough content to flush!');
189+
190+
// Nothing sent yet — still buffering
191+
expect(mockSendMessage).not.toHaveBeenCalled();
192+
193+
// Wait for debounce timer to fire (3000ms)
194+
await new Promise(resolve => setTimeout(resolve, 3100));
195+
196+
// Should have sent one coalesced message
197+
expect(mockSendMessage).toHaveBeenCalledTimes(1);
198+
const call = mockSendMessage.mock.calls[0];
199+
expect(call[0]).toBe(12345);
200+
expect(call[1]).toContain('Hello from the buffered');
201+
});
202+
203+
test('should skip whitespace-only chunks without flushing', async () => {
204+
await adapter.sendMessage('12345', '\n');
205+
await new Promise(resolve => setTimeout(resolve, 3100));
206+
expect(mockSendMessage).not.toHaveBeenCalled();
207+
});
208+
});
209+
169210
describe('getConversationId', () => {
170211
test('should return chat.id as string for private chat', () => {
171212
const adapter = new TelegramAdapter('fake-token-for-testing');

packages/adapters/src/chat/telegram/adapter.ts

Lines changed: 108 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,32 @@ function getLog(): ReturnType<typeof createLogger> {
1919

2020
const MAX_LENGTH = 4096;
2121

22+
/** Streaming mode for the Telegram adapter. */
23+
export type TelegramStreamingMode = 'stream' | 'batch' | 'buffered';
24+
25+
/** Buffered mode: debounce interval (ms). */
26+
const BUFFER_FLUSH_MS = 3000;
27+
/** Buffered mode: skip flushing buffers shorter than this (likely a single token before a thinking pause). */
28+
const BUFFER_MIN_FLUSH_LENGTH = 50;
29+
30+
/** State for a single chat's buffer. */
31+
interface BufferState {
32+
text: string;
33+
timer: ReturnType<typeof setTimeout> | null;
34+
}
35+
2236
export class TelegramAdapter implements IPlatformAdapter {
2337
private bot: Bot;
24-
private streamingMode: 'stream' | 'batch';
38+
private mode: TelegramStreamingMode;
2539
private allowedUserIds: number[];
2640
private messageHandler: ((ctx: TelegramMessageContext) => Promise<void>) | null = null;
41+
// Buffered mode: per-chat accumulation state
42+
private buffers = new Map<string, BufferState>();
2743

28-
constructor(token: string, mode: 'stream' | 'batch' = 'stream') {
44+
constructor(token: string, mode: TelegramStreamingMode = 'stream') {
2945
// grammY does not impose a handler timeout by default (unlike Telegraf's 90s limit)
3046
this.bot = new Bot(token);
31-
this.streamingMode = mode;
47+
this.mode = mode;
3248

3349
// Parse Telegram user whitelist (optional - empty = open access)
3450
// Support both TELEGRAM_ALLOWED_USER_IDS and TELEGRAM_ALLOWED_USERS
@@ -54,14 +70,93 @@ export class TelegramAdapter implements IPlatformAdapter {
5470
* (paragraphs rarely have formatting that spans across them)
5571
*/
5672
async sendMessage(chatId: string, message: string, _metadata?: MessageMetadata): Promise<void> {
73+
// Telegram rejects whitespace-only messages (400: text must be non-empty).
74+
// Reasoning models (e.g. GLM-4.5-Air via Pi) can emit newline-only chunks
75+
// during streaming — skip silently.
76+
if (!message.trim()) return;
77+
78+
// Buffered mode: accumulate chunks and flush on debounce timer or size threshold.
79+
// Recommended for providers that emit token-level chunks (e.g. Pi/z.ai with
80+
// GLM-4.5-Air) where each token would otherwise become a separate Telegram message.
81+
if (this.mode === 'buffered') {
82+
this.bufferChunk(chatId, message);
83+
return;
84+
}
85+
86+
await this.deliverMessage(chatId, message);
87+
}
88+
89+
/**
90+
* Accumulate a chunk into the per-chat buffer. Flushes when
91+
* BUFFER_FLUSH_MS elapses with no new chunks (end of AI response).
92+
* Long responses are split at paragraph boundaries by deliverMessage.
93+
*/
94+
private bufferChunk(chatId: string, chunk: string): void {
95+
let state = this.buffers.get(chatId);
96+
if (!state) {
97+
state = { text: '', timer: null };
98+
this.buffers.set(chatId, state);
99+
}
100+
101+
// Append chunk and reset debounce timer
102+
state.text += chunk;
103+
if (state.timer) clearTimeout(state.timer);
104+
105+
state.timer = setTimeout(() => {
106+
const current = this.buffers.get(chatId);
107+
if (current && current.text.trim().length > 0) {
108+
this.flushBuffer(chatId, current);
109+
}
110+
}, BUFFER_FLUSH_MS);
111+
}
112+
113+
/** Flush a buffered chat's accumulated text and clean up state. */
114+
private flushBuffer(chatId: string, state: BufferState, force = false): void {
115+
if (state.timer) {
116+
clearTimeout(state.timer);
117+
state.timer = null;
118+
}
119+
const text = state.text;
120+
this.buffers.delete(chatId);
121+
122+
if (!text.trim()) return;
123+
124+
// Skip very short buffers (likely a single token before a thinking pause).
125+
// Will be accumulated with subsequent chunks. Force flush on shutdown.
126+
if (!force && text.trim().length < BUFFER_MIN_FLUSH_LENGTH) {
127+
getLog().debug({ chatId, textLength: text.trim().length }, 'telegram.buffer_skip_short');
128+
// Re-buffer: put the text back for the next accumulation cycle
129+
const existing = this.buffers.get(chatId);
130+
if (existing) {
131+
existing.text = text + existing.text;
132+
} else {
133+
this.buffers.set(chatId, { text, timer: null });
134+
}
135+
return;
136+
}
137+
138+
getLog().debug({ chatId, textLength: text.length }, 'telegram.buffer_flush');
139+
// Fire-and-forget — errors are logged inside deliverMessage/sendFormattedChunk
140+
void this.deliverMessage(chatId, text).catch((err: unknown) => {
141+
getLog().error({ err, chatId }, 'telegram.buffered_flush_failed');
142+
});
143+
}
144+
145+
/** Flush all pending buffers — called during shutdown to avoid losing in-flight text. */
146+
private flushAllBuffers(): void {
147+
for (const [chatId, state] of this.buffers) {
148+
this.flushBuffer(chatId, state, true);
149+
}
150+
}
151+
152+
/** Send a complete (non-buffered) message to Telegram. */
153+
private async deliverMessage(chatId: string, message: string): Promise<void> {
57154
const id = parseInt(chatId);
58155
getLog().debug({ chatId, messageLength: message.length }, 'telegram.send_message');
59156

60157
if (message.length <= MAX_LENGTH) {
61-
// Short message: try MarkdownV2 formatting
62158
await this.sendFormattedChunk(id, message);
63159
} else {
64-
// Long message: split by paragraphs, format each chunk
65160
getLog().debug({ messageLength: message.length }, 'telegram.message_splitting');
66161
const chunks = splitIntoParagraphChunks(message, MAX_LENGTH - 200);
67162

@@ -71,9 +166,7 @@ export class TelegramAdapter implements IPlatformAdapter {
71166
}
72167
}
73168

74-
/**
75-
* Send a single chunk with MarkdownV2 formatting, with fallback to plain text
76-
*/
169+
/** Send a single chunk with MarkdownV2 formatting, with fallback to plain text. */
77170
private async sendFormattedChunk(id: number, chunk: string): Promise<void> {
78171
// If chunk is still too long after paragraph splitting, fall back to plain text
79172
if (chunk.length > MAX_LENGTH) {
@@ -122,10 +215,12 @@ export class TelegramAdapter implements IPlatformAdapter {
122215
}
123216

124217
/**
125-
* Get the configured streaming mode
218+
* Get the configured streaming mode.
219+
* Buffered mode reports 'stream' to the orchestrator — chunks are sent
220+
* one at a time and coalesced inside the adapter.
126221
*/
127222
getStreamingMode(): 'stream' | 'batch' {
128-
return this.streamingMode;
223+
return this.mode === 'batch' ? 'batch' : 'stream';
129224
}
130225

131226
/**
@@ -239,9 +334,11 @@ export class TelegramAdapter implements IPlatformAdapter {
239334
}
240335

241336
/**
242-
* Stop the bot gracefully
337+
* Stop the bot gracefully.
338+
* Flushes any pending buffered messages before stopping so in-flight text is not lost.
243339
*/
244340
stop(): void {
341+
this.flushAllBuffers();
245342
this.bot.stop();
246343
getLog().info('telegram.bot_stopped');
247344
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
export { TelegramAdapter } from './adapter';
1+
export { TelegramAdapter, type TelegramStreamingMode } from './adapter';

packages/adapters/src/index.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
// Chat adapters
2-
export { TelegramAdapter } from './chat/telegram';
2+
export { TelegramAdapter, type TelegramStreamingMode } from './chat/telegram';
33
export { SlackAdapter } from './chat/slack';
44

55
// Forge adapters

packages/providers/src/community/pi/event-bridge.test.ts

Lines changed: 14 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -652,11 +652,10 @@ describe('streaming tail completion', () => {
652652
} as unknown as AgentSessionEvent;
653653
}
654654

655-
test('emits corrective assistant chunk when streaming truncated', async () => {
655+
test('emits assembled text from agent_end when streaming differs', async () => {
656656
const streamed = 'The repo is cloned. Let me register it.\n\n/register-project';
657657
const full =
658658
'The repo is cloned. Let me register it.\n\n/register-project SaberEngine "/path/to/repo"';
659-
const tail = full.slice(streamed.length);
660659

661660
let listener: ((event: AgentSessionEvent) => void) | undefined;
662661
const mockSession = {
@@ -666,7 +665,6 @@ describe('streaming tail completion', () => {
666665
return () => {};
667666
},
668667
prompt: async () => {
669-
listener?.({ type: 'turn_start' } as AgentSessionEvent);
670668
listener?.(makeTextDeltaEvent(streamed));
671669
listener?.(makeAgentEndEvent(full));
672670
},
@@ -679,14 +677,14 @@ describe('streaming tail completion', () => {
679677
chunks.push(chunk);
680678
}
681679

680+
// text_deltas are suppressed; assembled text emitted at agent_end
682681
const assistantChunks = chunks.filter(c => c.type === 'assistant');
683-
expect(assistantChunks).toHaveLength(2);
684-
expect(assistantChunks[0].content).toBe(streamed);
685-
expect(assistantChunks[1].content).toBe(tail);
682+
expect(assistantChunks).toHaveLength(1);
683+
expect(assistantChunks[0].content).toBe(full);
686684
expect(chunks[chunks.length - 1].type).toBe('result');
687685
});
688686

689-
test('does not emit corrective chunk when streaming is complete', async () => {
687+
test('emits assembled text once when streaming matches', async () => {
690688
const full = 'complete text no truncation';
691689

692690
let listener: ((event: AgentSessionEvent) => void) | undefined;
@@ -697,7 +695,6 @@ describe('streaming tail completion', () => {
697695
return () => {};
698696
},
699697
prompt: async () => {
700-
listener?.({ type: 'turn_start' } as AgentSessionEvent);
701698
listener?.(makeTextDeltaEvent(full));
702699
listener?.(makeAgentEndEvent(full));
703700
},
@@ -715,7 +712,7 @@ describe('streaming tail completion', () => {
715712
expect(assistantChunks[0].content).toBe(full);
716713
});
717714

718-
test('does not emit corrective chunk when assembled text does not start with streamed (mismatch)', async () => {
715+
test('emits assembled text even when it differs from streamed text', async () => {
719716
let listener: ((event: AgentSessionEvent) => void) | undefined;
720717
const mockSession = {
721718
sessionId: 'session-1',
@@ -724,7 +721,6 @@ describe('streaming tail completion', () => {
724721
return () => {};
725722
},
726723
prompt: async () => {
727-
listener?.({ type: 'turn_start' } as AgentSessionEvent);
728724
listener?.(makeTextDeltaEvent('different content'));
729725
listener?.(makeAgentEndEvent('assembled is completely different'));
730726
},
@@ -739,10 +735,11 @@ describe('streaming tail completion', () => {
739735

740736
const assistantChunks = chunks.filter(c => c.type === 'assistant');
741737
expect(assistantChunks).toHaveLength(1);
742-
expect(assistantChunks[0].content).toBe('different content');
738+
// Assembled text replaces streamed text
739+
expect(assistantChunks[0].content).toBe('assembled is completely different');
743740
});
744741

745-
test('resets per-turn text on turn_start so only final turn is checked', async () => {
742+
test('emits final assembled text from multi-turn session', async () => {
746743
let listener: ((event: AgentSessionEvent) => void) | undefined;
747744
const mockSession = {
748745
sessionId: 'session-1',
@@ -751,11 +748,9 @@ describe('streaming tail completion', () => {
751748
return () => {};
752749
},
753750
prompt: async () => {
754-
listener?.({ type: 'turn_start' } as AgentSessionEvent);
755751
listener?.(makeTextDeltaEvent('turn one text'));
756-
listener?.({ type: 'turn_start' } as AgentSessionEvent); // second turn resets counter
757752
listener?.(makeTextDeltaEvent('turn two'));
758-
listener?.(makeAgentEndEvent('turn two')); // last assistant msg matches turn 2
753+
listener?.(makeAgentEndEvent('turn two')); // last assistant msg
759754
},
760755
abort: async () => {},
761756
dispose: () => {},
@@ -766,13 +761,13 @@ describe('streaming tail completion', () => {
766761
chunks.push(chunk);
767762
}
768763

764+
// text_deltas suppressed; only assembled text emitted
769765
const assistantChunks = chunks.filter(c => c.type === 'assistant');
770-
expect(assistantChunks).toHaveLength(2);
771-
expect(assistantChunks[0].content).toBe('turn one text');
772-
expect(assistantChunks[1].content).toBe('turn two');
766+
expect(assistantChunks).toHaveLength(1);
767+
expect(assistantChunks[0].content).toBe('turn two');
773768
});
774769

775-
test('corrective chunk is added to assistantBuffer when wantsStructured', async () => {
770+
test('assembled text is used for structured output parsing', async () => {
776771
const streamed = '{"partial":';
777772
const full = '{"partial":true}';
778773

@@ -784,7 +779,6 @@ describe('streaming tail completion', () => {
784779
return () => {};
785780
},
786781
prompt: async () => {
787-
listener?.({ type: 'turn_start' } as AgentSessionEvent);
788782
listener?.(makeTextDeltaEvent(streamed));
789783
listener?.(makeAgentEndEvent(full));
790784
},

0 commit comments

Comments
 (0)