@@ -33,6 +33,16 @@ class DrainClaimLostError extends Error {
3333const makeBatchKey = ( entries : DrainQueueEntry [ ] ) : string =>
3434 `${ entries [ 0 ] ?. event . id ?? "empty" } :${ entries . at ( - 1 ) ?. event . id ?? "empty" } ` ;
3535
36+ const getDrainableEntryIds = ( entries : DrainQueueEntry [ ] ) : Set < string > => {
37+ const drainableEntryIds = new Set < string > ( ) ;
38+ for ( const entry of entries ) {
39+ if ( shouldDrainEntry ( entry ) ) {
40+ drainableEntryIds . add ( entry . event . id ) ;
41+ }
42+ }
43+ return drainableEntryIds ;
44+ } ;
45+
3646const shouldDrainEntry = ( entry : DrainQueueEntry ) : boolean => {
3747 const text = sanitizeMemoryInput ( getSessionEventRecallText ( entry . event ) ) ;
3848 if ( ! text ) return false ;
@@ -135,8 +145,9 @@ export class BatchDrainService {
135145
136146 const batch = claimed . entries ;
137147 const batchKey = makeBatchKey ( batch ) ;
138- const semanticBatch = batch . filter ( shouldDrainEntry ) ;
139- if ( semanticBatch . length === 0 ) {
148+ const eventIds = batch . map ( ( entry ) => entry . event . id ) ;
149+ const drainableEntryIds = getDrainableEntryIds ( batch ) ;
150+ if ( drainableEntryIds . size === 0 ) {
140151 await this . events . markBatchSuccess ( groupId , claimed . claimToken , batch ) ;
141152 await this . redis . deleteKey ( drainRetryKey ( groupId , batchKey ) ) ;
142153 return { status : "success" , drained : 0 } ;
@@ -149,19 +160,7 @@ export class BatchDrainService {
149160 }
150161
151162 let lostClaim = false ;
152- const refreshClaimHeartbeat = async ( ) : Promise < void > => {
153- try {
154- const refreshed = await this . events . refreshClaimLease (
155- groupId ,
156- claimed . claimToken ,
157- claimed . lockTtlSeconds ,
158- ) ;
159- if ( ! refreshed ) lostClaim = true ;
160- } catch {
161- lostClaim = true ;
162- }
163- } ;
164- const confirmClaimOwnership = async ( ) : Promise < boolean > => {
163+ const refreshClaimOwnership = async ( ) : Promise < boolean > => {
165164 if ( lostClaim ) return false ;
166165 try {
167166 const refreshed = await this . events . refreshClaimLease (
@@ -175,6 +174,11 @@ export class BatchDrainService {
175174 }
176175 return ! lostClaim ;
177176 } ;
177+ const refreshClaimHeartbeat = async ( ) : Promise < void > => {
178+ await refreshClaimOwnership ( ) ;
179+ } ;
180+ const confirmClaimOwnership = ( ) : Promise < boolean > =>
181+ refreshClaimOwnership ( ) ;
178182 const assertClaimOwnership = async ( ) : Promise < void > => {
179183 if ( ! await confirmClaimOwnership ( ) ) {
180184 throw new DrainClaimLostError ( ) ;
@@ -187,8 +191,8 @@ export class BatchDrainService {
187191
188192 try {
189193 for ( const entry of batch ) {
190- await assertClaimOwnership ( ) ;
191- if ( shouldDrainEntry ( entry ) ) {
194+ if ( drainableEntryIds . has ( entry . event . id ) ) {
195+ await assertClaimOwnership ( ) ;
192196 await graphiti . addMemory ( {
193197 name : `${ entry . event . category } :${ entry . event . id } ` ,
194198 episodeBody : entry . episodeBody ,
@@ -208,13 +212,13 @@ export class BatchDrainService {
208212 await assertClaimOwnership ( ) ;
209213 await this . events . markBatchSuccess ( groupId , claimed . claimToken , batch ) ;
210214 await this . redis . deleteKey ( drainRetryKey ( groupId , batchKey ) ) ;
211- return { status : "success" , drained : semanticBatch . length } ;
215+ return { status : "success" , drained : drainableEntryIds . size } ;
212216 } catch ( err ) {
213217 const lostOwnership = err instanceof DrainClaimLostError ;
214218 if ( lostOwnership ) {
215219 logger . warn ( "Drain claim heartbeat lost ownership" , {
216220 groupId,
217- eventIds : batch . map ( ( entry ) => entry . event . id ) ,
221+ eventIds,
218222 } ) ;
219223 }
220224 const attempts = ( retryState ?. attempts ?? 0 ) + 1 ;
@@ -223,7 +227,7 @@ export class BatchDrainService {
223227 if ( ! lostOwnership ) {
224228 logger . warn ( "Drain claim heartbeat lost ownership" , {
225229 groupId,
226- eventIds : batch . map ( ( entry ) => entry . event . id ) ,
230+ eventIds,
227231 } ) ;
228232 }
229233 await this . redis . deleteKey ( drainRetryKey ( groupId , batchKey ) ) ;
0 commit comments