fix: #9231, fix redis pubsub connection

regression from fdfbc90255
v1.18.x
Barış Soner Uşaklı 4 years ago
parent 3696a19970
commit 5bc1f5b4e8

@ -103,10 +103,10 @@ redisModule.info = async function (cxn) {
return redisData; return redisData;
}; };
redisModule.socketAdapter = function () { redisModule.socketAdapter = async function () {
const redisAdapter = require('socket.io-redis'); const redisAdapter = require('socket.io-redis');
const pub = connection.connect(nconf.get('redis')); const pub = await connection.connect(nconf.get('redis'));
const sub = connection.connect(nconf.get('redis')); const sub = await connection.connect(nconf.get('redis'));
return redisAdapter({ return redisAdapter({
key: 'db:' + nconf.get('redis:database') + ':adapter_key', key: 'db:' + nconf.get('redis:database') + ':adapter_key',
pubClient: pub, pubClient: pub,

@ -1,31 +1,35 @@
'use strict'; 'use strict';
var nconf = require('nconf'); const nconf = require('nconf');
var util = require('util'); const util = require('util');
var winston = require('winston'); const winston = require('winston');
var EventEmitter = require('events').EventEmitter; const EventEmitter = require('events').EventEmitter;
const connection = require('./connection'); const connection = require('./connection');
var channelName; let channelName;
var PubSub = function () { const PubSub = function () {
var self = this; const self = this;
var subClient = connection.connect();
this.pubClient = connection.connect();
channelName = 'db:' + nconf.get('redis:database') + ':pubsub_channel'; channelName = 'db:' + nconf.get('redis:database') + ':pubsub_channel';
subClient.subscribe(channelName);
connection.connect().then(function (client) {
subClient.on('message', function (channel, message) { self.subClient = client;
if (channel !== channelName) { self.subClient.subscribe(channelName);
return; self.subClient.on('message', function (channel, message) {
} if (channel !== channelName) {
return;
try { }
var msg = JSON.parse(message);
self.emit(msg.event, msg.data); try {
} catch (err) { var msg = JSON.parse(message);
winston.error(err.stack); self.emit(msg.event, msg.data);
} } catch (err) {
winston.error(err.stack);
}
});
});
connection.connect().then(function (client) {
self.pubClient = client;
}); });
}; };

@ -16,7 +16,7 @@ const Namespaces = {};
const Sockets = module.exports; const Sockets = module.exports;
Sockets.init = function (server) { Sockets.init = async function (server) {
requireModules(); requireModules();
const SocketIO = require('socket.io').Server; const SocketIO = require('socket.io').Server;
@ -30,7 +30,8 @@ Sockets.init = function (server) {
// io.adapter(require('./single-host-cluster')); // io.adapter(require('./single-host-cluster'));
// } else if (nconf.get('redis')) { // } else if (nconf.get('redis')) {
if (nconf.get('redis')) { if (nconf.get('redis')) {
io.adapter(require('../database/redis').socketAdapter()); const adapter = await require('../database/redis').socketAdapter();
io.adapter(adapter);
} else { } else {
winston.warn('clustering detected, you should setup redis!'); winston.warn('clustering detected, you should setup redis!');
} }

@ -31,7 +31,8 @@ start.start = async function () {
await db.initSessionStore(); await db.initSessionStore();
const webserver = require('./webserver'); const webserver = require('./webserver');
require('./socket.io').init(webserver.server); const sockets = require('./socket.io');
await sockets.init(webserver.server);
if (nconf.get('runJobs')) { if (nconf.get('runJobs')) {
require('./notifications').startJobs(); require('./notifications').startJobs();

@ -152,7 +152,7 @@ before(async function () {
const webserver = require('../../src/webserver'); const webserver = require('../../src/webserver');
const sockets = require('../../src/socket.io'); const sockets = require('../../src/socket.io');
sockets.init(webserver.server); await sockets.init(webserver.server);
require('../../src/notifications').startJobs(); require('../../src/notifications').startJobs();
require('../../src/user').startJobs(); require('../../src/user').startJobs();

Loading…
Cancel
Save