Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions apps/kagura/src/agent/prompt/processors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,12 @@ export const codingWorkflowProcessor: PromptProcessor = {
'- Before editing files in a Git repository, inspect git status plus branch/upstream state.',
'- Before implementation work where freshness matters, fetch the relevant remote and check whether the active branch or its base/upstream branch has received new commits.',
'- If the upstream/base branch moved, sync it first and rebase your working branch onto the updated remote base before continuing, unless the user explicitly instructs otherwise.',
'- For a new Slack thread, unless the user explicitly names a branch, pull request, commit, or asks to continue existing work, base implementation work on the repository default branch, usually origin/main.',
'- Do not assume the current workspace branch is the right base. It may be left over from another Slack thread. Before editing, inspect the current branch, upstream, default branch, and worktree state.',
'- For resumed turns in the same Slack thread, preserve the established session worktree unless the user asks to rebase or restart from the default branch.',
'- If the repository is hosted on GitHub, prefer using a git worktree for implementation work when branch isolation would reduce risk or keep concurrent tasks separate.',
`- When creating or reusing git worktrees, keep them under the centralized worktree root ${env.WORKTREE_ROOT_DIR} instead of scattering ad-hoc worktree folders across the filesystem.`,
'- Multiple Slack threads working in the same repository must use isolated worktrees instead of sharing the repository root working tree.',
'- When using a git worktree, inspect the original/source workspace for ignored environment files and local config that may be required by the task; if such files exist there, copy the necessary ones into the worktree before proceeding.',
'- For read-only analysis, inspect only the files and metadata needed to answer; do not fetch, rebase, or otherwise change repository state unless freshness is central to the request.',
);
Expand Down
19 changes: 19 additions & 0 deletions apps/kagura/src/application.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ import { SlackUserInputBridge } from '~/slack/interaction/user-input-bridge.js';
import { startSlackAppWithRetry } from '~/slack/network-guard.js';
import { createReviewPanelServer, type ReviewPanelServer } from '~/web/review-panel.js';
import { WorkspaceResolver } from '~/workspace/resolver.js';
import {
GitThreadWorkspaceManager,
ThreadWorktreeCleanupScheduler,
} from '~/workspace/thread-worktree.js';

