fix: redis pubsub not being required correctly

split connection logic into separate module
v1.18.x
Barış Soner Uşaklı 6 years ago
parent 186321e646
commit 8d4f20865f

@ -2,18 +2,19 @@
'use strict';
var winston = require('winston');
var async = require('async');
var nconf = require('nconf');
var session = require('express-session');
var _ = require('lodash');
var semver = require('semver');
var prompt = require('prompt');
var utils = require('../utils');
const winston = require('winston');
const async = require('async');
const nconf = require('nconf');
const session = require('express-session');
const semver = require('semver');
const prompt = require('prompt');
const utils = require('../utils');
let client;
var mongoModule = module.exports;
const connection = require('./mongo/connection');
const mongoModule = module.exports;
function isUriNotSpecified() {
return !prompt.history('mongo:uri').value;
@ -60,58 +61,10 @@ mongoModule.questions = [
},
];
mongoModule.getConnectionString = function (mongo) {
mongo = mongo || nconf.get('mongo');
var usernamePassword = '';
var uri = mongo.uri || '';
if (mongo.username && mongo.password) {
usernamePassword = nconf.get('mongo:username') + ':' + encodeURIComponent(nconf.get('mongo:password')) + '@';
} else if (!uri.includes('@') || !uri.slice(uri.indexOf('://') + 3, uri.indexOf('@'))) {
winston.warn('You have no mongo username/password setup!');
}
// Sensible defaults for Mongo, if not set
if (!mongo.host) {
mongo.host = '127.0.0.1';
}
if (!mongo.port) {
mongo.port = 27017;
}
const dbName = mongo.database;
if (dbName === undefined || dbName === '') {
winston.warn('You have no database name, using "nodebb"');
mongo.database = 'nodebb';
}
var hosts = mongo.host.split(',');
var ports = mongo.port.toString().split(',');
var servers = [];
for (var i = 0; i < hosts.length; i += 1) {
servers.push(hosts[i] + ':' + ports[i]);
}
return uri || 'mongodb://' + usernamePassword + servers.join() + '/' + mongo.database;
};
mongoModule.getConnectionOptions = function (mongo) {
mongo = mongo || nconf.get('mongo');
var connOptions = {
poolSize: 10,
reconnectTries: 3600,
reconnectInterval: 1000,
autoReconnect: true,
connectTimeoutMS: 90000,
useNewUrlParser: true,
};
return _.merge(connOptions, mongo.options || {});
};
mongoModule.init = function (callback) {
callback = callback || function () { };
mongoModule.connect(nconf.get('mongo'), function (err, _client) {
connection.connect(nconf.get('mongo'), function (err, _client) {
if (err) {
winston.error('NodeBB could not connect to your Mongo database. Mongo returned the following error', err);
return callback(err);
@ -122,19 +75,8 @@ mongoModule.init = function (callback) {
});
};
mongoModule.connect = function (options, callback) {
callback = callback || function () { };
var mongoClient = require('mongodb').MongoClient;
var connString = mongoModule.getConnectionString(options);
var connOptions = mongoModule.getConnectionOptions(options);
mongoClient.connect(connString, connOptions, callback);
};
mongoModule.createSessionStore = function (options, callback) {
mongoModule.connect(options, function (err, client) {
connection.connect(options, function (err, client) {
if (err) {
return callback(err);
}
@ -193,7 +135,7 @@ mongoModule.info = function (db, callback) {
if (db) {
return setImmediate(next, null, db);
}
mongoModule.connect(nconf.get('mongo'), function (err, client) {
connection.connect(nconf.get('mongo'), function (err, client) {
next(err, client ? client.db() : undefined);
});
},
@ -278,7 +220,7 @@ mongoModule.close = function (callback) {
mongoModule.socketAdapter = function () {
var mongoAdapter = require('socket.io-adapter-mongo');
return mongoAdapter(mongoModule.getConnectionString());
return mongoAdapter(connection.getConnectionString());
};
require('./mongo/main')(mongoModule);

@ -0,0 +1,67 @@
'use strict';
const nconf = require('nconf');
const winston = require('winston');
const _ = require('lodash');
const connection = module.exports;
connection.getConnectionString = function (mongo) {
mongo = mongo || nconf.get('mongo');
var usernamePassword = '';
var uri = mongo.uri || '';
if (mongo.username && mongo.password) {
usernamePassword = nconf.get('mongo:username') + ':' + encodeURIComponent(nconf.get('mongo:password')) + '@';
} else if (!uri.includes('@') || !uri.slice(uri.indexOf('://') + 3, uri.indexOf('@'))) {
winston.warn('You have no mongo username/password setup!');
}
// Sensible defaults for Mongo, if not set
if (!mongo.host) {
mongo.host = '127.0.0.1';
}
if (!mongo.port) {
mongo.port = 27017;
}
const dbName = mongo.database;
if (dbName === undefined || dbName === '') {
winston.warn('You have no database name, using "nodebb"');
mongo.database = 'nodebb';
}
var hosts = mongo.host.split(',');
var ports = mongo.port.toString().split(',');
var servers = [];
for (var i = 0; i < hosts.length; i += 1) {
servers.push(hosts[i] + ':' + ports[i]);
}
return uri || 'mongodb://' + usernamePassword + servers.join() + '/' + mongo.database;
};
connection.getConnectionOptions = function (mongo) {
mongo = mongo || nconf.get('mongo');
var connOptions = {
poolSize: 10,
reconnectTries: 3600,
reconnectInterval: 1000,
autoReconnect: true,
connectTimeoutMS: 90000,
useNewUrlParser: true,
};
return _.merge(connOptions, mongo.options || {});
};
connection.connect = function (options, callback) {
callback = callback || function () { };
const mongoClient = require('mongodb').MongoClient;
const connString = connection.getConnectionString(options);
const connOptions = connection.getConnectionOptions(options);
mongoClient.connect(connString, connOptions, callback);
};

@ -1,7 +1,7 @@
'use strict';
var mubsub = require('mubsub-nbb');
var db = require('../mongo');
var client = mubsub(db.getConnectionString(), db.getConnectionOptions());
const mubsub = require('mubsub-nbb');
const connection = require('./connection');
const client = mubsub(connection.getConnectionString(), connection.getConnectionOptions());
module.exports = client.channel('pubsub');

@ -1,13 +1,14 @@
'use strict';
var winston = require('winston');
var async = require('async');
var nconf = require('nconf');
var session = require('express-session');
var _ = require('lodash');
var semver = require('semver');
const winston = require('winston');
const async = require('async');
const nconf = require('nconf');
const session = require('express-session');
const semver = require('semver');
var postgresModule = module.exports;
const connection = require('./postgres/connection');
const postgresModule = module.exports;
postgresModule.questions = [
{
@ -39,38 +40,12 @@ postgresModule.questions = [
},
];
postgresModule.getConnectionOptions = function (postgres) {
postgres = postgres || nconf.get('postgres');
// Sensible defaults for PostgreSQL, if not set
if (!postgres.host) {
postgres.host = '127.0.0.1';
}
if (!postgres.port) {
postgres.port = 5432;
}
const dbName = postgres.database;
if (dbName === undefined || dbName === '') {
winston.warn('You have no database name, using "nodebb"');
postgres.database = 'nodebb';
}
var connOptions = {
host: postgres.host,
port: postgres.port,
user: postgres.username,
password: postgres.password,
database: postgres.database,
};
return _.merge(connOptions, postgres.options || {});
};
postgresModule.init = function (callback) {
callback = callback || function () { };
var Pool = require('pg').Pool;
const Pool = require('pg').Pool;
var connOptions = postgresModule.getConnectionOptions();
const connOptions = connection.getConnectionOptions();
const db = new Pool(connOptions);
@ -90,17 +65,6 @@ postgresModule.init = function (callback) {
});
};
postgresModule.connect = function (options, callback) {
var Pool = require('pg').Pool;
var connOptions = postgresModule.getConnectionOptions(options);
const db = new Pool(connOptions);
db.connect(function (err) {
callback(err, db);
});
};
function checkUpgrade(client, callback) {
client.query(`
@ -330,7 +294,7 @@ postgresModule.createSessionStore = function (options, callback) {
callback(null, store);
}
postgresModule.connect(options, function (err, db) {
connection.connect(options, function (err, db) {
if (err) {
return callback(err);
}
@ -401,7 +365,7 @@ postgresModule.info = function (db, callback) {
if (db) {
setImmediate(next, null, db);
} else {
postgresModule.connect(nconf.get('postgres'), next);
connection.connect(nconf.get('postgres'), next);
}
},
function (db, next) {
@ -425,7 +389,7 @@ postgresModule.close = function (callback) {
postgresModule.socketAdapter = function () {
var postgresAdapter = require('socket.io-adapter-postgres');
return postgresAdapter(postgresModule.getConnectionOptions(), {
return postgresAdapter(connection.getConnectionOptions(), {
pubClient: postgresModule.pool,
});
};

@ -0,0 +1,45 @@
'use strict';
const nconf = require('nconf');
const winston = require('winston');
const _ = require('lodash');
const connection = module.exports;
connection.getConnectionOptions = function (postgres) {
postgres = postgres || nconf.get('postgres');
// Sensible defaults for PostgreSQL, if not set
if (!postgres.host) {
postgres.host = '127.0.0.1';
}
if (!postgres.port) {
postgres.port = 5432;
}
const dbName = postgres.database;
if (dbName === undefined || dbName === '') {
winston.warn('You have no database name, using "nodebb"');
postgres.database = 'nodebb';
}
var connOptions = {
host: postgres.host,
port: postgres.port,
user: postgres.username,
password: postgres.password,
database: postgres.database,
};
return _.merge(connOptions, postgres.options || {});
};
connection.connect = function (options, callback) {
const Pool = require('pg').Pool;
const connOptions = connection.getConnectionOptions(options);
const db = new Pool(connOptions);
db.connect(function (err) {
callback(err, db);
});
};

@ -1,15 +1,15 @@
'use strict';
var util = require('util');
var winston = require('winston');
var EventEmitter = require('events').EventEmitter;
var pg = require('pg');
var db = require('../postgres');
const util = require('util');
const winston = require('winston');
const EventEmitter = require('events').EventEmitter;
const pg = require('pg');
const connection = require('./connection');
var PubSub = function () {
var self = this;
const PubSub = function () {
const self = this;
var subClient = new pg.Client(db.getConnectionOptions());
const subClient = new pg.Client(connection.getConnectionOptions());
subClient.connect(function (err) {
if (err) {
@ -41,6 +41,7 @@ var PubSub = function () {
util.inherits(PubSub, EventEmitter);
PubSub.prototype.publish = function (event, data) {
const db = require('../postgres');
db.pool.query({
name: 'pubSubPublish',
text: `SELECT pg_notify('pubsub', $1::TEXT)`,

@ -1,14 +1,14 @@
'use strict';
var _ = require('lodash');
var async = require('async');
var winston = require('winston');
var nconf = require('nconf');
var semver = require('semver');
var session = require('express-session');
var redis = require('redis');
const async = require('async');
const winston = require('winston');
const nconf = require('nconf');
const semver = require('semver');
const session = require('express-session');
var redisModule = module.exports;
const connection = require('./redis/connection');
const redisModule = module.exports;
redisModule.questions = [
{
@ -35,20 +35,10 @@ redisModule.questions = [
},
];
redisModule.getConnectionOptions = function (redis) {
redis = redis || nconf.get('redis');
let connOptions = {};
if (redis.password) {
connOptions.auth_pass = redis.password;
}
connOptions = _.merge(connOptions, redis.options || {});
return connOptions;
};
redisModule.init = function (callback) {
callback = callback || function () { };
redisModule.client = redisModule.connect(nconf.get('redis'), function (err) {
redisModule.client = connection.connect(nconf.get('redis'), function (err) {
if (err) {
winston.error('NodeBB could not connect to your Redis database. Redis returned the following error', err);
return callback(err);
@ -60,63 +50,10 @@ redisModule.init = function (callback) {
});
};
redisModule.connect = function (options, callback) {
callback = callback || function () {};
options = options || nconf.get('redis');
var redis_socket_or_host = options.host;
var cxn;
var callbackCalled = false;
const connOptions = redisModule.getConnectionOptions(options);
if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
}
cxn.on('error', function (err) {
winston.error(err.stack);
if (!callbackCalled) {
callbackCalled = true;
callback(err);
}
});
cxn.on('ready', function () {
if (!callbackCalled) {
callbackCalled = true;
callback(null, cxn);
}
});
if (options.password) {
cxn.auth(options.password);
}
var dbIdx = parseInt(options.database, 10);
if (dbIdx >= 0) {
cxn.select(dbIdx, function (err) {
if (err) {
winston.error('NodeBB could not select Redis database. Redis returned the following error', err);
throw err;
}
});
} else {
callbackCalled = true;
return callback(new Error('[[error:no-database-selected]]'));
}
return cxn;
};
redisModule.createSessionStore = function (options, callback) {
const meta = require('../meta');
const sessionStore = require('connect-redis')(session);
const client = redisModule.connect(options);
const client = connection.connect(options);
const store = new sessionStore({
client: client,
ttl: meta.getSessionTTLSeconds(),
@ -162,7 +99,7 @@ redisModule.info = function (cxn, callback) {
if (cxn) {
return setImmediate(next, null, cxn);
}
redisModule.connect(nconf.get('redis'), next);
connection.connect(nconf.get('redis'), next);
},
function (cxn, next) {
redisModule.client = redisModule.client || cxn;
@ -203,8 +140,8 @@ redisModule.info = function (cxn, callback) {
redisModule.socketAdapter = function () {
var redisAdapter = require('socket.io-redis');
var pub = redisModule.connect(nconf.get('redis'));
var sub = redisModule.connect(nconf.get('redis'));
var pub = connection.connect(nconf.get('redis'));
var sub = connection.connect(nconf.get('redis'));
return redisAdapter({
key: 'db:' + nconf.get('redis:database') + ':adapter_key',
pubClient: pub,
@ -219,4 +156,4 @@ require('./redis/sorted')(redisModule);
require('./redis/list')(redisModule);
require('./redis/transaction')(redisModule);
redisModule.async = require('../promisify')(redisModule, ['client', 'sessionStore', 'connect']);
redisModule.async = require('../promisify')(redisModule, ['client', 'sessionStore']);

@ -0,0 +1,71 @@
'use strict';
const nconf = require('nconf');
const redis = require('redis');
const winston = require('winston');
const _ = require('lodash');
const connection = module.exports;
connection.getConnectionOptions = function (redis) {
redis = redis || nconf.get('redis');
let connOptions = {};
if (redis.password) {
connOptions.auth_pass = redis.password;
}
connOptions = _.merge(connOptions, redis.options || {});
return connOptions;
};
connection.connect = function (options, callback) {
callback = callback || function () {};
options = options || nconf.get('redis');
var redis_socket_or_host = options.host;
var cxn;
var callbackCalled = false;
const connOptions = connection.getConnectionOptions(options);
if (redis_socket_or_host && redis_socket_or_host.indexOf('/') >= 0) {
/* If redis.host contains a path name character, use the unix dom sock connection. ie, /tmp/redis.sock */
cxn = redis.createClient(options.host, connOptions);
} else {
/* Else, connect over tcp/ip */
cxn = redis.createClient(options.port, options.host, connOptions);
}
cxn.on('error', function (err) {
winston.error(err.stack);
if (!callbackCalled) {
callbackCalled = true;
callback(err);
}
});
cxn.on('ready', function () {
if (!callbackCalled) {
callbackCalled = true;
callback(null, cxn);
}
});
if (options.password) {
cxn.auth(options.password);
}
var dbIdx = parseInt(options.database, 10);
if (dbIdx >= 0) {
cxn.select(dbIdx, function (err) {
if (err) {
winston.error('NodeBB could not select Redis database. Redis returned the following error', err);
throw err;
}
});
} else {
callbackCalled = true;
return callback(new Error('[[error:no-database-selected]]'));
}
return cxn;
};

@ -4,14 +4,13 @@ var nconf = require('nconf');
var util = require('util');
var winston = require('winston');
var EventEmitter = require('events').EventEmitter;
const connection = require('./connection');
var channelName;
var PubSub = function () {
var self = this;
var db = require('../redis');
var subClient = db.connect();
this.pubClient = db.connect();
var subClient = connection.connect();
this.pubClient = connection.connect();
channelName = 'db:' + nconf.get('redis:database') + ':pubsub_channel';
subClient.subscribe(channelName);

Loading…
Cancel
Save