users:online refactor

v1.18.x
barisusakli 10 years ago
parent 1ce3c9ca2d
commit 5ac5d20e4b

@ -15,7 +15,8 @@ usersController.getOnlineUsers = function(req, res, next) {
user.getUsersFromSet('users:online', 0, 49, next);
},
count: function(next) {
next(null, websockets.getConnectedClients().length);
var now = Date.now();
db.sortedSetCount('users:online', now - 300000, now, next);
},
isAdministrator: function(next) {
user.isAdministrator(uid, next);

@ -24,76 +24,6 @@ var SocketIO = require('socket.io'),
var io;
var onlineUsers = [];
// process.on('message', onMessage);
// function onMessage(msg) {
// if (typeof msg !== 'object') {
// return;
// }
// if (msg.action === 'user:connect') {
// if (msg.uid && onlineUsers.indexOf(msg.uid) === -1) {
// onlineUsers.push(msg.uid);
// }
// } else if(msg.action === 'user:disconnect') {
// if (msg.uid && msg.socketCount <= 1) {
// var index = onlineUsers.indexOf(msg.uid);
// if (index !== -1) {
// onlineUsers.splice(index, 1);
// }
// }
// }
// }
function onUserConnect(uid, socketid) {
var database = Sockets.redisDB ? Sockets.redisDB : db;
database.sortedSetIncrBy('onlineUsers', 1, uid, function(err, score) {
if (err) {
return winston.error('[socket.io] Could not add socket id ' + socketid + ' (uid ' + uid + ') to the online users list.');
}
winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') connect');
});
// var msg = {action: 'user:connect', uid: uid, socketid: socketid};
// if (process.send) {
// process.send(msg);
// } else {
// onMessage(msg);
// }
}
function onUserDisconnect(uid, socketid, socketCount) {
// baris, I no longer use socketCount here, since the zset score is the # of connections, just FYI.
var database = Sockets.redisDB ? Sockets.redisDB : db;
database.sortedSetIncrBy('onlineUsers', -1, uid, function(err, score) {
if (err) {
return winston.error('[socket.io] Could not remove socket id ' + socketid + ' (uid ' + uid + ') from the online users list.');
}
if (parseInt(score, 10) === 0) {
database.sortedSetRemove('onlineUsers', uid, function(err) {
if (err) {
winston.error('[socket.io] Could not remove uid ' + uid + ' from the online users list')
} else {
winston.verbose('[socket.io] Removed uid ' + uid + ' from the online users list, user is now considered offline');
}
});
}
winston.verbose('[socket.io] Socket id ' + socketid + ' (uid ' + uid + ') disconnect');
});
// var msg = {action: 'user:disconnect', uid: uid, socketid: socketid, socketCount: socketCount};
// if (process.send) {
// process.send(msg);
// } else {
// onMessage(msg);
// }
}
Sockets.init = function(server) {
// Default socket.io config
var config = {
@ -106,14 +36,11 @@ Sockets.init = function(server) {
// If a redis server is configured, use it as a socket.io store, otherwise, fall back to in-memory store
if (nconf.get('redis')) {
var RedisStore = require('socket.io/lib/stores/redis');
Sockets.redisDB = require('../database/redis');
Sockets.redisDB.init();
var pub = Sockets.redisDB.connect(),
sub = Sockets.redisDB.connect(),
client = Sockets.redisDB.connect();
var RedisStore = require('socket.io/lib/stores/redis'),
database = require('../database/redis'),
pub = database.connect(),
sub = database.connect(),
client = database.connect();
// "redis" property needs to be passed in as referenced here: https://github.com/Automattic/socket.io/issues/808
// Probably fixed in socket.IO 1.0
@ -144,9 +71,6 @@ Sockets.init = function(server) {
});
});
// Clearing the online users sorted set
(Sockets.redisDB ? Sockets.redisDB : db)['delete']('onlineUsers');
io.sockets.on('connection', function(socket) {
var hs = socket.handshake,
sessionID, uid;
@ -170,7 +94,6 @@ Sockets.init = function(server) {
}
socket.uid = parseInt(uid, 10);
onUserConnect(uid, socket.id);
/* If meta.config.loggerIOStatus > 0, logger.io_one will hook into this socket */
logger.io_one(socket, uid);
@ -220,8 +143,6 @@ Sockets.init = function(server) {
socket.broadcast.emit('event:user_status_change', {uid: uid, status: 'offline'});
}
onUserDisconnect(uid, socket.id, socketCount);
for(var roomName in io.sockets.manager.roomClients[socket.id]) {
if (roomName.indexOf('topic') !== -1) {
io.sockets.in(roomName.slice(1)).emit('event:user_leave', socket.uid);
@ -314,10 +235,6 @@ Sockets.getSocketCount = function() {
return Array.isArray(clients) ? clients.length : 0;
};
Sockets.getConnectedClients = function() {
return onlineUsers;
};
Sockets.getUserSocketCount = function(uid) {
var roomClients = io.sockets.manager.rooms['/uid_' + uid];
if(!Array.isArray(roomClients)) {
@ -326,10 +243,6 @@ Sockets.getUserSocketCount = function(uid) {
return roomClients.length;
};
Sockets.getOnlineUserCount = function () {
return onlineUsers.length;
};
Sockets.getOnlineAnonCount = function () {
var guestRoom = io.sockets.manager.rooms['/online_guests'];
if (!Array.isArray(guestRoom)) {

@ -86,61 +86,67 @@ SocketMeta.rooms.enter = function(socket, data, callback) {
};
SocketMeta.rooms.getAll = function(socket, data, callback) {
var rooms = websockets.server.sockets.manager.rooms,
socketData = {
onlineGuestCount: websockets.getOnlineAnonCount(),
onlineRegisteredCount: websockets.getOnlineUserCount(),
socketCount: websockets.getSocketCount(),
users: {
home: rooms['/home'] ? rooms['/home'].length : 0,
topics: 0,
category: 0
},
topics: {}
};
var scores = {},
topTenTopics = [],
tid;
for (var room in rooms) {
if (rooms.hasOwnProperty(room)) {
if (tid = room.match(/^\/topic_(\d+)/)) {
var length = rooms[room].length;
socketData.users.topics += length;
if (scores[length]) {
scores[length].push(tid[1]);
} else {
scores[length] = [tid[1]];
db.sortedSetCount('users:online', now - 300000, now, function(err, onlineRegisteredCount) {
if (err) {
return callback(err);
}
var rooms = websockets.server.sockets.manager.rooms,
socketData = {
onlineGuestCount: websockets.getOnlineAnonCount(),
onlineRegisteredCount: onlineRegisteredCount,
socketCount: websockets.getSocketCount(),
users: {
home: rooms['/home'] ? rooms['/home'].length : 0,
topics: 0,
category: 0
},
topics: {}
};
var scores = {},
topTenTopics = [],
tid;
for (var room in rooms) {
if (rooms.hasOwnProperty(room)) {
if (tid = room.match(/^\/topic_(\d+)/)) {
var length = rooms[room].length;
socketData.users.topics += length;
if (scores[length]) {
scores[length].push(tid[1]);
} else {
scores[length] = [tid[1]];
}
} else if (room.match(/^\/category/)) {
socketData.users.category += rooms[room].length;
}
} else if (room.match(/^\/category/)) {
socketData.users.category += rooms[room].length;
}
}
}
var scoreKeys = Object.keys(scores),
mostActive = scoreKeys.sort();
var scoreKeys = Object.keys(scores),
mostActive = scoreKeys.sort();
while(topTenTopics.length < 10 && mostActive.length > 0) {
topTenTopics = topTenTopics.concat(scores[mostActive.pop()]);
}
while(topTenTopics.length < 10 && mostActive.length > 0) {
topTenTopics = topTenTopics.concat(scores[mostActive.pop()]);
}
topTenTopics = topTenTopics.slice(0, 10);
topTenTopics = topTenTopics.slice(0, 10);
topics.getTopicsFields(topTenTopics, ['title'], function(err, titles) {
if (err) {
return callback(err);
}
topTenTopics.forEach(function(tid, id) {
socketData.topics[tid] = {
value: rooms['/topic_' + tid].length,
title: validator.escape(titles[id].title)
};
topics.getTopicsFields(topTenTopics, ['title'], function(err, titles) {
if (err) {
return callback(err);
}
topTenTopics.forEach(function(tid, id) {
socketData.topics[tid] = {
value: rooms['/topic_' + tid].length,
title: validator.escape(titles[id].title)
};
});
callback(null, socketData);
});
callback(null, socketData);
});
};

@ -46,21 +46,24 @@ SocketPosts.reply = function(socket, data, callback) {
socket.emit('event:new_post', result);
var uids = websockets.getConnectedClients();
privileges.categories.filterUids('read', postData.topic.cid, uids, function(err, uids) {
user.getUidsFromSet('users:online', 0, -1, function(err, uids) {
if (err) {
return;
}
privileges.categories.filterUids('read', postData.topic.cid, uids, function(err, uids) {
if (err) {
return;
}
plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newPost"}, function(err, data) {
uids = data.uidsTo;
plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newPost"}, function(err, data) {
uids = data.uidsTo;
for(var i=0; i<uids.length; ++i) {
if (parseInt(uids[i], 10) !== socket.uid) {
websockets.in('uid_' + uids[i]).emit('event:new_post', result);
for(var i=0; i<uids.length; ++i) {
if (parseInt(uids[i], 10) !== socket.uid) {
websockets.in('uid_' + uids[i]).emit('event:new_post', result);
}
}
}
});
});
});
});

@ -41,22 +41,26 @@ SocketTopics.post = function(socket, data, callback) {
socket.emit('event:new_post', {posts: [result.postData]});
socket.emit('event:new_topic', result.topicData);
var uids = websockets.getConnectedClients();
privileges.categories.filterUids('read', result.topicData.cid, uids, function(err, uids) {
user.getUidsFromSet('users:online', 0, -1, function(err, uids) {
if (err) {
return;
}
plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newTopic"}, function(err, data) {
uids = data.uidsTo;
privileges.categories.filterUids('read', result.topicData.cid, uids, function(err, uids) {
if (err) {
return;
}
plugins.fireHook('filter:sockets.sendNewPostToUids', {uidsTo: uids, uidFrom: data.uid, type: "newTopic"}, function(err, data) {
uids = data.uidsTo;
for(var i=0; i<uids.length; ++i) {
if (parseInt(uids[i], 10) !== socket.uid) {
websockets.in('uid_' + uids[i]).emit('event:new_post', {posts: [result.postData]});
websockets.in('uid_' + uids[i]).emit('event:new_topic', result.topicData);
for(var i=0; i<uids.length; ++i) {
if (parseInt(uids[i], 10) !== socket.uid) {
websockets.in('uid_' + uids[i]).emit('event:new_post', {posts: [result.postData]});
websockets.in('uid_' + uids[i]).emit('event:new_topic', result.topicData);
}
}
}
});
});
});
});

@ -137,11 +137,14 @@ var async = require('async'),
User.updateLastOnlineTime = function(uid, callback) {
callback = callback || function() {};
User.getUserFields(uid, ['status', 'lastonline'], function(err, userData) {
if(err || userData.status === 'offline' || Date.now() - parseInt(userData.lastonline, 10) < 300000) {
var now = Date.now();
if(err || userData.status === 'offline' || now - parseInt(userData.lastonline, 10) < 300000) {
return callback(err);
}
User.setUserField(uid, 'lastonline', Date.now(), callback);
db.sortedSetAdd('users:online', now, uid);
User.setUserField(uid, 'lastonline', now, callback);
});
};
@ -184,15 +187,20 @@ var async = require('async'),
});
};
User.getUidsFromSet = function(set, start, stop, callback) {
if (set === 'users:online') {
var count = parseInt(stop, 10) === -1 ? stop : stop - start + 1;
var now = Date.now();
db.getSortedSetRevRangeByScore(set, start, count, now, now - 300000, next);
} else {
db.getSortedSetRevRange(set, start, stop, next);
}
};
User.getUsersFromSet = function(set, start, stop, callback) {
async.waterfall([
function(next) {
if (set === 'users:online') {
var uids = require('./socket.io').getConnectedClients();
next(null, uids.slice(start, stop + 1));
} else {
db.getSortedSetRevRange(set, start, stop, next);
}
User.getUidsFromSet(set, start, stop, next);
},
function(uids, next) {
User.getUsers(uids, next);

Loading…
Cancel
Save