feat: test psql without defineProperty (#7815)

* feat: test psql without defineProperty

* feat: refactor psql

remove .bind calls, use module.pool.query directly
move requires to top of file
move promisify to bottom so .init etc are promisified

* feat: mongodb

move requires to bottom

* feat: redis
v1.18.x
Barış Soner Uşaklı 6 years ago committed by GitHub
parent 52a2e5d61d
commit af1f7249a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -10,7 +10,6 @@ var _ = require('lodash');
var semver = require('semver');
var prompt = require('prompt');
var utils = require('../utils');
var client;
var mongoModule = module.exports;
@ -110,23 +109,13 @@ mongoModule.getConnectionOptions = function (mongo) {
mongoModule.init = function (callback) {
callback = callback || function () { };
mongoModule.connect(nconf.get('mongo'), function (err, _client) {
mongoModule.connect(nconf.get('mongo'), function (err, client) {
if (err) {
winston.error('NodeBB could not connect to your Mongo database. Mongo returned the following error', err);
return callback(err);
}
client = _client;
var db = client.db();
mongoModule.client = db;
require('./mongo/main')(db, mongoModule);
require('./mongo/hash')(db, mongoModule);
require('./mongo/sets')(db, mongoModule);
require('./mongo/sorted')(db, mongoModule);
require('./mongo/list')(db, mongoModule);
require('./mongo/transaction')(db, mongoModule);
mongoModule.async = require('../promisify')(mongoModule, ['client', 'sessionStore']);
mongoModule.client = client.db();
callback();
});
};
@ -280,7 +269,7 @@ function getCollectionStats(db, callback) {
mongoModule.close = function (callback) {
callback = callback || function () {};
client.close(function (err) {
mongoModule.client.close(function (err) {
callback(err);
});
};
@ -289,3 +278,12 @@ mongoModule.socketAdapter = function () {
var mongoAdapter = require('socket.io-adapter-mongo');
return mongoAdapter(mongoModule.getConnectionString());
};
require('./mongo/main')(mongoModule);
require('./mongo/hash')(mongoModule);
require('./mongo/sets')(mongoModule);
require('./mongo/sorted')(mongoModule);
require('./mongo/list')(mongoModule);
require('./mongo/transaction')(mongoModule);
mongoModule.async = require('../promisify')(mongoModule, ['client', 'sessionStore']);

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
var _ = require('lodash');
@ -16,11 +16,11 @@ module.exports = function (db, module) {
const writeData = helpers.serializeData(data);
if (isArray) {
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
key.forEach(key => bulk.find({ _key: key }).upsert().updateOne({ $set: writeData }));
await bulk.execute();
} else {
await db.collection('objects').updateOne({ _key: key }, { $set: writeData }, { upsert: true, w: 1 });
await module.client.collection('objects').updateOne({ _key: key }, { $set: writeData }, { upsert: true, w: 1 });
}
cache.delObjectCache(key);
@ -58,7 +58,7 @@ module.exports = function (db, module) {
return cachedData[key].hasOwnProperty(field) ? cachedData[key][field] : null;
}
field = helpers.fieldToString(field);
const item = await db.collection('objects').findOne({ _key: key }, { projection: { _id: 0, [field]: 1 } });
const item = await module.client.collection('objects').findOne({ _key: key }, { projection: { _id: 0, [field]: 1 } });
if (!item) {
return null;
}
@ -104,7 +104,7 @@ module.exports = function (db, module) {
if (unCachedKeys.length === 1) {
query._key = unCachedKeys[0];
}
let data = await db.collection('objects').find(query, { projection: { _id: 0 } }).toArray();
let data = await module.client.collection('objects').find(query, { projection: { _id: 0 } }).toArray();
data = data.map(helpers.deserializeData);
var map = helpers.toMap(data);
@ -142,7 +142,7 @@ module.exports = function (db, module) {
data[field] = 1;
});
const item = await db.collection('objects').findOne({ _key: key }, { projection: data });
const item = await module.client.collection('objects').findOne({ _key: key }, { projection: data });
const results = fields.map(f => !!item && item[f] !== undefined && item[f] !== null);
return results;
};
@ -166,7 +166,7 @@ module.exports = function (db, module) {
data[field] = '';
});
await db.collection('objects').updateOne({ _key: key }, { $unset: data });
await module.client.collection('objects').updateOne({ _key: key }, { $unset: data });
cache.delObjectCache(key);
};
@ -189,7 +189,7 @@ module.exports = function (db, module) {
increment[field] = value;
if (Array.isArray(key)) {
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
key.forEach(function (key) {
bulk.find({ _key: key }).upsert().update({ $inc: increment });
});
@ -199,7 +199,7 @@ module.exports = function (db, module) {
return result.map(data => data && data[field]);
}
const result = await db.collection('objects').findOneAndUpdate({ _key: key }, { $inc: increment }, { returnOriginal: false, upsert: true });
const result = await module.client.collection('objects').findOneAndUpdate({ _key: key }, { $inc: increment }, { returnOriginal: false, upsert: true });
cache.delObjectCache(key);
return result && result.value ? result.value[field] : null;
};

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.listPrepend = async function (key, value) {
@ -12,7 +12,7 @@ module.exports = function (db, module) {
const exists = await module.isObjectField(key, 'array');
if (exists) {
await db.collection('objects').updateOne({ _key: key }, { $push: { array: { $each: [value], $position: 0 } } }, { upsert: true, w: 1 });
await module.client.collection('objects').updateOne({ _key: key }, { $push: { array: { $each: [value], $position: 0 } } }, { upsert: true, w: 1 });
} else {
await module.listAppend(key, value);
}
@ -23,7 +23,7 @@ module.exports = function (db, module) {
return;
}
value = helpers.valueToString(value);
await db.collection('objects').updateOne({ _key: key }, { $push: { array: value } }, { upsert: true, w: 1 });
await module.client.collection('objects').updateOne({ _key: key }, { $push: { array: value } }, { upsert: true, w: 1 });
};
module.listRemoveLast = async function (key) {
@ -31,7 +31,7 @@ module.exports = function (db, module) {
return;
}
const value = await module.getListRange(key, -1, -1);
db.collection('objects').updateOne({ _key: key }, { $pop: { array: 1 } });
module.client.collection('objects').updateOne({ _key: key }, { $pop: { array: 1 } });
return (value && value.length) ? value[0] : null;
};
@ -41,7 +41,7 @@ module.exports = function (db, module) {
}
value = helpers.valueToString(value);
await db.collection('objects').updateOne({ _key: key }, { $pull: { array: value } });
await module.client.collection('objects').updateOne({ _key: key }, { $pull: { array: value } });
};
module.listTrim = async function (key, start, stop) {
@ -49,7 +49,7 @@ module.exports = function (db, module) {
return;
}
const value = await module.getListRange(key, start, stop);
await db.collection('objects').updateOne({ _key: key }, { $set: { array: value } });
await module.client.collection('objects').updateOne({ _key: key }, { $set: { array: value } });
};
module.getListRange = async function (key, start, stop) {
@ -57,7 +57,7 @@ module.exports = function (db, module) {
return;
}
const data = await db.collection('objects').findOne({ _key: key }, { array: 1 });
const data = await module.client.collection('objects').findOne({ _key: key }, { array: 1 });
if (!(data && data.array)) {
return [];
}
@ -66,7 +66,7 @@ module.exports = function (db, module) {
};
module.listLength = async function (key) {
const result = await db.collection('objects').aggregate([
const result = await module.client.collection('objects').aggregate([
{ $match: { _key: key } },
{ $project: { count: { $size: '$array' } } },
]).toArray();

@ -1,12 +1,12 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.flushdb = async function () {
await db.dropDatabase();
await module.client.dropDatabase();
};
module.emptydb = async function () {
await db.collection('objects').deleteMany({});
await module.client.collection('objects').deleteMany({});
module.objectCache.resetObjectCache();
};
@ -15,7 +15,7 @@ module.exports = function (db, module) {
return;
}
if (Array.isArray(key)) {
const data = await db.collection('objects').find({ _key: { $in: key } }).toArray();
const data = await module.client.collection('objects').find({ _key: { $in: key } }).toArray();
var map = {};
data.forEach(function (item) {
map[item._key] = true;
@ -23,7 +23,7 @@ module.exports = function (db, module) {
return key.map(key => !!map[key]);
}
const item = await db.collection('objects').findOne({ _key: key });
const item = await module.client.collection('objects').findOne({ _key: key });
return item !== undefined && item !== null;
};
@ -31,7 +31,7 @@ module.exports = function (db, module) {
if (!key) {
return;
}
await db.collection('objects').deleteMany({ _key: key });
await module.client.collection('objects').deleteMany({ _key: key });
module.objectCache.delObjectCache(key);
};
@ -39,7 +39,7 @@ module.exports = function (db, module) {
if (!Array.isArray(keys) || !keys.length) {
return;
}
await db.collection('objects').deleteMany({ _key: { $in: keys } });
await module.client.collection('objects').deleteMany({ _key: { $in: keys } });
module.objectCache.delObjectCache(keys);
};
@ -48,7 +48,7 @@ module.exports = function (db, module) {
return;
}
const objectData = await db.collection('objects').findOne({ _key: key }, { projection: { _id: 0 } });
const objectData = await module.client.collection('objects').findOne({ _key: key }, { projection: { _id: 0 } });
// fallback to old field name 'value' for backwards compatibility #6340
var value = null;
@ -74,17 +74,17 @@ module.exports = function (db, module) {
if (!key) {
return;
}
const result = await db.collection('objects').findOneAndUpdate({ _key: key }, { $inc: { data: 1 } }, { returnOriginal: false, upsert: true });
const result = await module.client.collection('objects').findOneAndUpdate({ _key: key }, { $inc: { data: 1 } }, { returnOriginal: false, upsert: true });
return result && result.value ? result.value.data : null;
};
module.rename = async function (oldKey, newKey) {
await db.collection('objects').updateMany({ _key: oldKey }, { $set: { _key: newKey } });
await module.client.collection('objects').updateMany({ _key: oldKey }, { $set: { _key: newKey } });
module.objectCache.delObjectCache([oldKey, newKey]);
};
module.type = async function (key) {
const data = await db.collection('objects').findOne({ _key: key });
const data = await module.client.collection('objects').findOne({ _key: key });
if (!data) {
return null;
}

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.setAdd = async function (key, value) {
@ -10,7 +10,7 @@ module.exports = function (db, module) {
value = value.map(v => helpers.valueToString(v));
await db.collection('objects').updateOne({
await module.client.collection('objects').updateOne({
_key: key,
}, {
$addToSet: {
@ -35,7 +35,7 @@ module.exports = function (db, module) {
value = value.map(v => helpers.valueToString(v));
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
for (var i = 0; i < keys.length; i += 1) {
bulk.find({ _key: keys[i] }).upsert().updateOne({ $addToSet: {
@ -61,7 +61,7 @@ module.exports = function (db, module) {
value = value.map(v => helpers.valueToString(v));
await db.collection('objects').updateMany({ _key: Array.isArray(key) ? { $in: key } : key }, { $pullAll: { members: value } });
await module.client.collection('objects').updateMany({ _key: Array.isArray(key) ? { $in: key } : key }, { $pullAll: { members: value } });
};
module.setsRemove = async function (keys, value) {
@ -70,7 +70,7 @@ module.exports = function (db, module) {
}
value = helpers.valueToString(value);
await db.collection('objects').updateMany({ _key: { $in: keys } }, { $pull: { members: value } });
await module.client.collection('objects').updateMany({ _key: { $in: keys } }, { $pull: { members: value } });
};
module.isSetMember = async function (key, value) {
@ -79,7 +79,7 @@ module.exports = function (db, module) {
}
value = helpers.valueToString(value);
const item = await db.collection('objects').findOne({ _key: key, members: value }, { projection: { _id: 0, members: 0 } });
const item = await module.client.collection('objects').findOne({ _key: key, members: value }, { projection: { _id: 0, members: 0 } });
return item !== null && item !== undefined;
};
@ -89,7 +89,7 @@ module.exports = function (db, module) {
}
values = values.map(v => helpers.valueToString(v));
const result = await db.collection('objects').findOne({ _key: key }, { projection: { _id: 0, _key: 0 } });
const result = await module.client.collection('objects').findOne({ _key: key }, { projection: { _id: 0, _key: 0 } });
const membersSet = new Set(result && Array.isArray(result.members) ? result.members : []);
return values.map(v => membersSet.has(v));
};
@ -100,7 +100,7 @@ module.exports = function (db, module) {
}
value = helpers.valueToString(value);
const result = await db.collection('objects').find({ _key: { $in: sets }, members: value }, { projection: { _id: 0, members: 0 } }).toArray();
const result = await module.client.collection('objects').find({ _key: { $in: sets }, members: value }, { projection: { _id: 0, members: 0 } }).toArray();
var map = {};
result.forEach(function (item) {
@ -115,7 +115,7 @@ module.exports = function (db, module) {
return [];
}
const data = await db.collection('objects').findOne({ _key: key }, { projection: { _id: 0, _key: 0 } });
const data = await module.client.collection('objects').findOne({ _key: key }, { projection: { _id: 0, _key: 0 } });
return data ? data.members : [];
};
@ -123,7 +123,7 @@ module.exports = function (db, module) {
if (!Array.isArray(keys) || !keys.length) {
return [];
}
const data = await db.collection('objects').find({ _key: { $in: keys } }, { projection: { _id: 0 } }).toArray();
const data = await module.client.collection('objects').find({ _key: { $in: keys } }, { projection: { _id: 0 } }).toArray();
var sets = {};
data.forEach(function (set) {
@ -137,7 +137,7 @@ module.exports = function (db, module) {
if (!key) {
return 0;
}
const data = await db.collection('objects').findOne({ _key: key }, { projection: { _id: 0 } });
const data = await module.client.collection('objects').findOne({ _key: key }, { projection: { _id: 0 } });
return data ? data.members.length : 0;
};
@ -148,7 +148,7 @@ module.exports = function (db, module) {
};
module.setRemoveRandom = async function (key) {
const data = await db.collection('objects').findOne({ _key: key });
const data = await module.client.collection('objects').findOne({ _key: key });
if (!data) {
return;
}

@ -2,15 +2,15 @@
var utils = require('../../utils');
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
const util = require('util');
const sleep = util.promisify(setTimeout);
require('./sorted/add')(db, module);
require('./sorted/remove')(db, module);
require('./sorted/union')(db, module);
require('./sorted/intersect')(db, module);
require('./sorted/add')(module);
require('./sorted/remove')(module);
require('./sorted/union')(module);
require('./sorted/intersect')(module);
module.getSortedSetRange = async function (key, start, stop) {
return await getSortedSetRange(key, start, stop, '-inf', '+inf', 1, false);
@ -81,7 +81,7 @@ module.exports = function (db, module) {
limit = 0;
}
let data = await db.collection('objects').find(query, { projection: fields })
let data = await module.client.collection('objects').find(query, { projection: fields })
.sort({ score: sort })
.skip(start)
.limit(limit)
@ -135,7 +135,7 @@ module.exports = function (db, module) {
query.score.$lte = max;
}
const count = await db.collection('objects').countDocuments(query);
const count = await module.client.collection('objects').countDocuments(query);
return count || 0;
};
@ -143,7 +143,7 @@ module.exports = function (db, module) {
if (!key) {
return 0;
}
const count = await db.collection('objects').countDocuments({ _key: key });
const count = await module.client.collection('objects').countDocuments({ _key: key });
return parseInt(count, 10) || 0;
};
@ -160,7 +160,7 @@ module.exports = function (db, module) {
return 0;
}
const count = await db.collection('objects').countDocuments({ _key: Array.isArray(keys) ? { $in: keys } : keys });
const count = await module.client.collection('objects').countDocuments({ _key: Array.isArray(keys) ? { $in: keys } : keys });
return parseInt(count, 10) || 0;
};
@ -182,7 +182,7 @@ module.exports = function (db, module) {
return null;
}
return await db.collection('objects').countDocuments({
return await module.client.collection('objects').countDocuments({
$or: [
{
_key: key,
@ -243,7 +243,7 @@ module.exports = function (db, module) {
return null;
}
value = helpers.valueToString(value);
const result = await db.collection('objects').findOne({ _key: key, value: value }, { projection: { _id: 0, _key: 0, value: 0 } });
const result = await module.client.collection('objects').findOne({ _key: key, value: value }, { projection: { _id: 0, _key: 0, value: 0 } });
return result ? result.score : null;
};
@ -252,7 +252,7 @@ module.exports = function (db, module) {
return [];
}
value = helpers.valueToString(value);
const result = await db.collection('objects').find({ _key: { $in: keys }, value: value }, { projection: { _id: 0, value: 0 } }).toArray();
const result = await module.client.collection('objects').find({ _key: { $in: keys }, value: value }, { projection: { _id: 0, value: 0 } }).toArray();
var map = {};
result.forEach(function (item) {
if (item) {
@ -271,7 +271,7 @@ module.exports = function (db, module) {
return [];
}
values = values.map(helpers.valueToString);
const result = await db.collection('objects').find({ _key: key, value: { $in: values } }, { projection: { _id: 0, _key: 0 } }).toArray();
const result = await module.client.collection('objects').find({ _key: key, value: { $in: values } }, { projection: { _id: 0, _key: 0 } }).toArray();
var valueToScore = {};
result.forEach(function (item) {
@ -288,7 +288,7 @@ module.exports = function (db, module) {
return;
}
value = helpers.valueToString(value);
const result = await db.collection('objects').findOne({ _key: key, value: value }, { projection: { _id: 0, _key: 0, score: 0 } });
const result = await module.client.collection('objects').findOne({ _key: key, value: value }, { projection: { _id: 0, _key: 0, score: 0 } });
return !!result;
};
@ -297,7 +297,7 @@ module.exports = function (db, module) {
return;
}
values = values.map(helpers.valueToString);
const results = await db.collection('objects').find({ _key: key, value: { $in: values } }, { projection: { _id: 0, _key: 0, score: 0 } }).toArray();
const results = await module.client.collection('objects').find({ _key: key, value: { $in: values } }, { projection: { _id: 0, _key: 0, score: 0 } }).toArray();
var isMember = {};
results.forEach(function (item) {
@ -314,7 +314,7 @@ module.exports = function (db, module) {
return [];
}
value = helpers.valueToString(value);
const results = await db.collection('objects').find({ _key: { $in: keys }, value: value }, { projection: { _id: 0, score: 0 } }).toArray();
const results = await module.client.collection('objects').find({ _key: { $in: keys }, value: value }, { projection: { _id: 0, score: 0 } }).toArray();
var isMember = {};
results.forEach(function (item) {
@ -331,7 +331,7 @@ module.exports = function (db, module) {
return [];
}
const data = await db.collection('objects').find({
const data = await module.client.collection('objects').find({
_key: keys.length === 1 ? keys[0] : { $in: keys },
}, { projection: { _id: 0, score: 0 } }).sort({ score: 1 }).toArray();
@ -353,7 +353,7 @@ module.exports = function (db, module) {
data.score = parseFloat(increment);
try {
const result = await db.collection('objects').findOneAndUpdate({ _key: key, value: value }, { $inc: data }, { returnOriginal: false, upsert: true });
const result = await module.client.collection('objects').findOneAndUpdate({ _key: key, value: value }, { $inc: data }, { returnOriginal: false, upsert: true });
return result && result.value ? result.value.score : null;
} catch (err) {
// if there is duplicate key error retry the upsert
@ -386,7 +386,7 @@ module.exports = function (db, module) {
count = count !== undefined ? count : 0;
buildLexQuery(query, min, max);
const data = await db.collection('objects').find(query, { projection: { _id: 0, _key: 0, score: 0 } })
const data = await module.client.collection('objects').find(query, { projection: { _id: 0, _key: 0, score: 0 } })
.sort({ value: sort })
.skip(start)
.limit(count === -1 ? 0 : count)
@ -399,7 +399,7 @@ module.exports = function (db, module) {
var query = { _key: key };
buildLexQuery(query, min, max);
await db.collection('objects').deleteMany(query);
await module.client.collection('objects').deleteMany(query);
};
function buildLexQuery(query, min, max) {
@ -432,7 +432,7 @@ module.exports = function (db, module) {
if (!options.withScores) {
project.score = 0;
}
var cursor = await db.collection('objects').find({ _key: setKey }, { projection: project })
var cursor = await module.client.collection('objects').find({ _key: setKey }, { projection: project })
.sort({ score: 1 })
.batchSize(options.batch);

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('../helpers');
var utils = require('../../../utils');
@ -17,7 +17,7 @@ module.exports = function (db, module) {
value = helpers.valueToString(value);
try {
await db.collection('objects').updateOne({ _key: key, value: value }, { $set: { score: parseFloat(score) } }, { upsert: true, w: 1 });
await module.client.collection('objects').updateOne({ _key: key, value: value }, { $set: { score: parseFloat(score) } }, { upsert: true, w: 1 });
} catch (err) {
if (err && err.message.startsWith('E11000 duplicate key error')) {
return await module.sortedSetAdd(key, score, value);
@ -40,7 +40,7 @@ module.exports = function (db, module) {
}
values = values.map(helpers.valueToString);
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
for (var i = 0; i < scores.length; i += 1) {
bulk.find({ _key: key, value: values[i] }).upsert().updateOne({ $set: { score: parseFloat(scores[i]) } });
}
@ -62,7 +62,7 @@ module.exports = function (db, module) {
value = helpers.valueToString(value);
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
for (var i = 0; i < keys.length; i += 1) {
bulk.find({ _key: keys[i], value: value }).upsert().updateOne({ $set: { score: parseFloat(isArrayOfScores ? scores[i] : scores) } });
}
@ -73,7 +73,7 @@ module.exports = function (db, module) {
if (!Array.isArray(data) || !data.length) {
return;
}
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
data.forEach(function (item) {
bulk.find({ _key: item[0], value: String(item[2]) }).upsert().updateOne({ $set: { score: parseFloat(item[1]) } });
});

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.sortedSetIntersectCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
return 0;
@ -13,7 +13,7 @@ module.exports = function (db, module) {
{ $group: { _id: null, count: { $sum: 1 } } },
];
const data = await db.collection('objects').aggregate(pipeline).toArray();
const data = await module.client.collection('objects').aggregate(pipeline).toArray();
return Array.isArray(data) && data.length ? data[0].count : 0;
};
@ -87,7 +87,7 @@ module.exports = function (db, module) {
}
pipeline.push({ $project: project });
let data = await db.collection('objects').aggregate(pipeline).toArray();
let data = await module.client.collection('objects').aggregate(pipeline).toArray();
if (!params.withScores) {
data = data.map(item => item.value);

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('../helpers');
module.sortedSetRemove = async function (key, value) {
@ -18,7 +18,7 @@ module.exports = function (db, module) {
value = helpers.valueToString(value);
}
await db.collection('objects').deleteMany({
await module.client.collection('objects').deleteMany({
_key: Array.isArray(key) ? { $in: key } : key,
value: isValueArray ? { $in: value } : value,
});
@ -30,7 +30,7 @@ module.exports = function (db, module) {
}
value = helpers.valueToString(value);
await db.collection('objects').deleteMany({ _key: { $in: keys }, value: value });
await module.client.collection('objects').deleteMany({ _key: { $in: keys }, value: value });
};
module.sortedSetsRemoveRangeByScore = async function (keys, min, max) {
@ -49,14 +49,14 @@ module.exports = function (db, module) {
query.score.$lte = parseFloat(max);
}
await db.collection('objects').deleteMany(query);
await module.client.collection('objects').deleteMany(query);
};
module.sortedSetRemoveBulk = async function (data) {
if (!Array.isArray(data) || !data.length) {
return;
}
var bulk = db.collection('objects').initializeUnorderedBulkOp();
var bulk = module.client.collection('objects').initializeUnorderedBulkOp();
data.forEach(item => bulk.find({ _key: item[0], value: String(item[1]) }).remove());
await bulk.execute();
};

@ -1,12 +1,12 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.sortedSetUnionCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
return 0;
}
const data = await db.collection('objects').aggregate([
const data = await module.client.collection('objects').aggregate([
{ $match: { _key: { $in: keys } } },
{ $group: { _id: { value: '$value' } } },
{ $group: { _id: null, count: { $sum: 1 } } },
@ -61,7 +61,7 @@ module.exports = function (db, module) {
}
pipeline.push({ $project: project });
let data = await db.collection('objects').aggregate(pipeline).toArray();
let data = await module.client.collection('objects').aggregate(pipeline).toArray();
if (!params.withScores) {
data = data.map(item => item.value);
}

@ -1,8 +1,8 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
// TODO
module.transaction = function (perform, callback) {
perform(db, callback);
perform(module.client, callback);
};
};

@ -81,37 +81,11 @@ postgresModule.init = function (callback) {
}
postgresModule.pool = db;
Object.defineProperty(postgresModule, 'client', {
get: function () {
return db;
},
configurable: true,
});
var wrappedDB = {
connect: function () {
return postgresModule.pool.connect.apply(postgresModule.pool, arguments);
},
query: function () {
return postgresModule.client.query.apply(postgresModule.client, arguments);
},
};
postgresModule.client = db;
checkUpgrade(client, function (err) {
release();
if (err) {
return callback(err);
}
require('./postgres/main')(wrappedDB, postgresModule);
require('./postgres/hash')(wrappedDB, postgresModule);
require('./postgres/sets')(wrappedDB, postgresModule);
require('./postgres/sorted')(wrappedDB, postgresModule);
require('./postgres/list')(wrappedDB, postgresModule);
require('./postgres/transaction')(db, postgresModule);
postgresModule.async = require('../promisify')(postgresModule, ['client', 'sessionStore', 'pool', 'transaction']);
callback();
callback(err);
});
});
};
@ -455,3 +429,12 @@ postgresModule.socketAdapter = function () {
pubClient: postgresModule.pool,
});
};
require('./postgres/main')(postgresModule);
require('./postgres/hash')(postgresModule);
require('./postgres/sets')(postgresModule);
require('./postgres/sorted')(postgresModule);
require('./postgres/list')(postgresModule);
require('./postgres/transaction')(postgresModule);
postgresModule.async = require('../promisify')(postgresModule, ['client', 'sessionStore', 'pool', 'transaction']);

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.setObject = async function (key, data) {
@ -13,11 +13,10 @@ module.exports = function (db, module) {
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
const dataString = JSON.stringify(data);
async function setOne(key) {
await helpers.ensureLegacyObjectType(client, key, 'hash');
await query({
await client.query({
name: 'setObject',
text: `
INSERT INTO "legacy_hash" ("_key", "data")
@ -41,11 +40,10 @@ module.exports = function (db, module) {
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
const valueString = JSON.stringify(value);
async function setOne(key) {
await helpers.ensureLegacyObjectType(client, key, 'hash');
await query({
await client.query({
name: 'setObjectField',
text: `
INSERT INTO "legacy_hash" ("_key", "data")
@ -69,7 +67,7 @@ module.exports = function (db, module) {
return null;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObject',
text: `
SELECT h."data"
@ -90,7 +88,7 @@ SELECT h."data"
return [];
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObjects',
text: `
SELECT h."data"
@ -112,7 +110,7 @@ SELECT h."data"
return null;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObjectField',
text: `
SELECT h."data"->>$2::TEXT f
@ -133,7 +131,7 @@ SELECT h."data"->>$2::TEXT f
return null;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObjectFields',
text: `
SELECT (SELECT jsonb_object_agg(f, d."value")
@ -165,7 +163,7 @@ SELECT (SELECT jsonb_object_agg(f, d."value")
return [];
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObjectsFields',
text: `
SELECT (SELECT jsonb_object_agg(f, d."value")
@ -190,7 +188,7 @@ SELECT (SELECT jsonb_object_agg(f, d."value")
return;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getObjectKeys',
text: `
SELECT ARRAY(SELECT jsonb_object_keys(h."data")) k
@ -216,7 +214,7 @@ SELECT ARRAY(SELECT jsonb_object_keys(h."data")) k
return;
}
const res = await db.query({
const res = await module.pool.query({
name: 'isObjectField',
text: `
SELECT (h."data" ? $2::TEXT AND h."data"->>$2::TEXT IS NOT NULL) b
@ -253,7 +251,7 @@ SELECT (h."data" ? $2::TEXT AND h."data"->>$2::TEXT IS NOT NULL) b
return;
}
await db.query({
await module.pool.query({
name: 'deleteObjectFields',
text: `
UPDATE "legacy_hash"
@ -281,14 +279,13 @@ UPDATE "legacy_hash"
}
return await module.transaction(async function (client) {
var query = client.query.bind(client);
if (Array.isArray(key)) {
await helpers.ensureLegacyObjectsType(client, key, 'hash');
} else {
await helpers.ensureLegacyObjectType(client, key, 'hash');
}
const res = await query(Array.isArray(key) ? {
const res = await client.query(Array.isArray(key) ? {
name: 'incrObjectFieldByMulti',
text: `
INSERT INTO "legacy_hash" ("_key", "data")

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.listPrepend = async function (key, value) {
@ -9,9 +9,8 @@ module.exports = function (db, module) {
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'list');
await query({
await client.query({
name: 'listPrepend',
text: `
INSERT INTO "legacy_list" ("_key", "array")
@ -29,9 +28,8 @@ DO UPDATE SET "array" = ARRAY[$2::TEXT] || "legacy_list"."array"`,
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'list');
await query({
await client.query({
name: 'listAppend',
text: `
INSERT INTO "legacy_list" ("_key", "array")
@ -48,7 +46,7 @@ DO UPDATE SET "array" = "legacy_list"."array" || ARRAY[$2::TEXT]`,
return;
}
const res = await db.query({
const res = await module.pool.query({
name: 'listRemoveLast',
text: `
WITH A AS (
@ -75,7 +73,7 @@ RETURNING A."array"[array_length(A."array", 1)] v`,
return;
}
await db.query({
await module.pool.query({
name: 'listRemoveAll',
text: `
UPDATE "legacy_list" l
@ -95,7 +93,7 @@ UPDATE "legacy_list" l
stop += 1;
await db.query(stop > 0 ? {
await module.pool.query(stop > 0 ? {
name: 'listTrim',
text: `
UPDATE "legacy_list" l
@ -133,7 +131,7 @@ UPDATE "legacy_list" l
stop += 1;
const res = await db.query(stop > 0 ? {
const res = await module.pool.query(stop > 0 ? {
name: 'getListRange',
text: `
SELECT ARRAY(SELECT m.m
@ -167,7 +165,7 @@ SELECT ARRAY(SELECT m.m
};
module.listLength = async function (key) {
const res = await db.query({
const res = await module.pool.query({
name: 'listLength',
text: `
SELECT array_length(l."array", 1) l

@ -1,17 +1,15 @@
'use strict';
module.exports = function (db, module) {
var helpers = require('./helpers');
var query = db.query.bind(db);
module.exports = function (module) {
const helpers = require('./helpers');
module.flushdb = async function () {
await query(`DROP SCHEMA "public" CASCADE`);
await query(`CREATE SCHEMA "public"`);
await module.pool.query(`DROP SCHEMA "public" CASCADE`);
await module.pool.query(`CREATE SCHEMA "public"`);
};
module.emptydb = async function () {
await query(`DELETE FROM "legacy_object"`);
await module.pool.query(`DELETE FROM "legacy_object"`);
};
module.exists = async function (key) {
@ -20,7 +18,7 @@ module.exports = function (db, module) {
}
if (Array.isArray(key)) {
const res = await query({
const res = await module.pool.query({
name: 'existsArray',
text: `
SELECT o."_key" k
@ -32,7 +30,7 @@ module.exports = function (db, module) {
return res.rows.some(r => r.k === k);
});
}
const res = await query({
const res = await module.pool.query({
name: 'exists',
text: `
SELECT EXISTS(SELECT *
@ -49,7 +47,7 @@ module.exports = function (db, module) {
return;
}
await query({
await module.pool.query({
name: 'delete',
text: `
DELETE FROM "legacy_object"
@ -63,7 +61,7 @@ DELETE FROM "legacy_object"
return;
}
await query({
await module.pool.query({
name: 'deleteAll',
text: `
DELETE FROM "legacy_object"
@ -77,7 +75,7 @@ DELETE FROM "legacy_object"
return;
}
const res = await query({
const res = await module.pool.query({
name: 'get',
text: `
SELECT s."data" t
@ -99,9 +97,8 @@ SELECT s."data" t
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'string');
await query({
await client.query({
name: 'set',
text: `
INSERT INTO "legacy_string" ("_key", "data")
@ -119,9 +116,8 @@ DO UPDATE SET "data" = $2::TEXT`,
}
return await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'string');
const res = await query({
const res = await client.query({
name: 'increment',
text: `
INSERT INTO "legacy_string" ("_key", "data")
@ -137,15 +133,14 @@ RETURNING "data" d`,
module.rename = async function (oldKey, newKey) {
await module.transaction(async function (client) {
var query = client.query.bind(client);
await query({
await client.query({
name: 'deleteRename',
text: `
DELETE FROM "legacy_object"
WHERE "_key" = $1::TEXT`,
values: [newKey],
});
await query({
await client.query({
name: 'rename',
text: `
UPDATE "legacy_object"
@ -157,7 +152,7 @@ WHERE "_key" = $1::TEXT`,
};
module.type = async function (key) {
const res = await query({
const res = await module.pool.query({
name: 'type',
text: `
SELECT "type"::TEXT t
@ -171,7 +166,7 @@ SELECT "type"::TEXT t
};
async function doExpire(key, date) {
await query({
await module.pool.query({
name: 'expire',
text: `
UPDATE "legacy_object"

@ -2,7 +2,7 @@
var _ = require('lodash');
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.setAdd = async function (key, value) {
@ -11,9 +11,8 @@ module.exports = function (db, module) {
}
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'set');
await query({
await client.query({
name: 'setAdd',
text: `
INSERT INTO "legacy_set" ("_key", "member")
@ -38,9 +37,8 @@ DO NOTHING`,
keys = _.uniq(keys);
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectsType(client, keys, 'set');
await query({
await client.query({
name: 'setsAdd',
text: `
INSERT INTO "legacy_set" ("_key", "member")
@ -63,7 +61,7 @@ DO NOTHING`,
value = [value];
}
await db.query({
await module.pool.query({
name: 'setRemove',
text: `
DELETE FROM "legacy_set"
@ -78,7 +76,7 @@ DELETE FROM "legacy_set"
return;
}
await db.query({
await module.pool.query({
name: 'setsRemove',
text: `
DELETE FROM "legacy_set"
@ -93,7 +91,7 @@ DELETE FROM "legacy_set"
return false;
}
const res = await db.query({
const res = await module.pool.query({
name: 'isSetMember',
text: `
SELECT 1
@ -116,7 +114,7 @@ SELECT 1
values = values.map(helpers.valueToString);
const res = await db.query({
const res = await module.pool.query({
name: 'isSetMembers',
text: `
SELECT s."member" m
@ -141,7 +139,7 @@ SELECT s."member" m
value = helpers.valueToString(value);
const res = await db.query({
const res = await module.pool.query({
name: 'isMemberOfSets',
text: `
SELECT o."_key" k
@ -164,7 +162,7 @@ SELECT o."_key" k
return [];
}
const res = await db.query({
const res = await module.pool.query({
name: 'getSetMembers',
text: `
SELECT s."member" m
@ -184,7 +182,7 @@ SELECT s."member" m
return [];
}
const res = await db.query({
const res = await module.pool.query({
name: 'getSetsMembers',
text: `
SELECT o."_key" k,
@ -208,7 +206,7 @@ SELECT o."_key" k,
return 0;
}
const res = await db.query({
const res = await module.pool.query({
name: 'setCount',
text: `
SELECT COUNT(*) c
@ -224,7 +222,7 @@ SELECT COUNT(*) c
};
module.setsCount = async function (keys) {
const res = await db.query({
const res = await module.pool.query({
name: 'setsCount',
text: `
SELECT o."_key" k,
@ -244,7 +242,7 @@ SELECT o."_key" k,
};
module.setRemoveRandom = async function (key) {
const res = await db.query({
const res = await module.pool.query({
name: 'setRemoveRandom',
text: `
WITH A AS (

@ -1,18 +1,16 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('./helpers');
const util = require('util');
var Cursor = require('pg-cursor');
Cursor.prototype.readAsync = util.promisify(Cursor.prototype.read);
const sleep = util.promisify(setTimeout);
var query = db.query.bind(db);
require('./sorted/add')(db, module);
require('./sorted/remove')(db, module);
require('./sorted/union')(db, module);
require('./sorted/intersect')(db, module);
require('./sorted/add')(module);
require('./sorted/remove')(module);
require('./sorted/union')(module);
require('./sorted/intersect')(module);
module.getSortedSetRange = async function (key, start, stop) {
return await getSortedSetRange(key, start, stop, 1, false);
@ -60,7 +58,7 @@ module.exports = function (db, module) {
limit = null;
}
const res = await query({
const res = await module.pool.query({
name: 'getSortedSetRangeWithScores' + (sort > 0 ? 'Asc' : 'Desc'),
text: `
SELECT z."value",
@ -125,7 +123,7 @@ OFFSET $2::INTEGER`,
max = null;
}
const res = await query({
const res = await module.pool.query({
name: 'getSortedSetRangeByScoreWithScores' + (sort > 0 ? 'Asc' : 'Desc'),
text: `
SELECT z."value",
@ -164,7 +162,7 @@ OFFSET $2::INTEGER`,
max = null;
}
const res = await query({
const res = await module.pool.query({
name: 'sortedSetCount',
text: `
SELECT COUNT(*) c
@ -186,7 +184,7 @@ SELECT COUNT(*) c
return 0;
}
const res = await query({
const res = await module.pool.query({
name: 'sortedSetCard',
text: `
SELECT COUNT(*) c
@ -206,7 +204,7 @@ SELECT COUNT(*) c
return [];
}
const res = await query({
const res = await module.pool.query({
name: 'sortedSetsCard',
text: `
SELECT o."_key" k,
@ -249,7 +247,7 @@ SELECT o."_key" k,
async function getSortedSetRank(sort, keys, values) {
values = values.map(helpers.valueToString);
const res = await query({
const res = await module.pool.query({
name: 'getSortedSetRank' + sort,
text: `
SELECT (SELECT r
@ -310,7 +308,7 @@ SELECT (SELECT r
value = helpers.valueToString(value);
const res = await query({
const res = await module.pool.query({
name: 'sortedSetScore',
text: `
SELECT z."score" s
@ -335,7 +333,7 @@ SELECT z."score" s
value = helpers.valueToString(value);
const res = await query({
const res = await module.pool.query({
name: 'sortedSetsScore',
text: `
SELECT o."_key" k,
@ -364,7 +362,7 @@ SELECT o."_key" k,
}
values = values.map(helpers.valueToString);
const res = await query({
const res = await module.pool.query({
name: 'sortedSetScores',
text: `
SELECT z."value" v,
@ -391,7 +389,7 @@ SELECT z."value" v,
value = helpers.valueToString(value);
const res = await query({
const res = await module.pool.query({
name: 'isSortedSetMember',
text: `
SELECT 1
@ -414,7 +412,7 @@ SELECT 1
values = values.map(helpers.valueToString);
const res = await query({
const res = await module.pool.query({
name: 'isSortedSetMembers',
text: `
SELECT z."value" v
@ -439,7 +437,7 @@ SELECT z."value" v
value = helpers.valueToString(value);
const res = await query({
const res = await module.pool.query({
name: 'isMemberOfSortedSets',
text: `
SELECT o."_key" k
@ -462,7 +460,7 @@ SELECT o."_key" k
return [];
}
const res = await query({
const res = await module.pool.query({
name: 'getSortedSetsMembers',
text: `
SELECT o."_key" k,
@ -490,9 +488,8 @@ SELECT o."_key" k,
increment = parseFloat(increment);
return await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'zset');
const res = await query({
const res = await client.query({
name: 'sortedSetIncrBy',
text: `
INSERT INTO "legacy_zset" ("_key", "value", "score")
@ -517,7 +514,7 @@ RETURNING "score" s`,
module.sortedSetLexCount = async function (key, min, max) {
var q = buildLexQuery(key, min, max);
const res = await query({
const res = await module.pool.query({
name: 'sortedSetLexCount' + q.suffix,
text: `
SELECT COUNT(*) c
@ -539,7 +536,7 @@ SELECT COUNT(*) c
var q = buildLexQuery(key, min, max);
q.values.push(start);
q.values.push(count <= 0 ? null : count);
const res = await query({
const res = await module.pool.query({
name: 'sortedSetLex' + (sort > 0 ? 'Asc' : 'Desc') + q.suffix,
text: `
SELECT z."value" v
@ -559,7 +556,7 @@ OFFSET $` + (q.values.length - 1) + `::INTEGER`,
module.sortedSetRemoveRangeByLex = async function (key, min, max) {
var q = buildLexQuery(key, min, max);
await query({
await module.pool.query({
name: 'sortedSetRemoveRangeByLex' + q.suffix,
text: `
DELETE FROM "legacy_zset" z
@ -614,7 +611,7 @@ DELETE FROM "legacy_zset" z
}
module.processSortedSet = async function (setKey, process, options) {
const client = await db.connect();
const client = await module.pool.connect();
var batchSize = (options || {}).batch || 100;
var cursor = client.query(new Cursor(`
SELECT z."value", z."score"
@ -645,7 +642,7 @@ SELECT z."value", z."score"
try {
await process(rows);
} catch (err) {
await query.close();
await client.release();
throw err;
}
if (options.interval) {

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('../helpers');
var utils = require('../../../utils');
@ -19,10 +19,8 @@ module.exports = function (db, module) {
score = parseFloat(score);
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'zset');
await query({
await client.query({
name: 'sortedSetAdd',
text: `
INSERT INTO "legacy_zset" ("_key", "value", "score")
@ -52,9 +50,8 @@ module.exports = function (db, module) {
helpers.removeDuplicateValues(values, scores);
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectType(client, key, 'zset');
await query({
await client.query({
name: 'sortedSetAddBulk',
text: `
INSERT INTO "legacy_zset" ("_key", "value", "score")
@ -84,9 +81,8 @@ DO UPDATE SET "score" = EXCLUDED."score"`,
scores = isArrayOfScores ? scores.map(score => parseFloat(score)) : parseFloat(scores);
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectsType(client, keys, 'zset');
await query({
await client.query({
name: isArrayOfScores ? 'sortedSetsAddScores' : 'sortedSetsAdd',
text: isArrayOfScores ? `
INSERT INTO "legacy_zset" ("_key", "value", "score")
@ -117,9 +113,8 @@ INSERT INTO "legacy_zset" ("_key", "value", "score")
values.push(item[2]);
});
await module.transaction(async function (client) {
var query = client.query.bind(client);
await helpers.ensureLegacyObjectsType(client, keys, 'zset');
await query({
await client.query({
name: 'sortedSetAddBulk2',
text: `
INSERT INTO "legacy_zset" ("_key", "value", "score")

@ -1,12 +1,12 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.sortedSetIntersectCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
return 0;
}
const res = await db.query({
const res = await module.pool.query({
name: 'sortedSetIntersectCard',
text: `
WITH A AS (SELECT z."value" v,
@ -55,7 +55,7 @@ SELECT COUNT(*) c
limit = null;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getSortedSetIntersect' + aggregate + (params.sort > 0 ? 'Asc' : 'Desc') + 'WithScores',
text: `
WITH A AS (SELECT z."value",

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
var helpers = require('../helpers');
module.sortedSetRemove = async function (key, value) {
@ -20,7 +20,7 @@ module.exports = function (db, module) {
value = [value];
}
value = value.map(helpers.valueToString);
await db.query({
await module.pool.query({
name: 'sortedSetRemove',
text: `
DELETE FROM "legacy_zset"
@ -37,7 +37,7 @@ DELETE FROM "legacy_zset"
value = helpers.valueToString(value);
await db.query({
await module.pool.query({
name: 'sortedSetsRemove',
text: `
DELETE FROM "legacy_zset"
@ -59,7 +59,7 @@ DELETE FROM "legacy_zset"
max = null;
}
await db.query({
await module.pool.query({
name: 'sortedSetsRemoveRangeByScore',
text: `
DELETE FROM "legacy_zset"

@ -1,12 +1,12 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.sortedSetUnionCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
return 0;
}
const res = await db.query({
const res = await module.pool.query({
name: 'sortedSetUnionCard',
text: `
SELECT COUNT(DISTINCT z."value") c
@ -49,7 +49,7 @@ SELECT COUNT(DISTINCT z."value") c
limit = null;
}
const res = await db.query({
const res = await module.pool.query({
name: 'getSortedSetUnion' + aggregate + (params.sort > 0 ? 'Asc' : 'Desc') + 'WithScores',
text: `
WITH A AS (SELECT z."value",

@ -1,6 +1,6 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
module.transaction = async function (perform, txClient) {
let res;
if (txClient) {
@ -15,7 +15,7 @@ module.exports = function (db, module) {
return res;
}
// see https://node-postgres.com/features/transactions#a-pooled-client-with-async-await
const client = await db.connect();
const client = await module.pool.connect();
try {
await client.query('BEGIN');

@ -7,7 +7,6 @@ var nconf = require('nconf');
var semver = require('semver');
var session = require('express-session');
var redis = require('redis');
var redisClient;
var redisModule = module.exports;
@ -49,23 +48,14 @@ redisModule.getConnectionOptions = function (redis) {
redisModule.init = function (callback) {
callback = callback || function () { };
redisClient = redisModule.connect(nconf.get('redis'), function (err) {
redisModule.client = redisModule.connect(nconf.get('redis'), function (err) {
if (err) {
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error', err);
return callback(err);
}
redisModule.client = redisClient;
require('./redis/promisify')(redisClient);
require('./redis/promisify')(redisModule.client);
require('./redis/main')(redisClient, redisModule);
require('./redis/hash')(redisClient, redisModule);
require('./redis/sets')(redisClient, redisModule);
require('./redis/sorted')(redisClient, redisModule);
require('./redis/list')(redisClient, redisModule);
require('./redis/transaction')(redisClient, redisModule);
redisModule.async = require('../promisify')(redisModule, ['client', 'sessionStore', 'connect']);
callback();
});
};
@ -161,7 +151,7 @@ redisModule.checkCompatibilityVersion = function (version, callback) {
redisModule.close = function (callback) {
callback = callback || function () {};
redisClient.quit(function (err) {
redisModule.client.quit(function (err) {
callback(err);
});
};
@ -221,3 +211,12 @@ redisModule.socketAdapter = function () {
subClient: sub,
});
};
require('./redis/main')(redisModule);
require('./redis/hash')(redisModule);
require('./redis/sets')(redisModule);
require('./redis/sorted')(redisModule);
require('./redis/list')(redisModule);
require('./redis/transaction')(redisModule);
redisModule.async = require('../promisify')(redisModule, ['client', 'sessionStore', 'connect']);

@ -1,6 +1,6 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
var helpers = require('./helpers');
const _ = require('lodash');
@ -28,11 +28,11 @@ module.exports = function (redisClient, module) {
return;
}
if (Array.isArray(key)) {
const batch = redisClient.batch();
const batch = module.client.batch();
key.forEach(k => batch.hmset(k, data));
await helpers.execBatch(batch);
} else {
await redisClient.async.hmset(key, data);
await module.client.async.hmset(key, data);
}
cache.delObjectCache(key);
@ -43,11 +43,11 @@ module.exports = function (redisClient, module) {
return;
}
if (Array.isArray(key)) {
const batch = redisClient.batch();
const batch = module.client.batch();
key.forEach(k => batch.hset(k, field, value));
await helpers.execBatch(batch);
} else {
await redisClient.async.hset(key, field, value);
await module.client.async.hset(key, field, value);
}
cache.delObjectCache(key);
@ -75,7 +75,7 @@ module.exports = function (redisClient, module) {
if (cachedData[key]) {
return cachedData[key].hasOwnProperty(field) ? cachedData[key][field] : null;
}
return await redisClient.async.hget(key, field);
return await module.client.async.hget(key, field);
};
module.getObjectFields = async function (key, fields) {
@ -98,11 +98,11 @@ module.exports = function (redisClient, module) {
let data = [];
if (unCachedKeys.length > 1) {
const batch = redisClient.batch();
const batch = module.client.batch();
unCachedKeys.forEach(k => batch.hgetall(k));
data = await helpers.execBatch(batch);
} else if (unCachedKeys.length === 1) {
data = [await redisClient.async.hgetall(unCachedKeys[0])];
data = [await module.client.async.hgetall(unCachedKeys[0])];
}
unCachedKeys.forEach(function (key, i) {
@ -126,20 +126,20 @@ module.exports = function (redisClient, module) {
};
module.getObjectKeys = async function (key) {
return await redisClient.async.hkeys(key);
return await module.client.async.hkeys(key);
};
module.getObjectValues = async function (key) {
return await redisClient.async.hvals(key);
return await module.client.async.hvals(key);
};
module.isObjectField = async function (key, field) {
const exists = await redisClient.async.hexists(key, field);
const exists = await module.client.async.hexists(key, field);
return exists === 1;
};
module.isObjectFields = async function (key, fields) {
const batch = redisClient.batch();
const batch = module.client.batch();
fields.forEach(f => batch.hexists(String(key), String(f)));
const results = await helpers.execBatch(batch);
return Array.isArray(results) ? helpers.resultsToBool(results) : null;
@ -149,12 +149,12 @@ module.exports = function (redisClient, module) {
if (key === undefined || key === null || field === undefined || field === null) {
return;
}
await redisClient.async.hdel(key, field);
await module.client.async.hdel(key, field);
cache.delObjectCache(key);
};
module.deleteObjectFields = async function (key, fields) {
await redisClient.async.hdel(key, fields);
await module.client.async.hdel(key, fields);
cache.delObjectCache(key);
};
@ -173,11 +173,11 @@ module.exports = function (redisClient, module) {
}
let result;
if (Array.isArray(key)) {
var batch = redisClient.batch();
var batch = module.client.batch();
key.forEach(k => batch.hincrby(k, field, value));
result = await helpers.execBatch(batch);
} else {
result = await redisClient.async.hincrby(key, field, value);
result = await module.client.async.hincrby(key, field, value);
}
cache.delObjectCache(key);
return Array.isArray(result) ? result.map(value => parseInt(value, 10)) : parseInt(result, 10);

@ -1,49 +1,49 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
module.listPrepend = async function (key, value) {
if (!key) {
return;
}
await redisClient.async.lpush(key, value);
await module.client.async.lpush(key, value);
};
module.listAppend = async function (key, value) {
if (!key) {
return;
}
await redisClient.async.rpush(key, value);
await module.client.async.rpush(key, value);
};
module.listRemoveLast = async function (key) {
if (!key) {
return;
}
return await redisClient.async.rpop(key);
return await module.client.async.rpop(key);
};
module.listRemoveAll = async function (key, value) {
if (!key) {
return;
}
await redisClient.async.lrem(key, 0, value);
await module.client.async.lrem(key, 0, value);
};
module.listTrim = async function (key, start, stop) {
if (!key) {
return;
}
await redisClient.async.ltrim(key, start, stop);
await module.client.async.ltrim(key, start, stop);
};
module.getListRange = async function (key, start, stop) {
if (!key) {
return;
}
return await redisClient.async.lrange(key, start, stop);
return await module.client.async.lrange(key, start, stop);
};
module.listLength = async function (key) {
return await redisClient.async.llen(key);
return await module.client.async.llen(key);
};
};

@ -1,10 +1,10 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.flushdb = async function () {
await redisClient.async.send_command('flushdb', []);
await module.client.async.send_command('flushdb', []);
};
module.emptydb = async function () {
@ -14,17 +14,17 @@ module.exports = function (redisClient, module) {
module.exists = async function (key) {
if (Array.isArray(key)) {
const batch = redisClient.batch();
const batch = module.client.batch();
key.forEach(key => batch.exists(key));
const data = await helpers.execBatch(batch);
return data.map(exists => exists === 1);
}
const exists = await redisClient.async.exists(key);
const exists = await module.client.async.exists(key);
return exists === 1;
};
module.delete = async function (key) {
await redisClient.async.del(key);
await module.client.async.del(key);
module.objectCache.delObjectCache(key);
};
@ -32,25 +32,25 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(keys) || !keys.length) {
return;
}
await redisClient.async.del(keys);
await module.client.async.del(keys);
module.objectCache.delObjectCache(keys);
};
module.get = async function (key) {
return await redisClient.async.get(key);
return await module.client.async.get(key);
};
module.set = async function (key, value) {
await redisClient.async.set(key, value);
await module.client.async.set(key, value);
};
module.increment = async function (key) {
return await redisClient.async.incr(key);
return await module.client.async.incr(key);
};
module.rename = async function (oldKey, newKey) {
try {
await redisClient.async.rename(oldKey, newKey);
await module.client.async.rename(oldKey, newKey);
} catch (err) {
if (err && err.message !== 'ERR no such key') {
throw err;
@ -61,23 +61,23 @@ module.exports = function (redisClient, module) {
};
module.type = async function (key) {
const type = await redisClient.async.type(key);
const type = await module.client.async.type(key);
return type !== 'none' ? type : null;
};
module.expire = async function (key, seconds) {
await redisClient.async.expire(key, seconds);
await module.client.async.expire(key, seconds);
};
module.expireAt = async function (key, timestamp) {
await redisClient.async.expireat(key, timestamp);
await module.client.async.expireat(key, timestamp);
};
module.pexpire = async function (key, ms) {
await redisClient.async.pexpire(key, ms);
await module.client.async.pexpire(key, ms);
};
module.pexpireAt = async function (key, timestamp) {
await redisClient.async.pexpireat(key, timestamp);
await module.client.async.pexpireat(key, timestamp);
};
};

@ -1,6 +1,6 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
var helpers = require('./helpers');
module.setAdd = async function (key, value) {
@ -10,14 +10,14 @@ module.exports = function (redisClient, module) {
if (!value.length) {
return;
}
await redisClient.async.sadd(key, value);
await module.client.async.sadd(key, value);
};
module.setsAdd = async function (keys, value) {
if (!Array.isArray(keys) || !keys.length) {
return;
}
const batch = redisClient.batch();
const batch = module.client.batch();
keys.forEach(k => batch.sadd(String(k), String(value)));
await helpers.execBatch(batch);
};
@ -30,58 +30,58 @@ module.exports = function (redisClient, module) {
key = [key];
}
var batch = redisClient.batch();
var batch = module.client.batch();
key.forEach(k => batch.srem(String(k), value));
await helpers.execBatch(batch);
};
module.setsRemove = async function (keys, value) {
var batch = redisClient.batch();
var batch = module.client.batch();
keys.forEach(k => batch.srem(String(k), value));
await helpers.execBatch(batch);
};
module.isSetMember = async function (key, value) {
const result = await redisClient.async.sismember(key, value);
const result = await module.client.async.sismember(key, value);
return result === 1;
};
module.isSetMembers = async function (key, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
values.forEach(v => batch.sismember(String(key), String(v)));
const results = await helpers.execBatch(batch);
return results ? helpers.resultsToBool(results) : null;
};
module.isMemberOfSets = async function (sets, value) {
const batch = redisClient.batch();
const batch = module.client.batch();
sets.forEach(s => batch.sismember(String(s), String(value)));
const results = await helpers.execBatch(batch);
return results ? helpers.resultsToBool(results) : null;
};
module.getSetMembers = async function (key) {
return await redisClient.async.smembers(key);
return await module.client.async.smembers(key);
};
module.getSetsMembers = async function (keys) {
const batch = redisClient.batch();
const batch = module.client.batch();
keys.forEach(k => batch.smembers(String(k)));
return await helpers.execBatch(batch);
};
module.setCount = async function (key) {
return await redisClient.async.scard(key);
return await module.client.async.scard(key);
};
module.setsCount = async function (keys) {
const batch = redisClient.batch();
const batch = module.client.batch();
keys.forEach(k => batch.scard(String(k)));
return await helpers.execBatch(batch);
};
module.setRemoveRandom = async function (key) {
return await redisClient.async.spop(key);
return await module.client.async.spop(key);
};
return module;

@ -1,14 +1,14 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
var _ = require('lodash');
var utils = require('../../utils');
var helpers = require('./helpers');
require('./sorted/add')(redisClient, module);
require('./sorted/remove')(redisClient, module);
require('./sorted/union')(redisClient, module);
require('./sorted/intersect')(redisClient, module);
require('./sorted/add')(module);
require('./sorted/remove')(module);
require('./sorted/union')(module);
require('./sorted/intersect')(module);
module.getSortedSetRange = async function (key, start, stop) {
return await sortedSetRange('zrange', key, start, stop, false);
@ -31,7 +31,7 @@ module.exports = function (redisClient, module) {
if (!key.length) {
return [];
}
const batch = redisClient.batch();
const batch = module.client.batch();
key.forEach((key) => {
batch[method]([key, start, stop, 'WITHSCORES']);
});
@ -59,7 +59,7 @@ module.exports = function (redisClient, module) {
params.push('WITHSCORES');
}
const data = await redisClient.async[method](params);
const data = await module.client.async[method](params);
if (!withScores) {
return data;
}
@ -71,11 +71,11 @@ module.exports = function (redisClient, module) {
}
module.getSortedSetRangeByScore = async function (key, start, count, min, max) {
return await redisClient.async.zrangebyscore([key, min, max, 'LIMIT', start, count]);
return await module.client.async.zrangebyscore([key, min, max, 'LIMIT', start, count]);
};
module.getSortedSetRevRangeByScore = async function (key, start, count, max, min) {
return await redisClient.async.zrevrangebyscore([key, max, min, 'LIMIT', start, count]);
return await module.client.async.zrevrangebyscore([key, max, min, 'LIMIT', start, count]);
};
module.getSortedSetRangeByScoreWithScores = async function (key, start, count, min, max) {
@ -87,7 +87,7 @@ module.exports = function (redisClient, module) {
};
async function sortedSetRangeByScoreWithScores(method, key, start, count, min, max) {
const data = await redisClient.async[method]([key, min, max, 'WITHSCORES', 'LIMIT', start, count]);
const data = await module.client.async[method]([key, min, max, 'WITHSCORES', 'LIMIT', start, count]);
const objects = [];
for (var i = 0; i < data.length; i += 2) {
objects.push({ value: data[i], score: parseFloat(data[i + 1]) });
@ -96,18 +96,18 @@ module.exports = function (redisClient, module) {
}
module.sortedSetCount = async function (key, min, max) {
return await redisClient.async.zcount(key, min, max);
return await module.client.async.zcount(key, min, max);
};
module.sortedSetCard = async function (key) {
return await redisClient.async.zcard(key);
return await module.client.async.zcard(key);
};
module.sortedSetsCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
return [];
}
var batch = redisClient.batch();
var batch = module.client.batch();
keys.forEach(k => batch.zcard(String(k)));
return await helpers.execBatch(batch);
};
@ -125,15 +125,15 @@ module.exports = function (redisClient, module) {
};
module.sortedSetRank = async function (key, value) {
return await redisClient.async.zrank(key, value);
return await module.client.async.zrank(key, value);
};
module.sortedSetRevRank = async function (key, value) {
return await redisClient.async.zrevrank(key, value);
return await module.client.async.zrevrank(key, value);
};
module.sortedSetsRanks = async function (keys, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
for (var i = 0; i < values.length; i += 1) {
batch.zrank(keys[i], String(values[i]));
}
@ -141,7 +141,7 @@ module.exports = function (redisClient, module) {
};
module.sortedSetsRevRanks = async function (keys, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
for (var i = 0; i < values.length; i += 1) {
batch.zrevrank(keys[i], String(values[i]));
}
@ -149,7 +149,7 @@ module.exports = function (redisClient, module) {
};
module.sortedSetRanks = async function (key, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
for (var i = 0; i < values.length; i += 1) {
batch.zrank(key, String(values[i]));
}
@ -157,7 +157,7 @@ module.exports = function (redisClient, module) {
};
module.sortedSetRevRanks = async function (key, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
for (var i = 0; i < values.length; i += 1) {
batch.zrevrank(key, String(values[i]));
}
@ -169,7 +169,7 @@ module.exports = function (redisClient, module) {
return null;
}
const score = await redisClient.async.zscore(key, value);
const score = await module.client.async.zscore(key, value);
return score === null ? score : parseFloat(score);
};
@ -177,7 +177,7 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(keys) || !keys.length) {
return [];
}
const batch = redisClient.batch();
const batch = module.client.batch();
keys.forEach(key => batch.zscore(String(key), String(value)));
const scores = await helpers.execBatch(batch);
return scores.map(d => (d === null ? d : parseFloat(d)));
@ -187,7 +187,7 @@ module.exports = function (redisClient, module) {
if (!values.length) {
return [];
}
const batch = redisClient.batch();
const batch = module.client.batch();
values.forEach(value => batch.zscore(String(key), String(value)));
const scores = await helpers.execBatch(batch);
return scores.map(d => (d === null ? d : parseFloat(d)));
@ -199,7 +199,7 @@ module.exports = function (redisClient, module) {
};
module.isSortedSetMembers = async function (key, values) {
const batch = redisClient.batch();
const batch = module.client.batch();
values.forEach(v => batch.zscore(key, String(v)));
const results = await helpers.execBatch(batch);
return results.map(utils.isNumber);
@ -209,7 +209,7 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(keys) || !keys.length) {
return [];
}
const batch = redisClient.batch();
const batch = module.client.batch();
keys.forEach(k => batch.zscore(k, String(value)));
const results = await helpers.execBatch(batch);
return results.map(utils.isNumber);
@ -219,13 +219,13 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(keys) || !keys.length) {
return [];
}
var batch = redisClient.batch();
var batch = module.client.batch();
keys.forEach(k => batch.zrange(k, 0, -1));
return await helpers.execBatch(batch);
};
module.sortedSetIncrBy = async function (key, increment, value) {
const newValue = await redisClient.async.zincrby(key, increment, value);
const newValue = await module.client.async.zincrby(key, increment, value);
return parseFloat(newValue);
};
@ -266,6 +266,6 @@ module.exports = function (redisClient, module) {
if (count) {
args.push('LIMIT', start, count);
}
return await redisClient.async[method](args);
return await module.client.async[method](args);
}
};

@ -1,6 +1,6 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
const helpers = require('../helpers');
const utils = require('../../../utils');
@ -14,7 +14,7 @@ module.exports = function (redisClient, module) {
if (!utils.isNumber(score)) {
throw new Error('[[error:invalid-score, ' + score + ']]');
}
await redisClient.async.zadd(key, score, String(value));
await module.client.async.zadd(key, score, String(value));
};
async function sortedSetAddMulti(key, scores, values) {
@ -34,7 +34,7 @@ module.exports = function (redisClient, module) {
for (var i = 0; i < scores.length; i += 1) {
args.push(scores[i], String(values[i]));
}
await redisClient.async.zadd(args);
await module.client.async.zadd(args);
}
module.sortedSetsAdd = async function (keys, scores, value) {
@ -50,7 +50,7 @@ module.exports = function (redisClient, module) {
throw new Error('[[error:invalid-data]]');
}
var batch = redisClient.batch();
var batch = module.client.batch();
for (var i = 0; i < keys.length; i += 1) {
if (keys[i]) {
batch.zadd(keys[i], isArrayOfScores ? scores[i] : scores, String(value));
@ -63,7 +63,7 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(data) || !data.length) {
return;
}
var batch = redisClient.batch();
var batch = module.client.batch();
data.forEach(function (item) {
batch.zadd(item[0], item[1], item[2]);
});

@ -1,7 +1,7 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
const helpers = require('../helpers');
module.sortedSetIntersectCard = async function (keys) {
if (!Array.isArray(keys) || !keys.length) {
@ -11,7 +11,7 @@ module.exports = function (redisClient, module) {
var interParams = [tempSetName, keys.length].concat(keys);
var multi = redisClient.multi();
var multi = module.client.multi();
multi.zinterstore(interParams);
multi.zcard(tempSetName);
multi.del(tempSetName);
@ -51,7 +51,7 @@ module.exports = function (redisClient, module) {
rangeParams.push('WITHSCORES');
}
var multi = redisClient.multi();
var multi = module.client.multi();
multi.zinterstore(interParams);
multi[params.method](rangeParams);
multi.del(tempSetName);

@ -1,7 +1,7 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
var helpers = require('../helpers');
module.sortedSetRemove = async function (key, value) {
@ -17,11 +17,11 @@ module.exports = function (redisClient, module) {
}
if (Array.isArray(key)) {
const batch = redisClient.batch();
const batch = module.client.batch();
key.forEach(k => batch.zrem(k, value));
await helpers.execBatch(batch);
} else {
await redisClient.async.zrem(key, value);
await module.client.async.zrem(key, value);
}
};
@ -30,7 +30,7 @@ module.exports = function (redisClient, module) {
};
module.sortedSetsRemoveRangeByScore = async function (keys, min, max) {
var batch = redisClient.batch();
var batch = module.client.batch();
keys.forEach(k => batch.zremrangebyscore(k, min, max));
await helpers.execBatch(batch);
};
@ -39,7 +39,7 @@ module.exports = function (redisClient, module) {
if (!Array.isArray(data) || !data.length) {
return;
}
const batch = redisClient.batch();
const batch = module.client.batch();
data.forEach(item => batch.zrem(item[0], item[1]));
await helpers.execBatch(batch);
};

@ -1,14 +1,14 @@
'use strict';
module.exports = function (redisClient, module) {
module.exports = function (module) {
const helpers = require('../helpers');
module.sortedSetUnionCard = async function (keys) {
var tempSetName = 'temp_' + Date.now();
if (!keys.length) {
return 0;
}
var multi = redisClient.multi();
var multi = module.client.multi();
multi.zunionstore([tempSetName, keys.length].concat(keys));
multi.zcard(tempSetName);
multi.del(tempSetName);
@ -38,7 +38,7 @@ module.exports = function (redisClient, module) {
rangeParams.push('WITHSCORES');
}
var multi = redisClient.multi();
var multi = module.client.multi();
multi.zunionstore([tempSetName, params.sets.length].concat(params.sets));
multi[params.method](rangeParams);
multi.del(tempSetName);

@ -1,8 +1,8 @@
'use strict';
module.exports = function (db, module) {
module.exports = function (module) {
// TODO
module.transaction = function (perform, callback) {
perform(db, callback);
perform(module.client, callback);
};
};

Loading…
Cancel
Save