export interface RuntimeApplication {
dispatchThreadConversation: (input: ConversationDispatchInput) => Promise<void>;
Expand Down Expand Up @@ -150,6 +154,18 @@ export function createApplication(options?: RuntimeApplicationOptions): RuntimeA
repoRootDir: env.REPO_ROOT_DIR,
scanDepth: env.REPO_SCAN_DEPTH,
});
const threadWorkspaceManager = new GitThreadWorkspaceManager({
worktreeRootDir: env.WORKTREE_ROOT_DIR,
});
const threadWorktreeCleanup = env.WORKTREE_CLEANUP_ENABLED
? new ThreadWorktreeCleanupScheduler({
intervalMs: env.WORKTREE_CLEANUP_INTERVAL_MS,
logger: logger.withTag('worktree:cleanup'),
manager: threadWorkspaceManager,
retentionMs: env.WORKTREE_CLEANUP_RETENTION_MS,
sessionStore,
})
: undefined;
const statusProbe = env.SLACK_E2E_ENABLED
? new FileSlackStatusProbe(options?.statusProbePath ?? env.SLACK_E2E_STATUS_PROBE_PATH)
: undefined;
Expand Down Expand Up @@ -216,6 +232,7 @@ export function createApplication(options?: RuntimeApplicationOptions): RuntimeA
sessionStore,
providerRegistry,
threadExecutionRegistry,
threadWorkspaceManager,
userInputBridge,
workspaceResolver,
...(statusProbe ? { statusProbe } : {}),
Expand Down Expand Up @@ -267,6 +284,7 @@ export function createApplication(options?: RuntimeApplicationOptions): RuntimeA
slackAppStarted = true;
}, logger.withTag('slack:socket'));
memoryReconciler.start();
threadWorktreeCleanup?.start();
slackApp.startA2ASummaryPoller?.();
logger.info('Slack Socket Mode application started.');
void slackApp.recoverPendingExecutions?.().catch((error) => {
Expand All @@ -287,6 +305,7 @@ export function createApplication(options?: RuntimeApplicationOptions): RuntimeA
slackAppStarted = false;
}
memoryReconciler.stop();
threadWorktreeCleanup?.stop();
await providerRegistry.drain();
a2aCoordinatorStore.close?.();
sqlite.close();
Expand Down
23 changes: 23 additions & 0 deletions apps/kagura/src/env/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,14 @@ const appConfigSchema = z
repoRootDir: z.string().optional(),
repoScanDepth: z.number().int().min(0).optional(),
worktreeRootDir: z.string().optional(),
worktreeCleanup: z
.object({
enabled: z.boolean().optional(),
intervalMs: z.number().int().positive().optional(),
retentionMs: z.number().int().positive().optional(),
})
.strict()
.optional(),
reviewPanel: z
.object({
assetsDir: z.string().optional(),
Expand Down Expand Up @@ -227,6 +235,9 @@ export const env = createEnv({
REPO_ROOT_DIR: z.string().min(1),
REPO_SCAN_DEPTH: z.coerce.number().int().min(0).default(2),
WORKTREE_ROOT_DIR: z.string().min(1).default(resolveDefaultWorktreeRootDir()),
WORKTREE_CLEANUP_ENABLED: booleanStringSchema.default(true),
WORKTREE_CLEANUP_INTERVAL_MS: z.coerce.number().int().positive().default(21_600_000),
WORKTREE_CLEANUP_RETENTION_MS: z.coerce.number().int().positive().default(604_800_000),
KAGURA_MEMORY_RECONCILER_ENABLED: booleanStringSchema.default(false),
KAGURA_MEMORY_RECONCILER_BASE_URL: z.string().url().default('https://api.openai.com/v1'),
KAGURA_MEMORY_RECONCILER_API_KEY: z.string().min(1).optional(),
Expand Down Expand Up @@ -297,6 +308,18 @@ export const env = createEnv({
REPO_ROOT_DIR: envOrConfig('REPO_ROOT_DIR', configString(appConfig.repoRootDir)),
REPO_SCAN_DEPTH: envOrConfig('REPO_SCAN_DEPTH', configNumber(appConfig.repoScanDepth)),
WORKTREE_ROOT_DIR: envOrConfig('WORKTREE_ROOT_DIR', configString(appConfig.worktreeRootDir)),
WORKTREE_CLEANUP_ENABLED: envOrConfig(
'WORKTREE_CLEANUP_ENABLED',
configBoolean(appConfig.worktreeCleanup?.enabled),
),
WORKTREE_CLEANUP_INTERVAL_MS: envOrConfig(
'WORKTREE_CLEANUP_INTERVAL_MS',
configNumber(appConfig.worktreeCleanup?.intervalMs),
),
WORKTREE_CLEANUP_RETENTION_MS: envOrConfig(
'WORKTREE_CLEANUP_RETENTION_MS',
configNumber(appConfig.worktreeCleanup?.retentionMs),
),
KAGURA_MEMORY_RECONCILER_ENABLED: envOrConfig(
'KAGURA_MEMORY_RECONCILER_ENABLED',
configBoolean(appConfig.memory?.reconciler?.enabled),
Expand Down
8 changes: 8 additions & 0 deletions apps/kagura/src/session/sqlite-session-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ export class SqliteSessionStore implements SessionStore {
return this.rowToRecord(row);
}

listAll(): SessionRecord[] {
return this.db
.select()
.from(sessions)
.all()
.map((row) => this.rowToRecord(row));
}

upsert(record: SessionRecord): SessionRecord {
this.db
.insert(sessions)
Expand Down
3 changes: 3 additions & 0 deletions apps/kagura/src/slack/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import type { MemoryStore } from '~/memory/types.js';
import type { ReviewSessionStore } from '~/review/types.js';
import type { SessionStore } from '~/session/types.js';
import type { WorkspaceResolver } from '~/workspace/resolver.js';
import type { ThreadWorkspaceManager } from '~/workspace/thread-worktree.js';

import { registerSlashCommands } from './commands/register.js';
import { SlackThreadContextLoader } from './context/thread-context-loader.js';
Expand Down Expand Up @@ -77,6 +78,7 @@ export interface SlackApplicationDependencies {
sessionStore: SessionStore;
statusProbe?: SlackStatusProbe;
threadExecutionRegistry: ThreadExecutionRegistry;
threadWorkspaceManager?: ThreadWorkspaceManager | undefined;
userInputBridge: SlackUserInputBridge;
workspaceResolver: WorkspaceResolver;
}
Expand Down Expand Up @@ -140,6 +142,7 @@ export function createSlackApp(
reviewPanelBaseUrl: deps.reviewPanelBaseUrl,
reviewSessionStore: deps.reviewSessionStore,
threadExecutionRegistry: deps.threadExecutionRegistry,
threadWorkspaceManager: deps.threadWorkspaceManager,
userInputBridge: deps.userInputBridge,
workspaceResolver: deps.workspaceResolver,
};
Expand Down
56 changes: 55 additions & 1 deletion apps/kagura/src/slack/ingress/conversation-pipeline.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { execFileSync } from 'node:child_process';
import { randomUUID } from 'node:crypto';

import type { AgentExecutionEvent, AgentExecutor } from '~/agent/types.js';
import { appRuntimeConfig } from '~/env/server.js';
import { appRuntimeConfig, env } from '~/env/server.js';
import { redact } from '~/logger/redact.js';
import { runtimeError, runtimeInfo, runtimeWarn } from '~/logger/runtime.js';
import {
Expand All @@ -13,6 +13,7 @@ import {
import type { SessionRecord } from '~/session/types.js';
import { formatClaudeExecutionFailureReply } from '~/util/error-detail.js';
import { enrichResolvedWorkspace } from '~/workspace/resolver.js';
import { GitThreadWorkspaceManager } from '~/workspace/thread-worktree.js';

import type { ThreadExecutionStopReason } from '../execution/thread-execution-registry.js';
import type { SlackWebClientLike } from '../types.js';
Expand Down Expand Up @@ -241,6 +242,58 @@ export async function resolveWorkspaceStep(
return CONTINUE;
}

let defaultThreadWorkspaceManager: GitThreadWorkspaceManager | undefined;

function getThreadWorkspaceManager(ctx: ConversationPipelineContext) {
if (ctx.deps.threadWorkspaceManager) {
return ctx.deps.threadWorkspaceManager;
}

defaultThreadWorkspaceManager ??= new GitThreadWorkspaceManager({
worktreeRootDir: env.WORKTREE_ROOT_DIR,
});
return defaultThreadWorkspaceManager;
}

export async function ensureThreadWorkspaceStep(
ctx: ConversationPipelineContext,
): Promise<PipelineStepResult> {
const { workspace } = ctx;
if (!workspace) {
return CONTINUE;
}

const existingWorkspacePath = ctx.existingSession?.workspacePath;
const existingWorkspaceStillResolved =
existingWorkspacePath &&
existingWorkspacePath === workspace.workspacePath &&
workspace.workspacePath !== workspace.repo.repoPath;

if (existingWorkspaceStillResolved && ctx.options.forceNewSession !== true) {
return CONTINUE;
}

try {
const nextWorkspace = getThreadWorkspaceManager(ctx).ensureThreadWorkspace({
channelId: ctx.message.channel,
threadTs: ctx.threadTs,
workspace,
});
ctx.workspace = enrichResolvedWorkspace(nextWorkspace);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
runtimeWarn(
ctx.deps.logger,
'Failed to ensure thread worktree for thread %s workspace %s: %s',
ctx.threadTs,
workspace.workspaceLabel,
redact(message),
);
}

return CONTINUE;
}

export async function resolveSessionStep(
ctx: ConversationPipelineContext,
): Promise<PipelineStepResult> {
Expand Down Expand Up @@ -699,6 +752,7 @@ export const DEFAULT_CONVERSATION_STEPS: PipelineStep[] = [
handleStopKeywordStep,
stopActiveExecutionsStep,
resolveWorkspaceStep,
ensureThreadWorkspaceStep,
resolveSessionStep,
prepareThreadContext,
executeAgent,
Expand Down
2 changes: 2 additions & 0 deletions apps/kagura/src/slack/ingress/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import type { ContextMemories, MemoryStore } from '~/memory/types.js';
import type { ReviewSessionStore } from '~/review/types.js';
import type { SessionRecord, SessionStore } from '~/session/types.js';
import type { WorkspaceResolver } from '~/workspace/resolver.js';
import type { ThreadWorkspaceManager } from '~/workspace/thread-worktree.js';
import type { ResolvedWorkspace } from '~/workspace/types.js';

import type {
Expand Down Expand Up @@ -45,6 +46,7 @@ export interface SlackIngressDependencies {
sessionStore: SessionStore;
threadContextLoader: SlackThreadContextLoader;
threadExecutionRegistry: ThreadExecutionRegistry;
threadWorkspaceManager?: ThreadWorkspaceManager | undefined;
userInputBridge: SlackUserInputBridge;
workspaceResolver: WorkspaceResolver;
}
Expand Down
70 changes: 49 additions & 21 deletions apps/kagura/src/slack/ingress/workspace-resolution.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import fs from 'node:fs';

import type { ChannelPreferenceStore } from '~/channel-preference/types.js';
import type { SessionRecord } from '~/session/types.js';
import type { WorkspaceResolver } from '~/workspace/resolver.js';
Expand Down Expand Up @@ -30,28 +32,54 @@ export function resolveWorkspaceForConversation(
existingSession.workspaceRepoPath &&
existingSession.workspaceLabel
) {
const workspaceBranch = resolveWorkspaceBranch(existingSession.workspacePath);
return {
status: 'unique',
workspace: {
input: existingSession.workspacePath,
matchKind:
existingSession.workspacePath === existingSession.workspaceRepoPath ? 'repo' : 'path',
repo: {
aliases: [],
id: existingSession.workspaceRepoId,
label: existingSession.workspaceRepoId,
name:
existingSession.workspaceRepoId.split('/').at(-1) ?? existingSession.workspaceRepoId,
repoPath: existingSession.workspaceRepoPath,
relativePath: existingSession.workspaceRepoId,
if (fs.existsSync(existingSession.workspacePath)) {
const workspaceBranch = resolveWorkspaceBranch(existingSession.workspacePath);
return {
status: 'unique',
workspace: {
input: existingSession.workspacePath,
matchKind:
existingSession.workspacePath === existingSession.workspaceRepoPath ? 'repo' : 'path',
repo: {
aliases: [],
id: existingSession.workspaceRepoId,
label: existingSession.workspaceRepoId,
name:
existingSession.workspaceRepoId.split('/').at(-1) ?? existingSession.workspaceRepoId,
repoPath: existingSession.workspaceRepoPath,
relativePath: existingSession.workspaceRepoId,
},
source: existingSession.workspaceSource ?? 'manual',
...(workspaceBranch ? { workspaceBranch } : {}),
workspaceLabel: existingSession.workspaceLabel,
workspacePath: existingSession.workspacePath,
},
source: existingSession.workspaceSource ?? 'manual',
...(workspaceBranch ? { workspaceBranch } : {}),
workspaceLabel: existingSession.workspaceLabel,
workspacePath: existingSession.workspacePath,
},
};
};
}

if (fs.existsSync(existingSession.workspaceRepoPath)) {
const workspaceBranch = resolveWorkspaceBranch(existingSession.workspaceRepoPath);
return {
status: 'unique',
workspace: {
input: existingSession.workspaceRepoPath,
matchKind: 'repo',
repo: {
aliases: [],
id: existingSession.workspaceRepoId,
label: existingSession.workspaceRepoId,
name:
existingSession.workspaceRepoId.split('/').at(-1) ?? existingSession.workspaceRepoId,
repoPath: existingSession.workspaceRepoPath,
relativePath: existingSession.workspaceRepoId,
},
source: existingSession.workspaceSource ?? 'manual',
...(workspaceBranch ? { workspaceBranch } : {}),
workspaceLabel: existingSession.workspaceRepoId,
workspacePath: existingSession.workspaceRepoPath,
},
};
}
}

const preference = channelPreferenceStore.get(channelId);
Expand Down
Loading