Skip to content

Commit 2097568

Browse files
authored
Merge pull request #6673 from Countly/feature/ingestion
Feature/ingestion
2 parents 3dd72ed + f1cfe7d commit 2097568

File tree

15 files changed

+145
-164
lines changed

15 files changed

+145
-164
lines changed

api/ingestor/usage.js

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -336,6 +336,7 @@ usage.processSession = function(ob) {
336336
if (idsplit[3] && idsplit[3].length === 13) {
337337
lasts = parseInt(idsplit[3]);
338338
}
339+
params.qstring.events = params.qstring.events || [];
339340
params.qstring.events.unshift({
340341
"_id": params.app_user.lsid,
341342
"key": "[CLY]_session",
@@ -388,7 +389,7 @@ usage.processSession = function(ob) {
388389
drill_updates2["sg.ended"] = "true";
389390
drill_updates2.lu = new Date();
390391
//if (drill_updates2.dur || drill_updates2.custom) {
391-
ob.drill_updates.push({"updateOne": {"filter": {"_id": params.app_user.lsid}, "update": {"$set": drill_updates2}}});
392+
//ob.drill_updates.push({"updateOne": {"filter": {"_id": params.app_user.lsid}, "update": {"$set": drill_updates2}}});
392393
//}
393394
var lasts2 = (params.qstring.end_session.ls * 1000);
394395
let idsplit = params.app_user.lsid.split("_");

api/parts/data/fetch.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1565,7 +1565,7 @@ fetch.getTotalUsersObjWithOptions = function(metric, params, options, callback)
15651565
"os": "p",
15661566
"platforms": "p",
15671567
"platform": "p",
1568-
"locale":"la",
1568+
"locale": "la",
15691569
"os_versions": "pv",
15701570
"platform_versions": "pv",
15711571
"resolutions": "r",

api/parts/queries/coreAggregation.js

Lines changed: 24 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -115,12 +115,14 @@ catch (error) {
115115
adapters: {
116116
mongodb: {
117117
handler: fetchAggregatedSegmentedEventDataMongo
118-
},
119-
clickhouse: {
120-
handler: clickHouseRunner.fetchAggregatedSegmentedEventDataClickhouse
121118
}
122119
}
123120
};
121+
if (clickHouseRunner && clickHouseRunner.fetchAggregatedSegmentedEventDataClickhouse) {
122+
queryDef.adapters.clickhouse = {
123+
handler: clickHouseRunner.fetchAggregatedSegmentedEventDataClickhouse
124+
};
125+
}
124126

125127
return common.queryRunner.executeQuery(queryDef, params, options);
126128
};
@@ -211,7 +213,25 @@ catch (error) {
211213
};
212214

213215

216+
agg.viewsTableData = async function(params, options) {
217+
if (!common.queryRunner) {
218+
throw new Error('QueryRunner not initialized. Ensure API server is fully started.');
219+
}
214220

215-
221+
const queryDef = {
222+
name: 'VIEWS_TABLE_DATA',
223+
adapters: {
224+
mongodb: {
225+
handler: mongodbRunner.getViewsTableData
226+
}
227+
}
228+
};
229+
if (clickHouseRunner && clickHouseRunner.getViewsTableData) {
230+
queryDef.adapters.clickhouse = {
231+
handler: clickHouseRunner.getViewsTableData
232+
};
233+
}
234+
return common.queryRunner.executeQuery(queryDef, params, options);
235+
};
216236
}(coreAggregator));
217237
module.exports = coreAggregator;

api/parts/queries/mongodbQueries.js

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ var obb = {};
6464
pipeline.push({"$group": {"_id": null, "u": {"$sum": 1}, "d": {"$sum": "$d"}, "t": {"$sum": "$t"}, "n": {"$sum": "$n"}}});
6565

6666
}
67+
68+
//Union with the one having [CLY]_session_begin.
69+
var copy_pipeline = JSON.parse(JSON.stringify(pipeline));
70+
copy_pipeline[0].$match.e = "[CLY]_session_begin";
71+
pipeline.push({"$unionWith": {"coll": "drill_events", "pipeline": copy_pipeline}});
72+
pipeline.push({"$group": {"_id": "$_id", "u": {"$max": "$u"}, "d": {"$max": "$d"}, "t": {"$max": "$t"}, "n": {"$max": "$n"}}});
6773
var data = await common.drillDb.collection("drill_events").aggregate(pipeline).toArray();
6874
for (var z = 0; z < data.length; z++) {
6975
// data[z]._id = data[z]._id.replaceAll(/\:0/gi, ":");
@@ -158,5 +164,68 @@ var obb = {};
158164
//Group by hour
159165
};
160166

