11'use strict' ;
22
3+ const config = require ( '@zone-eu/wild-config' ) ;
34const crypto = require ( 'crypto' ) ;
45const Joi = require ( 'joi' ) ;
56const ObjectId = require ( 'mongodb' ) . ObjectId ;
7+ const log = require ( 'npmlog' ) ;
68const tools = require ( '../tools' ) ;
79const roles = require ( '../roles' ) ;
810const base32 = require ( 'base32.js' ) ;
@@ -16,6 +18,52 @@ const getMailboxCounterCb = (db, mailbox, type, callback) => {
1618 . catch ( err => callback ( err ) ) ;
1719} ;
1820
21+ const hasUpdatesStreamLogging = ! ! ( config . log && config . log . updateStream ) ;
22+
23+ const formatLogValue = value => String ( value ) ;
24+
25+ const logUpdatesStream = ( level , session , message , ...args ) => {
26+ if ( ! hasUpdatesStreamLogging ) {
27+ return ;
28+ }
29+
30+ log [ level ] ( 'API' , '[%s] ' + message , session . id , ...args ) ;
31+ } ;
32+
33+ const logUpdatesEvent = ( session , source , entry ) => {
34+ if ( ! hasUpdatesStreamLogging || ! entry ) {
35+ return ;
36+ }
37+
38+ if ( entry . command === 'COUNTERS' ) {
39+ return log . verbose (
40+ 'API' ,
41+ '[%s] action=updates-event source=%s user=%s event=%s eventId=%s mailbox=%s total=%s unseen=%s' ,
42+ session . id ,
43+ source ,
44+ formatLogValue ( session . user . id ) ,
45+ entry . command ,
46+ formatLogValue ( entry . _id ) ,
47+ formatLogValue ( entry . mailbox ) ,
48+ formatLogValue ( entry . total ) ,
49+ formatLogValue ( entry . unseen )
50+ ) ;
51+ }
52+
53+ log . verbose (
54+ 'API' ,
55+ '[%s] action=updates-event source=%s user=%s event=%s eventId=%s mailbox=%s message=%s modseq=%s' ,
56+ session . id ,
57+ source ,
58+ formatLogValue ( session . user . id ) ,
59+ formatLogValue ( entry . command ) ,
60+ formatLogValue ( entry . _id ) ,
61+ formatLogValue ( entry . mailbox ) ,
62+ formatLogValue ( entry . message ) ,
63+ formatLogValue ( entry . modseq )
64+ ) ;
65+ } ;
66+
1967module . exports = ( db , server , notifier ) => {
2068 server . get (
2169 {
@@ -116,6 +164,8 @@ module.exports = (db, server, notifier) => {
116164 }
117165 } ;
118166
167+ let remoteAddress = req . headers [ 'x-forwarded-for' ] || req . connection . remoteAddress || '' ;
168+ let opened = Date . now ( ) ;
119169 let closed = false ;
120170 let idleTimer = false ;
121171 let idleCounter = 0 ;
@@ -144,45 +194,109 @@ module.exports = (db, server, notifier) => {
144194 }
145195
146196 if ( message ) {
147- return res . write ( formatJournalData ( message ) ) ;
197+ try {
198+ res . write ( formatJournalData ( message ) ) ;
199+ logUpdatesEvent ( session , 'live' , message ) ;
200+ resetIdleComment ( ) ;
201+ } catch ( err ) {
202+ log . error (
203+ 'API' ,
204+ '[%s] action=updates-event-write-fail source=live user=%s error=%s' ,
205+ session . id ,
206+ session . user . id . toString ( ) ,
207+ err . stack || err
208+ ) ;
209+ }
210+ return ;
148211 }
149212
150213 journalReading = true ;
151- loadJournalStream ( db , req , res , user , lastEventId , ( err , info ) => {
152- if ( err ) {
153- // ignore?
154- }
155- lastEventId = info && info . lastEventId ;
156- journalReading = false ;
157- if ( info && info . processed ) {
158- resetIdleComment ( ) ;
159- }
160- } ) ;
214+ logUpdatesStream (
215+ 'verbose' ,
216+ session ,
217+ 'action=updates-replay-start user=%s lastEventId=%s' ,
218+ session . user . id . toString ( ) ,
219+ formatLogValue ( lastEventId )
220+ ) ;
221+
222+ loadJournalStream (
223+ db ,
224+ res ,
225+ user ,
226+ lastEventId ,
227+ ( err , info ) => {
228+ if ( err ) {
229+ logUpdatesStream (
230+ 'error' ,
231+ session ,
232+ 'action=updates-replay-error user=%s lastEventId=%s error=%s' ,
233+ session . user . id . toString ( ) ,
234+ formatLogValue ( lastEventId ) ,
235+ err . message
236+ ) ;
237+ }
238+
239+ lastEventId = info && info . lastEventId ;
240+ journalReading = false ;
241+
242+ logUpdatesStream (
243+ 'verbose' ,
244+ session ,
245+ 'action=updates-replay-complete user=%s processed=%s lastEventId=%s' ,
246+ session . user . id . toString ( ) ,
247+ formatLogValue ( info && info . processed ) ,
248+ formatLogValue ( lastEventId )
249+ ) ;
250+
251+ if ( info && info . processed ) {
252+ resetIdleComment ( ) ;
253+ }
254+ } ,
255+ ( source , entry ) => logUpdatesEvent ( session , source , entry )
256+ ) ;
161257 } ;
162258
163- let close = ( ) => {
259+ let close = reason => {
260+ if ( closed ) {
261+ return ;
262+ }
263+
164264 closed = true ;
165265 clearTimeout ( idleTimer ) ;
166266 notifier . removeListener ( session , journalReader ) ;
267+
268+ logUpdatesStream (
269+ 'info' ,
270+ session ,
271+ 'action=updates-close user=%s reason=%s duration=%s idle=%s lastEventId=%s' ,
272+ session . user . id . toString ( ) ,
273+ reason ,
274+ Date . now ( ) - opened ,
275+ idleCounter ,
276+ formatLogValue ( lastEventId )
277+ ) ;
167278 } ;
168279
169280 let setup = ( ) => {
170281 notifier . addListener ( session , journalReader ) ;
171282
172283 let finished = false ;
173- let done = ( ) => {
284+ let done = reason => {
174285 if ( finished ) {
175286 return ;
176287 }
177288 finished = true ;
178- return close ( ) ;
289+ return close ( reason ) ;
179290 } ;
180291
181292 // force close after 30 min, otherwise we might end with connections that never close
182- req . connection . setTimeout ( 30 * 60 * 1000 , done ) ;
183- req . connection . on ( 'end' , done ) ;
184- req . connection . on ( 'close' , done ) ;
185- req . connection . on ( 'error' , done ) ;
293+ req . connection . setTimeout ( 30 * 60 * 1000 , ( ) => done ( 'timeout' ) ) ;
294+ req . connection . on ( 'end' , ( ) => done ( 'end' ) ) ;
295+ req . connection . on ( 'close' , ( ) => done ( 'close' ) ) ;
296+ req . connection . on ( 'error' , err => {
297+ logUpdatesStream ( 'error' , session , 'action=updates-connection-error user=%s error=%s' , session . user . id . toString ( ) , err . message ) ;
298+ done ( 'error' ) ;
299+ } ) ;
186300 } ;
187301
188302 res . writeHead ( 200 , {
@@ -192,18 +306,57 @@ module.exports = (db, server, notifier) => {
192306 } ) ;
193307
194308 if ( lastEventId ) {
195- loadJournalStream ( db , req , res , user , lastEventId , ( err , info ) => {
196- if ( err ) {
197- res . write ( 'event: error\ndata: ' + err . message . split ( '\n' ) . join ( '\ndata: ' ) + '\n\n' ) ;
198- // ignore
199- }
200- setup ( ) ;
201- if ( info && info . processed ) {
202- resetIdleComment ( ) ;
203- } else {
204- sendIdleComment ( ) ;
205- }
206- } ) ;
309+ logUpdatesStream (
310+ 'info' ,
311+ session ,
312+ 'action=updates-open user=%s remote=%s sess=%s ip=%s lastEventId=%s replay=yes' ,
313+ session . user . id . toString ( ) ,
314+ remoteAddress ,
315+ formatLogValue ( req . params . sess ) ,
316+ formatLogValue ( req . params . ip ) ,
317+ lastEventId . toString ( )
318+ ) ;
319+
320+ logUpdatesStream ( 'verbose' , session , 'action=updates-replay-start user=%s lastEventId=%s' , session . user . id . toString ( ) , lastEventId . toString ( ) ) ;
321+
322+ loadJournalStream (
323+ db ,
324+ res ,
325+ user ,
326+ lastEventId ,
327+ ( err , info ) => {
328+ if ( err ) {
329+ logUpdatesStream (
330+ 'error' ,
331+ session ,
332+ 'action=updates-replay-error user=%s lastEventId=%s error=%s' ,
333+ session . user . id . toString ( ) ,
334+ formatLogValue ( lastEventId ) ,
335+ err . message
336+ ) ;
337+ res . write ( 'event: error\ndata: ' + err . message . split ( '\n' ) . join ( '\ndata: ' ) + '\n\n' ) ;
338+ }
339+
340+ lastEventId = info && info . lastEventId ;
341+
342+ logUpdatesStream (
343+ 'verbose' ,
344+ session ,
345+ 'action=updates-replay-complete user=%s processed=%s lastEventId=%s' ,
346+ session . user . id . toString ( ) ,
347+ formatLogValue ( info && info . processed ) ,
348+ formatLogValue ( lastEventId )
349+ ) ;
350+
351+ setup ( ) ;
352+ if ( info && info . processed ) {
353+ resetIdleComment ( ) ;
354+ } else {
355+ sendIdleComment ( ) ;
356+ }
357+ } ,
358+ ( source , entry ) => logUpdatesEvent ( session , source , entry )
359+ ) ;
207360 } else {
208361 let latest ;
209362 try {
@@ -215,6 +368,17 @@ module.exports = (db, server, notifier) => {
215368 lastEventId = latest . _id ;
216369 }
217370
371+ logUpdatesStream (
372+ 'info' ,
373+ session ,
374+ 'action=updates-open user=%s remote=%s sess=%s ip=%s lastEventId=%s replay=no' ,
375+ session . user . id . toString ( ) ,
376+ remoteAddress ,
377+ formatLogValue ( req . params . sess ) ,
378+ formatLogValue ( req . params . ip ) ,
379+ formatLogValue ( lastEventId )
380+ ) ;
381+
218382 setup ( ) ;
219383 sendIdleComment ( ) ;
220384 }
@@ -242,7 +406,9 @@ function formatJournalData(e) {
242406 return response . join ( '\n' ) + '\n\n' ;
243407}
244408
245- function loadJournalStream ( db , req , res , user , lastEventId , done ) {
409+ function loadJournalStream ( db , res , user , lastEventId , done , onEntry ) {
410+ onEntry = typeof onEntry === 'function' ? onEntry : ( ) => false ;
411+
246412 let query = { user } ;
247413 if ( lastEventId ) {
248414 query . _id = { $gt : lastEventId } ;
@@ -285,15 +451,16 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
285451 // ignore
286452 }
287453
288- res . write (
289- formatJournalData ( {
290- command : 'COUNTERS' ,
291- _id : lastEventId ,
292- mailbox,
293- total,
294- unseen
295- } )
296- ) ;
454+ let countersEntry = {
455+ command : 'COUNTERS' ,
456+ _id : lastEventId ,
457+ mailbox,
458+ total,
459+ unseen
460+ } ;
461+
462+ res . write ( formatJournalData ( countersEntry ) ) ;
463+ onEntry ( 'counters' , countersEntry ) ;
297464
298465 setImmediate ( emitCounters ) ;
299466 } ) ;
@@ -327,9 +494,9 @@ function loadJournalStream(db, req, res, user, lastEventId, done) {
327494 try {
328495 let data = formatJournalData ( e ) ;
329496 res . write ( data ) ;
497+ onEntry ( 'replay' , e ) ;
330498 } catch ( err ) {
331- console . error ( err ) ;
332- console . error ( e ) ;
499+ log . error ( 'API' , 'action=updates-event-write-fail user=%s event=%s error=%s' , user . toString ( ) , e . command , err . stack || err ) ;
333500 }
334501
335502 processed ++ ;
0 commit comments