Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
303 changes: 62 additions & 241 deletions plugins/push/api/send/platforms/a.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
const { ConnectionError, ERROR, SendError, PushError, FCM_SDK_ERRORS } = require('../data/error'),
const { ERROR, SendError, FCM_SDK_ERRORS } = require('../data/error'),
logger = require('../../../../../api/utils/log'),
{ Splitter } = require('./utils/splitter'),
{ util } = require('../std'),
Expand Down Expand Up @@ -56,7 +56,7 @@ class FCM extends Splitter {
* Standard constructor
* @param {string} log logger name
* @param {string} type type of connection: ap, at, id, ia, ip, ht, hp
* @param {Credentials} creds FCM server key
* @param {Credentials} creds FCM credentials
* @param {Object[]} messages initial array of messages to send
* @param {Object} options standard stream options
* @param {number} options.pool.pushes number of notifications which can be processed concurrently, this parameter is strictly set to 500
Expand All @@ -71,34 +71,20 @@ class FCM extends Splitter {
this.legacyApi = !creds._data.serviceAccountFile;

this.log = logger(log).sub(`${threadId}-a`);
if (this.legacyApi) {
this.opts = {
agent: this.agent,
hostname: 'fcm.googleapis.com',
port: 443,
path: '/fcm/send',
method: 'POST',
headers: {
'Accept': 'application/json',
'Content-Type': 'application/json',
'Authorization': `key=${creds._data.key}`,
},
};
}
else {
const serviceAccountJSON = FORGE.util.decode64(
creds._data.serviceAccountFile.substring(creds._data.serviceAccountFile.indexOf(',') + 1)
);
const serviceAccountObject = JSON.parse(serviceAccountJSON);
const appName = creds._data.hash; // using hash as the app name
const firebaseApp = firebaseAdmin.apps.find(app => app.name === appName)
? firebaseAdmin.app(appName)
: firebaseAdmin.initializeApp({
credential: firebaseAdmin.credential.cert(serviceAccountObject, this.agent),
httpAgent: this.agent
}, appName);
this.firebaseMessaging = firebaseApp.messaging();
}

const serviceAccountJSON = FORGE.util.decode64(
creds._data.serviceAccountFile.substring(creds._data.serviceAccountFile.indexOf(',') + 1)
);
const serviceAccountObject = JSON.parse(serviceAccountJSON);
const appName = creds._data.hash; // using hash as the app name
const firebaseApp = firebaseAdmin.apps.find(app => app.name === appName)
? firebaseAdmin.app(appName)
: firebaseAdmin.initializeApp({
credential: firebaseAdmin.credential.cert(serviceAccountObject, this.agent),
httpAgent: this.agent
}, appName);
this.firebaseMessaging = firebaseApp.messaging();


this.log.i('Initialized');
}
Expand Down Expand Up @@ -139,8 +125,6 @@ class FCM extends Splitter {
const one = Math.ceil(bytes / pushes.length);
let content = this.template(pushes[0].m).compile(pushes[0]);

let printBody = false;
const oks = [];
const errors = {};
/**
* Get an error for given code & message, create it if it doesn't exist yet
Expand All @@ -156,221 +140,66 @@ class FCM extends Splitter {
}
return errors[err];
};
if (!this.legacyApi) {
const tokens = pushes.map(p => p.t);

// new fcm api doesn't allow objects or arrays inside "data" property
if (content.data && typeof content.data === "object") {
for (let prop in content.data) {
if (content.data[prop] && typeof content.data[prop] === "object") {
content.data[prop] = JSON.stringify(content.data[prop]);
}
}
}

const messages = tokens.map(token => ({
token,
...content,
}));

return this.firebaseMessaging
// EXAMPLE RESPONSE of sendEach
// {
// "responses": [
// {
// "success": false,
// "error": {
// "code": "messaging/invalid-argument",
// "message": "The registration token is not a valid FCM registration token"
// }
// }
// ],
// "successCount": 0,
// "failureCount": 1
// }
.sendEach(messages)
.then(async result => {
const allPushIds = pushes.map(p => p._id);

if (!result.failureCount) {
this.send_results(allPushIds, bytes);
return;
}
const tokens = pushes.map(p => p.t);
const messages = tokens.map(token => ({
token,
...content,
}));

// array of successfully sent push._id:
const sentSuccessfully = [];

// check for each message
for (let i = 0; i < result.responses.length; i++) {
const { success, error } = result.responses[i];
if (success) {
sentSuccessfully.push(allPushIds[i]);
}
else {
const sdkError = FCM_SDK_ERRORS[error.code];
// check if the sdk error is mapped to an internal error.
// set to default if its not.
let internalErrorCode = sdkError?.mapTo ?? ERROR.DATA_PROVIDER;
let internalErrorMessage = sdkError?.message ?? "Invalid error message";
errorObject(internalErrorCode, internalErrorMessage)
.addAffected(pushes[i]._id, one);
}
}
// send results back:
for (let errorKey in errors) {
this.send_push_error(errors[errorKey]);
}
if (sentSuccessfully.length) {
this.send_results(sentSuccessfully, one * sentSuccessfully.length);
}
});
}

content.registration_ids = pushes.map(p => p.t);

// CONNECTION TEST PAYLOAD (invalid registration token)
return this.firebaseMessaging
// EXAMPLE RESPONSE of sendEach
// {
// "data": {
// "c.i": "663389aab53ebbf71a115edb",
// "message": "test"
// },
// "registration_ids": [
// "0.2124088209996502"
// ]
// "responses": [
// {
// "success": false,
// "error": {
// "code": "messaging/invalid-argument",
// "message": "The registration token is not a valid FCM registration token"
// }
// }
// ],
// "successCount": 0,
// "failureCount": 1
// }
// NORMAL PAYLOAD
// {
// "data": {
// "c.i": "663389a949c58657a8e625b3",
// "title": "qwer",
// "message": "qwer",
// "sound": "default"
// },
// "registration_ids": [
// "dw_CueiXThqYI9owrQC0Pb:APA91bHanJn9RM-ZYnC-3wCMld5Nk3QaVJppS4HOKrpdV8kCXq7pjQlJjcd8_1xq9G6XaceZfrFPxbfehJ4YCEfMsfQVhZW1WKhnY3TbtO7HIQfYfbj35-sx_-BHAhQ5eSDuiCOZWUDP"
// ]
// }
return this.sendRequest(JSON.stringify(content)).then(resp => {
// CONNECTION TEST RESPONSE (with error)
// {
// "multicast_id": 2829871343601014000,
// "success": 0,
// "failure": 1,
// "canonical_ids": 0,
// "results": [
// {
// "error": "InvalidRegistration"
// }
// ]
// }
// NORMAL SUCCESSFUL RESPONSE
// {
// "multicast_id": 5676989510572196000,
// "success": 1,
// "failure": 0,
// "canonical_ids": 0,
// "results": [
// {
// "message_id": "0:1714653611139550%68dc6e82f9fd7ecd"
// }
// ]
// }
try {
resp = JSON.parse(resp);
}
catch (error) {
this.log.e('Bad FCM response format: %j', resp, error);
throw PushError.deserialize(error, SendError);
}
.sendEach(messages)
.then(async result => {
const allPushIds = pushes.map(p => p._id);

if (resp.failure === 0 && resp.canonical_ids === 0) {
this.send_results(pushes.map(p => p._id), bytes);
return;
}
if (!result.failureCount) {
this.send_results(allPushIds, bytes);
return;
}

if (resp.results) {
resp.results.forEach((r, i) => {
if (r.message_id) {
if (r.registration_id) {
if (r.registration_id === 'BLACKLISTED') {
errorObject(ERROR.DATA_TOKEN_INVALID, 'Blacklisted').addAffected(pushes[i]._id, one);
printBody = true;
}
else {
oks.push([pushes[i]._id, r.registration_id]);
}
// oks.push([pushes[i]._id, r.registration_id], one); ???
}
else {
oks.push(pushes[i]._id);
}
}
else if (r.error === 'NotRegistered') {
this.log.d('Token %s expired (%s)', pushes[i].t, r.error);
errorObject(ERROR.DATA_TOKEN_EXPIRED, r.error).addAffected(pushes[i]._id, one);
}
else if (r.error === 'InvalidRegistration' || r.error === 'MismatchSenderId' || r.error === 'InvalidPackageName') {
this.log.d('Token %s is invalid (%s)', pushes[i].t, r.error);
errorObject(ERROR.DATA_TOKEN_INVALID, r.error).addAffected(pushes[i]._id, one);
// array of successfully sent push._id:
const sentSuccessfully = [];

// check for each message
for (let i = 0; i < result.responses.length; i++) {
const { success, error } = result.responses[i];
if (success) {
sentSuccessfully.push(allPushIds[i]);
}
// these are identical to "else" block:
// else if (r.error === 'InvalidParameters') { // still hasn't figured out why this error is thrown, therefore not critical yet
// printBody = true;
// errorObject(ERROR.DATA_PROVIDER, r.error).addAffected(pushes[i]._id, one);
// }
// else if (r.error === 'MessageTooBig' || r.error === 'InvalidDataKey' || r.error === 'InvalidTtl') {
// printBody = true;
// errorObject(ERROR.DATA_PROVIDER, r.error).addAffected(pushes[i]._id, one);
// }
else {
printBody = true;
errorObject(ERROR.DATA_PROVIDER, r.error).addAffected(pushes[i]._id, one);
const sdkError = FCM_SDK_ERRORS[error.code];
// check if the sdk error is mapped to an internal error.
// set to default if its not.
let internalErrorCode = sdkError?.mapTo ?? ERROR.DATA_PROVIDER;
let internalErrorMessage = sdkError?.message ?? "Invalid error message";
errorObject(internalErrorCode, internalErrorMessage)
.addAffected(pushes[i]._id, one);
}
});
let errored = 0;
for (let k in errors) {
errored += errors[k].affectedBytes;
this.send_push_error(errors[k]);
}
if (oks.length) {
this.send_results(oks, bytes - errored);
// send results back:
for (let errorKey in errors) {
this.send_push_error(errors[errorKey]);
}
if (printBody) {
this.log.e('Provider returned error %j for %j', resp, content);
}
}
}, ([code, error]) => {
this.log.w('FCM error %d / %j', code, error);
console.log("========== MAIN PROMISE ERROR");
if (code === 0) {
if (error.message === 'ECONNRESET' || error.code === 'ENOTFOUND' || error.code === 'ETIMEDOUT' ||
error.code === 'ECONNREFUSED' || error.code === 'ECONNABORTED' || error.code === 'EHOSTUNREACH' ||
error.code === 'EAI_AGAIN') {
this.log.w('FCM error %d / %j', bytes, pushes.map(p => p._id));
throw new ConnectionError(`FCM ${error.code}`, ERROR.CONNECTION_PROVIDER)
.setConnectionError(error.code, `${error.errno} ${error.code} ${error.syscall}`)
.addAffected(pushes.map(p => p._id), bytes);
if (sentSuccessfully.length) {
this.send_results(sentSuccessfully, one * sentSuccessfully.length);
}
let pe = PushError.deserialize(error, SendError);
pe.addAffected(pushes.map(p => p._id), bytes);
throw pe;
}
else if (code >= 500) {
throw new ConnectionError(`FCM Unavailable: ${code}`, ERROR.CONNECTION_PROVIDER).addAffected(pushes.map(p => p._id), bytes);
}
else if (code === 401) {
throw new ConnectionError(`FCM Unauthorized: ${code}`, ERROR.INVALID_CREDENTIALS).addAffected(pushes.map(p => p._id), bytes);
}
else if (code === 400) {
throw new ConnectionError(`FCM Bad message: ${code}`, ERROR.DATA_PROVIDER).addAffected(pushes.map(p => p._id), bytes);
}
else {
throw new ConnectionError(`FCM Bad response code: ${code}`, ERROR.EXCEPTION).addAffected(pushes.map(p => p._id), bytes);
}
});
});
});
}

}

/**
Expand Down Expand Up @@ -581,7 +410,6 @@ const CREDS = {
static get scheme() {
return Object.assign(super.scheme, {
serviceAccountFile: { required: false, type: "String" },
key: { required: false, type: 'String', 'min-length': 100},
hash: { required: false, type: 'String' },
});
}
Expand Down Expand Up @@ -621,9 +449,6 @@ const CREDS = {
}
this._data.hash = FORGE.md.sha256.create().update(serviceAccountJSON).digest().toHex();
}
else if (this._data.key) {
this._data.hash = FORGE.md.sha256.create().update(this._data.key).digest().toHex();
}
else {
return ["Updating FCM credentials requires a service-account.json file"];
}
Expand All @@ -635,16 +460,12 @@ const CREDS = {
* @returns {object} json without sensitive information
*/
get view() {
const fcmKey = this._data?.key
? `FCM server key "${this._data.key.substr(0, 10)} ... ${this._data.key.substr(this._data.key.length - 10)}"`
: "";
const serviceAccountFile = this._data?.serviceAccountFile
? "service-account.json"
: "";
return {
_id: this._id,
type: this._data?.type,
key: fcmKey,
serviceAccountFile,
hash: this._data?.hash,
};
Expand Down
Loading