167+
168+
agg.getViewsTableData = async function(options) {
169+
var match = options.dbFilter || {};
170+
if (options.appID) {
171+
match.a = options.appID + "";
172+
}
173+
match.e = "[CLY]_view";
174+
if (options.period) {
175+
match.ts = countlyCommon.getPeriodRange(options.period, "UTC", options.periodOffset);
176+
}
177+
178+
var pipeline = [];
179+
pipeline.push({"$match": match});
180+
pipeline.push({"$group": {"_id": {"u": "$uid", "sg": "$n" }, "t": {"$sum": 1}, "d": {"$sum": "$dur"}, "s": {"$sum": "$sg.visit"}, "e": {"$sum": "$sg.exit"}, "b": {"$sum": "$sg.bounce"}}});
181+
pipeline.push({"$addFields": {"u": 1}});
182+
//Union with cly action
183+
pipeline.push({
184+
"$unionWith": {
185+
"coll": "drill_events",
186+
"pipeline": [
187+
{"$match": {"e": "[CLY]_action", "sg.type": "scroll", "ts": match.ts, "a": match.a}},
188+
{"$group": {"_id": {"u": "$uid", "sg": "$n"}, "scr": {"$sum": "$sg.scr"}}}
189+
]
190+
}
191+
});
192+
pipeline.push({"$group": {"_id": "$_id.sg", "u": {"$sum": "$u"}, "t": {"$sum": "$t"}, "d": {"$sum": "$d"}, "s": {"$sum": "$s"}, "e": {"$sum": "$e"}, "b": {"$sum": "$b"}, "scr": {"$sum": "$scr"}}});
193+
194+
//Union with data from view updates and group by _id.sg
195+
var match2 = JSON.parse(JSON.stringify(match));
196+
match2.e = "[CLY]_view_update";
197+
var pipeline2 = [
198+
{"$match": match2},
199+
{"$group": {"_id": {"u": "$uid", "sg": "$n"}, "t": {"$sum": 0}, "d": {"$sum": "$dur"}, "s": {"$sum": 0}, "e": {"$sum": "$sg.exit"}, "b": {"$sum": "$sg.bounce"}, "scr": {"$sum": 0}, "u": {"$sum": 1}}}, //t and scr are 0 as they are not tracked in view update
200+
{"$group": {"_id": "$_id.sg", "u": {"$sum": "$u"}, "t": {"$sum": "$t"}, "d": {"$sum": "$d"}, "s": {"$sum": "$s"}, "e": {"$sum": "$e"}, "b": {"$sum": "$b"}, "scr": {"$sum": "$scr"}}}
201+
];
202+
203+
pipeline.push({"$unionWith": {"coll": "drill_events", "pipeline": pipeline2}});
204+
pipeline.push({"$group": {"_id": "$_id", "u": {"$max": "$u"}, "t": {"$max": "$t"}, "d": {"$max": "$d"}, "s": {"$max": "$s"}, "e": {"$max": "$e"}, "b": {"$max": "$b"}, "scr": {"$max": "$scr"}}});
205+
pipeline.push({
206+
"$addFields": {
207+
"scr-calc": { $cond: [ { $or: [{$eq: ["$t", 0]}, {$eq: ['$scr', 0]}]}, 0, {'$divide': ['$scr', "$t"]}] },
208+
"d-calc": { $cond: [ { $or: [{$eq: ["$t", 0]}, {$eq: ['$d', 0]}]}, 0, {'$divide': ['$d', "$t"]}] },
209+
"br": { $cond: [ { $or: [{$eq: ["$s", 0]}, {$eq: ['$b', 0]}]}, 0, {'$divide': [{"$min": ['$b', "$s"]}, "$s"]}] },
210+
"b": {"$min": [{"$ifNull": ["$b", 0]}, {"$ifNull": ["$s", 0]}]},
211+
"view": "$_id"
212+
}
213+
});
214+
var data = await common.drillDb.collection("drill_events").aggregate(pipeline).toArray();
215+
for (var z = 0; z < data.length; z++) {
216+
data[z]._id = data[z]._id.replaceAll(/\:0/gi, ":");
217+
if (data[z].sg) {
218+
data[z].sg = data[z].sg.replaceAll(/\./gi, ":");
219+
}
220+
}
221+
return {
222+
_queryMeta: {
223+
adapter: 'mongodb',
224+
query: pipeline || 'MongoDB event segmentation aggregation pipeline',
225+
},
226+
data: data
227+
};
228+
};
229+
161230
}(obb));
162231
module.exports = obb;

