@@ -33,11 +33,24 @@ class DrainClaimLostError extends Error {
3333const makeBatchKey = ( entries : DrainQueueEntry [ ] ) : string =>
3434 `${ entries [ 0 ] ?. event . id ?? "empty" } :${ entries . at ( - 1 ) ?. event . id ?? "empty" } ` ;
3535
36- const getDrainableEntryIds = ( entries : DrainQueueEntry [ ] ) : Set < string > => {
36+ type PreparedDrainEntry = {
37+ entry : DrainQueueEntry ;
38+ recallText : string ;
39+ } ;
40+
41+ const prepareDrainEntries = (
42+ entries : DrainQueueEntry [ ] ,
43+ ) : PreparedDrainEntry [ ] =>
44+ entries . map ( ( entry ) => ( {
45+ entry,
46+ recallText : getDrainEntryRecallText ( entry ) ,
47+ } ) ) ;
48+
49+ const getDrainableEntryIds = ( entries : PreparedDrainEntry [ ] ) : Set < string > => {
3750 const drainableEntryIds = new Set < string > ( ) ;
3851 for ( const entry of entries ) {
3952 if ( shouldDrainEntry ( entry ) ) {
40- drainableEntryIds . add ( entry . event . id ) ;
53+ drainableEntryIds . add ( entry . entry . event . id ) ;
4154 }
4255 }
4356 return drainableEntryIds ;
@@ -46,40 +59,44 @@ const getDrainableEntryIds = (entries: DrainQueueEntry[]): Set<string> => {
4659const getDrainEntryRecallText = ( entry : DrainQueueEntry ) : string =>
4760 sanitizeMemoryInput ( getSessionEventRecallText ( entry . event ) ) ;
4861
49- const buildGraphitiEpisodeBody = ( entry : DrainQueueEntry ) : string => {
50- const refs = entry . event . refs ?. length
51- ? `\nRefs: ${ entry . event . refs . join ( ", " ) } `
62+ const buildGraphitiEpisodeBody = ( entry : PreparedDrainEntry ) : string => {
63+ const refs = entry . entry . event . refs ?. length
64+ ? `\nRefs: ${ entry . entry . event . refs . join ( ", " ) } `
5265 : "" ;
53- const keywords = entry . event . keywords ?. length
54- ? `\nKeywords: ${ entry . event . keywords . join ( ", " ) } `
66+ const keywords = entry . entry . event . keywords ?. length
67+ ? `\nKeywords: ${ entry . entry . event . keywords . join ( ", " ) } `
5568 : "" ;
5669 return sanitizeMemoryInput (
5770 [
58- `Category: ${ entry . event . category } ` ,
59- `Role: ${ entry . event . role } ` ,
60- `Summary: ${ entry . event . summary } ` ,
61- entry . event . detail ? `Detail: ${ entry . event . detail } ` : "" ,
62- entry . event . continuityText
63- ? `Continuity: ${ entry . event . continuityText } `
64- : getDrainEntryRecallText ( entry ) ,
71+ `Category: ${ entry . entry . event . category } ` ,
72+ `Role: ${ entry . entry . event . role } ` ,
73+ `Summary: ${ entry . entry . event . summary } ` ,
74+ entry . entry . event . detail ? `Detail: ${ entry . entry . event . detail } ` : "" ,
75+ entry . entry . event . continuityText
76+ ? `Continuity: ${ entry . entry . event . continuityText } `
77+ : entry . recallText ,
6578 keywords ,
6679 refs ,
6780 ] . filter ( Boolean ) . join ( "\n" ) ,
6881 ) ;
6982} ;
7083
71- const shouldDrainEntry = ( entry : DrainQueueEntry ) : boolean => {
72- const text = getDrainEntryRecallText ( entry ) ;
84+ const shouldDrainEntry = ( entry : PreparedDrainEntry ) : boolean => {
85+ const text = entry . recallText ;
7386 if ( ! text ) return false ;
7487 if ( looksLikeToolTranscript ( text ) ) return false ;
7588 if ( looksLikeOperationalChatter ( text ) ) return false ;
7689 if ( looksTranscriptHeavy ( text ) ) return false ;
7790 if (
78- entry . event . role === "assistant" && entry . event . category !== "discovery"
91+ entry . entry . event . role === "assistant" &&
92+ entry . entry . event . category !== "discovery"
7993 ) {
8094 return false ;
8195 }
82- if ( entry . event . category === "message" && entry . event . role !== "user" ) {
96+ if (
97+ entry . entry . event . category === "message" &&
98+ entry . entry . event . role !== "user"
99+ ) {
83100 return false ;
84101 }
85102 return true ;
@@ -175,9 +192,10 @@ export class BatchDrainService {
175192 }
176193
177194 const batch = claimed . entries ;
195+ const preparedBatch = prepareDrainEntries ( batch ) ;
178196 const batchKey = makeBatchKey ( batch ) ;
179197 const eventIds = batch . map ( ( entry ) => entry . event . id ) ;
180- const drainableEntryIds = getDrainableEntryIds ( batch ) ;
198+ const drainableEntryIds = getDrainableEntryIds ( preparedBatch ) ;
181199 if ( drainableEntryIds . size === 0 ) {
182200 await this . events . markBatchSuccess ( groupId , claimed . claimToken , batch ) ;
183201 await this . redis . deleteKey ( drainRetryKey ( groupId , batchKey ) ) ;
@@ -241,12 +259,13 @@ export class BatchDrainService {
241259 let checkpointedCount = 0 ;
242260
243261 try {
244- for ( const entry of batch ) {
262+ for ( const preparedEntry of preparedBatch ) {
263+ const entry = preparedEntry . entry ;
245264 if ( drainableEntryIds . has ( entry . event . id ) ) {
246265 await assertClaimOwnership ( ) ;
247266 await graphiti . addMemory ( {
248267 name : `${ entry . event . category } :${ entry . event . id } ` ,
249- episodeBody : buildGraphitiEpisodeBody ( entry ) ,
268+ episodeBody : buildGraphitiEpisodeBody ( preparedEntry ) ,
250269 groupId,
251270 source : "text" ,
252271 sourceDescription : `session-event:${ entry . event . category } ` ,
0 commit comments