Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 14 additions & 2 deletions server/lib/amp-api.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,19 @@ interface ListThreadsResult {
threads: AmpThreadSummary[];
}

export async function listThreads(limit = 500): Promise<AmpThreadSummary[]> {
const result = await callAmpInternalAPI<ListThreadsResult>('listThreads', { limit });
/** Maximum threads the Amp API returns in a single call. */
export const AMP_API_MAX = 500;

/**
* Fetch threads from the Amp internal API.
*
* The API's `offset` parameter is silently ignored — it always returns the
* same set of threads regardless of offset. So we make a single call with
* limit capped at 500 (the API maximum).
*/
export async function listThreads(): Promise<AmpThreadSummary[]> {
const result = await callAmpInternalAPI<ListThreadsResult>('listThreads', {
limit: AMP_API_MAX,
});
return result.threads;
}
33 changes: 10 additions & 23 deletions server/lib/threadCrud.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,32 +14,13 @@
type ThreadFile,
} from './threadTypes.js';

interface GetThreadsOptions {
limit?: number;
cursor?: string | null;
}

export async function getThreads({
limit = 50,
cursor = null,
}: GetThreadsOptions = {}): Promise<ThreadsResult> {
export async function getThreads(): Promise<ThreadsResult> {
const allThreads = await listAllThreads();

// Apply cursor-based pagination (same contract as before)
let startIndex = 0;
if (cursor) {
const cursorIndex = allThreads.findIndex((t) => t.id === cursor);
if (cursorIndex !== -1) startIndex = cursorIndex + 1;
}

const sliced = allThreads.slice(startIndex, startIndex + limit);
const hasMore = startIndex + limit < allThreads.length;
const lastThread = sliced[sliced.length - 1];

return {
threads: sliced,
nextCursor: lastThread && hasMore ? lastThread.id : null,
hasMore,
threads: allThreads,
nextCursor: null,
hasMore: false,
};
}

Expand Down Expand Up @@ -121,6 +102,10 @@
}

async function cleanupThreadFiles(threadId: string): Promise<void> {
// Delete the local thread JSON file so it doesn't resurface via local scan
const threadFile = join(THREADS_DIR, `${threadId}.json`);
await rm(threadFile, { force: true }).catch(() => {});

// Delete artifacts directory
const threadArtifactsDir = join(ARTIFACTS_DIR, threadId);
await rm(threadArtifactsDir, { recursive: true, force: true }).catch(() => {});
Expand All @@ -139,9 +124,11 @@
try {
await runAmp(['threads', 'delete', threadId]);
await cleanupThreadFiles(threadId);
console.log(`[threads] Deleted ${threadId}`);

Check warning on line 127 in server/lib/threadCrud.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected console statement. Only these console methods are allowed: warn, error, debug
return { success: true };
} catch (e) {
const error = e as Error;
console.error(`[threads] Delete failed for ${threadId}: ${error.message}`);
return { success: false, error: error.message };
}
}
Expand Down
110 changes: 104 additions & 6 deletions server/lib/threadProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,22 +47,120 @@ function parseRepoFromUrl(url: string | undefined | null): string | null {
return match?.[1] ?? url;
}

// ── List all threads via API ────────────────────────────────────────────
// ── List all threads via API + local supplement ─────────────────────────