bin/scripts/data-cleanup/remove_old_crashes_sync.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ Promise.all(
4646
pluginManager.dbConnection("countly"),
4747
pluginManager.dbConnection("countly_drill")
4848
])
49-
.spread(async function(db, drillDb) {
49+
.then(async function([db, drillDb]) {
5050
try {
5151
const apps = (APP_ID.length) ? [{_id: APP_ID}] : await db.collection('apps').find({}, { _id: 1 }).toArray();
5252

bin/scripts/data-cleanup/remove_old_drill_meta_collections.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ Promise.all(
99
pluginManager.dbConnection("countly"),
1010
pluginManager.dbConnection("countly_drill")
1111
])
12-
.spread(async function(countlyDB, countlyDrillDB) {
12+
.then(async function([countlyDB, countlyDrillDB]) {
1313

1414
countlyDrillDB.collections(function(err, colls) {
1515
if (err) {

bin/scripts/modify-data/delete/delete_user_properties.js

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ var unset = {};
2222
unset[PROPERTY] = "";
2323

2424
var Promise = require("bluebird");
25-
Promise.all([plugins.dbConnection("countly"), plugins.dbConnection("countly_drill")]).spread(function(db, dbDrill) {
25+
Promise.all([plugins.dbConnection("countly"), plugins.dbConnection("countly_drill")]).then(async function([db, dbDrill]) {
2626
console.log("Deleting property from app users");
2727
db.collection('app_users' + APP_ID).updateMany({}, {$unset: unset}, function(err,) {
2828
if (err) {

plugins/density/tests.js

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ var APP_ID = "";
99
var DEVICE_ID = "1234567890";
1010
var drill_db = "";
1111

12+
var session_event = "[CLY]_session_begin";
1213
describe('Testing Density metrics', function() {
1314
describe('Empty density', function() {
1415
it('should have no densities', function(done) {
@@ -44,11 +45,12 @@ describe('Testing Density metrics', function() {
4445
setTimeout(done, 2000);
4546
});
4647
});
48+
4749
});
4850
//{"2015":{"1":{"15":{"400dpi":{"t":1,"n":1,"u":1}},"400dpi":{"u":1,"t":1,"n":1}},"400dpi":{"u":1,"t":1,"n":1},"w3":{"400dpi":{"u":1}}},"meta":{"density":["400dpi"]}}
4951
describe('Verify density', function() {
5052
it('should have density', function(done) {
51-
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: "[CLY]_session", query: {"up.dnst": "400dpi"}, values: {u: 1, t: 1, n: 1}}, done);
53+
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: session_event, query: {"up.dnst": "400dpi"}, values: {u: 1, t: 1, n: 1}}, done);
5254
});
5355
});
5456
describe('write bulk density', function() {
@@ -82,7 +84,7 @@ describe('Testing Density metrics', function() {
8284

8385
testUtils.validateBreakdownTotalsInDrillData(drill_db, {
8486
app_id: APP_ID,
85-
event: "[CLY]_session",
87+
event: session_event,
8688
breakdownKeys: ["up.dnst"],
8789
values: {
8890
"100dpi": {"n": 1, "t": 1, "u": 1},
@@ -115,7 +117,7 @@ describe('Testing Density metrics', function() {
115117
});
116118
});
117119
it('should have density and platform(stored density has to pe affected by platform)', function(done) {
118-
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: "[CLY]_session", query: {"up.dnst": "a400dpi", "up.p": "Android"}, values: {u: 1, t: 1, n: 1}}, done);
120+
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: session_event, query: {"up.dnst": "a400dpi", "up.p": "Android"}, values: {u: 1, t: 1, n: 1}}, done);
119121
});
120122

121123
});
@@ -137,7 +139,7 @@ describe('Testing Density metrics', function() {
137139
});
138140
describe('verify empty density', function() {
139141
it('should have no densities', function(done) {
140-
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: "[CLY]_session", query: {}, values: {u: 0, t: 0, n: 0}}, done);
142+
testUtils.validateTotalsInDrillData(drill_db, {app_id: APP_ID, event: session_event, query: {}, values: {u: 0, t: 0, n: 0}}, done);
141143
});
142144
});
143145
});

plugins/server-stats/api/aggregator.js

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ var plugins = require('../../pluginManager.js'),
44
var log = common.log('server-stats:aggregator');
55
//const { DataBatchReader } = require('../../../api/parts/data/dataBatchReader');
66
const UnifiedEventSource = require('../../../api/eventSource/UnifiedEventSource.js');
7-
const internalEventsSkipped = ["[CLY]_orientation", "[CLY]_session_update"];
7+
const internalEventsSkipped = ["[CLY]_orientation", "[CLY]_session", "[CLY]_property_update", "[CLY]_view_update"];
88
(function() {
99
/** Data batch reader option for data point counting*/
1010
/*plugins.register("/aggregator", function() {
@@ -65,7 +65,7 @@ const internalEventsSkipped = ["[CLY]_orientation", "[CLY]_session_update"];
6565
if (internalEventsSkipped.includes(events[k].e)) {
6666
continue;
6767
}
68-
else if (events[k].e === "[CLY]_session") {
68+
else if (events[k].e === "[CLY]_session_begin") {
6969
stats.updateDataPoints(common.manualWriteBatcher, events[k].a, 1, 0, false, token);
7070
}
7171
else if (events[k].e in stats.internalEventsEnum) {

plugins/server-stats/api/parts/stats.js

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ const moment = require("moment");
33
const internalEventsEnum =
44
{
55
"[CLY]_session": "s",
6+
"[CLY]_session_begin": "s", //Using only this one when increasing session count
67
"[CLY]_view": "v",
78
"[CLY]_nps": "n",
89
"[CLY]_crash": "c",

0 commit comments

Comments
 (0)