diff --git a/src/socket.io/helpers.js b/src/socket.io/helpers.js index 3e8d947aea..b808c73929 100644 --- a/src/socket.io/helpers.js +++ b/src/socket.io/helpers.js @@ -14,10 +14,31 @@ var privileges = require('../privileges'); var notifications = require('../notifications'); var plugins = require('../plugins'); var utils = require('../utils'); +var batch = require('../batch'); var SocketHelpers = module.exports; SocketHelpers.notifyNew = function (uid, type, result) { + async.waterfall([ + function (next) { + user.getUidsFromSet('users:online', 0, -1, next); + }, + function (uids, next) { + uids = uids.filter(toUid => parseInt(toUid, 10) !== uid); + batch.processArray(uids, function (uids, next) { + notifyUids(uid, uids, type, result, next); + }, { + interval: 1000, + }, next); + }, + ], function (err) { + if (err) { + return winston.error(err.stack); + } + }); +}; + +function notifyUids(uid, uids, type, result, callback) { let watchStateUids; let categoryWatchStates; let topicFollowState; @@ -26,10 +47,6 @@ SocketHelpers.notifyNew = function (uid, type, result) { const cid = post.topic.cid; async.waterfall([ function (next) { - user.getUidsFromSet('users:online', 0, -1, next); - }, - function (uids, next) { - uids = uids.filter(toUid => parseInt(toUid, 10) !== uid); privileges.topics.filterUids('topics:read', tid, uids, next); }, function (uids, next) { @@ -48,23 +65,21 @@ SocketHelpers.notifyNew = function (uid, type, result) { function (uids, next) { plugins.fireHook('filter:sockets.sendNewPostToUids', { uidsTo: uids, uidFrom: uid, type: type }, next); }, - ], function (err, data) { - if (err) { - return winston.error(err.stack); - } - - post.ip = undefined; + function (data, next) { + post.ip = undefined; - data.uidsTo.forEach(function (toUid) { - post.categoryWatchState = categoryWatchStates[toUid]; - post.topic.isFollowing = topicFollowState[toUid]; - websockets.in('uid_' + toUid).emit('event:new_post', result); - if (result.topic && type === 'newTopic') { - websockets.in('uid_' + toUid).emit('event:new_topic', result.topic); - } - }); - }); -}; + data.uidsTo.forEach(function (toUid) { + post.categoryWatchState = categoryWatchStates[toUid]; + post.topic.isFollowing = topicFollowState[toUid]; + websockets.in('uid_' + toUid).emit('event:new_post', result); + if (result.topic && type === 'newTopic') { + websockets.in('uid_' + toUid).emit('event:new_topic', result.topic); + } + }); + setImmediate(next); + }, + ], callback); +} function getWatchStates(uids, tid, cid, callback) { async.parallel({