From 3d85992757e924a474f6dfaa1f4843f0f00678c8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bar=C4=B1=C5=9F=20Soner=20U=C5=9Fakl=C4=B1?= Date: Fri, 23 Jun 2017 18:18:34 -0400 Subject: [PATCH] modify db.processSortedSet so it works with intervals --- src/batch.js | 11 ++++++----- src/database/mongo/sorted.js | 15 +++++++++++---- src/user/digest.js | 30 +++++++++++++++--------------- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/src/batch.js b/src/batch.js index 204ae79940..fac78cd021 100644 --- a/src/batch.js +++ b/src/batch.js @@ -30,17 +30,18 @@ exports.processSortedSet = function (setKey, process, options, callback) { }); } + options.batch = options.batch || DEFAULT_BATCH_SIZE; + // use the fast path if possible if (db.processSortedSet && typeof options.doneIf !== 'function' && !utils.isNumber(options.alwaysStartAt)) { - return db.processSortedSet(setKey, process, options.batch || DEFAULT_BATCH_SIZE, callback); + return db.processSortedSet(setKey, process, options, callback); } // custom done condition options.doneIf = typeof options.doneIf === 'function' ? options.doneIf : function () {}; - var batch = options.batch || DEFAULT_BATCH_SIZE; var start = 0; - var stop = batch; + var stop = options.batch; var done = false; async.whilst( @@ -60,8 +61,8 @@ exports.processSortedSet = function (setKey, process, options, callback) { if (err) { return next(err); } - start += utils.isNumber(options.alwaysStartAt) ? options.alwaysStartAt : batch + 1; - stop = start + batch; + start += utils.isNumber(options.alwaysStartAt) ? options.alwaysStartAt : options.batch + 1; + stop = start + options.batch; if (options.interval) { setTimeout(next, options.interval); diff --git a/src/database/mongo/sorted.js b/src/database/mongo/sorted.js index 4168199bb5..5d461043fe 100644 --- a/src/database/mongo/sorted.js +++ b/src/database/mongo/sorted.js @@ -467,13 +467,13 @@ module.exports = function (db, module) { } } - module.processSortedSet = function (setKey, process, batch, callback) { + module.processSortedSet = function (setKey, process, options, callback) { var done = false; var ids = []; var cursor = db.collection('objects').find({ _key: setKey }) .sort({ score: 1 }) .project({ _id: 0, value: 1 }) - .batchSize(batch); + .batchSize(options.batch); async.whilst( function () { @@ -490,13 +490,20 @@ module.exports = function (db, module) { ids.push(item.value); } - if (ids.length < batch && (!done || ids.length === 0)) { + if (ids.length < options.batch && (!done || ids.length === 0)) { return next(null); } process(ids, function (err) { + if (err) { + return next(err); + } ids = []; - return next(err); + if (options.interval) { + setTimeout(next, options.interval); + } else { + next(); + } }); }); }, diff --git a/src/user/digest.js b/src/user/digest.js index 8d515cf734..b9cb689622 100644 --- a/src/user/digest.js +++ b/src/user/digest.js @@ -73,21 +73,21 @@ Digest.getSubscribers = function (interval, callback) { var subs = []; batch.processSortedSet('users:joindate', function (uids, next) { - user.getMultipleUserSettings(uids, function (err, settings) { - if (err) { - return next(err); - } - - settings.forEach(function (hash) { - if (hash.dailyDigestFreq === interval) { - subs.push(hash.uid); - } - }); - - next(); - }); - }, { interval: 1000 }, function () { - next(null, subs); + async.waterfall([ + function (next) { + user.getMultipleUserSettings(uids, next); + }, + function (settings, next) { + settings.forEach(function (hash) { + if (hash.dailyDigestFreq === interval) { + subs.push(hash.uid); + } + }); + next(); + }, + ], next); + }, { interval: 1000 }, function (err) { + next(err, subs); }); }, function (subscribers, next) {