feat: socket.io 3 changes (#8845)

* feat: socket.io 3 changes

* feat: replace socketio-wildcard with socket.onAny

up socket.io-redis to 6.x

* feat: remove mongodb/psql socket.io adapters

* feat: show data on fail

* fix: tests

* fix: typo

* fix: logger test fix

* fix: logger.io_close

* chore: up deps

* chore: update readme to reflect redis requirement

* fix: increase timeout show data if test fails
v1.18.x
Barış Soner Uşaklı 4 years ago committed by GitHub
parent d7f5efd960
commit 1c45fa1ba5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -51,6 +51,7 @@ NodeBB requires the following software to be installed:
* A version of Node.js at least 12 or greater ([installation/upgrade instructions](https://github.com/nodesource/distributions))
* MongoDB, version 2.6 or greater **or** Redis, version 2.8.9 or greater
* If you are using [clustering](https://docs.nodebb.org/configuring/scaling/) you need Redis installed and configured.
* nginx, version 1.3.13 or greater (**only if** intending to use nginx to proxy requests to a NodeBB)
## Installation

@ -90,8 +90,6 @@
"morgan": "^1.10.0",
"mousetrap": "^1.6.5",
"@nodebb/bootswatch": "3.4.2",
"@nodebb/mubsub": "1.7.1",
"@nodebb/socket.io-adapter-mongo": "3.1.1",
"nconf": "^0.11.0",
"nodebb-plugin-composer-default": "6.4.10",
"nodebb-plugin-dbsearch": "4.1.2",
@ -130,12 +128,10 @@
"sharp": "0.26.3",
"sitemap": "^6.1.0",
"slideout": "1.0.1",
"socket.io": "2.3.0",
"socket.io": "3.0.3",
"socket.io-adapter-cluster": "^1.0.1",
"socket.io-adapter-postgres": "^1.2.1",
"socket.io-client": "2.3.1",
"socket.io-redis": "5.4.0",
"socketio-wildcard": "2.0.0",
"socket.io-client": "3.0.3",
"socket.io-redis": "6.0.1",
"sortablejs": "1.10.2",
"spdx-license-list": "^6.1.0",
"spider-detector": "2.0.0",

@ -181,11 +181,6 @@ mongoModule.close = function (callback) {
client.close(err => callback(err));
};
mongoModule.socketAdapter = function () {
const mongoAdapter = require('@nodebb/socket.io-adapter-mongo');
return mongoAdapter(connection.getConnectionString(), connection.getConnectionOptions());
};
require('./mongo/main')(mongoModule);
require('./mongo/hash')(mongoModule);
require('./mongo/sets')(mongoModule);

@ -1,10 +0,0 @@
'use strict';
const mubsub = require('@nodebb/mubsub');
const connection = require('./connection');
const client = mubsub(connection.getConnectionString(), connection.getConnectionOptions());
client.on('error', err => console.error(err));
const channel = client.channel('pubsub');
channel.on('error', err => console.error(err));
module.exports = channel;

@ -406,13 +406,6 @@ postgresModule.close = function (callback) {
postgresModule.pool.end(callback);
};
postgresModule.socketAdapter = function () {
var postgresAdapter = require('socket.io-adapter-postgres');
return postgresAdapter(connection.getConnectionOptions(), {
pubClient: postgresModule.pool,
});
};
require('./postgres/main')(postgresModule);
require('./postgres/hash')(postgresModule);
require('./postgres/sets')(postgresModule);

@ -1,52 +0,0 @@
'use strict';
const util = require('util');
const winston = require('winston');
const EventEmitter = require('events').EventEmitter;
const pg = require('pg');
const connection = require('./connection');
const PubSub = function () {
const self = this;
const subClient = new pg.Client(connection.getConnectionOptions());
subClient.connect(function (err) {
if (err) {
winston.error(err.stack);
return;
}
subClient.query('LISTEN pubsub', function (err) {
if (err) {
winston.error(err.stack);
}
});
subClient.on('notification', function (message) {
if (message.channel !== 'pubsub') {
return;
}
try {
var msg = JSON.parse(message.payload);
self.emit(msg.event, msg.data);
} catch (err) {
winston.error(err.stack);
}
});
});
};
util.inherits(PubSub, EventEmitter);
PubSub.prototype.publish = function (event, data) {
const db = require('../postgres');
db.pool.query({
name: 'pubSubPublish',
text: `SELECT pg_notify('pubsub', $1::TEXT)`,
values: [JSON.stringify({ event: event, data: data })],
});
};
module.exports = new PubSub();

@ -158,11 +158,9 @@ Logger.io_close = function (socket) {
return;
}
var clients = socket.io.sockets.sockets;
const clientsMap = socket.io.sockets.sockets;
for (var sid in clients) {
if (clients.hasOwnProperty(sid)) {
var client = clients[sid];
for (const [, client] of clientsMap) {
if (client.oEmit && client.oEmit !== client.emit) {
client.emit = client.oEmit;
}
@ -171,7 +169,6 @@ Logger.io_close = function (socket) {
client.onevent = client.$onevent;
}
}
}
};
Logger.io = function (socket) {
@ -183,11 +180,9 @@ Logger.io = function (socket) {
return;
}
var clients = socket.io.sockets.sockets;
for (var sid in clients) {
if (clients.hasOwnProperty(sid)) {
Logger.io_one(clients[sid], clients[sid].uid);
}
const clientsMap = socket.io.sockets.sockets;
for (const [, socketObj] of clientsMap) {
Logger.io_one(socketObj, socketObj.uid);
}
};

@ -47,10 +47,8 @@ function get() {
pubsub = singleHost;
} else if (nconf.get('redis')) {
pubsub = require('./database/redis/pubsub');
} else if (nconf.get('mongo')) {
pubsub = require('./database/mongo/pubsub');
} else if (nconf.get('postgres')) {
pubsub = require('./database/postgres/pubsub');
} else {
throw new Error('[[error:redis-required-for-pubsub]]');
}
real = pubsub;

@ -17,7 +17,10 @@ SocketRooms.totals = totals;
pubsub.on('sync:stats:start', function () {
const stats = SocketRooms.getLocalStats();
pubsub.publish('sync:stats:end', { stats: stats, id: os.hostname() + ':' + nconf.get('port') });
pubsub.publish('sync:stats:end', {
stats: stats,
id: os.hostname() + ':' + nconf.get('port'),
});
});
pubsub.on('sync:stats:end', function (data) {
@ -25,9 +28,8 @@ pubsub.on('sync:stats:end', function (data) {
});
pubsub.on('sync:stats:guests', function (eventId) {
var io = require('../index').server;
var roomClients = io.sockets.adapter.rooms;
var guestCount = roomClients.online_guests ? roomClients.online_guests.length : 0;
const Sockets = require('../index');
const guestCount = Sockets.getCountInRoom('online_guests');
pubsub.publish(eventId, guestCount);
});
@ -101,8 +103,8 @@ SocketRooms.getOnlineUserCount = function (io) {
var count = 0;
if (io) {
for (var key in io.sockets.adapter.rooms) {
if (io.sockets.adapter.rooms.hasOwnProperty(key) && key.startsWith('uid_')) {
for (const [key] of io.sockets.adapter.rooms) {
if (key.startsWith('uid_')) {
count += 1;
}
}
@ -112,7 +114,8 @@ SocketRooms.getOnlineUserCount = function (io) {
};
SocketRooms.getLocalStats = function () {
var io = require('../index').server;
var Sockets = require('../index');
var io = Sockets.server;
var socketData = {
onlineGuestCount: 0,
@ -129,26 +132,23 @@ SocketRooms.getLocalStats = function () {
};
if (io) {
var roomClients = io.sockets.adapter.rooms;
socketData.onlineGuestCount = roomClients.online_guests ? roomClients.online_guests.length : 0;
socketData.onlineGuestCount = Sockets.getCountInRoom('online_guests');
socketData.onlineRegisteredCount = SocketRooms.getOnlineUserCount(io);
socketData.socketCount = Object.keys(io.sockets.sockets).length;
socketData.users.categories = roomClients.categories ? roomClients.categories.length : 0;
socketData.users.recent = roomClients.recent_topics ? roomClients.recent_topics.length : 0;
socketData.users.unread = roomClients.unread_topics ? roomClients.unread_topics.length : 0;
socketData.users.categories = Sockets.getCountInRoom('categories');
socketData.users.recent = Sockets.getCountInRoom('recent_topics');
socketData.users.unread = Sockets.getCountInRoom('unread_topics');
var topTenTopics = [];
var tid;
for (var room in roomClients) {
if (roomClients.hasOwnProperty(room)) {
for (const [room, clients] of io.sockets.adapter.rooms) {
tid = room.match(/^topic_(\d+)/);
if (tid) {
socketData.users.topics += roomClients[room].length;
topTenTopics.push({ tid: tid[1], count: roomClients[room].length });
socketData.users.topics += clients.size;
topTenTopics.push({ tid: tid[1], count: clients.size });
} else if (room.match(/^category/)) {
socketData.users.category += roomClients[room].length;
}
socketData.users.category += clients.size;
}
}

@ -19,27 +19,31 @@ const Sockets = module.exports;
Sockets.init = function (server) {
requireModules();
const SocketIO = require('socket.io');
const socketioWildcard = require('socketio-wildcard')();
const SocketIO = require('socket.io').Server;
const io = new SocketIO({
path: nconf.get('relative_path') + '/socket.io',
});
if (nconf.get('isCluster')) {
if (nconf.get('singleHostCluster')) {
io.adapter(require('./single-host-cluster'));
} else if (nconf.get('redis')) {
// socket.io-adapter-cluster needs update
// if (nconf.get('singleHostCluster')) {
// io.adapter(require('./single-host-cluster'));
// } else if (nconf.get('redis')) {
if (nconf.get('redis')) {
io.adapter(require('../database/redis').socketAdapter());
} else {
io.adapter(db.socketAdapter());
winston.warn('clustering detected, you should setup redis!');
}
}
io.use(socketioWildcard);
io.use(authorize);
io.on('connection', onConnection);
const opts = {
transports: nconf.get('socket.io:transports') || ['polling', 'websocket'],
cookie: false,
};
/*
* Restrict socket.io listener to cookie domain. If none is set, infer based on url.
* Production only so you don't get accidentally locked out.
@ -47,15 +51,15 @@ Sockets.init = function (server) {
*/
if (process.env.NODE_ENV !== 'development') {
const origins = nconf.get('socket.io:origins');
io.origins(origins);
opts.cors = {
origin: origins,
methods: ['GET', 'POST'],
allowedHeaders: ['content-type'],
};
winston.info('[socket.io] Restricting access to origin: ' + origins);
}
io.listen(server, {
transports: nconf.get('socket.io:transports'),
cookie: false,
});
io.listen(server, opts);
Sockets.server = io;
};
@ -65,8 +69,8 @@ function onConnection(socket) {
logger.io_one(socket, socket.uid);
onConnect(socket);
socket.on('*', function (payload) {
socket.onAny((event, ...args) => {
const payload = { data: [event].concat(args) };
onMessage(socket, payload);
});
@ -89,8 +93,8 @@ function onConnect(socket) {
}
socket.join('sess_' + socket.request.signedCookies[nconf.get('sessionKey')]);
Sockets.server.sockets.sockets[socket.id].emit('checkSession', socket.uid);
Sockets.server.sockets.sockets[socket.id].emit('setHostname', os.hostname());
socket.emit('checkSession', socket.uid);
socket.emit('setHostname', os.hostname());
plugins.hooks.fire('action:sockets.connect', { socket: socket });
}
@ -225,12 +229,15 @@ Sockets.in = function (room) {
};
Sockets.getUserSocketCount = function (uid) {
return Sockets.getCountInRoom('uid_' + uid);
};
Sockets.getCountInRoom = function (room) {
if (!Sockets.server) {
return 0;
}
const room = Sockets.server.sockets.adapter.rooms['uid_' + uid];
return room ? room.length : 0;
const roomMap = Sockets.server.sockets.adapter.rooms.get(room);
return roomMap ? roomMap.size : 0;
};
Sockets.warnDeprecated = (socket, replacement) => {

@ -48,7 +48,8 @@ nconf.set('socket.io:origins', origins);
if (nconf.get('isCluster') === undefined) {
nconf.set('isPrimary', true);
nconf.set('isCluster', true);
nconf.set('isCluster', false);
nconf.set('singleHostCluster', false);
}
const dbType = nconf.get('database');

@ -450,10 +450,10 @@ describe('Notifications', function () {
user.notifications.getAll(uid, '', function (err, data) {
meta.config.welcomeNotification = '';
assert.ifError(err);
assert(data.includes('welcome_' + uid));
assert(data.includes('welcome_' + uid), data);
done();
});
}, 1100);
}, 2000);
});
});
});

@ -21,7 +21,6 @@ describe('pubsub', function () {
it('should use same event emitter', function (done) {
pubsub.on('dummyEvent', function (message) {
assert.equal(message.foo, 2);
nconf.set('isCluster', true);
pubsub.removeAllListeners('dummyEvent');
pubsub.reset();
done();

@ -394,7 +394,7 @@ describe('socket.io', function () {
setTimeout(function () {
socketAdmin.rooms.getAll({ uid: 10 }, {}, function (err, data) {
assert.ifError(err);
assert.equal(data.users.category, 1);
assert.equal(data.users.category, 1, JSON.stringify(data, null, 4));
done();
});
}, 1000);

Loading…
Cancel
Save