Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions packages/api/src/channel/channel.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,33 @@ describe('Channel.allItems()', () => {
expect(typed.timestamp).toBe('2026-04-20T10:02:00.000Z');
expect(voice.timestamp < typed.timestamp).toBe(true);
});

// ── AbortSignal handling ───────────────────────────────────────────
//
// Two contracts the wrapper must honour:
// 1. Forward the `signal` to perspective.querySparql so the executor
// receives `request.cancel`.
// 2. Re-throw AbortError instead of swallowing it (other errors are
// swallowed for graceful degradation). Without this, callers
// can't distinguish cancellation from real failures.

it('forwards the AbortSignal to perspective.querySparql', async () => {
const perspective = createMockPerspective(async () => []);
const channel = new Channel(perspective as any, 'channel-1');
const controller = new AbortController();
await channel.allItems({ signal: controller.signal });
expect(perspective.querySparql).toHaveBeenCalledWith(
expect.any(String),
{ signal: controller.signal },
);
});

it('re-throws AbortError instead of swallowing it', async () => {
const perspective = createMockPerspective();
perspective.querySparql.mockRejectedValueOnce(new DOMException('Aborted', 'AbortError'));
const channel = new Channel(perspective as any, 'channel-1');
await expect(channel.allItems()).rejects.toMatchObject({ name: 'AbortError' });
});
});

