Skip to content

Commit b57a430

Browse files
fullsend-ai-coder[bot]claudegabemontero
authored
feat(boost): add streaming chat endpoint and normalized event pipeline (#3302) (#3555)
* feat(boost): add streaming chat endpoint and normalized event pipeline (#3302) Implement the SSE streaming endpoint, synchronous chat route, and supporting infrastructure that connects AI providers to the frontend via NormalizedStreamEvent. New files: - chat/routes.ts: POST /chat (sync) and POST /chat/stream (SSE) endpoints with permission gating (boost.chat.create + boost.admin fallback), input validation, and error-resilient streaming - chat/ConversationAgentCache.ts: session-scoped cacheService mapping conversation IDs to provider IDs (task 1.8, 24h TTL) - chat/RateLimiter.ts: per-window rate limiter backed by cacheService (task 1.9, configurable max/window) - chat/index.ts: barrel exports Modified files: - plugin.ts: wire chat routes, ConversationAgentCache, RateLimiter into plugin init; add /chat auth policy - index.ts: export new public API surface All caches use Backstage cacheService per Decision 3 (design.md). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> * fix: address review feedback on streaming chat pipeline - Track provider-sent done event to prevent duplicate done in SSE stream - Replace double-cast (as unknown as string) with JSON.stringify/parse in RateLimiter - Tighten SSE test assertion from toBeGreaterThanOrEqual(1) to toBe(1) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: gabemontero <gmontero@redhat.com> * feat(#3302): stub full NormalizedStreamEvent union and add missing test coverage - Add 8 stub event interfaces to NormalizedStreamEvent union per normalized-streaming spec: reasoning, rag_result, handoff, approval, form, auth, artifact, citation - Add test for provider-without-done dedup path in streaming route - Add clarifying comments for sync endpoint limitation and anonymous rate-limit bucket Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: gabemontero <gmontero@redhat.com> * fix(#3302): handle auto-deserializing cache backends in RateLimiter RateLimiter.consume() now handles both string and pre-parsed object returns from cacheService.get(). Some cache backends (e.g., Keyv with JSON serializer) auto-deserialize stored JSON into objects, which would bypass the typeof raw === 'string' check and reset the rate-limit window on every request. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com> Signed-off-by: gabemontero <gmontero@redhat.com> --------- Signed-off-by: gabemontero <gmontero@redhat.com> Co-authored-by: fullsend-code <278716306+fullsend-ai-coder[bot]@users.noreply.github.com> Co-authored-by: Claude Opus 4.6 <noreply@anthropic.com> Co-authored-by: gabemontero <gmontero@redhat.com>
1 parent 8ba3661 commit b57a430

13 files changed

Lines changed: 1554 additions & 1 deletion

File tree

workspaces/boost/plugins/boost-backend/report.api.md

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,16 @@ export type BoostConfigKey = keyof typeof boostConfigFields;
180180
const boostPlugin: BackendFeature;
181181
export default boostPlugin;
182182

183+
// @public
184+
export interface ChatRoutesOptions {
185+
conversationAgentCache: ConversationAgentCache;
186+
httpAuth: HttpAuthService;
187+
logger: LoggerService;
188+
permissions: PermissionsService;
189+
providerManager: ProviderManager;
190+
rateLimiter: RateLimiter;
191+
}
192+
183193
// @public
184194
export interface ConfigFieldMeta<T extends z.ZodTypeAny = z.ZodTypeAny> {
185195
configScope: ConfigScope;
@@ -191,12 +201,29 @@ export interface ConfigFieldMeta<T extends z.ZodTypeAny = z.ZodTypeAny> {
191201
// @public
192202
export type ConfigScope = 'yaml-only' | 'db-overridable' | 'db-only';
193203

204+
// @public
205+
export class ConversationAgentCache {
206+
constructor(options: ConversationAgentCacheOptions);
207+
delete(conversationId: string): Promise<void>;
208+
get(conversationId: string): Promise<string | undefined>;
209+
set(conversationId: string, providerId: string): Promise<void>;
210+
}
211+
212+
// @public
213+
export interface ConversationAgentCacheOptions {
214+
cache: CacheService;
215+
logger: LoggerService;
216+
}
217+
194218
// @public
195219
export function createAgentResourceLoader(): ResourceLoader;
196220

197221
// @public
198222
export function createAgentRoutes(options: AgentRoutesOptions): Router;
199223

224+
// @public
225+
export function createChatRoutes(options: ChatRoutesOptions): Router;
226+
200227
// @public
201228
export function createKagentiAdminRoutes(
202229
options: KagentiAdminRoutesOptions,
@@ -288,6 +315,24 @@ export class ProviderManager {
288315
switchProvider(providerId: string): void;
289316
}
290317

318+
// @public
319+
export class RateLimiter {
320+
constructor(options: RateLimiterOptions);
321+
consume(identity: string): Promise<{
322+
allowed: boolean;
323+
remaining: number;
324+
retryAfterMs?: number;
325+
}>;
326+
}
327+
328+
// @public
329+
export interface RateLimiterOptions {
330+
cache: CacheService;
331+
logger: LoggerService;
332+
maxRequests?: number;
333+
windowMs?: number;
334+
}
335+
291336
// @public
292337
export type ResourceLoader = (req: Request_2) => Promise<
293338
| {
Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
/*
2+
* Copyright Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import type {
18+
CacheService,
19+
LoggerService,
20+
} from '@backstage/backend-plugin-api';
21+
import { ConversationAgentCache } from './ConversationAgentCache';
22+
23+
function createMockLogger(): LoggerService {
24+
return {
25+
info: jest.fn(),
26+
warn: jest.fn(),
27+
error: jest.fn(),
28+
debug: jest.fn(),
29+
child: jest.fn().mockReturnThis(),
30+
};
31+
}
32+
33+
function createMockCache(): CacheService {
34+
const store = new Map<string, unknown>();
35+
const cache: CacheService = {
36+
get: jest.fn(async (key: string) => store.get(key)) as CacheService['get'],
37+
set: jest.fn(async (key: string, value: unknown) => {
38+
store.set(key, value);
39+
}),
40+
delete: jest.fn(async (key: string) => {
41+
store.delete(key);
42+
}),
43+
withOptions: jest.fn().mockReturnThis(),
44+
};
45+
return cache;
46+
}
47+
48+
describe('ConversationAgentCache', () => {
49+
let cache: CacheService;
50+
let conversationCache: ConversationAgentCache;
51+
52+
beforeEach(() => {
53+
cache = createMockCache();
54+
conversationCache = new ConversationAgentCache({
55+
cache,
56+
logger: createMockLogger(),
57+
});
58+
});
59+
60+
it('stores and retrieves a conversation-agent mapping', async () => {
61+
await conversationCache.set('conv-1', 'llamastack');
62+
const result = await conversationCache.get('conv-1');
63+
expect(result).toBe('llamastack');
64+
});
65+
66+
it('returns undefined for unknown conversation', async () => {
67+
const result = await conversationCache.get('unknown');
68+
expect(result).toBeUndefined();
69+
});
70+
71+
it('deletes a conversation-agent mapping', async () => {
72+
await conversationCache.set('conv-1', 'llamastack');
73+
await conversationCache.delete('conv-1');
74+
const result = await conversationCache.get('conv-1');
75+
expect(result).toBeUndefined();
76+
});
77+
78+
it('uses cacheService withOptions for namespace isolation', () => {
79+
expect(cache.withOptions).toHaveBeenCalledWith({
80+
defaultTtl: 24 * 60 * 60 * 1000,
81+
});
82+
});
83+
});
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
/*
2+
* Copyright Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import type {
18+
CacheService,
19+
LoggerService,
20+
} from '@backstage/backend-plugin-api';
21+
22+
/**
23+
* Options for creating a ConversationAgentCache.
24+
*
25+
* @public
26+
*/
27+
export interface ConversationAgentCacheOptions {
28+
/** The Backstage cache service. */
29+
cache: CacheService;
30+
/** The Backstage logger service. */
31+
logger: LoggerService;
32+
}
33+
34+
/**
35+
* Session-scoped cache that maps conversation IDs to the agent (provider)
36+
* ID that is handling the conversation. Uses Backstage cacheService with
37+
* namespace isolation per Decision 3 (design.md).
38+
*
39+
* @public
40+
*/
41+
export class ConversationAgentCache {
42+
private readonly cache: CacheService;
43+
private readonly logger: LoggerService;
44+
45+
/** Cache TTL for conversation-agent mappings: 24 hours. */
46+
private static readonly TTL_MS = 24 * 60 * 60 * 1000;
47+
48+
constructor(options: ConversationAgentCacheOptions) {
49+
this.cache = options.cache.withOptions({
50+
defaultTtl: ConversationAgentCache.TTL_MS,
51+
});
52+
this.logger = options.logger;
53+
}
54+
55+
/**
56+
* Associates a conversation with a provider (agent) ID.
57+
*
58+
* @param conversationId - The conversation identifier.
59+
* @param providerId - The provider identifier handling this conversation.
60+
*/
61+
async set(conversationId: string, providerId: string): Promise<void> {
62+
const key = `conversation-agent:${conversationId}`;
63+
await this.cache.set(key, providerId);
64+
this.logger.debug(
65+
`Mapped conversation ${conversationId} to provider ${providerId}`,
66+
);
67+
}
68+
69+
/**
70+
* Retrieves the provider ID for a conversation.
71+
*
72+
* @param conversationId - The conversation identifier.
73+
* @returns The provider ID or undefined if not cached.
74+
*/
75+
async get(conversationId: string): Promise<string | undefined> {
76+
const key = `conversation-agent:${conversationId}`;
77+
const value = await this.cache.get(key);
78+
return typeof value === 'string' ? value : undefined;
79+
}
80+
81+
/**
82+
* Removes the conversation-agent mapping.
83+
*
84+
* @param conversationId - The conversation identifier.
85+
*/
86+
async delete(conversationId: string): Promise<void> {
87+
const key = `conversation-agent:${conversationId}`;
88+
await this.cache.delete(key);
89+
this.logger.debug(
90+
`Removed conversation-agent mapping for ${conversationId}`,
91+
);
92+
}
93+
}
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
/*
2+
* Copyright Red Hat, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
import type {
18+
CacheService,
19+
LoggerService,
20+
} from '@backstage/backend-plugin-api';
21+
import { RateLimiter } from './RateLimiter';
22+
23+
function createMockLogger(): LoggerService {
24+
return {
25+
info: jest.fn(),
26+
warn: jest.fn(),
27+
error: jest.fn(),
28+
debug: jest.fn(),
29+
child: jest.fn().mockReturnThis(),
30+
};
31+
}
32+
33+
function createMockCache(): CacheService {
34+
const store = new Map<string, unknown>();
35+
const cache: CacheService = {
36+
get: jest.fn(async (key: string) => store.get(key)) as CacheService['get'],
37+
set: jest.fn(async (key: string, value: unknown) => {
38+
store.set(key, value);
39+
}),
40+
delete: jest.fn(async (key: string) => {
41+
store.delete(key);
42+
}),
43+
withOptions: jest.fn().mockReturnThis(),
44+
};
45+
return cache;
46+
}
47+
48+
describe('RateLimiter', () => {
49+
let cache: CacheService;
50+
let limiter: RateLimiter;
51+
52+
beforeEach(() => {
53+
cache = createMockCache();
54+
limiter = new RateLimiter({
55+
cache,
56+
logger: createMockLogger(),
57+
maxRequests: 3,
58+
windowMs: 60_000,
59+
});
60+
});
61+
62+
it('allows requests within the limit', async () => {
63+
const r1 = await limiter.consume('user-1');
64+
expect(r1.allowed).toBe(true);
65+
expect(r1.remaining).toBe(2);
66+
67+
const r2 = await limiter.consume('user-1');
68+
expect(r2.allowed).toBe(true);
69+
expect(r2.remaining).toBe(1);
70+
});
71+
72+
it('denies requests exceeding the limit', async () => {
73+
await limiter.consume('user-1');
74+
await limiter.consume('user-1');
75+
await limiter.consume('user-1');
76+
77+
const r4 = await limiter.consume('user-1');
78+
expect(r4.allowed).toBe(false);
79+
expect(r4.remaining).toBe(0);
80+
expect(r4.retryAfterMs).toBeGreaterThan(0);
81+
});
82+
83+
it('tracks separate limits per identity', async () => {
84+
await limiter.consume('user-1');
85+
await limiter.consume('user-1');
86+
await limiter.consume('user-1');
87+
88+
// user-2 should still be allowed
89+
const r = await limiter.consume('user-2');
90+
expect(r.allowed).toBe(true);
91+
expect(r.remaining).toBe(2);
92+
});
93+
94+
it('handles cache backends that auto-deserialize JSON into objects', async () => {
95+
const objectCache = createMockCache();
96+
const origSet = objectCache.set as jest.Mock;
97+
const origGet = objectCache.get as jest.Mock;
98+
99+
// Intercept set to store parsed objects instead of raw strings
100+
origSet.mockImplementation(async (key: string, value: unknown) => {
101+
const parsed = typeof value === 'string' ? JSON.parse(value) : value;
102+
(origGet as jest.Mock).mockImplementation(async (k: string) =>
103+
k === key ? parsed : undefined,
104+
);
105+
});
106+
107+
const objLimiter = new RateLimiter({
108+
cache: objectCache,
109+
logger: createMockLogger(),
110+
maxRequests: 3,
111+
windowMs: 60_000,
112+
});
113+
114+
const r1 = await objLimiter.consume('user-obj');
115+
expect(r1.allowed).toBe(true);
116+
expect(r1.remaining).toBe(2);
117+
118+
const r2 = await objLimiter.consume('user-obj');
119+
expect(r2.allowed).toBe(true);
120+
expect(r2.remaining).toBe(1);
121+
});
122+
123+
it('uses cacheService withOptions for namespace isolation', () => {
124+
expect(cache.withOptions).toHaveBeenCalledWith({
125+
defaultTtl: 60_000,
126+
});
127+
});
128+
});

0 commit comments

Comments
 (0)