Skip to content

Commit 0588cc9

Browse files
HexaFieldclaude
andcommitted
feat(api): thread AbortSignal through querySparql wrappers
Adds opt-in cancellation to every public API method that wraps a querySparql call. Callers pass `{ signal }` from an AbortController and the executor receives a `request.cancel` over the WebSocket, short- circuiting the JSON reply. Aborted calls reject with `DOMException ('Aborted', 'AbortError')` — wrappers re-throw this so callers can distinguish cancellation from real failures (non-AbortError exceptions are still logged + swallowed for graceful degradation). Wired through: - Channel: allItems, unprocessedItems, totalItemCount, recentConversations, pinnedConversations - Conversation: stats, topics, subgroups, subgroupsData - ConversationSubgroup: stats, topics, itemsData, topicsWithRelevance - SemanticRelationship: itemEmbedding, allConversationEmbeddings, allSubgroupEmbeddings, allItemEmbeddings, allItemEmbeddingsByType - Topic: linkedConversations, linkedSubgroups Structural `AbortOptions { signal?: AbortSignal }` type lives in `packages/api/src/shared/abort.ts` and is re-exported from the package index. Structurally compatible with ad4m's `CallOptions` — no version bump required to consume the underlying executor cancellation support shipped in coasys/ad4m#855. Tests - Forwarding: Channel.allItems passes the signal to perspective.querySparql. - Re-throw: Channel.allItems propagates DOMException AbortError instead of swallowing it. Backwards compatible: all new parameters are optional, existing call sites are unchanged. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent bc3bae2 commit 0588cc9

8 files changed

Lines changed: 125 additions & 43 deletions

File tree

packages/api/src/channel/channel.test.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,33 @@ describe('Channel.allItems()', () => {
239239
expect(typed.timestamp).toBe('2026-04-20T10:02:00.000Z');
240240
expect(voice.timestamp < typed.timestamp).toBe(true);
241241
});
242+
243+
// ── AbortSignal handling ───────────────────────────────────────────
244+
//
245+
// Two contracts the wrapper must honour:
246+
// 1. Forward the `signal` to perspective.querySparql so the executor
247+
// receives `request.cancel`.
248+
// 2. Re-throw AbortError instead of swallowing it (other errors are
249+
// swallowed for graceful degradation). Without this, callers
250+
// can't distinguish cancellation from real failures.
251+
252+
it('forwards the AbortSignal to perspective.querySparql', async () => {
253+
const perspective = createMockPerspective(async () => []);
254+
const channel = new Channel(perspective as any, 'channel-1');
255+
const controller = new AbortController();
256+
await channel.allItems({ signal: controller.signal });
257+
expect(perspective.querySparql).toHaveBeenCalledWith(
258+
expect.any(String),
259+
{ signal: controller.signal },
260+
);
261+
});
262+
263+
it('re-throws AbortError instead of swallowing it', async () => {
264+
const perspective = createMockPerspective();
265+
perspective.querySparql.mockRejectedValueOnce(new DOMException('Aborted', 'AbortError'));
266+
const channel = new Channel(perspective as any, 'channel-1');
267+
await expect(channel.allItems()).rejects.toMatchObject({ name: 'AbortError' });
268+
});
242269
});
243270

244271
// ---------------------------------------------------------------------------

