From e85aabbe74d7838185d0b102bf37a20c1a1b62a1 Mon Sep 17 00:00:00 2001 From: Ben Lubar Date: Thu, 18 Jan 2018 12:02:56 -0600 Subject: [PATCH] Allow running as a cluster without Redis (#6233) * [database/*] Allow databases other than Redis to provide pubsub for clustering if Redis is not present * [pubsub] Delay messages sent before the database is ready until the database is ready. * [pubsub] Restore old behavior of not using the database in non-clustered NodeBB instances. See comment: https://github.com/NodeBB/NodeBB/pull/6233#issuecomment-357814968 --- install/package.json | 2 + src/database/mongo.js | 22 +++++--- src/database/mongo/hash.js | 19 ------- src/database/mongo/pubsub.js | 34 +++++++++++++ src/database/redis.js | 12 +++++ src/database/redis/pubsub.js | 40 +++++++++++++++ src/pubsub.js | 98 ++++++++++++++++++++++-------------- src/socket.io/index.js | 18 +------ 8 files changed, 165 insertions(+), 80 deletions(-) create mode 100644 src/database/mongo/pubsub.js create mode 100644 src/database/redis/pubsub.js diff --git a/install/package.json b/install/package.json index e06f51139e..baa45377fa 100644 --- a/install/package.json +++ b/install/package.json @@ -58,6 +58,7 @@ "mongodb": "2.2.33", "morgan": "^1.9.0", "mousetrap": "^1.6.1", + "mubsub": "^1.4.0", "nconf": "^0.9.1", "nodebb-plugin-composer-default": "6.0.8", "nodebb-plugin-dbsearch": "2.0.9", @@ -89,6 +90,7 @@ "serve-favicon": "^2.4.5", "sitemap": "^1.13.0", "socket.io": "2.0.4", + "socket.io-adapter-mongo": "^2.0.1", "socket.io-client": "2.0.4", "socket.io-redis": "5.2.0", "socketio-wildcard": "2.0.0", diff --git a/src/database/mongo.js b/src/database/mongo.js index 5bcab76a6f..4042cf6025 100644 --- a/src/database/mongo.js +++ b/src/database/mongo.js @@ -61,11 +61,7 @@ mongoModule.questions = [ mongoModule.helpers = mongoModule.helpers || {}; mongoModule.helpers.mongo = require('./mongo/helpers'); -mongoModule.init = function (callback) { - callback = callback || function () { }; - - var mongoClient = require('mongodb').MongoClient; - +function getConnectionString() { var usernamePassword = ''; if (nconf.get('mongo:username') && nconf.get('mongo:password')) { usernamePassword = nconf.get('mongo:username') + ':' + encodeURIComponent(nconf.get('mongo:password')) + '@'; @@ -92,7 +88,15 @@ mongoModule.init = function (callback) { servers.push(hosts[i] + ':' + ports[i]); } - var connString = nconf.get('mongo:uri') || 'mongodb://' + usernamePassword + servers.join() + '/' + nconf.get('mongo:database'); + return nconf.get('mongo:uri') || 'mongodb://' + usernamePassword + servers.join() + '/' + nconf.get('mongo:database'); +} + +mongoModule.init = function (callback) { + callback = callback || function () { }; + + var mongoClient = require('mongodb').MongoClient; + + var connString = getConnectionString(); var connOptions = { poolSize: 10, @@ -118,6 +122,7 @@ mongoModule.init = function (callback) { require('./mongo/sets')(db, mongoModule); require('./mongo/sorted')(db, mongoModule); require('./mongo/list')(db, mongoModule); + require('./mongo/pubsub')(db, mongoModule); callback(); }); }; @@ -262,3 +267,8 @@ mongoModule.close = function (callback) { callback = callback || function () {}; db.close(callback); }; + +mongoModule.socketAdapter = function () { + var mongoAdapter = require('socket.io-adapter-mongo'); + return mongoAdapter(getConnectionString()); +}; diff --git a/src/database/mongo/hash.js b/src/database/mongo/hash.js index aef54bf124..84d66660c0 100644 --- a/src/database/mongo/hash.js +++ b/src/database/mongo/hash.js @@ -5,7 +5,6 @@ module.exports = function (db, module) { var LRU = require('lru-cache'); var _ = require('lodash'); - var pubsub = require('../../pubsub'); var cache = LRU({ max: 10000, @@ -17,24 +16,6 @@ module.exports = function (db, module) { cache.hits = 0; module.objectCache = cache; - pubsub.on('mongo:hash:cache:del', function (key) { - cache.del(key); - }); - - pubsub.on('mongo:hash:cache:reset', function () { - cache.reset(); - }); - - module.delObjectCache = function (key) { - pubsub.publish('mongo:hash:cache:del', key); - cache.del(key); - }; - - module.resetObjectCache = function () { - pubsub.publish('mongo:hash:cache:reset'); - cache.reset(); - }; - module.setObject = function (key, data, callback) { callback = callback || helpers.noop; if (!key || !data) { diff --git a/src/database/mongo/pubsub.js b/src/database/mongo/pubsub.js new file mode 100644 index 0000000000..157d4652bd --- /dev/null +++ b/src/database/mongo/pubsub.js @@ -0,0 +1,34 @@ +'use strict'; + +var nconf = require('nconf'); + +module.exports = function (db, mongoModule) { + var pubsub; + + if (!nconf.get('redis')) { + var mubsub = require('mubsub'); + var client = mubsub(db); + pubsub = client.channel('pubsub'); + mongoModule.pubsub = pubsub; + } else { + pubsub = require('../../pubsub'); + } + + pubsub.on('mongo:hash:cache:del', function (key) { + mongoModule.objectCache.del(key); + }); + + pubsub.on('mongo:hash:cache:reset', function () { + mongoModule.objectCache.reset(); + }); + + mongoModule.delObjectCache = function (key) { + pubsub.publish('mongo:hash:cache:del', key); + mongoModule.objectCache.del(key); + }; + + mongoModule.resetObjectCache = function () { + pubsub.publish('mongo:hash:cache:reset'); + mongoModule.objectCache.reset(); + }; +}; diff --git a/src/database/redis.js b/src/database/redis.js index 70a8ffbd53..55037393cb 100644 --- a/src/database/redis.js +++ b/src/database/redis.js @@ -50,6 +50,7 @@ redisModule.init = function (callback) { require('./redis/sets')(redisClient, redisModule); require('./redis/sorted')(redisClient, redisModule); require('./redis/list')(redisClient, redisModule); + require('./redis/pubsub')(redisClient, redisModule); callback(); }); @@ -169,5 +170,16 @@ redisModule.info = function (cxn, callback) { ], callback); }; +redisModule.socketAdapter = function () { + var redisAdapter = require('socket.io-redis'); + var pub = redisModule.connect(); + var sub = redisModule.connect(); + return redisAdapter({ + key: 'db:' + nconf.get('redis:database') + ':adapter_key', + pubClient: pub, + subClient: sub, + }); +}; + redisModule.helpers = redisModule.helpers || {}; redisModule.helpers.redis = require('./redis/helpers'); diff --git a/src/database/redis/pubsub.js b/src/database/redis/pubsub.js new file mode 100644 index 0000000000..3daf0f40cc --- /dev/null +++ b/src/database/redis/pubsub.js @@ -0,0 +1,40 @@ +'use strict'; + +var nconf = require('nconf'); +var util = require('util'); +var winston = require('winston'); +var EventEmitter = require('events').EventEmitter; + +var channelName; + +var PubSub = function (redisModule) { + var self = this; + var subClient = redisModule.connect(); + this.pubClient = redisModule.connect(); + + channelName = 'db:' + nconf.get('redis:database') + 'pubsub_channel'; + subClient.subscribe(channelName); + + subClient.on('message', function (channel, message) { + if (channel !== channelName) { + return; + } + + try { + var msg = JSON.parse(message); + self.emit(msg.event, msg.data); + } catch (err) { + winston.error(err.stack); + } + }); +}; + +util.inherits(PubSub, EventEmitter); + +PubSub.prototype.publish = function (event, data) { + this.pubClient.publish(channelName, JSON.stringify({ event: event, data: data })); +}; + +module.exports = function (redisClient, redisModule) { + redisModule.pubsub = new PubSub(redisModule); +}; diff --git a/src/pubsub.js b/src/pubsub.js index b33a03b6bf..2364612b88 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -1,48 +1,70 @@ - 'use strict'; var nconf = require('nconf'); -var util = require('util'); -var winston = require('winston'); -var EventEmitter = require('events').EventEmitter; - -var channelName; - -var PubSub = function () { - var self = this; - if (nconf.get('redis')) { - var redis = require('./database/redis'); - var subClient = redis.connect(); - this.pubClient = redis.connect(); - - channelName = 'db:' + nconf.get('redis:database') + 'pubsub_channel'; - subClient.subscribe(channelName); - - subClient.on('message', function (channel, message) { - if (channel !== channelName) { - return; - } - - try { - var msg = JSON.parse(message); - self.emit(msg.event, msg.data); - } catch (err) { - winston.error(err.stack); - } - }); - } + +var real; +var fake = { + publishQueue: [], + publish: function (event, data) { + fake.publishQueue.push({ event: event, data: data }); + }, + listenQueue: {}, + on: function (event, callback) { + if (!Object.prototype.hasOwnProperty.call(fake.listenQueue, event)) { + fake.listenQueue[event] = []; + } + fake.listenQueue[event].push(callback); + }, + removeAllListeners: function (event) { + delete fake.listenQueue[event]; + }, }; -util.inherits(PubSub, EventEmitter); +function get() { + if (real) { + return real; + } + + var pubsub; -PubSub.prototype.publish = function (event, data) { - if (this.pubClient) { - this.pubClient.publish(channelName, JSON.stringify({ event: event, data: data })); + if (nconf.get('isCluster') === 'false') { + var EventEmitter = require('events'); + pubsub = new EventEmitter(); + pubsub.publish = pubsub.emit.bind(pubsub); + } else if (nconf.get('redis')) { + pubsub = require('./database/redis').pubsub; } else { - this.emit(event, data); + pubsub = require('./database').pubsub; } -}; -var pubsub = new PubSub(); + if (!pubsub) { + return fake; + } + + Object.keys(fake.listenQueue).forEach(function (event) { + fake.listenQueue[event].forEach(function (callback) { + pubsub.on(event, callback); + }); + }); + + fake.publishQueue.forEach(function (msg) { + pubsub.publish(msg.event, msg.data); + }); -module.exports = pubsub; + real = pubsub; + fake = null; + + return pubsub; +} + +module.exports = { + publish: function (event, data) { + get().publish(event, data); + }, + on: function (event, callback) { + get().on(event, callback); + }, + removeAllListeners: function (event) { + get().removeAllListeners(event); + }, +}; diff --git a/src/socket.io/index.js b/src/socket.io/index.js index 2e897d0c4c..ead298b492 100644 --- a/src/socket.io/index.js +++ b/src/socket.io/index.js @@ -26,7 +26,7 @@ Sockets.init = function (server) { path: nconf.get('relative_path') + '/socket.io', }); - addRedisAdapter(io); + io.adapter(nconf.get('redis') ? require('../database/redis').socketAdapter() : db.socketAdapter()); io.use(socketioWildcard); io.use(authorize); @@ -212,22 +212,6 @@ function authorize(socket, callback) { ], callback); } -function addRedisAdapter(io) { - if (nconf.get('redis')) { - var redisAdapter = require('socket.io-redis'); - var redis = require('../database/redis'); - var pub = redis.connect(); - var sub = redis.connect(); - io.adapter(redisAdapter({ - key: 'db:' + nconf.get('redis:database') + ':adapter_key', - pubClient: pub, - subClient: sub, - })); - } else if (nconf.get('isCluster') === 'true') { - winston.warn('[socket.io] Clustering detected, you are advised to configure Redis as a websocket store.'); - } -} - Sockets.in = function (room) { return io.in(room); };