add reverse support to db.processSortedSet (#11826)

isekai-main
Barış Soner Uşaklı 2 years ago committed by GitHub
parent 8ab9c72c6b
commit 9b1cc57604
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -23,6 +23,7 @@ exports.processSortedSet = async function (setKey, process, options) {
} }
options.batch = options.batch || DEFAULT_BATCH_SIZE; options.batch = options.batch || DEFAULT_BATCH_SIZE;
options.reverse = options.reverse || false;
// use the fast path if possible // use the fast path if possible
if (db.processSortedSet && typeof options.doneIf !== 'function' && !utils.isNumber(options.alwaysStartAt)) { if (db.processSortedSet && typeof options.doneIf !== 'function' && !utils.isNumber(options.alwaysStartAt)) {
@ -38,10 +39,10 @@ exports.processSortedSet = async function (setKey, process, options) {
if (process && process.constructor && process.constructor.name !== 'AsyncFunction') { if (process && process.constructor && process.constructor.name !== 'AsyncFunction') {
process = util.promisify(process); process = util.promisify(process);
} }
const method = options.reverse ? 'getSortedSetRevRange' : 'getSortedSetRange';
while (true) { while (true) {
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
const ids = await db[`getSortedSetRange${options.withScores ? 'WithScores' : ''}`](setKey, start, stop); const ids = await db[`${method}${options.withScores ? 'WithScores' : ''}`](setKey, start, stop);
if (!ids.length || options.doneIf(start, stop, ids)) { if (!ids.length || options.doneIf(start, stop, ids)) {
return; return;
} }

@ -563,12 +563,12 @@ module.exports = function (module) {
let done = false; let done = false;
const ids = []; const ids = [];
const project = { _id: 0, _key: 0 }; const project = { _id: 0, _key: 0 };
const sort = options.reverse ? -1 : 1;
if (!options.withScores) { if (!options.withScores) {
project.score = 0; project.score = 0;
} }
const cursor = await module.client.collection('objects').find({ _key: setKey }, { projection: project }) const cursor = await module.client.collection('objects').find({ _key: setKey }, { projection: project })
.sort({ score: 1 }) .sort({ score: sort })
.batchSize(options.batch); .batchSize(options.batch);
if (processFn && processFn.constructor && processFn.constructor.name !== 'AsyncFunction') { if (processFn && processFn.constructor && processFn.constructor.name !== 'AsyncFunction') {

@ -664,6 +664,7 @@ SELECT z."value",
module.processSortedSet = async function (setKey, process, options) { module.processSortedSet = async function (setKey, process, options) {
const client = await module.pool.connect(); const client = await module.pool.connect();
const batchSize = (options || {}).batch || 100; const batchSize = (options || {}).batch || 100;
const sort = options.reverse ? 'DESC' : 'ASC';
const cursor = client.query(new Cursor(` const cursor = client.query(new Cursor(`
SELECT z."value", z."score" SELECT z."value", z."score"
FROM "legacy_object_live" o FROM "legacy_object_live" o
@ -671,7 +672,7 @@ SELECT z."value", z."score"
ON o."_key" = z."_key" ON o."_key" = z."_key"
AND o."type" = z."type" AND o."type" = z."type"
WHERE o."_key" = $1::TEXT WHERE o."_key" = $1::TEXT
ORDER BY z."score" ASC, z."value" ASC`, [setKey])); ORDER BY z."score" ${sort}, z."value" ${sort}`, [setKey]));
if (process && process.constructor && process.constructor.name !== 'AsyncFunction') { if (process && process.constructor && process.constructor.name !== 'AsyncFunction') {
process = util.promisify(process); process = util.promisify(process);

@ -83,6 +83,7 @@ module.exports = function (Messaging) {
notifications.push(notification, uids); notifications.push(notification, uids);
}, { }, {
reverse: true,
batch: 500, batch: 500,
interval: 1000, interval: 1000,
}); });

Loading…
Cancel
Save