feat: #7743 notifications

v1.18.x
Barış Soner Uşaklı 6 years ago
parent cd2e68cb2d
commit 6f738c2b44

@ -34,18 +34,12 @@ Notifications.privilegedTypes = [
'notificationType_new-user-flag', 'notificationType_new-user-flag',
]; ];
Notifications.getAllNotificationTypes = function (callback) { Notifications.getAllNotificationTypes = async function () {
async.waterfall([ const results = await plugins.fireHook('filter:user.notificationTypes', {
function (next) {
plugins.fireHook('filter:user.notificationTypes', {
types: Notifications.baseTypes.slice(), types: Notifications.baseTypes.slice(),
privilegedTypes: Notifications.privilegedTypes.slice(), privilegedTypes: Notifications.privilegedTypes.slice(),
}, next); });
}, return results.types.concat(results.privilegedTypes);
function (results, next) {
next(null, results.types.concat(results.privilegedTypes));
},
], callback);
}; };
Notifications.startJobs = function () { Notifications.startJobs = function () {
@ -53,31 +47,22 @@ Notifications.startJobs = function () {
new cron('*/30 * * * *', Notifications.prune, null, true); new cron('*/30 * * * *', Notifications.prune, null, true);
}; };
Notifications.get = function (nid, callback) { Notifications.get = async function (nid) {
Notifications.getMultiple([nid], function (err, notifications) { const notifications = await Notifications.getMultiple([nid]);
callback(err, Array.isArray(notifications) && notifications.length ? notifications[0] : null); return Array.isArray(notifications) && notifications.length ? notifications[0] : null;
});
}; };
Notifications.getMultiple = function (nids, callback) { Notifications.getMultiple = async function (nids) {
if (!Array.isArray(nids) || !nids.length) { if (!Array.isArray(nids) || !nids.length) {
return setImmediate(callback, null, []); return [];
} }
var notifications;
async.waterfall([
function (next) {
const keys = nids.map(nid => 'notifications:' + nid); const keys = nids.map(nid => 'notifications:' + nid);
db.getObjects(keys, next); const notifications = await db.getObjects(keys);
},
function (_notifications, next) {
notifications = _notifications;
const userKeys = notifications.map(n => n && n.from); const userKeys = notifications.map(n => n && n.from);
User.getUsersFields(userKeys, ['username', 'userslug', 'picture'], next); const usersData = await User.getUsersFields(userKeys, ['username', 'userslug', 'picture']);
},
function (usersData, next) {
notifications.forEach(function (notification, index) { notifications.forEach(function (notification, index) {
if (notification) { if (notification) {
notification.datetimeISO = utils.toISOString(notification.datetime); notification.datetimeISO = utils.toISOString(notification.datetime);
@ -97,89 +82,50 @@ Notifications.getMultiple = function (nids, callback) {
} }
} }
}); });
next(null, notifications); return notifications;
},
], callback);
}; };
Notifications.filterExists = function (nids, callback) { Notifications.filterExists = async function (nids) {
async.waterfall([ const exists = await db.isSortedSetMembers('notifications', nids);
function (next) { return nids.filter((nid, idx) => exists[idx]);
db.isSortedSetMembers('notifications', nids, next);
},
function (exists, next) {
nids = nids.filter((notifId, idx) => exists[idx]);
next(null, nids);
},
], callback);
}; };
Notifications.findRelated = function (mergeIds, set, callback) { Notifications.findRelated = async function (mergeIds, set) {
// A related notification is one in a zset that has the same mergeId // A related notification is one in a zset that has the same mergeId
var nids; const nids = await db.getSortedSetRevRange(set, 0, -1);
async.waterfall([
async.apply(db.getSortedSetRevRange, set, 0, -1),
function (_nids, next) {
nids = _nids;
var keys = nids.map(nid => 'notifications:' + nid); const keys = nids.map(nid => 'notifications:' + nid);
db.getObjectsFields(keys, ['mergeId'], next); let sets = await db.getObjectsFields(keys, ['mergeId']);
},
function (sets, next) {
sets = sets.map(set => String(set.mergeId)); sets = sets.map(set => String(set.mergeId));
var mergeSet = new Set(mergeIds.map(id => String(id))); var mergeSet = new Set(mergeIds.map(id => String(id)));
next(null, nids.filter((nid, idx) => mergeSet.has(sets[idx]))); return nids.filter((nid, idx) => mergeSet.has(sets[idx]));
},
], callback);
}; };
Notifications.create = function (data, callback) { Notifications.create = async function (data) {
if (!data.nid) { if (!data.nid) {
return callback(new Error('[[error:no-notification-id]]')); throw new Error('[[error:no-notification-id]]');
} }
data.importance = data.importance || 5; data.importance = data.importance || 5;
async.waterfall([ const oldNotif = await db.getObject('notifications:' + data.nid);
function (next) { if (oldNotif && parseInt(oldNotif.pid, 10) === parseInt(data.pid, 10) && parseInt(oldNotif.importance, 10) > parseInt(data.importance, 10)) {
db.getObject('notifications:' + data.nid, next); return null;
},
function (oldNotification, next) {
if (oldNotification) {
if (parseInt(oldNotification.pid, 10) === parseInt(data.pid, 10) && parseInt(oldNotification.importance, 10) > parseInt(data.importance, 10)) {
return callback(null, null);
} }
} const now = Date.now();
var now = Date.now();
data.datetime = now; data.datetime = now;
async.parallel([ await Promise.all([
function (next) { db.sortedSetAdd('notifications', now, data.nid),
db.sortedSetAdd('notifications', now, data.nid, next); db.setObject('notifications:' + data.nid, data),
}, ]);
function (next) { return data;
db.setObject('notifications:' + data.nid, data, next);
},
], function (err) {
next(err, data);
});
},
], callback);
}; };
Notifications.push = function (notification, uids, callback) { Notifications.push = async function (notification, uids) {
callback = callback || function () {};
if (!notification || !notification.nid) { if (!notification || !notification.nid) {
return callback(); return;
}
if (!Array.isArray(uids)) {
uids = [uids];
} }
uids = Array.isArray(uids) ? _.uniq(uids) : [uids];
uids = _.uniq(uids);
if (!uids.length) { if (!uids.length) {
return callback(); return;
} }
setTimeout(function () { setTimeout(function () {
@ -191,55 +137,35 @@ Notifications.push = function (notification, uids, callback) {
} }
}); });
}, 1000); }, 1000);
callback();
}; };
function pushToUids(uids, notification, callback) { async function pushToUids(uids, notification) {
function sendNotification(uids, callback) { async function sendNotification(uids) {
if (!uids.length) { if (!uids.length) {
return callback(); return;
} }
var oneWeekAgo = Date.now() - 604800000; const oneWeekAgo = Date.now() - 604800000;
var unreadKeys = []; const unreadKeys = uids.map(uid => 'uid:' + uid + ':notifications:unread');
var readKeys = []; const readKeys = uids.map(uid => 'uid:' + uid + ':notifications:read');
async.waterfall([ await db.sortedSetsAdd(unreadKeys, notification.datetime, notification.nid);
function (next) { await db.sortedSetsRemove(readKeys, notification.nid);
uids.forEach(function (uid) { await db.sortedSetsRemoveRangeByScore(unreadKeys, '-inf', oneWeekAgo);
unreadKeys.push('uid:' + uid + ':notifications:unread'); await db.sortedSetsRemoveRangeByScore(readKeys, '-inf', oneWeekAgo);
readKeys.push('uid:' + uid + ':notifications:read'); const websockets = require('./socket.io');
});
db.sortedSetsAdd(unreadKeys, notification.datetime, notification.nid, next);
},
function (next) {
db.sortedSetsRemove(readKeys, notification.nid, next);
},
function (next) {
db.sortedSetsRemoveRangeByScore(unreadKeys, '-inf', oneWeekAgo, next);
},
function (next) {
db.sortedSetsRemoveRangeByScore(readKeys, '-inf', oneWeekAgo, next);
},
function (next) {
var websockets = require('./socket.io');
if (websockets.server) { if (websockets.server) {
uids.forEach(function (uid) { uids.forEach(function (uid) {
websockets.in('uid_' + uid).emit('event:new_notification', notification); websockets.in('uid_' + uid).emit('event:new_notification', notification);
}); });
} }
next();
},
], callback);
} }
function sendEmail(uids, callback) { async function sendEmail(uids) {
// Update CTA messaging (as not all notification types need custom text) // Update CTA messaging (as not all notification types need custom text)
if (['new-reply', 'new-chat'].includes(notification.type)) { if (['new-reply', 'new-chat'].includes(notification.type)) {
notification['cta-type'] = notification.type; notification['cta-type'] = notification.type;
} }
async.eachLimit(uids, 3, function (uid, next) { await async.eachLimit(uids, 3, function (uid, next) {
emailer.send('notification', uid, { emailer.send('notification', uid, {
path: notification.path, path: notification.path,
subject: utils.stripHTMLTags(notification.subject || '[[notifications:new_notification]]'), subject: utils.stripHTMLTags(notification.subject || '[[notifications:new_notification]]'),
@ -248,19 +174,15 @@ function pushToUids(uids, notification, callback) {
notification: notification, notification: notification,
showUnsubscribe: true, showUnsubscribe: true,
}, next); }, next);
}, callback); });
} }
function getUidsBySettings(uids, callback) { async function getUidsBySettings(uids) {
var uidsToNotify = []; const uidsToNotify = [];
var uidsToEmail = []; const uidsToEmail = [];
async.waterfall([ const usersSettings = await User.getMultipleUserSettings(uids);
function (next) {
User.getMultipleUserSettings(uids, next);
},
function (usersSettings, next) {
usersSettings.forEach(function (userSettings) { usersSettings.forEach(function (userSettings) {
var setting = userSettings['notificationType_' + notification.type] || 'notification'; const setting = userSettings['notificationType_' + notification.type] || 'notification';
if (setting === 'notification' || setting === 'notificationemail') { if (setting === 'notification' || setting === 'notificationemail') {
uidsToNotify.push(userSettings.uid); uidsToNotify.push(userSettings.uid);
@ -270,215 +192,130 @@ function pushToUids(uids, notification, callback) {
uidsToEmail.push(userSettings.uid); uidsToEmail.push(userSettings.uid);
} }
}); });
next(null, { uidsToNotify: uidsToNotify, uidsToEmail: uidsToEmail }); return { uidsToNotify: uidsToNotify, uidsToEmail: uidsToEmail };
},
], callback);
} }
async.waterfall([
function (next) {
// Remove uid from recipients list if they have blocked the user triggering the notification // Remove uid from recipients list if they have blocked the user triggering the notification
User.blocks.filterUids(notification.from, uids, next); uids = await User.blocks.filterUids(notification.from, uids);
}, const data = await plugins.fireHook('filter:notification.push', { notification: notification, uids: uids });
function (uids, next) {
plugins.fireHook('filter:notification.push', { notification: notification, uids: uids }, next);
},
function (data, next) {
if (!data || !data.notification || !data.uids || !data.uids.length) { if (!data || !data.notification || !data.uids || !data.uids.length) {
return callback(); return;
} }
notification = data.notification; notification = data.notification;
let results = { uidsToNotify: data.uids, uidsToEmail: [] };
if (notification.type) { if (notification.type) {
getUidsBySettings(data.uids, next); results = await getUidsBySettings(data.uids);
} else { }
next(null, { uidsToNotify: data.uids, uidsToEmail: [] }); await Promise.all([
} sendNotification(results.uidsToNotify),
}, sendEmail(results.uidsToEmail),
function (results, next) { ]);
async.parallel([
function (next) {
sendNotification(results.uidsToNotify, next);
},
function (next) {
sendEmail(results.uidsToEmail, next);
},
], function (err) {
next(err, results);
});
},
function (results, next) {
plugins.fireHook('action:notification.pushed', { plugins.fireHook('action:notification.pushed', {
notification: notification, notification: notification,
uids: results.uidsToNotify, uids: results.uidsToNotify,
uidsNotified: results.uidsToNotify, uidsNotified: results.uidsToNotify,
uidsEmailed: results.uidsToEmail, uidsEmailed: results.uidsToEmail,
}); });
next();
},
], callback);
} }
Notifications.pushGroup = function (notification, groupName, callback) { Notifications.pushGroup = async function (notification, groupName) {
callback = callback || function () {};
if (!notification) { if (!notification) {
return callback(); return;
} }
async.waterfall([ const members = await groups.getMembers(groupName, 0, -1);
function (next) { await Notifications.push(notification, members);
groups.getMembers(groupName, 0, -1, next);
},
function (members, next) {
Notifications.push(notification, members, next);
},
], callback);
}; };
Notifications.pushGroups = function (notification, groupNames, callback) { Notifications.pushGroups = async function (notification, groupNames) {
callback = callback || function () {};
if (!notification) { if (!notification) {
return callback(); return;
} }
async.waterfall([ let groupMembers = await groups.getMembersOfGroups(groupNames);
function (next) { groupMembers = _.uniq(_.flatten(groupMembers));
groups.getMembersOfGroups(groupNames, next); await Notifications.push(notification, groupMembers);
},
function (groupMembers, next) {
var members = _.uniq(_.flatten(groupMembers));
Notifications.push(notification, members, next);
},
], callback);
}; };
Notifications.rescind = function (nid, callback) { Notifications.rescind = async function (nid) {
callback = callback || function () {}; await Promise.all([
db.sortedSetRemove('notifications', nid),
async.parallel([ db.delete('notifications:' + nid),
async.apply(db.sortedSetRemove, 'notifications', nid), ]);
async.apply(db.delete, 'notifications:' + nid),
], function (err) {
callback(err);
});
}; };
Notifications.markRead = function (nid, uid, callback) { Notifications.markRead = async function (nid, uid) {
callback = callback || function () {};
if (parseInt(uid, 10) <= 0 || !nid) { if (parseInt(uid, 10) <= 0 || !nid) {
return setImmediate(callback); return;
} }
Notifications.markReadMultiple([nid], uid, callback); await Notifications.markReadMultiple([nid], uid);
}; };
Notifications.markUnread = function (nid, uid, callback) { Notifications.markUnread = async function (nid, uid) {
callback = callback || function () {};
if (parseInt(uid, 10) <= 0 || !nid) { if (parseInt(uid, 10) <= 0 || !nid) {
return setImmediate(callback); return;
} }
async.waterfall([ const notification = await db.getObject('notifications:' + nid);
function (next) {
db.getObject('notifications:' + nid, next);
},
function (notification, next) {
if (!notification) { if (!notification) {
return callback(new Error('[[error:no-notification]]')); throw new Error('[[error:no-notification]]');
} }
notification.datetime = notification.datetime || Date.now(); notification.datetime = notification.datetime || Date.now();
async.parallel([ await Promise.all([
async.apply(db.sortedSetRemove, 'uid:' + uid + ':notifications:read', nid), db.sortedSetRemove('uid:' + uid + ':notifications:read', nid),
async.apply(db.sortedSetAdd, 'uid:' + uid + ':notifications:unread', notification.datetime, nid), db.sortedSetAdd('uid:' + uid + ':notifications:unread', notification.datetime, nid),
], next); ]);
},
], function (err) {
callback(err);
});
}; };
Notifications.markReadMultiple = function (nids, uid, callback) { Notifications.markReadMultiple = async function (nids, uid) {
callback = callback || function () {};
nids = nids.filter(Boolean); nids = nids.filter(Boolean);
if (!Array.isArray(nids) || !nids.length) { if (!Array.isArray(nids) || !nids.length) {
return callback(); return;
} }
let notificationKeys = nids.map(nid => 'notifications:' + nid); let notificationKeys = nids.map(nid => 'notifications:' + nid);
let mergeIds = await db.getObjectsFields(notificationKeys, ['mergeId']);
async.waterfall([
async.apply(db.getObjectsFields, notificationKeys, ['mergeId']),
function (mergeIds, next) {
// Isolate mergeIds and find related notifications // Isolate mergeIds and find related notifications
mergeIds = _.uniq(mergeIds.map(set => set.mergeId)); mergeIds = _.uniq(mergeIds.map(set => set.mergeId));
Notifications.findRelated(mergeIds, 'uid:' + uid + ':notifications:unread', next); const relatedNids = await Notifications.findRelated(mergeIds, 'uid:' + uid + ':notifications:unread');
},
function (relatedNids, next) {
notificationKeys = _.union(nids, relatedNids).map(nid => 'notifications:' + nid); notificationKeys = _.union(nids, relatedNids).map(nid => 'notifications:' + nid);
db.getObjectsFields(notificationKeys, ['nid', 'datetime'], next); let notificationData = await db.getObjectsFields(notificationKeys, ['nid', 'datetime']);
},
function (notificationData, next) {
notificationData = notificationData.filter(n => n && n.nid); notificationData = notificationData.filter(n => n && n.nid);
nids = notificationData.map(n => n.nid); nids = notificationData.map(n => n.nid);
async.parallel([
function (next) {
db.sortedSetRemove('uid:' + uid + ':notifications:unread', nids, next);
},
function (next) {
const datetimes = notificationData.map(n => (n && n.datetime) || Date.now()); const datetimes = notificationData.map(n => (n && n.datetime) || Date.now());
db.sortedSetAdd('uid:' + uid + ':notifications:read', datetimes, nids, next); await Promise.all([
}, db.sortedSetRemove('uid:' + uid + ':notifications:unread', nids),
], next); db.sortedSetAdd('uid:' + uid + ':notifications:read', datetimes, nids),
}, ]);
], function (err) {
callback(err);
});
}; };
Notifications.markAllRead = function (uid, callback) { Notifications.markAllRead = async function (uid) {
async.waterfall([ const nids = await db.getSortedSetRevRange('uid:' + uid + ':notifications:unread', 0, 99);
function (next) { await Notifications.markReadMultiple(nids, uid);
db.getSortedSetRevRange('uid:' + uid + ':notifications:unread', 0, 99, next);
},
function (nids, next) {
Notifications.markReadMultiple(nids, uid, next);
},
], callback);
}; };
Notifications.prune = function (callback) { Notifications.prune = async function () {
callback = callback || function () {};
async.waterfall([
function (next) {
const week = 604800000; const week = 604800000;
const cutoffTime = Date.now() - week; const cutoffTime = Date.now() - week;
db.getSortedSetRangeByScore('notifications', 0, 500, '-inf', cutoffTime, next); const nids = await db.getSortedSetRangeByScore('notifications', 0, 500, '-inf', cutoffTime);
},
function (nids, next) {
if (!nids.length) { if (!nids.length) {
return callback(); return;
} }
try {
async.parallel([ await Promise.all([
function (next) { db.sortedSetRemove('notifications', nids),
db.sortedSetRemove('notifications', nids, next); db.deleteAll(nids.map(nid => 'notifications:' + nid)),
}, ]);
function (next) { } catch (err) {
const keys = nids.map(nid => 'notifications:' + nid);
db.deleteAll(keys, next);
},
], next);
},
], function (err) {
if (err) { if (err) {
winston.error('Encountered error pruning notifications', err); winston.error('Encountered error pruning notifications', err);
} }
callback(err); }
});
}; };
Notifications.merge = function (notifications, callback) { Notifications.merge = async function (notifications) {
// When passed a set of notification objects, merge any that can be merged // When passed a set of notification objects, merge any that can be merged
var mergeIds = [ var mergeIds = [
'notifications:upvoted_your_post_in', 'notifications:upvoted_your_post_in',
@ -575,11 +412,10 @@ Notifications.merge = function (notifications, callback) {
return notifications; return notifications;
}, notifications); }, notifications);
plugins.fireHook('filter:notifications.merge', { const data = await plugins.fireHook('filter:notifications.merge', {
notifications: notifications, notifications: notifications,
}, function (err, data) {
callback(err, data.notifications);
}); });
return data && data.notifications;
}; };
Notifications.async = require('./promisify')(Notifications); Notifications.async = require('./promisify')(Notifications);

Loading…
Cancel
Save