-
Notifications
You must be signed in to change notification settings - Fork 8
refactor: replace perspective.addListener('link-*') with SPARQL subscriptions #610
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: dev
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -141,8 +141,8 @@ | |
| // Community query is perspective-scoped (typically one per perspective — low cost). | ||
| // Use ChannelSummary — lightweight model without @HasMany relations. | ||
| // Property getters run by default (deepQuery=true) via batched VALUES queries. | ||
| const { data: communities, loading: communitiesLoading, error: communitiesError } = useLiveQuery(Community, perspective); | ||
|
Check warning on line 144 in app/src/composables/useCommunityService.ts
|
||
| const { data: allChannels, loading: channelsLoading, error: channelsError } = useLiveQuery(ChannelSummary, perspective); | ||
|
Check warning on line 145 in app/src/composables/useCommunityService.ts
|
||
|
|
||
| // Cache for conversation instances — populated during data fetching, looked up in computeds. | ||
| // Plain Map (not reactive) is sufficient: updates always precede the ref changes that trigger re-computation. | ||
|
|
@@ -163,8 +163,8 @@ | |
|
|
||
| const isAuthor = computed(() => communities.value[0]?.author === me.value.did); | ||
| const community = computed<Community>(() => communities.value[0]); | ||
| const pinnedChannels = computed(() => allChannels.value.filter((channel) => channel.isPinned)); | ||
|
Check warning on line 166 in app/src/composables/useCommunityService.ts
|
||
| const conversationChannels = computed(() => allChannels.value.filter((channel) => channel.isConversation)); | ||
|
Check warning on line 167 in app/src/composables/useCommunityService.ts
|
||
| const spaceChannels = computed(() => allChannels.value.filter((channel) => !channel.isConversation)); | ||
| const pinnedConversationsWithAgents = computed((): ChannelDataWithAgents[] => { | ||
| return pinnedConversations.value.map((data) => ({ | ||
|
|
@@ -485,41 +485,80 @@ | |
| perspective.addSyncStateChangeListener(syncStateListener); | ||
|
|
||
| // Track channel participants automatically. | ||
| // Uses addListener because participant tracking needs link.author metadata, | ||
| // which isn't available from SPARQL subscription results. | ||
| // Local dedup set avoids redundant addLinks RPCs — ChannelSummary doesn't | ||
| // carry participants, so the old channel.participants.includes() check was lost. | ||
| // | ||
| // Previously this used `perspective.addListener('link-added', ...)`, which | ||
| // is a perspective-scoped firehose — every link added anywhere in the | ||
| // perspective wakes every listener. We now drive the same logic from a | ||
| // targeted SPARQL subscription that only fires when the set of distinct | ||
| // `(channel, author)` pairs over the `flux://has_channel` predicate | ||
| // changes. The reified link's author is exposed via | ||
| // `?_r ad4m:author ?author`, so the subscription observes exactly the | ||
| // signal we used to read off `link.author`. | ||
| // | ||
| // The dedup set keeps `addLinks` RPCs idempotent (the addLink itself is | ||
| // also idempotent on the executor side, but the dedup avoids the round | ||
| // trip). Each subscription fire triggers a single `LinkQuery` refetch so | ||
| // the handler does not depend on the SPARQL binding shape — robust | ||
| // against any future result-shape changes. | ||
| const knownParticipants = new Set<string>(); | ||
| function handleParticipantTracking(link: any) { | ||
| if (link.data.predicate !== CHANNEL) return null; | ||
| if (!link.author) return null; | ||
|
|
||
| const channelId = link.data.source; | ||
| const channel = allChannels.value.find((c) => c.id === channelId); | ||
| if (!channel) return null; | ||
|
|
||
| const key = `${channelId}::${link.author}`; | ||
| if (knownParticipants.has(key)) return null; | ||
| knownParticipants.add(key); | ||
|
|
||
| perspective | ||
| .addLinks([{ source: channelId, predicate: 'flux://has_participant', target: link.author }]) | ||
| .catch((error) => { | ||
| knownParticipants.delete(key); | ||
| console.error('Failed to add participant to channel:', { | ||
| channelId, | ||
| author: link.author, | ||
| error, | ||
| }); | ||
| }); | ||
| let channelLinksSub: { dispose: () => void } | null = null; | ||
| let participantTrackingCancelled = false; | ||
|
|
||
| return null; | ||
| async function refreshParticipantsFromChannelLinks() { | ||
| try { | ||
| const links = await perspective.get(new LinkQuery({ predicate: CHANNEL })); | ||
| for (const link of links) { | ||
| if (!link.author) continue; | ||
| const channelId = link.data.source; | ||
| const channel = allChannels.value.find((c) => c.id === channelId); | ||
| if (!channel) continue; | ||
|
|
||
| const key = `${channelId}::${link.author}`; | ||
| if (knownParticipants.has(key)) continue; | ||
| knownParticipants.add(key); | ||
|
|
||
| perspective | ||
| .addLinks([{ source: channelId, predicate: 'flux://has_participant', target: link.author }]) | ||
| .catch((error) => { | ||
| knownParticipants.delete(key); | ||
| console.error('Failed to add participant to channel:', { | ||
| channelId, | ||
| author: link.author, | ||
| error, | ||
| }); | ||
| }); | ||
| } | ||
| } catch (error) { | ||
| console.error('Error refreshing channel participants:', error); | ||
| } | ||
| } | ||
|
Comment on lines
+507
to
534
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
# Check for watch() calls on allChannels or initialization order dependencies
rg -n -A5 -B5 'watch.*allChannels|allChannels.*watch' app/src/composables/useCommunityService.ts
# Check if subscribeQuery is documented to fire initially
rg -n -C3 'subscribeQuery.*initial|subscribeQuery.*first' --type=ts
# Look for similar subscription patterns that might clarify initialization behavior
ast-grep --pattern 'await $_.subscribeQuery($$$)'Repository: coasys/flux Length of output: 1576 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="app/src/composables/useCommunityService.ts"
echo "=== allChannels definition (~line 145) ==="
sed -n '120,190p' "$FILE" | nl -ba | sed -n '1,120p'
echo
echo "=== subscription IIFE / onResult region (~line 520-580) ==="
sed -n '500,590p' "$FILE" | nl -ba | sed -n '1,120p'
echo
echo "=== locate refreshParticipantsFromChannelLinks usage ==="
rg -n "refreshParticipantsFromChannelLinks" "$FILE"
echo
echo "=== search for any watch/allChannels coordination specifically for participants refresh ==="
rg -n "watch\\(.*allChannels|allChannels.*watch" "$FILE"
echo
echo "=== inspect lines around the skip in refresh (line ~513-514) ==="
sed -n '485,545p' "$FILE" | nl -ba | sed -n '1,120p'Repository: coasys/flux Length of output: 159 🏁 Script executed: #!/bin/bash
set -euo pipefail
FILE="app/src/composables/useCommunityService.ts"
awk_nr() { awk '{printf "%d:%s\n", NR,$0}'; }
echo "=== allChannels definition (~line 145) ==="
sed -n '120,190p' "$FILE" | awk_nr
echo
echo "=== subscription IIFE / onResult region (~line 500-590) ==="
sed -n '500,590p' "$FILE" | awk_nr
echo
echo "=== locate refreshParticipantsFromChannelLinks usage ==="
rg -n "refreshParticipantsFromChannelLinks" "$FILE" || true
echo
echo "=== search for any watch/allChannels coordination specifically for participants refresh ==="
rg -n "watch\\(.*allChannels|allChannels.*watch" "$FILE" || true
echo
echo "=== inspect lines around the skip in refresh (line ~485-545) ==="
sed -n '485,545p' "$FILE" | awk_nrRepository: coasys/flux Length of output: 10935 🌐 Web query:
💡 Result: In AD4M, subscriptions designed to provide real-time updates—including those created via ModelQueryBuilder's subscribe method and QuerySubscriptionProxy's onResult method—are designed to provide immediate feedback by firing with the current state as soon as the subscription is established [1][2][3]. When you initialize a subscription (e.g., via subscribeInfer, subscribeSurrealDB, or a ModelQueryBuilder), the system internally waits for the initial results to arrive from the backend before completing the initialization process [1][4][2]. Key details regarding this behavior include: 1. Immediate Execution: When you register a callback using onResult or the subscription parameter in a query builder, that callback is invoked immediately with the initial result set [1][2][3]. Subsequent calls occur whenever the query results change [1][3]. 2. Initialization Guarantees: Methods such as subscribeInfer and subscribeSurrealDB return a promise that resolves only after the subscription has been successfully initialized and the first result has been received [1][4]. This ensures the subscription is ready to receive updates before your application logic proceeds [1][4]. 3. Implementation: At the core, the AD4M client processes the initial result immediately upon receipt during the subscription setup to ensure fast user experience [5]. Always remember to call dispose on your subscriptions when they are no longer needed to prevent memory leaks and ensure the backend cleans up the associated resources [4][2][3][6]. Citations:
Guard participant refresh against
🤖 Prompt for AI Agents |
||
| perspective.addListener('link-added', handleParticipantTracking); | ||
|
|
||
| // Cleanup function to remove all listeners | ||
| (async () => { | ||
| try { | ||
| // SELECT every distinct `(source, author)` over the `flux://has_channel` | ||
| // predicate. The reifier metadata is queried via the RDF 1.2 `reifies` | ||
| // pattern that the rest of the model query pipeline uses. | ||
| const sub = await perspective.subscribeQuery(` | ||
| SELECT DISTINCT ?source ?author WHERE { | ||
| ?_r <http://www.w3.org/1999/02/22-rdf-syntax-ns#reifies> <<( ?source <${CHANNEL}> ?_target )>> . | ||
| ?_r <ad4m://ontology/author> ?author . | ||
| } | ||
| `); | ||
| if (participantTrackingCancelled) { | ||
| sub.dispose(); | ||
| return; | ||
| } | ||
| channelLinksSub = sub; | ||
| sub.onResult(() => refreshParticipantsFromChannelLinks()); | ||
| } catch (error) { | ||
| console.error('Failed to subscribe to channel-link author pairs:', error); | ||
| } | ||
| })(); | ||
|
|
||
| // Cleanup function to tear down active subscriptions | ||
| function cleanup() { | ||
| perspective.removeListener('link-added', handleParticipantTracking); | ||
| participantTrackingCancelled = true; | ||
| channelLinksSub?.dispose(); | ||
| } | ||
|
|
||
| getMembers(); | ||
|
|
||
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use the imported constant instead of hardcoding the predicate.
The predicate
'flux://has_participant'is hardcoded here but is defined as theFLUX_PARTICIPANTconstant in@coasys/flux-constants(confirmed by context snippet 3). Import and use the constant for consistency and maintainability.📦 Proposed fix
At the top of the file, update the import:
Then use the constant:
🤖 Prompt for AI Agents