packages/api/src/channel/index.ts

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Ad4mModel, HasMany, HasManyMethods, Flag, Literal, LinkQuery, Model, Property, PerspectiveProxy, parseLit, parseSparqlCount, CountBinding } from '@coasys/ad4m';
2+
import type { AbortOptions } from '../shared/abort';
23
import { community } from '@coasys/flux-constants';
34
import { EntryType } from '@coasys/flux-types';
45
import { SynergyGroup, SynergyItem, ItemType, icons } from '@coasys/flux-utils';
@@ -95,7 +96,7 @@ export class Channel extends Ad4mModel {
9596
@HasMany(() => Post)
9697
posts: Post[] = [];
9798

98-
async allItems(): Promise<SynergyItem[]> {
99+
async allItems(options?: AbortOptions): Promise<SynergyItem[]> {
99100
// Get all items (messages, posts, tasks) in the channel
100101
try {
101102
const sparqlQuery = `
@@ -115,7 +116,7 @@ export class Channel extends Ad4mModel {
115116
ORDER BY ?timestamp
116117
`;
117118

118-
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(sparqlQuery);
119+
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(sparqlQuery, options);
119120

120121
const mapped = (sparqlResult || []).map((binding) => {
121122
let text = '';
@@ -145,12 +146,14 @@ export class Channel extends Ad4mModel {
145146
// Re-sort by effective timestamp since transcriptStart may differ from link timestamp
146147
return mapped.sort((a, b) => a.timestamp.localeCompare(b.timestamp));
147148
} catch (error) {
149+
// Re-throw AbortError so callers can distinguish cancellation from real failures
150+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
148151
console.error('Error getting all channel items:', error);
149152
return [];
150153
}
151154
}
152155

153-
async unprocessedItems(): Promise<SynergyItem[]> {
156+
async unprocessedItems(options?: AbortOptions): Promise<SynergyItem[]> {
154157
// Get all unprocessed items in the channel using set-difference approach
155158
// instead of FILTER NOT EXISTS (which is O(N²) in Oxigraph)
156159
try {
@@ -182,8 +185,8 @@ export class Channel extends Ad4mModel {
182185
// an item to appear in allItems but not processedSet (or vice-versa).
183186
// The final VALUES query re-verifies channel membership to mitigate this.
184187
const [allItemsResult, processedResult] = await Promise.all([
185-
this.perspective.querySparql<IdBinding[]>(allItemsQuery),
186-
this.perspective.querySparql<IdBinding[]>(processedQuery),
188+
this.perspective.querySparql<IdBinding[]>(allItemsQuery, options),
189+
this.perspective.querySparql<IdBinding[]>(processedQuery, options),
187190
]);
188191

189192
const processedSet = new Set((processedResult || []).map((r) => r.id));
@@ -213,7 +216,7 @@ export class Channel extends Ad4mModel {
213216
ORDER BY ?timestamp
214217
`;
215218

216-
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(dataQuery);
219+
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(dataQuery, options);
217220

218221
// Deduplicate by id
219222
const itemMap = new Map<string, ChannelItemBinding>();
@@ -251,12 +254,13 @@ export class Channel extends Ad4mModel {
251254
// Re-sort by effective timestamp since transcriptStart may differ from link timestamp
252255
return mapped.sort((a, b) => a.timestamp.localeCompare(b.timestamp));
253256
} catch (error) {
257+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
254258
console.error('Error getting channel items:', error);
255259
return [];
256260
}
257261
}
258262

259-
async totalItemCount(): Promise<number> {
263+
async totalItemCount(options?: AbortOptions): Promise<number> {
260264
// Find the total number of items in the channel
261265
try {
262266
// SPARQL migration
@@ -268,9 +272,10 @@ export class Channel extends Ad4mModel {
268272
}
269273
`;
270274

271-
const sparqlResult = await this.perspective.querySparql<CountBinding[]>(sparqlQuery);
275+
const sparqlResult = await this.perspective.querySparql<CountBinding[]>(sparqlQuery, options);
272276
return parseSparqlCount(sparqlResult);
273277
} catch (error) {
278+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
274279
console.error('Error getting total item count:', error);
275280
return 0;
276281
}
@@ -289,6 +294,7 @@ export class Channel extends Ad4mModel {
289294
static async recentConversations(
290295
perspective: PerspectiveProxy,
291296
limit: number = 20,
297+
options?: AbortOptions,
292298
): Promise<{ channelId: string; conversationId?: string; lastActivity?: string }[]> {
293299
// Step 1: Find conversation channels + their conversation child (fast, no reifier joins)
294300
const sparql = `
@@ -303,7 +309,7 @@ export class Channel extends Ad4mModel {
303309
`;
304310

305311
try {
306-
const results = await perspective.querySparql<RecentConversationBinding[]>(sparql);
312+
const results = await perspective.querySparql<RecentConversationBinding[]>(sparql, options);
307313

308314
// Filter to only conversation channels and dedup
309315
const channelMap = new Map<string, { channelId: string; conversationId?: string; lastActivity?: string }>();
@@ -326,6 +332,7 @@ export class Channel extends Ad4mModel {
326332
Array.from(channelMap.entries()).map(async ([channelId, entry]) => {
327333
const links = await perspective.get(
328334
new LinkQuery({ source: channelId, predicate: 'ad4m://has_child' }),
335+
options,
329336
);
330337
// Find the most recent link timestamp
331338
let latest = '';
@@ -342,6 +349,7 @@ export class Channel extends Ad4mModel {
342349
.slice(0, limit);
343350
return sorted;
344351
} catch (error) {
352+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
345353
console.error('Error in Channel.recentConversations():', error);
346354
return [];
347355
}
@@ -353,6 +361,7 @@ export class Channel extends Ad4mModel {
353361
*/
354362
static async pinnedConversations(
355363
perspective: PerspectiveProxy,
364+
options?: AbortOptions,
356365
): Promise<{ channelId: string; conversationId?: string }[]> {
357366
const sparql = `
358367
SELECT ?channelId ?conversationId WHERE {
@@ -367,7 +376,7 @@ export class Channel extends Ad4mModel {
367376
`;
368377

369378
try {
370-
const results = await perspective.querySparql<PinnedConversationBinding[]>(sparql);
379+
const results = await perspective.querySparql<PinnedConversationBinding[]>(sparql, options);
371380
// Deduplicate by channelId
372381
const seen = new Map<string, { channelId: string; conversationId?: string }>();
373382
for (const r of results || []) {
@@ -380,6 +389,7 @@ export class Channel extends Ad4mModel {
380389
}
381390
return Array.from(seen.values());
382391
} catch (error) {
392+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
383393
console.error('Error in Channel.pinnedConversations():', error);
384394
return [];
385395
}

packages/api/src/conversation-subgroup/index.ts

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { Model, Ad4mModel, Flag, HasMany, Property, Literal, parseLit } from '@coasys/ad4m';
2+
import type { AbortOptions } from '../shared/abort';
23
import Topic, { TopicWithRelevance } from '../topic';
34
import SemanticRelationship from '../semantic-relationship';
45
import { SynergyTopic, SynergyItem, ItemType, icons } from '@coasys/flux-utils';
@@ -46,7 +47,7 @@ export default class ConversationSubgroup extends Ad4mModel {
4647
@HasMany({ through: FLUX_PARTICIPANT })
4748
participants: string[] = [];
4849

49-
async stats(): Promise<{ totalItems: number; participants: string[] }> {
50+
async stats(options?: AbortOptions): Promise<{ totalItems: number; participants: string[] }> {
5051
// find the total item count and the dids of participants in the subgroup
5152
try {
5253
// SPARQL migration
@@ -66,20 +67,21 @@ export default class ConversationSubgroup extends Ad4mModel {
6667
`;
6768

6869
const [itemsResult, participantsResult] = await Promise.all([
69-
this.perspective.querySparql<ItemIdBinding[]>(itemsQuery),
70-
this.perspective.querySparql<DidBinding[]>(participantsQuery),
70+
this.perspective.querySparql<ItemIdBinding[]>(itemsQuery, options),
71+
this.perspective.querySparql<DidBinding[]>(participantsQuery, options),
7172
]);
7273

7374
const totalItems = itemsResult?.length || 0;
7475
const participants = (participantsResult || []).map((r) => r.did).filter(Boolean);
7576
return { totalItems, participants };
7677
} catch (error) {
78+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
7779
console.error('Error getting subgroup stats:', error);
7880
return { totalItems: 0, participants: [] };
7981
}
8082
}
8183

82-
async topics(): Promise<SynergyTopic[]> {
84+
async topics(options?: AbortOptions): Promise<SynergyTopic[]> {
8385
// find the subgroups topics
8486
try {
8587
// SPARQL migration
@@ -93,7 +95,7 @@ export default class ConversationSubgroup extends Ad4mModel {
9395
}
9496
`;
9597

96-
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery);
98+
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery, options);
9799

98100
// Deduplicate by topicBase
99101
const uniqueTopics = new Map<string, { topicBase: string; topicName: string }>();
@@ -114,12 +116,13 @@ export default class ConversationSubgroup extends Ad4mModel {
114116
}),
115117
);
116118
} catch (error) {
119+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
117120
console.error('Error getting subgroup topics:', error);
118121
return [];
119122
}
120123
}
121124

122-
async itemsData(): Promise<SynergyItem[]> {
125+
async itemsData(options?: AbortOptions): Promise<SynergyItem[]> {
123126
// find the necissary data to render the subgroups items in timeline components
124127
try {
125128
// SPARQL migration
@@ -145,7 +148,7 @@ export default class ConversationSubgroup extends Ad4mModel {
145148
ORDER BY ?timestamp
146149
`;
147150

148-
const sparqlResult = await this.perspective.querySparql<SubgroupItemBinding[]>(sparqlQuery);
151+
const sparqlResult = await this.perspective.querySparql<SubgroupItemBinding[]>(sparqlQuery, options);
149152

150153
// Collect items — keep duplicate IDs so the view can detect and clean them up
151154
const items: SubgroupItem[] = [];
@@ -221,12 +224,13 @@ export default class ConversationSubgroup extends Ad4mModel {
221224
};
222225
});
223226
} catch (error) {
227+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
224228
console.error('Error getting subgroup items:', error);
225229
return [];
226230
}
227231
}
228232

229-
async topicsWithRelevance(): Promise<TopicWithRelevance[]> {
233+
async topicsWithRelevance(options?: AbortOptions): Promise<TopicWithRelevance[]> {
230234
try {
231235
// SPARQL migration
232236
const sparqlQuery = `
@@ -240,7 +244,7 @@ export default class ConversationSubgroup extends Ad4mModel {
240244
}
241245
`;
242246

243-
const sparqlResult = await this.perspective.querySparql<TopicRelevanceBinding[]>(sparqlQuery);
247+
const sparqlResult = await this.perspective.querySparql<TopicRelevanceBinding[]>(sparqlQuery, options);
244248

245249
// Deduplicate by topicBase
246250
const uniqueTopics = new Map<string, { topicBase: string; topicName: string; relevance: string }>();
@@ -261,6 +265,7 @@ export default class ConversationSubgroup extends Ad4mModel {
261265
relevance: parseInt(relevance, 10) || 0,
262266
}));
263267
} catch (error) {
268+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
264269
console.error('Error getting subgroup topics with relevance:', error);
265270
return [];
266271
}

packages/api/src/conversation/index.ts

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { Ad4mModel, Ad4mClient, Flag, HasMany, HasManyMethods, Link, Literal, Model, Property, parseLit } from '@coasys/ad4m';
22

33
import { getProfile, Topic } from '@coasys/flux-api';
4+
import type { AbortOptions } from '../shared/abort';
45
import { ProcessingState, Profile } from '@coasys/flux-types';
56
import { SynergyGroup, SynergyItem, SynergyTopic } from '@coasys/flux-utils';
67
import ConversationSubgroup from '../conversation-subgroup';
@@ -53,7 +54,7 @@ export class Conversation extends Ad4mModel {
5354
@HasMany(() => ConversationSubgroup)
5455
subgroupEntities: ConversationSubgroup[] = [];
5556

56-
async stats(): Promise<{ totalSubgroups: number; participants: string[] }> {
57+
async stats(options?: AbortOptions): Promise<{ totalSubgroups: number; participants: string[] }> {
5758
// find the total subgroup count and the dids of participants in the conversation
5859
try {
5960
// SPARQL migration
@@ -71,20 +72,21 @@ export class Conversation extends Ad4mModel {
7172
`;
7273

7374
const [subgroupsResult, participantsResult] = await Promise.all([
74-
this.perspective.querySparql<SgBinding[]>(subgroupsQuery),
75-
this.perspective.querySparql<DidBinding[]>(participantsQuery),
75+
this.perspective.querySparql<SgBinding[]>(subgroupsQuery, options),
76+
this.perspective.querySparql<DidBinding[]>(participantsQuery, options),
7677
]);
7778

7879
const totalSubgroups = subgroupsResult?.length || 0;
7980
const participants = (participantsResult || []).map((r) => r.did).filter(Boolean);
8081
return { totalSubgroups, participants };
8182
} catch (error) {
83+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
8284
console.error('Error getting conversation stats:', error);
8385
return { totalSubgroups: 0, participants: [] };
8486
}
8587
}
8688

87-
async topics(): Promise<SynergyTopic[]> {
89+
async topics(options?: AbortOptions): Promise<SynergyTopic[]> {
8890
// find the conversations topics (via its subgroups)
8991
try {
9092
// SPARQL migration
@@ -103,7 +105,7 @@ export class Conversation extends Ad4mModel {
103105
}
104106
`;
105107

106-
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery);
108+
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery, options);
107109

108110
// Deduplicate by topicBase
109111
const uniqueTopics = new Map<string, { topicBase: string; topicName: string }>();
@@ -124,20 +126,21 @@ export class Conversation extends Ad4mModel {
124126
}),
125127
);
126128
} catch (error) {
129+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
127130
console.error('Error getting conversation topics:', error);
128131
return [];
129132
}
130133
}
131134

132-
async subgroups(): Promise<ConversationSubgroup[]> {
135+
async subgroups(options?: AbortOptions): Promise<ConversationSubgroup[]> {
133136
// find the conversations subgroup entities — use parent-scoped query
134137
// instead of this.get() which fetches all Conversation triples
135138
return ConversationSubgroup.findAll(this.perspective, {
136139
parent: { model: Conversation, id: this.id },
137-
}) as unknown as Promise<ConversationSubgroup[]>;
140+
}, options) as unknown as Promise<ConversationSubgroup[]>;
138141
}
139142

140-
async subgroupsData(): Promise<SynergyGroup[]> {
143+
async subgroupsData(options?: AbortOptions): Promise<SynergyGroup[]> {
141144
// find the necissary data to render the conversations subgroups in timeline components (include timestamps for the first and last item in each subgroup)
142145
try {
143146
// SPARQL migration
@@ -154,7 +157,7 @@ export class Conversation extends Ad4mModel {
154157
ORDER BY ?timestamp
155158
`;
156159

157-
const sparqlResult = await this.perspective.querySparql<SubgroupRowBinding[]>(sparqlQuery);
160+
const sparqlResult = await this.perspective.querySparql<SubgroupRowBinding[]>(sparqlQuery, options);
158161

159162
// Deduplicate by id
160163
const subgroupMap = new Map<string, SubgroupRow>();
@@ -189,7 +192,7 @@ export class Conversation extends Ad4mModel {
189192
}
190193
`;
191194

192-
const batchResults = await this.perspective.querySparql<SubgroupTimestampBinding[]>(batchTimestampQuery);
195+
const batchResults = await this.perspective.querySparql<SubgroupTimestampBinding[]>(batchTimestampQuery, options);
193196

194197
// Group timestamps by subgroup ID
195198
const timestampsBySg = new Map<string, number[]>();
@@ -222,6 +225,7 @@ export class Conversation extends Ad4mModel {
222225
// Sort by actual content start time, not link creation time
223226
return subgroups.sort((a, b) => Number(a.start) - Number(b.start));
224227
} catch (error) {
228+
if (error instanceof DOMException && error.name === 'AbortError') throw error;
225229
console.error('Error getting conversation subgroups:', error);
226230
return [];
227231
}

packages/api/src/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import TaskColumn from './task-column';
2828
import TaskBoard from './task-board';
2929
import updateProfile from './updateProfile';
3030
export * from './npmApi';
31+
export type { AbortOptions } from './shared/abort';
3132

3233
export {
3334
App,

0 commit comments

Comments
 (0)