/**
* Fetch all threads using a hybrid approach:
* 1. API call — returns up to 500 most-recent threads (the API's max)
* 2. Local file scan — picks up any threads on disk not already in the API
* response (handles >500 threads or threads not yet synced to server)
*
* Returns threads sorted by lastUpdatedDate descending.
*/
export async function listAllThreads(): Promise<Thread[]> {
const summaries = await listThreads(500);
const summaries = await listThreads();
const threads = summaries.map(toThread);
const apiIds = new Set(threads.map((t) => t.id));

// Always supplement with local thread files not in the API response.
// The API returns at most 500 most-recent threads, so older threads
// only appear via their local files on disk.
const localExtras = await getLocalOnlyThreads(apiIds);
threads.push(...localExtras);

// The listThreads API doesn't return relationship data.
// Enrich from two sources:
// 1. Local thread files (relationships[] array + handoff tool blocks)
// 2. Shared thread ID prefix heuristic (handoff batches share first 4 UUID segments)
// Enrich handoff relationships from local files + batch heuristic
await enrichRelationships(threads);
enrichBatchSiblings(threads);

// Sort by last updated descending (API threads are already sorted, but
// local extras may interleave)
threads.sort(
(a, b) =>
new Date(b.lastUpdatedDate || 0).getTime() - new Date(a.lastUpdatedDate || 0).getTime(),
);

return threads;
}

/**
* Scan local thread files and return Thread objects for IDs not in `knownIds`.
* Reads only the minimal fields needed (title, timestamps, env) — does NOT
* parse full message content.
*/
async function getLocalOnlyThreads(knownIds: Set<string>): Promise<Thread[]> {
let files: string[];
try {
files = await readdir(THREADS_DIR);
} catch {
return [];
}

const extras: Thread[] = [];
const jsonFiles = files.filter((f) => f.startsWith('T-') && f.endsWith('.json'));

await Promise.all(
jsonFiles.map(async (file) => {
const threadId = file.replace('.json', '');
if (knownIds.has(threadId)) return;

try {
const content = await readFile(join(THREADS_DIR, file), 'utf-8');
const data = JSON.parse(content) as ThreadFile;
extras.push(localFileToThread(threadId, data));
} catch {
// Skip unreadable files
}
}),
);

return extras;
}

/**
* Convert a local ThreadFile into the shared Thread type.
* Mirrors toThread() but works from the on-disk JSON shape.
*/
function localFileToThread(id: string, data: ThreadFile): Thread {
const tree = data.env?.initial?.trees?.[0];
const repoUrl = tree?.repository?.url;

let handoffParentId: string | null = null;
const handoffChildIds: string[] = [];
for (const rel of data.relationships || []) {
if (isHandoffRelationship(rel)) {
if (rel.role === 'child') {
handoffParentId = rel.threadID;
} else {
handoffChildIds.push(rel.threadID);
}
}
}

const createdMs = data.created || 0;
const lastDate = new Date(createdMs);

return {
id,
title: data.title || id,
lastUpdated: formatRelativeTime(lastDate),
lastUpdatedDate: lastDate.toISOString(),
visibility: normalizeVisibility(data.visibility ?? data.meta?.visibility),
messages: (data.messages || []).length,
workspace: tree?.displayName || null,
workspacePath: parseFileUri(tree?.uri) || null,
repo: parseRepoFromUrl(repoUrl),
handoffParentId,
handoffChildIds: [...new Set(handoffChildIds)],
agentMode: data.agentMode,
archived: false,
model: undefined,
cost: undefined,
contextPercent: undefined,
maxContextTokens: undefined,
touchedFiles: undefined,
};
}

