Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions plugins/push/api/api.js
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ plugins.register('/master', function() {
common.dbUniqueMap.users.push(common.dbMap['messaging-enabled'] = DBMAP.MESSAGING_ENABLED);
fields(platforms, true).forEach(f => common.dbUserMap[f] = f);
PUSH.cache = common.cache.cls(PUSH_CACHE_GROUP);
setTimeout(() => {
const jobManager = require('../../../api/parts/jobs');
jobManager.job("push:clear-stats").replace().schedule("at 3:00 am every 7 days");
}, 10000);
});

plugins.register('/master/runners', runners => {
Expand Down Expand Up @@ -361,6 +365,7 @@ plugins.register('/i/app_users/export', ({app_id, uids, export_commands, dbargs,
* @apiDefine PushMessageBody
*
* @apiBody {ObjectID} app Application ID
* @apiBody {Boolean} saveStats Store each individual push records into push_stats for debugging
* @apiBody {String[]} platforms Array of platforms to send to
* @apiBody {String="draft"} [status] Message status, only set to draft when creating or editing a draft message, don't set otherwise
* @apiBody {Object} filter={} User profile filter to limit recipients of this message
Expand Down Expand Up @@ -410,6 +415,7 @@ plugins.register('/i/app_users/export', ({app_id, uids, export_commands, dbargs,
*
* @apiSuccess {ObjectID} _id Message ID
* @apiSuccess {ObjectID} app Application ID
* @apiSuccess {Boolean} saveStats Store each individual push records into push_stats for debugging
* @apiSuccess {String[]} platforms Array of platforms to send to
* @apiSuccess {Number} state Message state, for internal use
* @apiSuccess {String="created", "inactive", "draft", "scheduled", "sending", "sent", "stopped", "failed"} [status] Message status: "created" is for messages yet to be scheduled (put into queue), "inactive" - cannot be scheduled (approval required for push approver plugin), "draft", "scheduled", "sending", "sent", "stopped" - automated message has been stopped, "failed" - failed to send all notifications
Expand Down
27 changes: 27 additions & 0 deletions plugins/push/api/jobs/clear-stats.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
'use strict';
/**
* @typedef {import("mongodb").Db} MongoDb
*/

const { Job } = require('../../../../api/parts/jobs/job.js');
const log = require('../../../../api/utils/log.js')('job:push:clear-stats');

const EXPIRY = 30 * 24 * 60 * 60 * 1000; // 30 days

/**
* Clears push_stats collection
*/
class ClearStatsJob extends Job {
/**
* Clears push_stats based on EXPIRY and MAX_RECORDS
* @param {MongoDb} db - db connection
*/
async run(db) {
log.d('Clearing push_stats');
await db.collection("push_stats").deleteMany({
d: { $lte: new Date(Date.now() - EXPIRY) }
});
}
}

module.exports = ClearStatsJob;
98 changes: 69 additions & 29 deletions plugins/push/api/jobs/util/resultor.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,23 @@
const { FRAME, FRAME_NAME } = require('../../send/proto'),
{ DoFinish } = require('./do_finish'),
{ ERROR, TriggerKind, State, Status, PushError, Result } = require('../../send/data');
/**
* @typedef {import("mongodb").ObjectId} ObjectId
*/

/**
* PushStat object (collection: push_stats)
* @typedef {Object} PushStat
* @property {ObjectId} a - application id
* @property {ObjectId} m - message id from "messages" collection
* @property {string} u - uid from app_users{appId}
* @property {string} t - token from "push_{appId}" collection
* @property {string=} r - id returned from provider
* @property {Date} d - date this message sent to this user
* @property {string=} e - error message
* @property {string} p - platform: "a" for android, "i" for ios and "h" for huawei
* @property {string} f - token type: "p" for production
*/

/**
* Stream responsible for handling sending results:
Expand Down Expand Up @@ -34,7 +51,10 @@ class Resultor extends DoFinish {
this.fatalErrors = {}; // {mid: []}
this.toDelete = []; // [push id, push id, ...]
this.count = 0; // number of results cached
this.last = null; // time of last data from
this.last = null; // time of last data from

/** @type {PushStat[]} */
this.pushStats = [];

this.data.on('app', app => {
this.changed[app._id] = {};
Expand Down Expand Up @@ -118,11 +138,17 @@ class Resultor extends DoFinish {
if (id < 0) {
return;
}
let {p, m, pr} = this.data.pushes[id],
const p = this.data.pushes[id];
let {p: platform, m, pr} = p,
msg = this.data.message(m),
result,
rp, rl;

// additional fields to keep this in push_stats
if (msg && msg.saveStats) {
this.pushStats.push({ a: p.a, m: p.m, p: p.p, f: p.f, u: p.u, t: p.t, d: new Date, r: null, e: results.toString() });
}

if (msg) {
result = msg.result;
result.lastRun.processed++;
Expand All @@ -131,7 +157,7 @@ class Resultor extends DoFinish {
else {
result = this.noMessage[m] || (this.noMessage[m] = new Result());
}
rp = result.sub(p, undefined, PLATFORM[p].parent);
rp = result.sub(platform, undefined, PLATFORM[platform].parent);
rl = rp.sub(pr.la || 'default');

result.processed++;
Expand All @@ -141,8 +167,8 @@ class Resultor extends DoFinish {
rl.recordError(results.message, 1);
rl.processed++;

if (PLATFORM[p].parent) {
rp = result.sub(PLATFORM[p].parent),
if (PLATFORM[platform].parent) {
rp = result.sub(PLATFORM[platform].parent),
rl = rp.sub(pr.la || 'default');
rp.recordError(results.message, 1);
rp.processed++;
Expand All @@ -159,29 +185,39 @@ class Resultor extends DoFinish {
}
else {
results.forEach(res => {
let id, token;
if (typeof res === 'string') {
this.log.d('Ok for %s', id);
id = res;
}
else {
let id, resultId, token;

if (Array.isArray(res)) {
this.log.d('New token for %s', id);
id = res[0];
token = res[1];
}
else {
id = res;
}

if (typeof id !== "string") {
resultId = id.r;
id = id.p;
}

let p = this.data.pushes[id];
if (!p) { // 2 or more resultors on one pool
return;
}

this.data.decSending(p.m);

let m = this.data.message(p.m),
let msg = this.data.message(p.m),
result, rp, rl;

if (m) {
result = m.result;
// additional fields to keep this in push_stats
if (msg && msg.saveStats) {
this.pushStats.push({ a: p.a, m: p.m, p: p.p, f: p.f, u: p.u, t: p.t, d: new Date, r: resultId, e: null });
}

this.data.decSending(p.m);

if (msg) {
result = msg.result;
result.lastRun.processed++;
}
else {
Expand Down Expand Up @@ -220,14 +256,6 @@ class Resultor extends DoFinish {
});
this.log.d('Added %d results', results.length);
}

// // in case no more data is expected, we can safely close the stream
// if (this.check()) {
// for (let _ in this.state.pushes) {
// return;
// }
// this.do_flush(() => this.end());
// }
}
else if (frame & FRAME.ERROR) {
let error = results.messageError(),
Expand All @@ -241,28 +269,35 @@ class Resultor extends DoFinish {
return;
}
this.log.d('Error %d %s for %s', results.type, results.name, id);
let {m, p, pr} = this.data.pushes[id],
const p = this.data.pushes[id];
let {m, p: platform, pr} = p,
result, rp, rl;
let msg = this.data.message(m);

// additional fields to keep this in push_stats
if (msg && msg.saveStats) {
this.pushStats.push({ a: p.a, m: p.m, p: p.p, f: p.f, u: p.u, t: p.t, d: new Date, r: null, e: results.toString() });
}

mids[m] = (mids[m] || 0) + 1;
delete this.data.pushes[id];
this.toDelete.push(id);

let msg = this.data.message(m);
if (msg) {
result = msg.result;
}
else {
result = this.noMessage[m] || (this.noMessage[m] = new Result());
}

rp = result.sub(p, undefined, PLATFORM[p].parent);
rp = result.sub(platform, undefined, PLATFORM[platform].parent);
rl = rp.sub(pr.la || 'default');

rp.processed++;
rl.processed++;

if (PLATFORM[p].parent) {
rp = result.sub(PLATFORM[p].parent),
if (PLATFORM[platform].parent) {
rp = result.sub(PLATFORM[platform].parent),
rl = rp.sub(pr.la || 'default');
rp.processed++;
rl.processed++;
Expand Down Expand Up @@ -514,6 +549,11 @@ class Resultor extends DoFinish {
}
}

if (this.pushStats.length) {
promises.push(this.db.collection("push_stats").insertMany(this.pushStats));
this.pushStats = [];
}

Promise.all(promises).then(() => {
this.log.d('do_flush done');
callback();
Expand Down
30 changes: 27 additions & 3 deletions plugins/push/api/send/data/message.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,14 @@ class Message extends Mongoable {

/**
* Validation scheme of this class
*
*
* @returns {object} validateArgs scheme
*/
static get scheme() {
return {
_id: { required: false, type: 'ObjectID' },
app: { required: true, type: 'ObjectID' },
saveStats: { required: false, type: 'Boolean' },
platforms: { required: true, type: 'String[]', in: () => require('../platforms').platforms },
state: { type: 'Number' },
status: { type: 'String', in: Object.values(Status) },
Expand Down Expand Up @@ -148,9 +149,32 @@ class Message extends Mongoable {
}
}

/**
* Getter for message.saveStats
*
* @returns {boolean} saveStats
*/
get saveStats() {
return this._data.saveStats;
}

/**
* Setter for message.saveStats
*
* @param {boolean} value value to set
*/
set saveStats(value) {
if (typeof value !== "boolean") {
this._data.saveStats = false;
}
else {
this._data.saveStats = value;
}
}

/**
* Getter for platforms
*
*
* @returns {string[]|undefined} platforms array
*/
get platforms() {
Expand All @@ -159,7 +183,7 @@ class Message extends Mongoable {

/**
* Setter for platforms
*
*
* @param {string[]|undefined} arr platforms array
*/
set platforms(arr) {
Expand Down
12 changes: 3 additions & 9 deletions plugins/push/api/send/platforms/a.js
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ class FCM extends Splitter {
return errors[err];
};

const messages = pushes.map(p => p.t).map((token) => ({
const messages = pushes.map(({ t: token }) => ({
token,
...content,
}));
Expand All @@ -178,20 +178,14 @@ class FCM extends Splitter {
.sendEach(messages)
.then(async result => {
const allPushIds = pushes.map(p => p._id);

if (!result.failureCount) {
this.send_results(allPushIds, bytes);
return;
}

// array of successfully sent push._id:
const sentSuccessfully = [];

// check for each message
for (let i = 0; i < result.responses.length; i++) {
const { success, error } = result.responses[i];
const { success, error, messageId } = result.responses[i];
if (success) {
sentSuccessfully.push(allPushIds[i]);
sentSuccessfully.push({ p: allPushIds[i], r: messageId });
}
else {
const sdkError = FCM_SDK_ERRORS[error.code];
Expand Down
4 changes: 2 additions & 2 deletions plugins/push/api/send/platforms/i.js
Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,6 @@ class APN extends Base {
}
}
// =======0========000=================0========000=================0========0
console.log(JSON.stringify(reqHeaders, null, 2), JSON.stringify(content, null, 2));
let stream = this.session.request(reqHeaders),
status,
data = '';
Expand Down Expand Up @@ -708,7 +707,8 @@ class APN extends Base {
status = headers[':status'];
// self.log.d('%d: status %d: %j', i, status, self.session.state);
if (status === 200) {
oks.push(p._id);
const apnsUniqueId = headers["apns-unique-id"];
oks.push({ p: p._id, r: apnsUniqueId });
stream.destroy();
streamDone();
}
Expand Down
3 changes: 3 additions & 0 deletions plugins/push/frontend/public/javascripts/countly.models.js
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,7 @@
isCohorts: typeof countlyCohorts !== 'undefined',
_id: null,
demo: false,
saveStats: false,
name: "",
platforms: [],
message: {
Expand Down Expand Up @@ -1413,6 +1414,7 @@
return {
_id: dto._id || null,
demo: this.mapDemo(dto),
saveStats: dto.saveStats || false,
status: this.mapStatus(dto),
createdAt: dto.info && dto.info.created ? moment(dto.info.created).format("dddd, Do MMMM YYYY h:mm") : null,
name: dto.info && dto.info.title,
Expand Down Expand Up @@ -1995,6 +1997,7 @@
mapModelToBaseDto: function(pushNotificationModel, options) {
var resultDto = {
app: countlyCommon.ACTIVE_APP_ID,
saveStats: pushNotificationModel.saveStats || false,
platforms: this.mapPlatforms(pushNotificationModel.platforms),
};
if (pushNotificationModel._id) {
Expand Down
3 changes: 3 additions & 0 deletions plugins/push/frontend/public/localization/push.properties
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,9 @@ push-notification.testing-tooltip = Sends the push notification to applications'
push-notification.send-to-test-users = Send to test users
push-notification.confirmation-uppercase = CONFIRMATION
push-notification.confirmation-uppercase-description = CONFIRMATION description
push-notification.save-push-stats = Keep individual push records for each user
push-notification.debugging = DEBUGGING
push-notification.push-stats-warning = This options enables the storage of each push record inside "push_stats" collection for every message per device for debug purposes. Please enable this option with caution since it can fill up the database quickly.
push-notification.i-am-ready-to-send = I am ready to send this message to real-users
push-notification.was-successfully-saved = Push notification message was successfully saved
push-notification.was-successfully-sent-to-test-users = Push notification message was successfully sent to test users
Expand Down
Loading
Loading