feat: async/await redis connection

v1.18.x
Barış Soner Uşaklı 4 years ago
parent 33bf1b0e2c
commit fdfbc90255

@ -1,9 +1,8 @@
'use strict'; 'use strict';
const async = require('async');
const winston = require('winston');
const nconf = require('nconf'); const nconf = require('nconf');
const semver = require('semver'); const semver = require('semver');
const util = require('util');
const session = require('express-session'); const session = require('express-session');
const connection = require('./redis/connection'); const connection = require('./redis/connection');
@ -36,112 +35,77 @@ redisModule.questions = [
]; ];
redisModule.init = function (callback) { redisModule.init = async function () {
callback = callback || function () { }; redisModule.client = await connection.connect(nconf.get('redis'));
redisModule.client = connection.connect(nconf.get('redis'), function (err) { require('./redis/promisify')(redisModule.client);
if (err) {
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error\n' + err.stack);
return callback(err);
}
require('./redis/promisify')(redisModule.client);
callback();
});
}; };
redisModule.createSessionStore = function (options, callback) { redisModule.createSessionStore = async function (options) {
const meta = require('../meta'); const meta = require('../meta');
const sessionStore = require('connect-redis')(session); const sessionStore = require('connect-redis')(session);
const client = connection.connect(options); const client = await connection.connect(options);
const store = new sessionStore({ const store = new sessionStore({
client: client, client: client,
ttl: meta.getSessionTTLSeconds(), ttl: meta.getSessionTTLSeconds(),
}); });
return store;
if (typeof callback === 'function') {
callback(null, store);
}
};
redisModule.createIndices = function (callback) {
setImmediate(callback);
}; };
redisModule.checkCompatibility = function (callback) { redisModule.checkCompatibility = async function () {
async.waterfall([ const info = await redisModule.info(redisModule.client);
function (next) { redisModule.checkCompatibilityVersion(info.redis_version);
redisModule.info(redisModule.client, next);
},
function (info, next) {
redisModule.checkCompatibilityVersion(info.redis_version, next);
},
], callback);
}; };
redisModule.checkCompatibilityVersion = function (version, callback) { redisModule.checkCompatibilityVersion = function (version) {
if (semver.lt(version, '2.8.9')) { if (semver.lt(version, '2.8.9')) {
return callback(new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.')); throw new Error('Your Redis version is not new enough to support NodeBB, please upgrade Redis to v2.8.9 or higher.');
} }
callback();
}; };
redisModule.close = function (callback) { redisModule.close = async function () {
callback = callback || function () {}; await redisModule.client.async.quit();
redisModule.client.quit(function (err) {
callback(err);
});
}; };
redisModule.info = function (cxn, callback) { redisModule.info = async function (cxn) {
async.waterfall([ if (!cxn) {
function (next) { cxn = await connection.connect(nconf.get('redis'));
if (cxn) { }
return setImmediate(next, null, cxn); redisModule.client = redisModule.client || cxn;
} const infoAsync = util.promisify(cb => cxn.info(cb));
connection.connect(nconf.get('redis'), next); const data = await infoAsync();
}, const lines = data.toString().split('\r\n').sort();
function (cxn, next) { const redisData = {};
redisModule.client = redisModule.client || cxn; lines.forEach(function (line) {
const parts = line.split(':');
cxn.info(next); if (parts[1]) {
}, redisData[parts[0]] = parts[1];
function (data, next) { }
var lines = data.toString().split('\r\n').sort(); });
var redisData = {};
lines.forEach(function (line) { const keyInfo = redisData['db' + nconf.get('redis:database')];
var parts = line.split(':'); if (keyInfo) {
if (parts[1]) { const split = keyInfo.split(',');
redisData[parts[0]] = parts[1]; redisData.keys = (split[0] || '').replace('keys=', '');
} redisData.expires = (split[1] || '').replace('expires=', '');
}); redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', '');
}
const keyInfo = redisData['db' + nconf.get('redis:database')];
if (keyInfo) { redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3);
const split = keyInfo.split(','); redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3);
redisData.keys = (split[0] || '').replace('keys=', '');
redisData.expires = (split[1] || '').replace('expires=', ''); redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
redisData.avg_ttl = (split[2] || '').replace('avg_ttl=', ''); redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
}
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
redisData.instantaneous_input = (redisData.instantaneous_input_kbps / 1024).toFixed(3); redisData.raw = JSON.stringify(redisData, null, 4);
redisData.instantaneous_output = (redisData.instantaneous_output_kbps / 1024).toFixed(3); redisData.redis = true;
return redisData;
redisData.total_net_input = (redisData.total_net_input_bytes / (1024 * 1024 * 1024)).toFixed(3);
redisData.total_net_output = (redisData.total_net_output_bytes / (1024 * 1024 * 1024)).toFixed(3);
redisData.used_memory_human = (redisData.used_memory / (1024 * 1024 * 1024)).toFixed(3);
redisData.raw = JSON.stringify(redisData, null, 4);
redisData.redis = true;
next(null, redisData);
},
], callback);
}; };
redisModule.socketAdapter = function () { redisModule.socketAdapter = function () {
var redisAdapter = require('socket.io-redis'); const redisAdapter = require('socket.io-redis');
var pub = connection.connect(nconf.get('redis')); const pub = connection.connect(nconf.get('redis'));
var sub = connection.connect(nconf.get('redis')); const sub = connection.connect(nconf.get('redis'));
return redisAdapter({ return redisAdapter({
key: 'db:' + nconf.get('redis:database') + ':adapter_key', key: 'db:' + nconf.get('redis:database') + ':adapter_key',
pubClient: pub, pubClient: pub,

@ -9,63 +9,48 @@ const connection = module.exports;
connection.getConnectionOptions = function (redis) { connection.getConnectionOptions = function (redis) {
redis = redis || nconf.get('redis'); redis = redis || nconf.get('redis');
let connOptions = {}; const connOptions = {};
if (redis.password) { if (redis.password) {
connOptions.auth_pass = redis.password; connOptions.auth_pass = redis.password;
} }
if (redis.hasOwnProperty('database')) {
connOptions = _.merge(connOptions, redis.options || {}); connOptions.db = redis.database;
return connOptions;
};
connection.connect = function (options, callback) {
callback = callback || function () {};
options = options || nconf.get('redis');
var redis_socket_or_host = options.host;
var cxn;
var callbackCalled = false;
const connOptions = connection.getConnectionOptions(options);
if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
} }
return _.merge(connOptions, redis.options || {});
};
cxn.on('error', function (err) { connection.connect = async function (options) {
winston.error(err.stack); return new Promise(function (resolve, reject) {
if (!callbackCalled) { options = options || nconf.get('redis');
callbackCalled = true; const redis_socket_or_host = options.host;
callback(err); const connOptions = connection.getConnectionOptions(options);
let cxn;
if (redis_socket_or_host && String(redis_socket_or_host).indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
} }
});
cxn.on('ready', function () { const dbIdx = parseInt(options.database, 10);
if (!callbackCalled) { if (!(dbIdx >= 0)) {
callbackCalled = true; throw new Error('[[error:no-database-selected]]');
callback(null, cxn);
} }
});
if (options.password) { cxn.on('error', function (err) {
cxn.auth(options.password); winston.error(err.stack);
} reject(err);
});
var dbIdx = parseInt(options.database, 10); cxn.on('ready', function () {
if (dbIdx >= 0) { resolve(cxn);
cxn.select(dbIdx, function (err) {
if (err) {
winston.error('NodeBB could not select Redis database. Redis returned the following error\n' + err.stack);
throw err;
}
}); });
} else {
callbackCalled = true;
return callback(new Error('[[error:no-database-selected]]'));
}
return cxn; if (options.password) {
cxn.auth(options.password);
}
});
}; };
require('../../promisify')(connection);

@ -4,6 +4,7 @@ const util = require('util');
module.exports = function (redisClient) { module.exports = function (redisClient) {
redisClient.async = { redisClient.async = {
quit: util.promisify(redisClient.quit).bind(redisClient),
send_command: util.promisify(redisClient.send_command).bind(redisClient), send_command: util.promisify(redisClient.send_command).bind(redisClient),
exists: util.promisify(redisClient.exists).bind(redisClient), exists: util.promisify(redisClient.exists).bind(redisClient),

@ -165,7 +165,9 @@ async function completeConfigSetup(config) {
nconf.overrides(config); nconf.overrides(config);
const db = require('./database'); const db = require('./database');
await db.init(); await db.init();
await db.createIndices(); if (db.hasOwnProperty('createIndices')) {
await db.createIndices();
}
// Sanity-check/fix url/port // Sanity-check/fix url/port
if (!/^http(?:s)?:\/\//.test(config.url)) { if (!/^http(?:s)?:\/\//.test(config.url)) {

@ -135,7 +135,9 @@ before(async function () {
await db.init(); await db.init();
await db.createIndices(); if (db.hasOwnProperty('createIndices')) {
await db.createIndices();
}
await setupMockDefaults(); await setupMockDefaults();
await db.initSessionStore(); await db.initSessionStore();

Loading…
Cancel
Save