/**
* Scan local thread files for handoff relationships and merge them into
* the thread list. Checks two sources:
Expand Down
4 changes: 1 addition & 3 deletions server/routes/threads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,7 @@ export async function handleThreadRoutes(

if (pathname === '/api/threads') {
try {
const limit = parseInt(url.searchParams.get('limit') || '50', 10);
const cursor = url.searchParams.get('cursor') || null;
const result = await getThreads({ limit, cursor });
const result = await getThreads();

// Trigger background actual cost fetching for visible threads
enqueueCostFetch(result.threads.map((t: { id: string }) => t.id));
Expand Down
5 changes: 3 additions & 2 deletions src/hooks/useThreadActions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ export function useThreadActions({
} catch (err) {
console.error('Bulk archive error:', err);
showError('Some threads failed to archive');
refetch();
}
refetch();
},
[refetch, removeThread, showError],
);
Expand All @@ -171,12 +171,13 @@ export function useThreadActions({
if (failures.length > 0) {
console.error(`${failures.length} delete(s) failed`);
showError(`${failures.length} thread(s) failed to delete`);
refetch();
}
} catch (err) {
console.error('Bulk delete error:', err);
showError('Some threads failed to delete');
refetch();
}
refetch();
},
[refetch, removeThread, showError],
);
Expand Down
66 changes: 20 additions & 46 deletions src/hooks/useThreads.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,47 +8,31 @@ const AUTO_REFRESH_INTERVAL_MS = 30000;
export function useThreads() {
const [threads, setThreads] = useState<Thread[]>([]);
const [loading, setLoading] = useState(true);
const [loadingMore, setLoadingMore] = useState(false);
const [error, setError] = useState<string | null>(null);
const [hasMore, setHasMore] = useState(false);
const cursorRef = useRef<string | null>(null);
const autoRefreshRef = useRef<number | null>(null);

const fetchThreads = useCallback(async (append = false) => {
if (append) {
setLoadingMore(true);
} else {
setLoading(true);
}
const fetchThreads = useCallback(async () => {
setLoading(true);
setError(null);

try {
const cursor = append ? cursorRef.current : null;
const data = await apiGet<ThreadsResult>(
`/api/threads?limit=50${cursor ? `&cursor=${cursor}` : ''}`,
);
const data = await apiGet<ThreadsResult>('/api/threads');

if (append) {
setThreads((prev) => [...prev, ...data.threads]);
} else {
// Stabilize reference: skip setState if thread list hasn't meaningfully changed,
// preventing downstream re-renders (e.g., useFilters label re-fetch) on every poll
setThreads((prev) => {
if (prev.length !== data.threads.length) return data.threads;
const changed = prev.some((t, i) => {
const next = data.threads[i];
return (
!next ||
t.id !== next.id ||
t.title !== next.title ||
t.lastUpdated !== next.lastUpdated
);
});
return changed ? data.threads : prev;
// Stabilize reference: skip setState if thread list hasn't meaningfully changed,
// preventing downstream re-renders (e.g., useFilters label re-fetch) on every poll
setThreads((prev) => {
if (prev.length !== data.threads.length) return data.threads;
const changed = prev.some((t, i) => {
const next = data.threads[i];
return (
!next ||
t.id !== next.id ||
t.title !== next.title ||
t.lastUpdated !== next.lastUpdated
);
});
}
cursorRef.current = data.nextCursor;
setHasMore(data.hasMore);
return changed ? data.threads : prev;
});
} catch (err) {
if (err instanceof ApiError) {
console.error(`[useThreads] API error ${err.status}: ${err.message}`);
Expand All @@ -59,16 +43,9 @@ export function useThreads() {
}
} finally {
setLoading(false);
setLoadingMore(false);
}
}, []);

const loadMore = useCallback(() => {
if (hasMore && !loadingMore) {
void fetchThreads(true);
}
}, [hasMore, loadingMore, fetchThreads]);

const removeThread = useCallback((threadId: string) => {
setThreads((prev) => prev.filter((t) => t.id !== threadId));
}, []);
Expand All @@ -79,7 +56,7 @@ export function useThreads() {
const startPolling = () => {
if (autoRefreshRef.current !== null) return;
autoRefreshRef.current = window.setInterval(() => {
void fetchThreads(false);
void fetchThreads();
}, AUTO_REFRESH_INTERVAL_MS);
};

Expand All @@ -94,7 +71,7 @@ export function useThreads() {
if (document.hidden) {
stopPolling();
} else {
void fetchThreads(false);
void fetchThreads();
startPolling();
}
};
Expand All @@ -111,11 +88,8 @@ export function useThreads() {
return {
threads,
loading,
loadingMore,
error,
hasMore,
refetch: () => fetchThreads(false),
loadMore,
refetch: fetchThreads,
removeThread,
};
}
Loading