feat: refactor getSortedSetRange to allow big arrays

v1.18.x
Barış Soner Uşaklı 5 years ago
parent 7e7ea7a697
commit b602c04463

@ -0,0 +1,31 @@
'use strict';
const helpers = module.exports;
helpers.mergeBatch = function (batchData, start, stop, sort) {
function getFirst() {
let selectedArray = batchData[0];
for (let i = 1; i < batchData.length; i++) {
if (batchData[i].length && (
!selectedArray.length ||
(sort === 1 && batchData[i][0].score < selectedArray[0].score) ||
(sort === -1 && batchData[i][0].score > selectedArray[0].score)
)) {
selectedArray = batchData[i];
}
}
return selectedArray.length ? selectedArray.shift() : null;
}
let item = null;
const result = [];
var st = process.hrtime();
do {
item = getFirst(batchData);
if (item) {
result.push(item);
}
} while (item && (result.length < (stop - start + 1) || stop === -1));
process.profile('meeerge', st);
return result;
};

@ -1,9 +1,11 @@
'use strict';
var utils = require('../../utils');
const utils = require('../../utils');
module.exports = function (module) {
var helpers = require('./helpers');
const helpers = require('./helpers');
const dbHelpers = require('../helpers');
const util = require('util');
const sleep = util.promisify(setTimeout);
@ -62,37 +64,54 @@ module.exports = function (module) {
fields.score = 0;
}
var reverse = false;
let reverse = false;
if (start === 0 && stop < -1) {
reverse = true;
sort *= -1;
start = Math.abs(stop + 1);
stop = -1;
} else if (start < 0 && stop > start) {
var tmp1 = Math.abs(stop + 1);
const tmp1 = Math.abs(stop + 1);
stop = Math.abs(start + 1);
start = tmp1;
}
var limit = stop - start + 1;
let limit = stop - start + 1;
if (limit <= 0) {
limit = 0;
}
let data = await module.client.collection('objects').find(query, { projection: fields })
.sort({ score: sort })
.skip(start)
.limit(limit)
.toArray();
let result = [];
async function doQuery(fields) {
return await module.client.collection('objects').find(query, { projection: fields })
.sort({ score: sort })
.skip(start)
.limit(limit)
.toArray();
}
if (isArray && key.length > 100) {
const batches = [];
const batch = require('../../batch');
await batch.processArray(key, async currentBatch => batches.push(currentBatch), { batch: 100 });
const batchData = await Promise.all(batches.map(async (batch) => {
query._key = { $in: batch };
return await doQuery({ _id: 0, _key: 0 });
}));
result = dbHelpers.mergeBatch(batchData, start, stop, sort);
} else {
result = await doQuery(fields);
}
if (reverse) {
data.reverse();
result.reverse();
}
if (!withScores) {
data = data.map(item => item.value);
result = result.map(item => item.value);
}
return data;
return result;
}
module.getSortedSetRangeByScore = async function (key, start, count, min, max) {

@ -3,6 +3,7 @@
module.exports = function (module) {
const utils = require('../../utils');
const helpers = require('./helpers');
const dbHelpers = require('../helpers');
require('./sorted/add')(module);
require('./sorted/remove')(module);
@ -26,20 +27,6 @@ module.exports = function (module) {
};
async function sortedSetRange(method, key, start, stop, withScores) {
function getFirst(mapped) {
let selectedArray = mapped[0];
for (let i = 1; i < mapped.length; i++) {
if (mapped[i].length && (
!selectedArray.length ||
(method === 'zrange' && mapped[i][0].score < selectedArray[0].score) ||
(method === 'zrevrange' && mapped[i][0].score > selectedArray[0].score)
)) {
selectedArray = mapped[i];
}
}
return selectedArray.length ? selectedArray.shift() : null;
}
if (Array.isArray(key)) {
if (!key.length) {
return [];
@ -48,17 +35,9 @@ module.exports = function (module) {
key.forEach(key => batch[method]([key, start, stop, 'WITHSCORES']));
const data = await helpers.execBatch(batch);
const mapped = data.map(setData => helpers.zsetToObjectArray(setData));
let objects = [];
const count = stop - start + 1;
let item = null;
do {
item = getFirst(mapped);
if (item) {
objects.push(item);
}
} while (item && (objects.length < count || stop === -1));
const batchData = data.map(setData => helpers.zsetToObjectArray(setData));
let objects = dbHelpers.mergeBatch(batchData, start, stop, method === 'zrange' ? 1 : -1);
if (!withScores) {
objects = objects.map(item => item.value);

@ -251,6 +251,35 @@ describe('Sorted Set methods', function () {
const data = await db.getSortedSetRevRange(['dupezset3', 'dupezset4'], 0, 1);
assert.deepStrictEqual(data, ['value5', 'value3']);
});
it('should work with big arrays (length > 100) ', async function () {
for (let i = 0; i < 400; i++) {
/* eslint-disable no-await-in-loop */
const bulkAdd = [];
for (let k = 0; k < 100; k++) {
bulkAdd.push(['testzset' + i, 1000000 + k + (i * 100), k + (i * 100)]);
}
await db.sortedSetAddBulk(bulkAdd);
}
const keys = [];
for (let i = 0; i < 400; i++) {
keys.push('testzset' + i);
}
let data = await db.getSortedSetRevRange(keys, 0, 3);
assert.deepStrictEqual(data, ['39999', '39998', '39997', '39996']);
data = await db.getSortedSetRevRangeWithScores(keys, 0, 3);
assert.deepStrictEqual(data, [
{ value: '39999', score: 1039999 },
{ value: '39998', score: 1039998 },
{ value: '39997', score: 1039997 },
{ value: '39996', score: 1039996 },
]);
data = await db.getSortedSetRevRange(keys, 0, -1);
assert.equal(data.length, 40000);
});
});
describe('getSortedSetRevRange()', function () {

Loading…
Cancel
Save