From e5228179c19cf18d585bfcafe9ddbdf5e65de555 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bar=C4=B1=C5=9F=20Soner=20U=C5=9Fakl=C4=B1?= Date: Wed, 20 May 2020 19:13:47 -0400 Subject: [PATCH] Mongo intersection (#8322) * feat: intersection without aggregation * feat: intersection * feat: remove debug code --- src/database/mongo/sorted/intersect.js | 179 ++++++++++++++++++++----- 1 file changed, 149 insertions(+), 30 deletions(-) diff --git a/src/database/mongo/sorted/intersect.js b/src/database/mongo/sorted/intersect.js index 87f537665c..242dfd5c70 100644 --- a/src/database/mongo/sorted/intersect.js +++ b/src/database/mongo/sorted/intersect.js @@ -5,18 +5,54 @@ module.exports = function (module) { if (!Array.isArray(keys) || !keys.length) { return 0; } - - var pipeline = [ - { $match: { _key: { $in: keys } } }, - { $group: { _id: { value: '$value' }, count: { $sum: 1 } } }, - { $match: { count: keys.length } }, - { $group: { _id: null, count: { $sum: 1 } } }, - ]; - - const data = await module.client.collection('objects').aggregate(pipeline).toArray(); - return Array.isArray(data) && data.length ? data[0].count : 0; + const objects = module.client.collection('objects'); + const counts = await countSets(keys, 50000); + if (counts.minCount === 0) { + return 0; + } + let items = await objects.find({ _key: counts.smallestSet }, { + projection: { _id: 0, value: 1 }, + }).toArray(); + + items = items.map(i => i.value); + const otherSets = keys.filter(s => s !== counts.smallestSet); + if (otherSets.length === 1) { + return await objects.countDocuments({ + _key: otherSets[0], value: { $in: items }, + }); + } + items = await intersectValuesWithSets(items, otherSets); + return items.length; }; + async function intersectValuesWithSets(items, sets) { + for (let i = 0; i < sets.length; i++) { + /* eslint-disable no-await-in-loop */ + items = await module.client.collection('objects').find({ + _key: sets[i], value: { $in: items }, + }, { + projection: { _id: 0, value: 1 }, + }).toArray(); + items = items.map(i => i.value); + } + return items; + } + + async function countSets(sets, limit) { + const objects = module.client.collection('objects'); + const counts = await Promise.all( + sets.map(s => objects.countDocuments({ _key: s }, { + limit: limit || 25000, + })) + ); + const minCount = Math.min(...counts); + const index = counts.indexOf(minCount); + const smallestSet = sets[index]; + return { + minCount: minCount, + smallestSet: smallestSet, + }; + } module.getSortedSetIntersect = async function (params) { params.sort = 1; @@ -29,10 +65,100 @@ module.exports = function (module) { }; async function getSortedSetRevIntersect(params) { - var sets = params.sets; - var start = params.hasOwnProperty('start') ? params.start : 0; - var stop = params.hasOwnProperty('stop') ? params.stop : -1; - var weights = params.weights || []; + params.start = params.hasOwnProperty('start') ? params.start : 0; + params.stop = params.hasOwnProperty('stop') ? params.stop : -1; + params.weights = params.weights || []; + + params.limit = params.stop - params.start + 1; + if (params.limit <= 0) { + params.limit = 0; + } + params.counts = await countSets(params.sets); + if (params.counts.minCount === 0) { + return []; + } + + const simple = params.weights.filter(w => w === 1).length === 1 && params.limit !== 0; + if (params.counts.minCount < 25000 && simple) { + return await intersectSingle(params); + } else if (simple) { + return await intersectBatch(params); + } + return await intersectAggregate(params); + } + + async function intersectSingle(params) { + const objects = module.client.collection('objects'); + let items = await objects.find({ _key: params.counts.smallestSet }, { + projection: { _id: 0, value: 1 }, + }).toArray(); + if (!items.length) { + return []; + } + + const otherSets = params.sets.filter(s => s !== params.counts.smallestSet); + items = await intersectValuesWithSets(items.map(i => i.value), otherSets); + if (!items.length) { + return []; + } + const project = { _id: 0, value: 1 }; + if (params.withScores) { + project.score = 1; + } + const sortSet = params.sets[params.weights.indexOf(1)]; + let res = await objects + .find({ _key: sortSet, value: { $in: items } }, { projection: project }) + .sort({ score: params.sort }) + .skip(params.start) + .limit(params.limit) + .toArray(); + if (!params.withScores) { + res = res.map(i => i.value); + } + return res; + } + + async function intersectBatch(params) { + const project = { _id: 0, value: 1 }; + if (params.withScores) { + project.score = 1; + } + const sortSet = params.sets[params.weights.indexOf(1)]; + const batchSize = 10000; + const cursor = await module.client.collection('objects') + .find({ _key: sortSet }, { projection: project }) + .sort({ score: params.sort }) + .batchSize(batchSize); + + const otherSets = params.sets.filter(s => s !== sortSet); + let inters = []; + let done = false; + while (!done) { + /* eslint-disable no-await-in-loop */ + const items = []; + while (items.length < batchSize) { + const nextItem = await cursor.next(); + if (!nextItem) { + done = true; + break; + } + items.push(nextItem); + } + + const members = await Promise.all(otherSets.map(s => module.isSortedSetMembers(s, items.map(i => i.value)))); + inters = inters.concat(items.filter((item, idx) => members.every(arr => arr[idx]))); + if (inters.length >= params.stop) { + done = true; + inters = inters.slice(params.start, params.stop + 1); + } + } + if (!params.withScores) { + inters = inters.map(item => item.value); + } + return inters; + } + + async function intersectAggregate(params) { var aggregate = {}; if (params.aggregate) { @@ -40,15 +166,9 @@ module.exports = function (module) { } else { aggregate.$sum = '$score'; } + const pipeline = [{ $match: { _key: { $in: params.sets } } }]; - var limit = stop - start + 1; - if (limit <= 0) { - limit = 0; - } - - var pipeline = [{ $match: { _key: { $in: sets } } }]; - - weights.forEach(function (weight, index) { + params.weights.forEach(function (weight, index) { if (weight !== 1) { pipeline.push({ $project: { @@ -56,7 +176,7 @@ module.exports = function (module) { score: { $cond: { if: { - $eq: ['$_key', sets[index]], + $eq: ['$_key', params.sets[index]], }, then: { $multiply: ['$score', weight], @@ -70,25 +190,24 @@ module.exports = function (module) { }); pipeline.push({ $group: { _id: { value: '$value' }, totalScore: aggregate, count: { $sum: 1 } } }); - pipeline.push({ $match: { count: sets.length } }); + pipeline.push({ $match: { count: params.sets.length } }); pipeline.push({ $sort: { totalScore: params.sort } }); - if (start) { - pipeline.push({ $skip: start }); + if (params.start) { + pipeline.push({ $skip: params.start }); } - if (limit > 0) { - pipeline.push({ $limit: limit }); + if (params.limit > 0) { + pipeline.push({ $limit: params.limit }); } - var project = { _id: 0, value: '$_id.value' }; + const project = { _id: 0, value: '$_id.value' }; if (params.withScores) { project.score = '$totalScore'; } pipeline.push({ $project: project }); let data = await module.client.collection('objects').aggregate(pipeline).toArray(); - if (!params.withScores) { data = data.map(item => item.value); }