Skip to content

Commit ef76833

Browse files
committed
fix: harden startup and hot-tier fallback
1 parent 21ba068 commit ef76833

17 files changed

Lines changed: 680 additions & 162 deletions

src/handlers/chat.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ export interface ChatHandlerDeps {
1818
drainTriggerSize: number;
1919
}
2020

21-
export function createChatHandler(deps: ChatHandlerDeps) {
21+
export function createChatHandler(deps: ChatHandlerDeps): ChatMessageHook {
2222
const { sessionManager, redisEvents, graphitiAsync, drainTriggerSize } = deps;
2323

2424
return async ({ sessionID }: ChatMessageInput, output: ChatMessageOutput) => {

src/handlers/compacting.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ export interface CompactingHandlerDeps {
1010
sessionManager: SessionManager;
1111
}
1212

13-
export function createCompactingHandler(deps: CompactingHandlerDeps) {
13+
export function createCompactingHandler(
14+
deps: CompactingHandlerDeps,
15+
): CompactingHook {
1416
const { sessionManager } = deps;
1517

1618
return async (

src/handlers/event.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -792,17 +792,17 @@ describe("event handler", () => {
792792
let providerCalls = 0;
793793
const sdkClient = {
794794
provider: {
795-
list: async () => {
795+
list: () => {
796796
providerCalls += 1;
797797
if (providerCalls === 1) {
798798
throw new Error("transient provider failure");
799799
}
800-
return {
800+
return Promise.resolve({
801801
data: [{
802802
id: "openai",
803803
models: [{ id: "gpt-5", limit: { context: 123_456 } }],
804804
}],
805-
};
805+
});
806806
},
807807
},
808808
};

src/handlers/event.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ const getCompactionSummary = (value: unknown): string => {
7474
return typeof summary === "string" ? summary : "";
7575
};
7676

77-
export function createEventHandler(deps: EventHandlerDeps) {
77+
export function createEventHandler(deps: EventHandlerDeps): EventHook {
7878
const {
7979
sessionManager,
8080
redisEvents,

src/handlers/messages.test.ts

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -588,4 +588,25 @@ describe("messages handler", () => {
588588
"follow up from child",
589589
);
590590
});
591+
592+
it("swallows missing-session resolution failures so startup does not throw", async () => {
593+
const sessionManager = new MockSessionManager();
594+
sessionManager.resolveSessionState = () => {
595+
throw new Error("Session not found");
596+
};
597+
const handler = createMessagesHandler({
598+
sessionManager: sessionManager as never,
599+
});
600+
601+
const output = {
602+
messages: [{
603+
info: { role: "user", sessionID: "session-1" },
604+
parts: [{ type: "text", text: "startup prompt" }],
605+
}],
606+
};
607+
608+
await handler({} as never, output as never);
609+
610+
assertEquals(output.messages[0].parts[0].text, "startup prompt");
611+
});
591612
});

src/handlers/messages.ts

Lines changed: 61 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,9 @@ const scrubPromptMemoryText = (text: string): string => {
4747
.trim();
4848
};
4949

50-
export function createMessagesHandler(deps: MessagesHandlerDeps) {
50+
export function createMessagesHandler(
51+
deps: MessagesHandlerDeps,
52+
): MessagesTransformHook {
5153
const { sessionManager } = deps;
5254

5355
return async (
@@ -59,62 +61,73 @@ export function createMessagesHandler(deps: MessagesHandlerDeps) {
5961
if (!lastUserEntry) return;
6062

6163
const sourceSessionID = lastUserEntry.info.sessionID;
62-
const {
63-
state,
64-
resolved,
65-
canonicalSessionId,
66-
} = await sessionManager.resolveSessionState(sourceSessionID);
67-
if (!resolved || !canonicalSessionId) return;
68-
if (!state?.isMain) return;
6964

70-
let rewroteExistingMemory = false;
71-
for (const entry of output.messages) {
72-
for (const part of entry.parts) {
73-
if (isTextPart(part)) {
74-
if (
75-
!rewroteExistingMemory &&
76-
(part.text.includes("<session_memory") ||
77-
part.text.includes("<memory") ||
78-
part.text.includes("<persistent_memory"))
79-
) {
80-
rewroteExistingMemory = true;
65+
try {
66+
const {
67+
state,
68+
resolved,
69+
canonicalSessionId,
70+
} = await sessionManager.resolveSessionState(sourceSessionID);
71+
if (!resolved || !canonicalSessionId) return;
72+
if (!state?.isMain) return;
73+
74+
let rewroteExistingMemory = false;
75+
for (const entry of output.messages) {
76+
for (const part of entry.parts) {
77+
if (isTextPart(part)) {
78+
if (
79+
!rewroteExistingMemory &&
80+
(part.text.includes("<session_memory") ||
81+
part.text.includes("<memory") ||
82+
part.text.includes("<persistent_memory"))
83+
) {
84+
rewroteExistingMemory = true;
85+
}
86+
part.text = scrubPromptMemoryText(part.text);
8187
}
82-
part.text = scrubPromptMemoryText(part.text);
8388
}
8489
}
85-
}
8690

87-
const recallQuery = sanitizeMemoryInput(
88-
stripInjectedMemoryBlocks(
89-
getTransformMessage(input) ?? getLatestUserText(output) ?? "",
90-
),
91-
) || undefined;
92-
const prepared = state.pendingInjection ??
93-
await sessionManager.prepareInjection(
94-
canonicalSessionId,
95-
recallQuery,
96-
);
97-
if (!prepared) return;
91+
const recallQuery = sanitizeMemoryInput(
92+
stripInjectedMemoryBlocks(
93+
getTransformMessage(input) ?? getLatestUserText(output) ?? "",
94+
),
95+
) || undefined;
96+
const prepared = state.pendingInjection ??
97+
await sessionManager.prepareInjection(
98+
canonicalSessionId,
99+
recallQuery,
100+
);
101+
if (!prepared) return;
98102

99-
const textPart = lastUserEntry.parts.find(isTextPart);
100-
if (!textPart) return;
101-
const effectiveUserText = sanitizeMemoryInput(
102-
stripInjectedMemoryBlocks(textPart.text),
103-
);
104-
if (!effectiveUserText) {
103+
const textPart = lastUserEntry.parts.find(isTextPart);
104+
if (!textPart) return;
105+
const effectiveUserText = sanitizeMemoryInput(
106+
stripInjectedMemoryBlocks(textPart.text),
107+
);
108+
if (!effectiveUserText) {
109+
if (state.pendingInjection === prepared) {
110+
state.pendingInjection = undefined;
111+
}
112+
return;
113+
}
114+
textPart.text = `${prepared.envelope}\n\n${effectiveUserText}`;
115+
logger.info("Injected canonical session_memory block", {
116+
sessionID: canonicalSessionId,
117+
sourceSessionID,
118+
rewroteExistingMemory,
119+
});
105120
if (state.pendingInjection === prepared) {
106121
state.pendingInjection = undefined;
107122
}
108-
return;
109-
}
110-
textPart.text = `${prepared.envelope}\n\n${effectiveUserText}`;
111-
logger.info("Injected canonical session_memory block", {
112-
sessionID: canonicalSessionId,
113-
sourceSessionID,
114-
rewroteExistingMemory,
115-
});
116-
if (state.pendingInjection === prepared) {
117-
state.pendingInjection = undefined;
123+
} catch (error) {
124+
logger.warn(
125+
"Unable to prepare local session memory for messages transform",
126+
{
127+
sessionID: sourceSessionID,
128+
error,
129+
},
130+
);
118131
}
119132
};
120133
}

src/index.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ const defaultGraphitiDependencies: GraphitiDependencies = {
7878
makeUserGroupId,
7979
};
8080

81-
export const graphiti = ((
81+
export const graphiti: Plugin = (
8282
input: PluginInput,
8383
dependencies: GraphitiDependencies = defaultGraphitiDependencies,
8484
) => {
@@ -183,4 +183,4 @@ export const graphiti = ((
183183
sessionManager,
184184
}),
185185
});
186-
}) satisfies Plugin;
186+
};

src/services/batch-drain.test.ts

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -29,16 +29,17 @@ class FakeRedisRuntime {
2929
Set<(...args: unknown[]) => void>
3030
>();
3131

32-
async connect(): Promise<void> {
32+
connect(): Promise<void> {
3333
this.emit("ready");
34+
return Promise.resolve();
3435
}
3536

36-
async ping(): Promise<"PONG"> {
37-
return "PONG";
37+
ping(): Promise<"PONG"> {
38+
return Promise.resolve("PONG");
3839
}
3940

40-
async quit(): Promise<"OK"> {
41-
return "OK";
41+
quit(): Promise<"OK"> {
42+
return Promise.resolve("OK");
4243
}
4344

4445
private ensureList(key: string): string[] {
@@ -54,86 +55,88 @@ class FakeRedisRuntime {
5455
return list;
5556
}
5657

57-
async lpush(key: string, value: string): Promise<number> {
58+
lpush(key: string, value: string): Promise<number> {
5859
const list = this.ensureList(key);
5960
list.unshift(value);
60-
return list.length;
61+
return Promise.resolve(list.length);
6162
}
6263

63-
async rpush(key: string, value: string): Promise<number> {
64+
rpush(key: string, value: string): Promise<number> {
6465
const list = this.ensureList(key);
6566
list.push(value);
66-
return list.length;
67+
return Promise.resolve(list.length);
6768
}
6869

69-
async lmove(
70+
lmove(
7071
source: string,
7172
destination: string,
7273
sourceSide: "LEFT" | "RIGHT",
7374
destinationSide: "LEFT" | "RIGHT",
7475
): Promise<string | null> {
7576
const sourceList = this.lists.get(source) ?? [];
7677
const value = sourceSide === "LEFT" ? sourceList.shift() : sourceList.pop();
77-
if (value === undefined) return null;
78+
if (value === undefined) return Promise.resolve(null);
7879
const destinationList = this.ensureList(destination);
7980
if (destinationSide === "LEFT") destinationList.unshift(value);
8081
else destinationList.push(value);
81-
return value;
82+
return Promise.resolve(value);
8283
}
8384

84-
async lrange(key: string, start: number, stop: number): Promise<string[]> {
85+
lrange(key: string, start: number, stop: number): Promise<string[]> {
8586
const list = this.lists.get(key) ?? [];
8687
const normalizedStop = stop < 0 ? list.length + stop : stop;
87-
return list.slice(start, normalizedStop + 1);
88+
return Promise.resolve(list.slice(start, normalizedStop + 1));
8889
}
8990

90-
async llen(key: string): Promise<number> {
91-
return (this.lists.get(key) ?? []).length;
91+
llen(key: string): Promise<number> {
92+
return Promise.resolve((this.lists.get(key) ?? []).length);
9293
}
9394

94-
async ltrim(key: string, start: number, stop: number): Promise<void> {
95+
ltrim(key: string, start: number, stop: number): Promise<void> {
9596
const list = this.lists.get(key) ?? [];
9697
const normalizedStop = stop < 0 ? list.length + stop : stop;
9798
this.lists.set(key, list.slice(start, normalizedStop + 1));
99+
return Promise.resolve();
98100
}
99101

100-
async lindex(key: string, index: number): Promise<string | null> {
101-
return this.lists.get(key)?.[index] ?? null;
102+
lindex(key: string, index: number): Promise<string | null> {
103+
return Promise.resolve(this.lists.get(key)?.[index] ?? null);
102104
}
103105

104-
async lset(key: string, index: number, value: string): Promise<void> {
106+
lset(key: string, index: number, value: string): Promise<void> {
105107
const list = this.lists.get(key);
106108
if (!list || index < 0 || index >= list.length) {
107-
throw new Error("ERR index out of range");
109+
return Promise.reject(new Error("ERR index out of range"));
108110
}
109111
list[index] = value;
112+
return Promise.resolve();
110113
}
111114

112-
async get(key: string): Promise<string | null> {
113-
return this.values.get(key) ?? null;
115+
get(key: string): Promise<string | null> {
116+
return Promise.resolve(this.values.get(key) ?? null);
114117
}
115118

116-
async set(
119+
set(
117120
key: string,
118121
value: string,
119122
...args: Array<string | number>
120123
): Promise<"OK" | null> {
121124
const onlyIfAbsent = args.includes("NX");
122-
if (onlyIfAbsent && this.values.has(key)) return null;
125+
if (onlyIfAbsent && this.values.has(key)) return Promise.resolve(null);
123126
this.values.set(key, value);
124-
return "OK";
127+
return Promise.resolve("OK");
125128
}
126129

127-
async expire(_key: string, _ttlSeconds: number): Promise<number> {
128-
return 1;
130+
expire(_key: string, _ttlSeconds: number): Promise<number> {
131+
return Promise.resolve(1);
129132
}
130133

131-
async del(key: string): Promise<number> {
134+
del(key: string): Promise<number> {
132135
const deleted = this.values.delete(key) || this.lists.delete(key);
133-
return deleted ? 1 : 0;
136+
return Promise.resolve(deleted ? 1 : 0);
134137
}
135138

136-
async eval(
139+
eval(
137140
script: string,
138141
_numKeys: number,
139142
...args: string[]
@@ -142,19 +145,19 @@ class FakeRedisRuntime {
142145
script.includes("redis.call('GET', KEYS[1]) == ARGV[1]") &&
143146
script.includes("redis.call('EXPIRE', KEYS[1], ARGV[2])")
144147
) {
145-
return this.values.get(args[0]) === args[1] ? 1 : 0;
148+
return Promise.resolve(this.values.get(args[0]) === args[1] ? 1 : 0);
146149
}
147150

148151
if (
149152
script.includes("redis.call('GET', KEYS[1]) == ARGV[1]") &&
150153
script.includes("redis.call('DEL', KEYS[1])")
151154
) {
152-
if (this.values.get(args[0]) !== args[1]) return 0;
155+
if (this.values.get(args[0]) !== args[1]) return Promise.resolve(0);
153156
this.values.delete(args[0]);
154-
return 1;
157+
return Promise.resolve(1);
155158
}
156159

157-
throw new Error("unsupported eval script");
160+
return Promise.reject(new Error("unsupported eval script"));
158161
}
159162

160163
on(event: RedisEvent, listener: (...args: unknown[]) => void): void {

0 commit comments

Comments
 (0)