@@ -5,37 +5,79 @@ const db = require('../db');
55const consts = require ( '../consts' ) ;
66const tools = require ( '../tools' ) ;
77
8+ let getState = async ( task , data ) => {
9+ const [ mailboxData , taskData ] = await Promise . all ( [
10+ db . database . collection ( 'mailboxes' ) . findOne (
11+ {
12+ _id : data . mailbox ,
13+ user : data . user
14+ } ,
15+ {
16+ projection : {
17+ retention : true ,
18+ retentionCounter : true
19+ }
20+ }
21+ ) ,
22+ db . database . collection ( 'tasks' ) . findOne (
23+ {
24+ _id : task . _id
25+ } ,
26+ {
27+ projection : {
28+ 'data.retentionCounter' : true
29+ }
30+ }
31+ )
32+ ] ) ;
33+
34+ return {
35+ mailboxData,
36+ mailboxRetentionCounter : mailboxData ?. retentionCounter || 0 ,
37+ taskRetentionCounter : taskData ?. data ?. retentionCounter || 0
38+ } ;
39+ } ;
40+
841let run = async ( task , data ) => {
942 let processed = 0 ;
1043 let passes = 0 ;
1144 let rerun = false ;
45+ let activeRetentionCounter = data ?. retentionCounter || 0 ;
1246
1347 do {
1448 passes ++ ;
1549 rerun = false ;
16-
17- const passStart = new Date ( ) ;
1850 let lastUid = 0 ;
1951 let hasMore = true ;
2052
2153 while ( hasMore ) {
22- const mailboxData = await db . database . collection ( 'mailboxes' ) . findOne (
23- {
24- _id : data . mailbox ,
25- user : data . user
26- } ,
27- {
28- projection : {
29- retention : true
30- }
31- }
32- ) ;
54+ const state = await getState ( task , data ) ;
55+ const mailboxData = state . mailboxData ;
3356
3457 if ( ! mailboxData ) {
3558 log . verbose ( 'Tasks' , 'task=mailbox-retention id=%s user=%s mailbox=%s status=missing-mailbox' , task . _id , data . user , data . mailbox ) ;
3659 return { processed, passes } ;
3760 }
3861
62+ if ( state . taskRetentionCounter < state . mailboxRetentionCounter ) {
63+ log . verbose (
64+ 'Tasks' ,
65+ 'task=mailbox-retention id=%s user=%s mailbox=%s status=stale taskRetentionCounter=%s mailboxRetentionCounter=%s' ,
66+ task . _id ,
67+ data . user ,
68+ data . mailbox ,
69+ state . taskRetentionCounter ,
70+ state . mailboxRetentionCounter
71+ ) ;
72+ return { processed, passes } ;
73+ }
74+
75+ if ( state . taskRetentionCounter > activeRetentionCounter ) {
76+ activeRetentionCounter = state . taskRetentionCounter ;
77+ rerun = true ;
78+ break ;
79+ }
80+
3981 const messages = await db . database
4082 . collection ( 'messages' )
4183 . find ( {
@@ -60,6 +102,25 @@ let run = async (task, data) => {
60102 continue ;
61103 }
62104
105+ const latestTaskData = await db . database . collection ( 'tasks' ) . findOne (
106+ {
107+ _id : task . _id
108+ } ,
109+ {
110+ projection : {
111+ 'data.retentionCounter' : true
112+ }
113+ }
114+ ) ;
115+
116+ const latestTaskRetentionCounter = latestTaskData ?. data ?. retentionCounter || 0 ;
117+
118+ if ( latestTaskRetentionCounter > activeRetentionCounter ) {
119+ activeRetentionCounter = latestTaskRetentionCounter ;
120+ rerun = true ;
121+ break ;
122+ }
123+
63124 const operations = messages . map ( messageData => {
64125 // Messages get a fresh ObjectId when they are copied or moved, so its timestamp tracks arrival to this mailbox.
65126 const retentionState = tools . getMessageRetentionState ( mailboxData , messageData . _id . getTimestamp ( ) . getTime ( ) ) ;
@@ -98,21 +159,44 @@ let run = async (task, data) => {
98159 lastUid = messages [ messages . length - 1 ] . uid ;
99160 }
100161
101- const taskData = await db . database . collection ( 'tasks' ) . findOne (
102- {
103- _id : task . _id
104- } ,
105- {
106- projection : {
107- updated : true
108- }
162+ if ( ! rerun ) {
163+ const state = await getState ( task , data ) ;
164+
165+ if ( ! state . mailboxData ) {
166+ log . verbose ( 'Tasks' , 'task=mailbox-retention id=%s user=%s mailbox=%s status=missing-mailbox' , task . _id , data . user , data . mailbox ) ;
167+ return { processed, passes } ;
168+ }
169+
170+ if ( state . taskRetentionCounter < state . mailboxRetentionCounter ) {
171+ log . verbose (
172+ 'Tasks' ,
173+ 'task=mailbox-retention id=%s user=%s mailbox=%s status=stale taskRetentionCounter=%s mailboxRetentionCounter=%s' ,
174+ task . _id ,
175+ data . user ,
176+ data . mailbox ,
177+ state . taskRetentionCounter ,
178+ state . mailboxRetentionCounter
179+ ) ;
180+ return { processed, passes } ;
109181 }
110- ) ;
111182
112- rerun = ! ! ( taskData && taskData . updated && taskData . updated > passStart ) ;
183+ if ( state . taskRetentionCounter > activeRetentionCounter ) {
184+ activeRetentionCounter = state . taskRetentionCounter ;
185+ rerun = true ;
186+ }
187+ }
113188 } while ( rerun ) ;
114189
115- log . verbose ( 'Tasks' , 'task=mailbox-retention id=%s user=%s mailbox=%s processed=%s passes=%s' , task . _id , data . user , data . mailbox , processed , passes ) ;
190+ log . verbose (
191+ 'Tasks' ,
192+ 'task=mailbox-retention id=%s user=%s mailbox=%s processed=%s passes=%s retentionCounter=%s' ,
193+ task . _id ,
194+ data . user ,
195+ data . mailbox ,
196+ processed ,
197+ passes ,
198+ activeRetentionCounter
199+ ) ;
116200
117201 return { processed, passes } ;
118202} ;
0 commit comments