Skip to content

Commit 678007c

Browse files
authored
feat: local to cloud handoff (#1618)
Now let's do it the other way around as well, local to cloud MERGE THIS PR IN THE MONOREPO BEFORE MERGING THIS ONE: PostHog/posthog#54540
1 parent 47acdc8 commit 678007c

19 files changed

Lines changed: 793 additions & 329 deletions

File tree

apps/code/src/main/services/git/service.ts

Lines changed: 61 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -111,13 +111,12 @@ function toUnifiedDiffPatch(
111111
@injectable()
112112
export class GitService extends TypedEventEmitter<GitServiceEvents> {
113113
private lastFetchTime = new Map<string, number>();
114-
private llmGateway: LlmGatewayService;
115114

116115
constructor(
117-
@inject(MAIN_TOKENS.LlmGatewayService) llmGateway: LlmGatewayService,
116+
@inject(MAIN_TOKENS.LlmGatewayService)
117+
private readonly llmGateway: LlmGatewayService,
118118
) {
119119
super();
120-
this.llmGateway = llmGateway;
121120
}
122121

123122
private async getStateSnapshot(
@@ -1123,85 +1122,71 @@ export class GitService extends TypedEventEmitter<GitServiceEvents> {
11231122

11241123
const [owner, repoName] = parts;
11251124

1126-
try {
1127-
const repoResult = await execGh([
1128-
"api",
1129-
`repos/${owner}/${repoName}`,
1130-
"--jq",
1131-
".default_branch",
1132-
]);
1125+
const repoResult = await execGh([
1126+
"api",
1127+
`repos/${owner}/${repoName}`,
1128+
"--jq",
1129+
".default_branch",
1130+
]);
11331131

1134-
if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) {
1135-
return [];
1136-
}
1137-
const defaultBranch = repoResult.stdout.trim();
1132+
if (repoResult.exitCode !== 0 || !repoResult.stdout.trim()) {
1133+
return [];
1134+
}
1135+
const defaultBranch = repoResult.stdout.trim();
11381136

1139-
const result = await execGh([
1140-
"api",
1141-
`repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`,
1142-
]);
1137+
const result = await execGh([
1138+
"api",
1139+
`repos/${owner}/${repoName}/compare/${defaultBranch}...${branch}`,
1140+
]);
11431141

1144-
if (result.exitCode !== 0) {
1145-
throw new Error(
1146-
`Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`,
1147-
);
1142+
if (result.exitCode !== 0) {
1143+
throw new Error(
1144+
`Failed to fetch branch files: ${result.stderr || result.error || "Unknown error"}`,
1145+
);
1146+
}
1147+
1148+
const response = JSON.parse(result.stdout) as {
1149+
files?: Array<{
1150+
filename: string;
1151+
status: string;
1152+
previous_filename?: string;
1153+
additions: number;
1154+
deletions: number;
1155+
patch?: string;
1156+
}>;
1157+
};
1158+
const files = response.files;
1159+
1160+
if (!files) return [];
1161+
1162+
return files.map((f) => {
1163+
let status: ChangedFile["status"];
1164+
switch (f.status) {
1165+
case "added":
1166+
status = "added";
1167+
break;
1168+
case "removed":
1169+
status = "deleted";
1170+
break;
1171+
case "renamed":
1172+
status = "renamed";
1173+
break;
1174+
default:
1175+
status = "modified";
1176+
break;
11481177
}
11491178

1150-
const response = JSON.parse(result.stdout) as {
1151-
files?: Array<{
1152-
filename: string;
1153-
status: string;
1154-
previous_filename?: string;
1155-
additions: number;
1156-
deletions: number;
1157-
patch?: string;
1158-
}>;
1179+
return {
1180+
path: f.filename,
1181+
status,
1182+
originalPath: f.previous_filename,
1183+
linesAdded: f.additions,
1184+
linesRemoved: f.deletions,
1185+
patch: f.patch
1186+
? toUnifiedDiffPatch(f.patch, f.filename, f.previous_filename, status)
1187+
: undefined,
11591188
};
1160-
const files = response.files;
1161-
1162-
if (!files) return [];
1163-
1164-
return files.map((f) => {
1165-
let status: ChangedFile["status"];
1166-
switch (f.status) {
1167-
case "added":
1168-
status = "added";
1169-
break;
1170-
case "removed":
1171-
status = "deleted";
1172-
break;
1173-
case "renamed":
1174-
status = "renamed";
1175-
break;
1176-
default:
1177-
status = "modified";
1178-
break;
1179-
}
1180-
1181-
return {
1182-
path: f.filename,
1183-
status,
1184-
originalPath: f.previous_filename,
1185-
linesAdded: f.additions,
1186-
linesRemoved: f.deletions,
1187-
patch: f.patch
1188-
? toUnifiedDiffPatch(
1189-
f.patch,
1190-
f.filename,
1191-
f.previous_filename,
1192-
status,
1193-
)
1194-
: undefined,
1195-
};
1196-
});
1197-
} catch (error) {
1198-
log.warn("Failed to fetch branch changed files", {
1199-
repo,
1200-
branch,
1201-
error,
1202-
});
1203-
throw error;
1204-
}
1189+
});
12051190
}
12061191

12071192
public async generateCommitMessage(

apps/code/src/main/services/handoff/handoff-saga.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@ function createDeps(overrides: Partial<HandoffSagaDeps> = {}): HandoffSagaDeps {
7373
getTaskRun: vi.fn().mockResolvedValue({
7474
log_url: "https://logs.example.com/run-1.ndjson",
7575
}),
76+
updateTaskRun: vi.fn().mockResolvedValue({}),
7677
}),
7778
applyTreeSnapshot: vi.fn().mockResolvedValue(undefined),
7879
applyGitCheckpoint: vi.fn().mockResolvedValue(undefined),
@@ -97,7 +98,6 @@ function createResumeState(
9798
conversation: [],
9899
latestSnapshot: null,
99100
latestGitCheckpoint: null,
100-
snapshotApplied: false,
101101
interrupted: false,
102102
logEntryCount: 0,
103103
...overrides,

apps/code/src/main/services/handoff/handoff-saga.ts

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -6,29 +6,18 @@ import {
66
} from "@posthog/agent/resume";
77
import type * as AgentTypes from "@posthog/agent/types";
88
import { Saga, type SagaLogger } from "@posthog/shared";
9-
import type { WorkspaceMode } from "../../db/repositories/workspace-repository";
109
import type { SessionResponse } from "../agent/schemas";
11-
import type { HandoffStep } from "./schemas";
12-
13-
export interface HandoffSagaInput {
14-
taskId: string;
15-
runId: string;
16-
repoPath: string;
17-
apiHost: string;
18-
teamId: number;
19-
sessionId?: string;
20-
adapter?: "claude" | "codex";
21-
localGitState?: AgentTypes.HandoffLocalGitState;
22-
}
10+
import type { HandoffBaseDeps, HandoffExecuteInput } from "./schemas";
11+
12+
export type HandoffSagaInput = HandoffExecuteInput;
2313

2414
export interface HandoffSagaOutput {
2515
sessionId: string;
2616
snapshotApplied: boolean;
2717
conversationTurns: number;
2818
}
2919

30-
export interface HandoffSagaDeps {
31-
createApiClient(apiHost: string, teamId: number): PostHogAPIClient;
20+
export interface HandoffSagaDeps extends HandoffBaseDeps {
3221
applyTreeSnapshot(
3322
snapshot: AgentTypes.TreeSnapshotEvent,
3423
repoPath: string,
@@ -44,7 +33,6 @@ export interface HandoffSagaDeps {
4433
apiClient: PostHogAPIClient,
4534
localGitState?: AgentTypes.HandoffLocalGitState,
4635
): Promise<void>;
47-
updateWorkspaceMode(taskId: string, mode: WorkspaceMode): void;
4836
reconnectSession(params: {
4937
taskId: string;
5038
taskRunId: string;
@@ -63,9 +51,7 @@ export interface HandoffSagaDeps {
6351
localGitState?: AgentTypes.HandoffLocalGitState,
6452
): Promise<void>;
6553
seedLocalLogs(runId: string, logUrl: string): Promise<void>;
66-
killSession(taskRunId: string): Promise<void>;
6754
setPendingContext(taskRunId: string, context: string): void;
68-
onProgress(step: HandoffStep, message: string): void;
6955
}
7056

7157
export class HandoffSaga extends Saga<HandoffSagaInput, HandoffSagaOutput> {
@@ -97,6 +83,12 @@ export class HandoffSaga extends Saga<HandoffSagaInput, HandoffSagaOutput> {
9783

9884
const apiClient = this.deps.createApiClient(apiHost, teamId);
9985

86+
await this.readOnlyStep("update_run_environment", async () => {
87+
await apiClient.updateTaskRun(taskId, runId, {
88+
environment: "local",
89+
});
90+
});
91+
10092
const { resumeState, cloudLogUrl } = await this.readOnlyStep(
10193
"fetch_and_rebuild",
10294
async () => {
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
import type * as AgentTypes from "@posthog/agent/types";
2+
import { Saga, type SagaLogger } from "@posthog/shared";
3+
import type { HandoffBaseDeps, HandoffToCloudExecuteInput } from "./schemas";
4+
5+
export type HandoffToCloudSagaInput = HandoffToCloudExecuteInput;
6+
7+
export interface HandoffToCloudSagaOutput {
8+
checkpointCaptured: boolean;
9+
snapshotCaptured: boolean;
10+
flushedLogEntryCount: number;
11+
}
12+
13+
export interface HandoffToCloudSagaDeps extends HandoffBaseDeps {
14+
captureGitCheckpoint(
15+
localGitState?: AgentTypes.HandoffLocalGitState,
16+
): Promise<AgentTypes.GitCheckpointEvent | null>;
17+
captureTreeSnapshot(): Promise<AgentTypes.TreeSnapshotEvent | null>;
18+
persistCheckpointToLog(
19+
checkpoint: AgentTypes.GitCheckpointEvent,
20+
): Promise<void>;
21+
persistSnapshotToLog(snapshot: AgentTypes.TreeSnapshotEvent): Promise<void>;
22+
flushLocalLogs(): Promise<number>;
23+
resumeRunInCloud(): Promise<void>;
24+
}
25+
26+
export class HandoffToCloudSaga extends Saga<
27+
HandoffToCloudSagaInput,
28+
HandoffToCloudSagaOutput
29+
> {
30+
readonly sagaName = "HandoffToCloudSaga";
31+
private deps: HandoffToCloudSagaDeps;
32+
33+
constructor(deps: HandoffToCloudSagaDeps, logger?: SagaLogger) {
34+
super(logger);
35+
this.deps = deps;
36+
}
37+
38+
protected async execute(
39+
input: HandoffToCloudSagaInput,
40+
): Promise<HandoffToCloudSagaOutput> {
41+
const { taskId, runId } = input;
42+
43+
let checkpointCaptured = false;
44+
let snapshotCaptured = false;
45+
46+
this.deps.onProgress(
47+
"capturing_checkpoint",
48+
"Capturing local git state...",
49+
);
50+
51+
const checkpoint = await this.readOnlyStep("capture_git_checkpoint", () =>
52+
this.deps.captureGitCheckpoint(input.localGitState),
53+
);
54+
55+
let persistedNotificationCount = 0;
56+
57+
if (checkpoint) {
58+
await this.readOnlyStep("persist_checkpoint_to_log", () =>
59+
this.deps.persistCheckpointToLog(checkpoint),
60+
);
61+
checkpointCaptured = true;
62+
persistedNotificationCount++;
63+
}
64+
65+
this.deps.onProgress("capturing_snapshot", "Capturing local file state...");
66+
67+
const snapshot = await this.readOnlyStep("capture_tree_snapshot", () =>
68+
this.deps.captureTreeSnapshot(),
69+
);
70+
71+
if (snapshot) {
72+
await this.readOnlyStep("persist_snapshot_to_log", () =>
73+
this.deps.persistSnapshotToLog(snapshot),
74+
);
75+
snapshotCaptured = true;
76+
persistedNotificationCount++;
77+
}
78+
79+
const localLogLineCount = await this.readOnlyStep("flush_local_logs", () =>
80+
this.deps.flushLocalLogs(),
81+
);
82+
const flushedLogEntryCount = localLogLineCount + persistedNotificationCount;
83+
84+
this.deps.onProgress("starting_cloud_run", "Starting cloud sandbox...");
85+
86+
await this.step({
87+
name: "start_cloud_run",
88+
execute: () => this.deps.resumeRunInCloud(),
89+
rollback: async () => {},
90+
});
91+
92+
this.deps.onProgress("stopping_agent", "Stopping local agent...");
93+
94+
await this.readOnlyStep("stop_local_agent", () =>
95+
this.deps.killSession(runId),
96+
);
97+
98+
await this.step({
99+
name: "update_workspace",
100+
execute: async () => {
101+
this.deps.updateWorkspaceMode(taskId, "cloud");
102+
},
103+
rollback: async () => {
104+
this.deps.updateWorkspaceMode(taskId, "local");
105+
},
106+
});
107+
108+
this.deps.onProgress("complete", "Handoff to cloud complete");
109+
110+
return { checkpointCaptured, snapshotCaptured, flushedLogEntryCount };
111+
}
112+
}

0 commit comments

Comments
 (0)