From e952a873a824222e6afc52cee77340877656949c Mon Sep 17 00:00:00 2001 From: Julian Lam Date: Fri, 28 Nov 2014 11:49:15 -0500 Subject: [PATCH 1/4] moving onlineUsers from an in-memory variable to a sorted set --- loader.js | 4 +- src/socket.io/index.js | 90 +++++++++++++++++++++++++++--------------- 2 files changed, 61 insertions(+), 33 deletions(-) diff --git a/loader.js b/loader.js index 0a9d6e0a05..c27691036a 100644 --- a/loader.js +++ b/loader.js @@ -133,8 +133,8 @@ Loader.addClusterEvents = function(callback) { Loader.primaryWorker = parseInt(worker.id, 10); } break; - case 'user:connect': - case 'user:disconnect': + // case 'user:connect': + // case 'user:disconnect': case 'config:update': Loader.notifyWorkers(message); break; diff --git a/src/socket.io/index.js b/src/socket.io/index.js index ced9d8b3b9..010a2743e0 100644 --- a/src/socket.io/index.js +++ b/src/socket.io/index.js @@ -26,43 +26,68 @@ var io; var onlineUsers = []; -process.on('message', onMessage); +// process.on('message', onMessage); + +// function onMessage(msg) { +// if (typeof msg !== 'object') { +// return; +// } + +// if (msg.action === 'user:connect') { +// if (msg.uid && onlineUsers.indexOf(msg.uid) === -1) { +// onlineUsers.push(msg.uid); +// } +// } else if(msg.action === 'user:disconnect') { +// if (msg.uid && msg.socketCount <= 1) { +// var index = onlineUsers.indexOf(msg.uid); +// if (index !== -1) { +// onlineUsers.splice(index, 1); +// } +// } +// } +// } -function onMessage(msg) { - if (typeof msg !== 'object') { - return; - } - - if (msg.action === 'user:connect') { - if (msg.uid && onlineUsers.indexOf(msg.uid) === -1) { - onlineUsers.push(msg.uid); - } - } else if(msg.action === 'user:disconnect') { - if (msg.uid && msg.socketCount <= 1) { - var index = onlineUsers.indexOf(msg.uid); - if (index !== -1) { - onlineUsers.splice(index, 1); - } +function onUserConnect(uid, socketid) { + db.sortedSetIncrBy('onlineUsers', 1, uid, function(err, score) { + if (err) { + return winston.error('[socket.io] Could not add socket id ' + socketid + ' (uid ' + uid + ') to the online users list.'); } - } -} -function onUserConnect(uid, socketid) { - var msg = {action: 'user:connect', uid: uid, socketid: socketid}; - if (process.send) { - process.send(msg); - } else { - onMessage(msg); - } + winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') connect'); + }); + // var msg = {action: 'user:connect', uid: uid, socketid: socketid}; + // if (process.send) { + // process.send(msg); + // } else { + // onMessage(msg); + // } } function onUserDisconnect(uid, socketid, socketCount) { - var msg = {action: 'user:disconnect', uid: uid, socketid: socketid, socketCount: socketCount}; - if (process.send) { - process.send(msg); - } else { - onMessage(msg); - } + // baris, I no longer use socketCount here, since the zset score is the # of connections, just FYI. + db.sortedSetIncrBy('onlineUsers', -1, uid, function(err, score) { + if (err) { + return winston.error('[socket.io] Could not remove socket id ' + socketid + ' (uid ' + uid + ') from the online users list.'); + } + + if (parseInt(score, 10) === 0) { + db.sortedSetRemove('onlineUsers', uid, function(err) { + if (err) { + winston.error('[socket.io] Could not remove uid ' + uid + ' from the online users list') + } else { + winston.verbose('[socket.io] Removed uid ' + uid + ' from the online users list, user is now considered offline'); + } + }); + } + + winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') disconnect'); + }); + // var msg = {action: 'user:disconnect', uid: uid, socketid: socketid, socketCount: socketCount}; + // if (process.send) { + // process.send(msg); + // } else { + // onMessage(msg); + // } } Sockets.init = function(server) { @@ -112,6 +137,9 @@ Sockets.init = function(server) { }); }); + // Clearing the online users sorted set + db.delete('onlineUsers'); + io.sockets.on('connection', function(socket) { var hs = socket.handshake, sessionID, uid; From 7ad242b981fa9f3be03fe371c953304904111b99 Mon Sep 17 00:00:00 2001 From: Julian Lam Date: Fri, 28 Nov 2014 12:34:19 -0500 Subject: [PATCH 2/4] using redis for user online list management if configured --- src/socket.io/index.js | 25 ++++++++++++++++--------- 1 file changed, 16 insertions(+), 9 deletions(-) diff --git a/src/socket.io/index.js b/src/socket.io/index.js index 010a2743e0..4e2f956ce3 100644 --- a/src/socket.io/index.js +++ b/src/socket.io/index.js @@ -48,7 +48,9 @@ var onlineUsers = []; // } function onUserConnect(uid, socketid) { - db.sortedSetIncrBy('onlineUsers', 1, uid, function(err, score) { + var database = Sockets.redisDB ? Sockets.redisDB : db; + + database.sortedSetIncrBy('onlineUsers', 1, uid, function(err, score) { if (err) { return winston.error('[socket.io] Could not add socket id ' + socketid + ' (uid ' + uid + ') to the online users list.'); } @@ -65,13 +67,15 @@ function onUserConnect(uid, socketid) { function onUserDisconnect(uid, socketid, socketCount) { // baris, I no longer use socketCount here, since the zset score is the # of connections, just FYI. - db.sortedSetIncrBy('onlineUsers', -1, uid, function(err, score) { + var database = Sockets.redisDB ? Sockets.redisDB : db; + + database.sortedSetIncrBy('onlineUsers', -1, uid, function(err, score) { if (err) { return winston.error('[socket.io] Could not remove socket id ' + socketid + ' (uid ' + uid + ') from the online users list.'); } if (parseInt(score, 10) === 0) { - db.sortedSetRemove('onlineUsers', uid, function(err) { + database.sortedSetRemove('onlineUsers', uid, function(err) { if (err) { winston.error('[socket.io] Could not remove uid ' + uid + ' from the online users list') } else { @@ -102,11 +106,14 @@ Sockets.init = function(server) { // If a redis server is configured, use it as a socket.io store, otherwise, fall back to in-memory store if (nconf.get('redis')) { - var RedisStore = require('socket.io/lib/stores/redis'), - database = require('../database/redis'), - pub = database.connect(), - sub = database.connect(), - client = database.connect(); + var RedisStore = require('socket.io/lib/stores/redis'); + + Sockets.redisDB = require('../database/redis'); + Sockets.redisDB.init(); + + var pub = Sockets.redisDB.connect(), + sub = Sockets.redisDB.connect(), + client = Sockets.redisDB.connect(); // "redis" property needs to be passed in as referenced here: https://github.com/Automattic/socket.io/issues/808 // Probably fixed in socket.IO 1.0 @@ -138,7 +145,7 @@ Sockets.init = function(server) { }); // Clearing the online users sorted set - db.delete('onlineUsers'); + (Sockets.redisDB ? Sockets.redisDB : db)['delete']('onlineUsers'); io.sockets.on('connection', function(socket) { var hs = socket.handshake, From 1ce3c9ca2da0a0ef0c25d48117dfff982bda5adb Mon Sep 17 00:00:00 2001 From: Julian Lam Date: Fri, 28 Nov 2014 13:44:46 -0500 Subject: [PATCH 3/4] no longer logging warnings for socket.io --- src/socket.io/index.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/socket.io/index.js b/src/socket.io/index.js index 4e2f956ce3..25172bbbf8 100644 --- a/src/socket.io/index.js +++ b/src/socket.io/index.js @@ -98,7 +98,7 @@ Sockets.init = function(server) { // Default socket.io config var config = { log: true, - 'log level': process.env.NODE_ENV === 'development' ? 2 : 1, + 'log level': process.env.NODE_ENV === 'development' ? 2 : 0, transports: ['websocket', 'xhr-polling', 'jsonp-polling', 'flashsocket'], 'browser client minification': true, resource: nconf.get('relative_path') + '/socket.io' From 5ac5d20e4be9fdbff53b9be6ad91e5269e174fda Mon Sep 17 00:00:00 2001 From: barisusakli Date: Fri, 28 Nov 2014 14:25:11 -0500 Subject: [PATCH 4/4] users:online refactor --- src/controllers/users.js | 3 +- src/socket.io/index.js | 97 ++----------------------------------- src/socket.io/meta.js | 100 +++++++++++++++++++++------------------ src/socket.io/posts.js | 21 ++++---- src/socket.io/topics.js | 24 ++++++---- src/user.js | 24 ++++++---- 6 files changed, 102 insertions(+), 167 deletions(-) diff --git a/src/controllers/users.js b/src/controllers/users.js index 84fde728fd..626d69da4a 100644 --- a/src/controllers/users.js +++ b/src/controllers/users.js @@ -15,7 +15,8 @@ usersController.getOnlineUsers = function(req, res, next) { user.getUsersFromSet('users:online', 0, 49, next); }, count: function(next) { - next(null, websockets.getConnectedClients().length); + var now = Date.now(); + db.sortedSetCount('users:online', now - 300000, now, next); }, isAdministrator: function(next) { user.isAdministrator(uid, next); diff --git a/src/socket.io/index.js b/src/socket.io/index.js index 25172bbbf8..b7e18eb596 100644 --- a/src/socket.io/index.js +++ b/src/socket.io/index.js @@ -24,76 +24,6 @@ var SocketIO = require('socket.io'), var io; -var onlineUsers = []; - -// process.on('message', onMessage); - -// function onMessage(msg) { -// if (typeof msg !== 'object') { -// return; -// } - -// if (msg.action === 'user:connect') { -// if (msg.uid && onlineUsers.indexOf(msg.uid) === -1) { -// onlineUsers.push(msg.uid); -// } -// } else if(msg.action === 'user:disconnect') { -// if (msg.uid && msg.socketCount <= 1) { -// var index = onlineUsers.indexOf(msg.uid); -// if (index !== -1) { -// onlineUsers.splice(index, 1); -// } -// } -// } -// } - -function onUserConnect(uid, socketid) { - var database = Sockets.redisDB ? Sockets.redisDB : db; - - database.sortedSetIncrBy('onlineUsers', 1, uid, function(err, score) { - if (err) { - return winston.error('[socket.io] Could not add socket id ' + socketid + ' (uid ' + uid + ') to the online users list.'); - } - - winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') connect'); - }); - // var msg = {action: 'user:connect', uid: uid, socketid: socketid}; - // if (process.send) { - // process.send(msg); - // } else { - // onMessage(msg); - // } -} - -function onUserDisconnect(uid, socketid, socketCount) { - // baris, I no longer use socketCount here, since the zset score is the # of connections, just FYI. - var database = Sockets.redisDB ? Sockets.redisDB : db; - - database.sortedSetIncrBy('onlineUsers', -1, uid, function(err, score) { - if (err) { - return winston.error('[socket.io] Could not remove socket id ' + socketid + ' (uid ' + uid + ') from the online users list.'); - } - - if (parseInt(score, 10) === 0) { - database.sortedSetRemove('onlineUsers', uid, function(err) { - if (err) { - winston.error('[socket.io] Could not remove uid ' + uid + ' from the online users list') - } else { - winston.verbose('[socket.io] Removed uid ' + uid + ' from the online users list, user is now considered offline'); - } - }); - } - - winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') disconnect'); - }); - // var msg = {action: 'user:disconnect', uid: uid, socketid: socketid, socketCount: socketCount}; - // if (process.send) { - // process.send(msg); - // } else { - // onMessage(msg); - // } -} - Sockets.init = function(server) { // Default socket.io config var config = { @@ -106,14 +36,11 @@ Sockets.init = function(server) { // If a redis server is configured, use it as a socket.io store, otherwise, fall back to in-memory store if (nconf.get('redis')) { - var RedisStore = require('socket.io/lib/stores/redis'); - - Sockets.redisDB = require('../database/redis'); - Sockets.redisDB.init(); - - var pub = Sockets.redisDB.connect(), - sub = Sockets.redisDB.connect(), - client = Sockets.redisDB.connect(); + var RedisStore = require('socket.io/lib/stores/redis'), + database = require('../database/redis'), + pub = database.connect(), + sub = database.connect(), + client = database.connect(); // "redis" property needs to be passed in as referenced here: https://github.com/Automattic/socket.io/issues/808 // Probably fixed in socket.IO 1.0 @@ -144,9 +71,6 @@ Sockets.init = function(server) { }); }); - // Clearing the online users sorted set - (Sockets.redisDB ? Sockets.redisDB : db)['delete']('onlineUsers'); - io.sockets.on('connection', function(socket) { var hs = socket.handshake, sessionID, uid; @@ -170,7 +94,6 @@ Sockets.init = function(server) { } socket.uid = parseInt(uid, 10); - onUserConnect(uid, socket.id); /* If meta.config.loggerIOStatus > 0, logger.io_one will hook into this socket */ logger.io_one(socket, uid); @@ -220,8 +143,6 @@ Sockets.init = function(server) { socket.broadcast.emit('event:user_status_change', {uid: uid, status: 'offline'}); } - onUserDisconnect(uid, socket.id, socketCount); - for(var roomName in io.sockets.manager.roomClients[socket.id]) { if (roomName.indexOf('topic') !== -1) { io.sockets.in(roomName.slice(1)).emit('event:user_leave', socket.uid); @@ -314,10 +235,6 @@ Sockets.getSocketCount = function() { return Array.isArray(clients) ? clients.length : 0; }; -Sockets.getConnectedClients = function() { - return onlineUsers; -}; - Sockets.getUserSocketCount = function(uid) { var roomClients = io.sockets.manager.rooms['/uid_' + uid]; if(!Array.isArray(roomClients)) { @@ -326,10 +243,6 @@ Sockets.getUserSocketCount = function(uid) { return roomClients.length; }; -Sockets.getOnlineUserCount = function () { - return onlineUsers.length; -}; - Sockets.getOnlineAnonCount = function () { var guestRoom = io.sockets.manager.rooms['/online_guests']; if (!Array.isArray(guestRoom)) { diff --git a/src/socket.io/meta.js b/src/socket.io/meta.js index 7c1f42a698..65a4038d2e 100644 --- a/src/socket.io/meta.js +++ b/src/socket.io/meta.js @@ -86,61 +86,67 @@ SocketMeta.rooms.enter = function(socket, data, callback) { }; SocketMeta.rooms.getAll = function(socket, data, callback) { - var rooms = websockets.server.sockets.manager.rooms, - socketData = { - onlineGuestCount: websockets.getOnlineAnonCount(), - onlineRegisteredCount: websockets.getOnlineUserCount(), - socketCount: websockets.getSocketCount(), - users: { - home: rooms['/home'] ? rooms['/home'].length : 0, - topics: 0, - category: 0 - }, - topics: {} - }; - - var scores = {}, - topTenTopics = [], - tid; - - for (var room in rooms) { - if (rooms.hasOwnProperty(room)) { - if (tid = room.match(/^\/topic_(\d+)/)) { - var length = rooms[room].length; - socketData.users.topics += length; - - if (scores[length]) { - scores[length].push(tid[1]); - } else { - scores[length] = [tid[1]]; + db.sortedSetCount('users:online', now - 300000, now, function(err, onlineRegisteredCount) { + if (err) { + return callback(err); + } + + var rooms = websockets.server.sockets.manager.rooms, + socketData = { + onlineGuestCount: websockets.getOnlineAnonCount(), + onlineRegisteredCount: onlineRegisteredCount, + socketCount: websockets.getSocketCount(), + users: { + home: rooms['/home'] ? rooms['/home'].length : 0, + topics: 0, + category: 0 + }, + topics: {} + }; + + var scores = {}, + topTenTopics = [], + tid; + + for (var room in rooms) { + if (rooms.hasOwnProperty(room)) { + if (tid = room.match(/^\/topic_(\d+)/)) { + var length = rooms[room].length; + socketData.users.topics += length; + + if (scores[length]) { + scores[length].push(tid[1]); + } else { + scores[length] = [tid[1]]; + } + } else if (room.match(/^\/category/)) { + socketData.users.category += rooms[room].length; } - } else if (room.match(/^\/category/)) { - socketData.users.category += rooms[room].length; } } - } - var scoreKeys = Object.keys(scores), - mostActive = scoreKeys.sort(); + var scoreKeys = Object.keys(scores), + mostActive = scoreKeys.sort(); - while(topTenTopics.length < 10 && mostActive.length > 0) { - topTenTopics = topTenTopics.concat(scores[mostActive.pop()]); - } + while(topTenTopics.length < 10 && mostActive.length > 0) { + topTenTopics = topTenTopics.concat(scores[mostActive.pop()]); + } - topTenTopics = topTenTopics.slice(0, 10); + topTenTopics = topTenTopics.slice(0, 10); - topics.getTopicsFields(topTenTopics, ['title'], function(err, titles) { - if (err) { - return callback(err); - } - topTenTopics.forEach(function(tid, id) { - socketData.topics[tid] = { - value: rooms['/topic_' + tid].length, - title: validator.escape(titles[id].title) - }; + topics.getTopicsFields(topTenTopics, ['title'], function(err, titles) { + if (err) { + return callback(err); + } + topTenTopics.forEach(function(tid, id) { + socketData.topics[tid] = { + value: rooms['/topic_' + tid].length, + title: validator.escape(titles[id].title) + }; + }); + + callback(null, socketData); }); - - callback(null, socketData); }); }; diff --git a/src/socket.io/posts.js b/src/socket.io/posts.js index 01b4e1f78f..f5052460cc 100644 --- a/src/socket.io/posts.js +++ b/src/socket.io/posts.js @@ -46,21 +46,24 @@ SocketPosts.reply = function(socket, data, callback) { socket.emit('event:new_post', result); - var uids = websockets.getConnectedClients(); - - privileges.categories.filterUids('read', postData.topic.cid, uids, function(err, uids) { + user.getUidsFromSet('users:online', 0, -1, function(err, uids) { if (err) { return; } + privileges.categories.filterUids('read', postData.topic.cid, uids, function(err, uids) { + if (err) { + return; + } - plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newPost"}, function(err, data) { - uids = data.uidsTo; + plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newPost"}, function(err, data) { + uids = data.uidsTo; - for(var i=0; i