"use strict"; var async = require('async'); module.exports = function(db, module) { var helpers = module.helpers.mongo; module.sortedSetAdd = function(key, score, value, callback) { callback = callback || helpers.noop; if (!key) { return callback(); } if (Array.isArray(score) && Array.isArray(value)) { return sortedSetAddBulk(key, score, value, callback); } value = helpers.valueToString(value); db.collection('objects').update({_key: key, value: value}, {$set: {score: parseInt(score, 10)}}, {upsert:true, w: 1}, function(err) { callback(err); }); }; function sortedSetAddBulk(key, scores, values, callback) { if (!scores.length || !values.length) { return callback(); } if (scores.length !== values.length) { return callback(new Error('[[error:invalid-data]]')); } values = values.map(helpers.valueToString); var bulk = db.collection('objects').initializeUnorderedBulkOp(); for(var i=0; i 0) { pipeline.push({ $limit: limit }); } pipeline.push({ $project: { _id: 0, value: '$_id.value' }}); db.collection('objects').aggregate(pipeline, function(err, data) { if (err || !data) { return callback(err); } data = data.map(function(item) { return item.value; }); callback(null, data); }); } module.sortedSetIncrBy = function(key, increment, value, callback) { callback = callback || helpers.noop; if (!key) { return callback(); } var data = {}; value = helpers.fieldToString(value); data.score = parseInt(increment, 10); db.collection('objects').findAndModify({_key: key, value: value}, {}, {$inc: data}, {new: true, upsert: true}, function(err, result) { // if there is duplicate key error retry the upsert // https://github.com/NodeBB/NodeBB/issues/4467 // https://jira.mongodb.org/browse/SERVER-14322 // https://docs.mongodb.org/manual/reference/command/findAndModify/#upsert-and-unique-index if (err && err.message.startsWith('E11000 duplicate key error')) { return module.sortedSetIncrBy(key, increment, value, callback); } callback(err, result && result.value ? result.value.score : null); }); }; module.getSortedSetRangeByLex = function(key, min, max, start, count, callback) { var query = {_key: key}; if (min !== '-') { query.value = {$gte: min}; } if (max !== '+') { query.value = query.value || {}; query.value.$lte = max; } db.collection('objects').find(query, {_id: 0, value: 1}) .sort({value: 1}) .skip(start) .limit(count === -1 ? 0 : count) .toArray(function(err, data) { if (err) { return callback(err); } data = data.map(function(item) { return item && item.value; }); callback(err, data); }); }; module.processSortedSet = function(setKey, process, batch, callback) { var done = false; var ids = []; var cursor = db.collection('objects').find({_key: setKey}) .sort({score: 1}) .project({_id: 0, value: 1}) .batchSize(batch); async.whilst( function() { return !done; }, function(next) { cursor.next(function(err, item) { if (err) { return next(err); } if (item === null) { done = true; } else { ids.push(item.value); } if (ids.length < batch && (!done || ids.length === 0)) { return next(null); } process(ids, function(err) { ids = []; return next(err); }); }); }, callback ); }; };