modify db.processSortedSet so it works with intervals

v1.18.x
Barış Soner Uşaklı 8 years ago
parent b4b68498cd
commit 3d85992757

@ -30,17 +30,18 @@ exports.processSortedSet = function (setKey, process, options, callback) {
}); });
} }
options.batch = options.batch || DEFAULT_BATCH_SIZE;
// use the fast path if possible // use the fast path if possible
if (db.processSortedSet && typeof options.doneIf !== 'function' && !utils.isNumber(options.alwaysStartAt)) { if (db.processSortedSet && typeof options.doneIf !== 'function' && !utils.isNumber(options.alwaysStartAt)) {
return db.processSortedSet(setKey, process, options.batch || DEFAULT_BATCH_SIZE, callback); return db.processSortedSet(setKey, process, options, callback);
} }
// custom done condition // custom done condition
options.doneIf = typeof options.doneIf === 'function' ? options.doneIf : function () {}; options.doneIf = typeof options.doneIf === 'function' ? options.doneIf : function () {};
var batch = options.batch || DEFAULT_BATCH_SIZE;
var start = 0; var start = 0;
var stop = batch; var stop = options.batch;
var done = false; var done = false;
async.whilst( async.whilst(
@ -60,8 +61,8 @@ exports.processSortedSet = function (setKey, process, options, callback) {
if (err) { if (err) {
return next(err); return next(err);
} }
start += utils.isNumber(options.alwaysStartAt) ? options.alwaysStartAt : batch + 1; start += utils.isNumber(options.alwaysStartAt) ? options.alwaysStartAt : options.batch + 1;
stop = start + batch; stop = start + options.batch;
if (options.interval) { if (options.interval) {
setTimeout(next, options.interval); setTimeout(next, options.interval);

@ -467,13 +467,13 @@ module.exports = function (db, module) {
} }
} }
module.processSortedSet = function (setKey, process, batch, callback) { module.processSortedSet = function (setKey, process, options, callback) {
var done = false; var done = false;
var ids = []; var ids = [];
var cursor = db.collection('objects').find({ _key: setKey }) var cursor = db.collection('objects').find({ _key: setKey })
.sort({ score: 1 }) .sort({ score: 1 })
.project({ _id: 0, value: 1 }) .project({ _id: 0, value: 1 })
.batchSize(batch); .batchSize(options.batch);
async.whilst( async.whilst(
function () { function () {
@ -490,13 +490,20 @@ module.exports = function (db, module) {
ids.push(item.value); ids.push(item.value);
} }
if (ids.length < batch && (!done || ids.length === 0)) { if (ids.length < options.batch && (!done || ids.length === 0)) {
return next(null); return next(null);
} }
process(ids, function (err) { process(ids, function (err) {
if (err) {
return next(err);
}
ids = []; ids = [];
return next(err); if (options.interval) {
setTimeout(next, options.interval);
} else {
next();
}
}); });
}); });
}, },

@ -73,21 +73,21 @@ Digest.getSubscribers = function (interval, callback) {
var subs = []; var subs = [];
batch.processSortedSet('users:joindate', function (uids, next) { batch.processSortedSet('users:joindate', function (uids, next) {
user.getMultipleUserSettings(uids, function (err, settings) { async.waterfall([
if (err) { function (next) {
return next(err); user.getMultipleUserSettings(uids, next);
} },
function (settings, next) {
settings.forEach(function (hash) { settings.forEach(function (hash) {
if (hash.dailyDigestFreq === interval) { if (hash.dailyDigestFreq === interval) {
subs.push(hash.uid); subs.push(hash.uid);
} }
}); });
next();
next(); },
}); ], next);
}, { interval: 1000 }, function () { }, { interval: 1000 }, function (err) {
next(null, subs); next(err, subs);
}); });
}, },
function (subscribers, next) { function (subscribers, next) {

Loading…
Cancel
Save