// ---------------------------------------------------------------------------
Expand Down
30 changes: 20 additions & 10 deletions packages/api/src/channel/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Ad4mModel, HasMany, HasManyMethods, Flag, Literal, LinkQuery, Model, Property, PerspectiveProxy, parseLit, parseSparqlCount, CountBinding } from '@coasys/ad4m';
import type { AbortOptions } from '../shared/abort';
import { community } from '@coasys/flux-constants';
import { EntryType } from '@coasys/flux-types';
import { SynergyGroup, SynergyItem, ItemType, icons } from '@coasys/flux-utils';
Expand Down Expand Up @@ -95,7 +96,7 @@ export class Channel extends Ad4mModel {
@HasMany(() => Post)
posts: Post[] = [];

async allItems(): Promise<SynergyItem[]> {
async allItems(options?: AbortOptions): Promise<SynergyItem[]> {
// Get all items (messages, posts, tasks) in the channel
try {
const sparqlQuery = `
Expand All @@ -115,7 +116,7 @@ export class Channel extends Ad4mModel {
ORDER BY ?timestamp
`;

const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(sparqlQuery, options);

const mapped = (sparqlResult || []).map((binding) => {
let text = '';
Expand Down Expand Up @@ -145,12 +146,14 @@ export class Channel extends Ad4mModel {
// Re-sort by effective timestamp since transcriptStart may differ from link timestamp
return mapped.sort((a, b) => a.timestamp.localeCompare(b.timestamp));
} catch (error) {
// Re-throw AbortError so callers can distinguish cancellation from real failures
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting all channel items:', error);
return [];
}
}

async unprocessedItems(): Promise<SynergyItem[]> {
async unprocessedItems(options?: AbortOptions): Promise<SynergyItem[]> {
// Get all unprocessed items in the channel using set-difference approach
// instead of FILTER NOT EXISTS (which is O(N²) in Oxigraph)
try {
Expand Down Expand Up @@ -182,8 +185,8 @@ export class Channel extends Ad4mModel {
// an item to appear in allItems but not processedSet (or vice-versa).
// The final VALUES query re-verifies channel membership to mitigate this.
const [allItemsResult, processedResult] = await Promise.all([
this.perspective.querySparql<IdBinding[]>(allItemsQuery),
this.perspective.querySparql<IdBinding[]>(processedQuery),
this.perspective.querySparql<IdBinding[]>(allItemsQuery, options),
this.perspective.querySparql<IdBinding[]>(processedQuery, options),
]);

const processedSet = new Set((processedResult || []).map((r) => r.id));
Expand Down Expand Up @@ -213,7 +216,7 @@ export class Channel extends Ad4mModel {
ORDER BY ?timestamp
`;

const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(dataQuery);
const sparqlResult = await this.perspective.querySparql<ChannelItemBinding[]>(dataQuery, options);

// Deduplicate by id
const itemMap = new Map<string, ChannelItemBinding>();
Expand Down Expand Up @@ -251,12 +254,13 @@ export class Channel extends Ad4mModel {
// Re-sort by effective timestamp since transcriptStart may differ from link timestamp
return mapped.sort((a, b) => a.timestamp.localeCompare(b.timestamp));
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting channel items:', error);
return [];
}
}

async totalItemCount(): Promise<number> {
async totalItemCount(options?: AbortOptions): Promise<number> {
// Find the total number of items in the channel
try {
// SPARQL migration
Expand All @@ -268,9 +272,10 @@ export class Channel extends Ad4mModel {
}
`;

const sparqlResult = await this.perspective.querySparql<CountBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<CountBinding[]>(sparqlQuery, options);
return parseSparqlCount(sparqlResult);
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting total item count:', error);
return 0;
}
Expand All @@ -289,6 +294,7 @@ export class Channel extends Ad4mModel {
static async recentConversations(
perspective: PerspectiveProxy,
limit: number = 20,
options?: AbortOptions,
): Promise<{ channelId: string; conversationId?: string; lastActivity?: string }[]> {
// Step 1: Find conversation channels + their conversation child (fast, no reifier joins)
const sparql = `
Expand All @@ -303,7 +309,7 @@ export class Channel extends Ad4mModel {
`;

try {
const results = await perspective.querySparql<RecentConversationBinding[]>(sparql);
const results = await perspective.querySparql<RecentConversationBinding[]>(sparql, options);

// Filter to only conversation channels and dedup
const channelMap = new Map<string, { channelId: string; conversationId?: string; lastActivity?: string }>();
Expand All @@ -326,6 +332,7 @@ export class Channel extends Ad4mModel {
Array.from(channelMap.entries()).map(async ([channelId, entry]) => {
const links = await perspective.get(
new LinkQuery({ source: channelId, predicate: 'ad4m://has_child' }),
options,
);
// Find the most recent link timestamp
let latest = '';
Expand All @@ -342,6 +349,7 @@ export class Channel extends Ad4mModel {
.slice(0, limit);
return sorted;
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error in Channel.recentConversations():', error);
return [];
}
Expand All @@ -353,6 +361,7 @@ export class Channel extends Ad4mModel {
*/
static async pinnedConversations(
perspective: PerspectiveProxy,
options?: AbortOptions,
): Promise<{ channelId: string; conversationId?: string }[]> {
const sparql = `
SELECT ?channelId ?conversationId WHERE {
Expand All @@ -367,7 +376,7 @@ export class Channel extends Ad4mModel {
`;

try {
const results = await perspective.querySparql<PinnedConversationBinding[]>(sparql);
const results = await perspective.querySparql<PinnedConversationBinding[]>(sparql, options);
// Deduplicate by channelId
const seen = new Map<string, { channelId: string; conversationId?: string }>();
for (const r of results || []) {
Expand All @@ -380,6 +389,7 @@ export class Channel extends Ad4mModel {
}
return Array.from(seen.values());
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error in Channel.pinnedConversations():', error);
return [];
}
Expand Down
23 changes: 14 additions & 9 deletions packages/api/src/conversation-subgroup/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { Model, Ad4mModel, Flag, HasMany, Property, Literal, parseLit } from '@coasys/ad4m';
import type { AbortOptions } from '../shared/abort';
import Topic, { TopicWithRelevance } from '../topic';
import SemanticRelationship from '../semantic-relationship';
import { SynergyTopic, SynergyItem, ItemType, icons } from '@coasys/flux-utils';
Expand Down Expand Up @@ -46,7 +47,7 @@ export default class ConversationSubgroup extends Ad4mModel {
@HasMany({ through: FLUX_PARTICIPANT })
participants: string[] = [];

async stats(): Promise<{ totalItems: number; participants: string[] }> {
async stats(options?: AbortOptions): Promise<{ totalItems: number; participants: string[] }> {
// find the total item count and the dids of participants in the subgroup
try {
// SPARQL migration
Expand All @@ -66,20 +67,21 @@ export default class ConversationSubgroup extends Ad4mModel {
`;

const [itemsResult, participantsResult] = await Promise.all([
this.perspective.querySparql<ItemIdBinding[]>(itemsQuery),
this.perspective.querySparql<DidBinding[]>(participantsQuery),
this.perspective.querySparql<ItemIdBinding[]>(itemsQuery, options),
this.perspective.querySparql<DidBinding[]>(participantsQuery, options),
]);

const totalItems = itemsResult?.length || 0;
const participants = (participantsResult || []).map((r) => r.did).filter(Boolean);
return { totalItems, participants };
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting subgroup stats:', error);
return { totalItems: 0, participants: [] };
}
}

async topics(): Promise<SynergyTopic[]> {
async topics(options?: AbortOptions): Promise<SynergyTopic[]> {
// find the subgroups topics
try {
// SPARQL migration
Expand All @@ -93,7 +95,7 @@ export default class ConversationSubgroup extends Ad4mModel {
}
`;

const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery, options);

// Deduplicate by topicBase
const uniqueTopics = new Map<string, { topicBase: string; topicName: string }>();
Expand All @@ -114,12 +116,13 @@ export default class ConversationSubgroup extends Ad4mModel {
}),
);
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting subgroup topics:', error);
return [];
}
}

async itemsData(): Promise<SynergyItem[]> {
async itemsData(options?: AbortOptions): Promise<SynergyItem[]> {
// find the necissary data to render the subgroups items in timeline components
try {
// SPARQL migration
Expand All @@ -145,7 +148,7 @@ export default class ConversationSubgroup extends Ad4mModel {
ORDER BY ?timestamp
`;

const sparqlResult = await this.perspective.querySparql<SubgroupItemBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<SubgroupItemBinding[]>(sparqlQuery, options);

// Collect items — keep duplicate IDs so the view can detect and clean them up
const items: SubgroupItem[] = [];
Expand Down Expand Up @@ -221,12 +224,13 @@ export default class ConversationSubgroup extends Ad4mModel {
};
});
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting subgroup items:', error);
return [];
}
}

async topicsWithRelevance(): Promise<TopicWithRelevance[]> {
async topicsWithRelevance(options?: AbortOptions): Promise<TopicWithRelevance[]> {
try {
// SPARQL migration
const sparqlQuery = `
Expand All @@ -240,7 +244,7 @@ export default class ConversationSubgroup extends Ad4mModel {
}
`;

const sparqlResult = await this.perspective.querySparql<TopicRelevanceBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<TopicRelevanceBinding[]>(sparqlQuery, options);

// Deduplicate by topicBase
const uniqueTopics = new Map<string, { topicBase: string; topicName: string; relevance: string }>();
Expand All @@ -261,6 +265,7 @@ export default class ConversationSubgroup extends Ad4mModel {
relevance: parseInt(relevance, 10) || 0,
}));
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting subgroup topics with relevance:', error);
return [];
}
Expand Down
24 changes: 14 additions & 10 deletions packages/api/src/conversation/index.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Ad4mModel, Ad4mClient, Flag, HasMany, HasManyMethods, Link, Literal, Model, Property, parseLit } from '@coasys/ad4m';

import { getProfile, Topic } from '@coasys/flux-api';
import type { AbortOptions } from '../shared/abort';
import { ProcessingState, Profile } from '@coasys/flux-types';
import { SynergyGroup, SynergyItem, SynergyTopic } from '@coasys/flux-utils';
import ConversationSubgroup from '../conversation-subgroup';
Expand Down Expand Up @@ -53,7 +54,7 @@ export class Conversation extends Ad4mModel {
@HasMany(() => ConversationSubgroup)
subgroupEntities: ConversationSubgroup[] = [];

async stats(): Promise<{ totalSubgroups: number; participants: string[] }> {
async stats(options?: AbortOptions): Promise<{ totalSubgroups: number; participants: string[] }> {
// find the total subgroup count and the dids of participants in the conversation
try {
// SPARQL migration
Expand All @@ -71,20 +72,21 @@ export class Conversation extends Ad4mModel {
`;

const [subgroupsResult, participantsResult] = await Promise.all([
this.perspective.querySparql<SgBinding[]>(subgroupsQuery),
this.perspective.querySparql<DidBinding[]>(participantsQuery),
this.perspective.querySparql<SgBinding[]>(subgroupsQuery, options),
this.perspective.querySparql<DidBinding[]>(participantsQuery, options),
]);

const totalSubgroups = subgroupsResult?.length || 0;
const participants = (participantsResult || []).map((r) => r.did).filter(Boolean);
return { totalSubgroups, participants };
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting conversation stats:', error);
return { totalSubgroups: 0, participants: [] };
}
}

async topics(): Promise<SynergyTopic[]> {
async topics(options?: AbortOptions): Promise<SynergyTopic[]> {
// find the conversations topics (via its subgroups)
try {
// SPARQL migration
Expand All @@ -103,7 +105,7 @@ export class Conversation extends Ad4mModel {
}
`;

const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<TopicBinding[]>(sparqlQuery, options);

// Deduplicate by topicBase
const uniqueTopics = new Map<string, { topicBase: string; topicName: string }>();
Expand All @@ -124,20 +126,21 @@ export class Conversation extends Ad4mModel {
}),
);
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting conversation topics:', error);
return [];
}
}

async subgroups(): Promise<ConversationSubgroup[]> {
async subgroups(options?: AbortOptions): Promise<ConversationSubgroup[]> {
// find the conversations subgroup entities — use parent-scoped query
// instead of this.get() which fetches all Conversation triples
return ConversationSubgroup.findAll(this.perspective, {
parent: { model: Conversation, id: this.id },
}) as unknown as Promise<ConversationSubgroup[]>;
}, options) as unknown as Promise<ConversationSubgroup[]>;
}
Comment on lines +135 to 141

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
# Search for the findAll method definition in Ad4mModel or its implementation
# to confirm the signature accepts options as third parameter

# Check local Ad4m model implementations
rg -n -A 10 'static\s+(async\s+)?findAll' --type=ts

# Check if there's any documentation or type definition for findAll
rg -n -B 2 -A 5 'findAll.*perspective.*where' --type=ts

Repository: coasys/flux

Length of output: 3622


Fix ConversationSubgroup.findAll options placement (AbortOptions as 3rd arg likely ignored)

  • In packages/api/src/conversation/conversation.test.ts, findAll is mocked as static async findAll(perspective, opts?) (2 params). The call in packages/api/src/conversation/index.ts passes AbortOptions as a 3rd argument (this.perspective, { parent: ... }, options), which won’t match that API and will likely not propagate abort/cancellation.
  • Adjust the ConversationSubgroup.findAll call to pass AbortOptions in the parameter position actually supported by the real Ad4mModel.findAll implementation (or through its expected cancellation mechanism).
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@packages/api/src/conversation/index.ts` around lines 135 - 141, The call in
Conversation.subgroups is passing AbortOptions as a third arg to
ConversationSubgroup.findAll which the mocked/real API treats as a single
options object; fix by passing the abort/cancellation options inside the same
options object passed as the second parameter—i.e. call
ConversationSubgroup.findAll(this.perspective, { parent: { model: Conversation,
id: this.id }, ...(options ?? {}) }) so the abort signal is propagated; update
the subgroups method accordingly (referencing ConversationSubgroup.findAll and
the subgroups method).


async subgroupsData(): Promise<SynergyGroup[]> {
async subgroupsData(options?: AbortOptions): Promise<SynergyGroup[]> {
// find the necissary data to render the conversations subgroups in timeline components (include timestamps for the first and last item in each subgroup)
try {
// SPARQL migration
Expand All @@ -154,7 +157,7 @@ export class Conversation extends Ad4mModel {
ORDER BY ?timestamp
`;

const sparqlResult = await this.perspective.querySparql<SubgroupRowBinding[]>(sparqlQuery);
const sparqlResult = await this.perspective.querySparql<SubgroupRowBinding[]>(sparqlQuery, options);

// Deduplicate by id
const subgroupMap = new Map<string, SubgroupRow>();
Expand Down Expand Up @@ -189,7 +192,7 @@ export class Conversation extends Ad4mModel {
}
`;

const batchResults = await this.perspective.querySparql<SubgroupTimestampBinding[]>(batchTimestampQuery);
const batchResults = await this.perspective.querySparql<SubgroupTimestampBinding[]>(batchTimestampQuery, options);

// Group timestamps by subgroup ID
const timestampsBySg = new Map<string, number[]>();
Expand Down Expand Up @@ -222,6 +225,7 @@ export class Conversation extends Ad4mModel {
// Sort by actual content start time, not link creation time
return subgroups.sort((a, b) => Number(a.start) - Number(b.start));
} catch (error) {
if (error instanceof DOMException && error.name === 'AbortError') throw error;
console.error('Error getting conversation subgroups:', error);
return [];
}
Expand Down
1 change: 1 addition & 0 deletions packages/api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import TaskColumn from './task-column';
import TaskBoard from './task-board';
import updateProfile from './updateProfile';
export * from './npmApi';
export type { AbortOptions } from './shared/abort';

export {
App,
Expand Down
Loading
Loading