Skip to content

Commit 133fd3d

Browse files
authored
Merge pull request #6734 from Countly/feature/ingestion
Feature/ingestion
2 parents e25660f + e745adc commit 133fd3d

File tree

19 files changed

+386
-183
lines changed

19 files changed

+386
-183
lines changed

api/ingestor/requestProcessor.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -317,7 +317,7 @@ var processToDrill = async function(params, drill_updates, callback) {
317317
}
318318
var upWithMeta = fillUserProperties(dbAppUser, params.app?.ovveridden_types?.prop);
319319
dbEventObject[common.dbUserMap.device_id] = params.qstring.device_id;
320-
dbEventObject.lsid = dbAppUser.lsid;
320+
dbEventObject.lsid = events[i].lsid || dbAppUser.lsid;
321321
dbEventObject[common.dbEventMap.user_properties] = upWithMeta.up;
322322
dbEventObject.custom = upWithMeta.upCustom;
323323
dbEventObject.cmp = upWithMeta.upCampaign;
@@ -831,7 +831,7 @@ const validateAppForWriteAPI = (params, done) => {
831831
postfix: crypto.createHash('md5').update(params.app_user.did + "").digest('base64')[0],
832832
ended: "false"
833833
},
834-
up_extra,
834+
up_extra
835835
});
836836
}
837837
plugins.dispatch("/sdk/process_user", ob, function() { //

api/ingestor/usage.js

Lines changed: 66 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,63 @@ usage.returnRequestMetrics = function(params) {
304304
return params.collectedMetrics;
305305
};
306306

307+
usage.updateEndSessionParams = function(params, eventList, session_duration) {
308+
var user = params.app_user;
309+
if (!user || !eventList || !Array.isArray(eventList)) {
310+
return;
311+
}
312+
const up_extra = { av_prev: params.app_user.av, p_prev: params.app_user.p };
313+
if (params.app_user.hadFatalCrash) {
314+
up_extra.hadFatalCrash = params.app_user.hadFatalCrash;
315+
}
316+
if (params.app_user.hadAnyFatalCrash) {
317+
up_extra.hadAnyFatalCrash = params.app_user.hadAnyFatalCrash;
318+
}
319+
if (params.app_user.hadNonfatalCrash) {
320+
up_extra.hadNonfatalCrash = params.app_user.hadNonfatalCrash;
321+
}
322+
if (params.app_user.hadAnyNonfatalCrash) {
323+
up_extra.hadAnyNonfatalCrash = params.app_user.hadAnyNonfatalCrash;
324+
}
325+
326+
var drill_doc = {
327+
"key": "[CLY]_session",
328+
"lsid": user.lsid,
329+
"segmentation": user.lsparams,
330+
"dur": ((user.sd || 0) + (session_duration || 0)),
331+
"count": 1,
332+
"up_extra": up_extra
333+
};
334+
var lasts = (user.ls * 1000);
335+
let idsplit = user.lsid.split("_");
336+
if (idsplit[3] && idsplit[3].length === 13) {
337+
lasts = parseInt(idsplit[3]);
338+
}
339+
drill_doc._id = params.app_id + "_" + user.uid + "_" + user.lsid;
340+
drill_doc.timestamp = lasts;
341+
drill_doc.segmentation.ended = "true";
342+
eventList.push(drill_doc);
343+
344+
//Flush last view stored for user
345+
if (user.last_view) {
346+
user.last_view.segments = user.last_view.segments || {};
347+
user.last_view.segments.exit = 1;
348+
if (user.vc < 2) {
349+
user.last_view.segments.bounce = 1;
350+
}
351+
var lastViewDoc = {
352+
"key": "[CLY]_view", //Will be renamed to [CLY]_view_update before inserting to drill
353+
"name": user.last_view.name,
354+
"segmentation": user.last_view.segments,
355+
"dur": user.last_view.duration || 0,
356+
"_id": (user.last_view._idv ? (params.app_id + "_" + user.uid + '_' + user.last_view._idv + '_up') : (user.lvid + '_up')),
357+
"timestamp": user.last_view.ts,
358+
"_system_auto_added": true
359+
};
360+
eventList.push(lastViewDoc);
361+
}
362+
};
363+
307364
usage.processSession = function(ob) {
308365
var params = ob.params;
309366
var userProps = {};
@@ -333,43 +390,17 @@ usage.processSession = function(ob) {
333390
delete params.qstring.begin_session;//do not start a new session.
334391
}
335392
else {
336-
337393
if (params.app_user[common.dbUserMap.has_ongoing_session]) {
338394
params.qstring.end_session = {"lsid": ob.params.app_user.lsid, "ls": ob.params.app_user.ls, "sd": ob.params.app_user.sd};
339395
}
340396
userProps[common.dbUserMap.last_begin_session_timestamp] = params.time.timestamp;
341397
userProps.lsid = params.request_id;
342398

343399
if (params.app_user[common.dbUserMap.has_ongoing_session]) {
344-
var drill_updates = {};
345400
if (params.app_user.lsid) {
346-
if (params.app_user.sd > 0) {
347-
drill_updates.dur = params.app_user.sd;
348-
}
349-
if (params.app_user.custom && Object.keys(params.app_user.custom).length > 0) {
350-
drill_updates.custom = JSON.parse(JSON.stringify(params.app_user.custom));
351-
}
352401
try {
353-
var lasts = (params.app_user.ls * 1000);
354-
let idsplit = params.app_user.lsid.split("_");
355-
if (idsplit[3] && idsplit[3].length === 13) {
356-
lasts = parseInt(idsplit[3]);
357-
}
358402
params.qstring.events = params.qstring.events || [];
359-
const up_extra = { av_prev: params.app_user.av };
360-
if (params.app_user.hadFatalCrash) {
361-
up_extra.hadFatalCrash = params.app_user.hadFatalCrash;
362-
}
363-
if (params.app_user.hadAnyFatalCrash) {
364-
up_extra.hadAnyFatalCrash = params.app_user.hadAnyFatalCrash;
365-
}
366-
if (params.app_user.hadNonfatalCrash) {
367-
up_extra.hadNonfatalCrash = params.app_user.hadNonfatalCrash;
368-
}
369-
if (params.app_user.hadAnyNonfatalCrash) {
370-
up_extra.hadAnyNonfatalCrash = params.app_user.hadAnyNonfatalCrash;
371-
}
372-
403+
usage.updateEndSessionParams(params, params.qstring.events);
373404
if (!params.app_user.hadFatalCrash) {
374405
userProps.hadAnyFatalCrash = moment(params.time.timestamp).unix();
375406
}
@@ -384,15 +415,6 @@ usage.processSession = function(ob) {
384415
userProps.hadNonfatalCrash = false;
385416
}
386417

387-
params.qstring.events.unshift({
388-
"_id": params.app_user.lsid,
389-
"key": "[CLY]_session",
390-
"segmentation": params.app_user.lsparams || { ended: "true" },
391-
"dur": (drill_updates.dur || 0),
392-
"count": 1,
393-
"timestamp": lasts,
394-
up_extra,
395-
});
396418
}
397419
catch (ex) {
398420
log.e("Error adding previous session end event: " + ex);
@@ -417,7 +439,13 @@ usage.processSession = function(ob) {
417439
if (!update.$inc) {
418440
update.$inc = {};
419441
}
442+
if (!update.$unset) {
443+
update.$unset = {};
444+
}
445+
delete params.app_user.last_view;
446+
update.$unset.last_view = "";
420447
update.$inc.sc = 1;
448+
421449
}
422450
}
423451
else if (params.qstring.end_session && params.app_user && params.app_user[common.dbUserMap.has_ongoing_session]) {
@@ -426,53 +454,11 @@ usage.processSession = function(ob) {
426454
userProps[common.dbUserMap.last_end_session_timestamp] = params.time.timestamp;
427455
}
428456
else {
429-
var drill_updates2 = {};
430457
if (params.app_user.lsid) {
431-
drill_updates2.dur = (params.app_user.sd || 0) + (session_duration || 0);
432-
if (drill_updates2.dur === 0) {
433-
delete drill_updates2.dur;
434-
}
435-
if (params.app_user.custom && Object.keys(params.app_user.custom).length > 0) {
436-
drill_updates2.custom = JSON.parse(JSON.stringify(params.app_user.custom));
437-
}
438-
drill_updates2["sg.ended"] = "true";
439-
drill_updates2.lu = new Date();
440-
//if (drill_updates2.dur || drill_updates2.custom) {
441-
//ob.drill_updates.push({"updateOne": {"filter": {"_id": params.app_user.lsid}, "update": {"$set": drill_updates2}}});
442-
//}
443-
var lasts2 = (params?.app_user?.ls * 1000);
444-
let idsplit = params.app_user.lsid.split("_");
445-
if (idsplit[3] && idsplit[3].length === 13) {
446-
lasts2 = parseInt(idsplit[3]);
447-
}
448-
449458
params.qstring.events = params.qstring.events || [];
450-
const up_extra = { av_prev: params.app_user.av };
451-
if (params.app_user.hadFatalCrash) {
452-
up_extra.hadFatalCrash = params.app_user.hadFatalCrash;
453-
}
454-
if (params.app_user.hadAnyFatalCrash) {
455-
up_extra.hadAnyFatalCrash = params.app_user.hadAnyFatalCrash;
456-
}
457-
if (params.app_user.hadNonfatalCrash) {
458-
up_extra.hadNonfatalCrash = params.app_user.hadNonfatalCrash;
459-
}
460-
if (params.app_user.hadAnyNonfatalCrash) {
461-
up_extra.hadAnyNonfatalCrash = params.app_user.hadAnyNonfatalCrash;
462-
}
463-
464-
params.qstring.events.unshift({
465-
"_id": params.app_user.lsid,
466-
"key": "[CLY]_session",
467-
"segmentation": params.app_user.lsparams || { ended: "true" },
468-
"dur": (drill_updates2.dur || 0),
469-
"count": 1,
470-
"timestamp": lasts2,
471-
up_extra,
472-
});
473-
459+
console.log("Ending previous session" + params.app_user.lsid);
460+
usage.updateEndSessionParams(params, params.qstring.events, session_duration);
474461
}
475-
userProps.data = {};
476462
}
477463
if (params.app_user[common.dbUserMap.has_ongoing_session]) {
478464
if (!update.$unset) {

api/parts/data/fetch.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1451,7 +1451,9 @@ fetch.fetchTimeObj = function(collection, params, isCustomEvent, options) {
14511451
if (meta.sg[key].type !== "a") {
14521452
output.meta = output.meta || {};
14531453
output.meta.segments = output.meta.segments || [];
1454-
output.meta.segments.push(key);
1454+
if (output.meta.segments.indexOf(key) === -1) {
1455+
output.meta.segments.push(key);
1456+
}
14551457
}
14561458
}
14571459
}

api/parts/queries/mongodbQueries.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -241,7 +241,7 @@ var obb = {};
241241

242242
var pipeline = [];
243243
pipeline.push({"$match": match});
244-
pipeline.push({"$group": {"_id": {"u": "$uid", "sg": "$n" }, "t": {"$sum": 1}, "d": {"$sum": "$dur"}, "s": {"$sum": "$sg.visit"}, "e": {"$sum": "$sg.exit"}, "b": {"$sum": "$sg.bounce"}}});
244+
pipeline.push({"$group": {"_id": {"u": "$uid", "sg": "$n" }, "t": {"$sum": 1}, "d": {"$sum": "$dur"}, "s": {"$sum": "$sg.start"}, "e": {"$sum": "$sg.exit"}, "b": {"$sum": "$sg.bounce"}}});
245245
pipeline.push({"$addFields": {"u": 1}});
246246
//Union with cly action
247247
pipeline.push({
@@ -265,7 +265,7 @@ var obb = {};
265265
];
266266

267267
pipeline.push({"$unionWith": {"coll": "drill_events", "pipeline": pipeline2}});
268-
pipeline.push({"$group": {"_id": "$_id", "u": {"$max": "$u"}, "t": {"$max": "$t"}, "d": {"$max": "$d"}, "s": {"$max": "$s"}, "e": {"$max": "$e"}, "b": {"$max": "$b"}, "scr": {"$max": "$scr"}}});
268+
pipeline.push({"$group": {"_id": "$_id", "u": {"$max": "$u"}, "t": {"$max": "$t"}, "d": {"$sum": "$d"}, "s": {"$max": "$s"}, "e": {"$max": "$e"}, "b": {"$max": "$b"}, "scr": {"$max": "$scr"}}});
269269
pipeline.push({
270270
"$addFields": {
271271
"scr-calc": { $cond: [ { $or: [{$eq: ["$t", 0]}, {$eq: ['$scr', 0]}]}, 0, {'$divide': ['$scr', "$t"]}] },

api/utils/common.js

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2989,7 +2989,15 @@ common.mergeQuery = function(ob1, ob2) {
29892989
if (!ob1[key]) {
29902990
ob1[key] = ob2[key];
29912991
}
2992-
else if (key === "$set" || key === "$setOnInsert" || key === "$unset") {
2992+
else if (key === "$set" || key === "$setOnInsert") {
2993+
for (let val in ob2[key]) {
2994+
ob1[key][val] = ob2[key][val];
2995+
if (ob1.$unset && typeof ob1.$unset[val] !== "undefined") {
2996+
delete ob1.$unset[val];
2997+
}
2998+
}
2999+
}
3000+
else if (key === "$unset") {
29933001
for (let val in ob2[key]) {
29943002
ob1[key][val] = ob2[key][val];
29953003
}

api/utils/eventTransformer.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,9 @@ function transformToKafkaEventFormat(doc) {
5454
if (doc.sg && typeof doc.sg === 'object') {
5555
result.sg = doc.sg;
5656
}
57+
if (doc.up_extra && typeof doc.up_extra === 'object') {
58+
result.up_extra = doc.up_extra;
59+
}
5760

5861
// Optional date field
5962
if (doc.lu !== undefined && doc.lu !== null) {

plugins/crashes/api/aggregator.js

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -513,8 +513,10 @@ const recalculateStats = async function(currEvent) {
513513
cd: '$fullDocument.cd',
514514
e: '$fullDocument.e',
515515
n: '$fullDocument.n',
516-
sg: '$fullDocument.sg',
517516
ts: '$fullDocument.ts',
517+
up_extra: '$fullDocument.up_extra',
518+
up: '$fullDocument.up',
519+
sg: '$fullDocument.sg'
518520
},
519521
},
520522
],

plugins/crashes/tests.js

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3201,6 +3201,11 @@ describe('Testing Crashes', function() {
32013201
setTimeout(done, 200 * testUtils.testScalingFactor);
32023202
});
32033203
});
3204+
it('Trigger deletion job to run', function(done) {
3205+
testUtils.triggerJobToRun("api:deletionManagerJob", function() {
3206+
setTimeout(done, 5000);
3207+
});
3208+
});
32043209
});
32053210

32063211
describe('Verify reset metrics', function() {

plugins/reports/tests.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,9 @@ describe('Testing Reports', function() {
179179
setTimeout(done, 500 * testUtils.testScalingFactor);
180180
});
181181
});
182+
it('trigger job for database cleanup', function(done) {
183+
testUtils.triggerJobToRun("api:deletionManagerJob", done);
184+
});
182185
});
183186

184187
});

plugins/sources/tests.js

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ describe('Testing Store metrics', function() {
105105
setTimeout(done, 100 * testUtils.testScalingFactor);
106106
});
107107
});
108+
it('trigger job for database cleanup', function(done) {
109+
testUtils.triggerJobToRun("api:deletionManagerJob", done);
110+
});
108111
});
109112
describe('verify empty sources', function() {
110113
it('should have no sources', function(done) {

0 commit comments

Comments
 (0)