From adb8f80b8653071214f2233d48e4edc83b065e60 Mon Sep 17 00:00:00 2001 From: Julian Lam Date: Thu, 9 Jan 2014 20:13:17 -0500 Subject: [PATCH] interim commit - removed calls to websockets.js, beginning porting to namespaced files --- app.js | 11 +- package.json | 3 +- src/favourites.js | 4 +- src/notifications.js | 6 +- src/postTools.js | 83 +++++----- src/routes/authentication.js | 4 +- src/routes/user.js | 5 +- src/socket.io/index.js | 297 +++++++++++++++++++++++++++++++++++ src/threadTools.js | 2 +- src/topics.js | 7 +- src/user.js | 5 +- src/websockets.js | 248 ++--------------------------- 12 files changed, 376 insertions(+), 299 deletions(-) create mode 100644 src/socket.io/index.js diff --git a/app.js b/app.js index 760a5d4b74..d2312fa494 100644 --- a/app.js +++ b/app.js @@ -94,9 +94,11 @@ var templates = require('./public/src/templates'), translator = require('./public/src/translator'), - webserver = require('./src/webserver'), - SocketIO = require('socket.io').listen(global.server, { log: false, transports: ['websocket', 'xhr-polling', 'jsonp-polling', 'flashsocket'], 'browser client minification': true}), - websockets = require('./src/websockets'), + webserver = require('./src/webserver'); + console.log('here now'); + // SocketIO = require('socket.io').listen(global.server, { log: false, transports: ['websocket', 'xhr-polling', 'jsonp-polling', 'flashsocket'], 'browser client minification': true}), + // websockets = require('./src/websockets'), + var sockets = require('./src/socket.io'), plugins = require('./src/plugins'), notifications = require('./src/notifications'), upgrade = require('./src/upgrade'); @@ -105,7 +107,8 @@ upgrade.check(function(schema_ok) { if (schema_ok || nconf.get('check-schema') === false) { - websockets.init(SocketIO); + // websockets.init(SocketIO); + sockets.init(); plugins.init(); global.templates = {}; diff --git a/package.json b/package.json index e268311415..06021b9d5f 100644 --- a/package.json +++ b/package.json @@ -45,7 +45,8 @@ "cron": "~1.0.1", "semver": "~2.2.1", "string": "~1.7.0", - "xregexp": "~2.0.0" + "xregexp": "~2.0.0", + "socket.io-wildcard": "~0.1.1" }, "optionalDependencies": { "redis": "0.8.3", diff --git a/src/favourites.js b/src/favourites.js index acf6e35d60..cf19918d94 100644 --- a/src/favourites.js +++ b/src/favourites.js @@ -1,13 +1,13 @@ var db = require('./database'), posts = require('./posts'), user = require('./user'), - websockets = require('./websockets') translator = require('./../public/src/translator'); (function (Favourites) { "use strict"; Favourites.favourite = function (pid, room_id, uid, socket) { + var websockets = require('./socket.io'); if (uid === 0) { @@ -56,6 +56,8 @@ var db = require('./database'), }; Favourites.unfavourite = function (pid, room_id, uid, socket) { + var websockets = require('./socket.io'); + if (uid === 0) { return; } diff --git a/src/notifications.js b/src/notifications.js index fcc1d46c97..bbbe3710a9 100644 --- a/src/notifications.js +++ b/src/notifications.js @@ -3,10 +3,7 @@ var async = require('async'), cron = require('cron').CronJob, db = require('./database'), - utils = require('../public/src/utils'), - websockets = require('./websockets'); - - + utils = require('../public/src/utils'); (function(Notifications) { "use strict"; @@ -71,6 +68,7 @@ var async = require('async'), } Notifications.push = function(nid, uids, callback) { + var websockets = require('./socket.io'); if (!Array.isArray(uids)) { uids = [uids]; } diff --git a/src/postTools.js b/src/postTools.js index 0f8dd73f89..9d94eecbe6 100644 --- a/src/postTools.js +++ b/src/postTools.js @@ -8,7 +8,6 @@ var winston = require('winston'), topics = require('./topics'), threadTools = require('./threadTools'), user = require('./user'), - websockets = require('./websockets'), utils = require('../public/src/utils'), plugins = require('./plugins'), events = require('./events'), @@ -70,55 +69,55 @@ var winston = require('winston'), PostTools.edit = function(uid, pid, title, content) { + var websockets = require('./socket.io'), + success = function() { + posts.setPostFields(pid, { + edited: Date.now(), + editor: uid, + content: content + }); - var success = function() { - posts.setPostFields(pid, { - edited: Date.now(), - editor: uid, - content: content - }); - - events.logPostEdit(uid, pid); - - db.searchRemove('post', pid, function() { - db.searchIndex('post', content, pid); - }); - - async.parallel([ - function(next) { - posts.getPostField(pid, 'tid', function(err, tid) { - PostTools.isMain(pid, tid, function(err, isMainPost) { - if (isMainPost) { - title = title.trim(); - var slug = tid + '/' + utils.slugify(title); + events.logPostEdit(uid, pid); - topics.setTopicField(tid, 'title', title); - topics.setTopicField(tid, 'slug', slug); + db.searchRemove('post', pid, function() { + db.searchIndex('post', content, pid); + }); - db.searchRemove('topic', tid, function() { - db.searchIndex('topic', title, tid); + async.parallel([ + function(next) { + posts.getPostField(pid, 'tid', function(err, tid) { + PostTools.isMain(pid, tid, function(err, isMainPost) { + if (isMainPost) { + title = title.trim(); + var slug = tid + '/' + utils.slugify(title); + + topics.setTopicField(tid, 'title', title); + topics.setTopicField(tid, 'slug', slug); + + db.searchRemove('topic', tid, function() { + db.searchIndex('topic', title, tid); + }); + } + + next(null, { + tid: tid, + isMainPost: isMainPost }); - } - - next(null, { - tid: tid, - isMainPost: isMainPost }); }); + }, + function(next) { + PostTools.parse(content, next); + } + ], function(err, results) { + websockets.in('topic_' + results[0].tid).emit('event:post_edited', { + pid: pid, + title: validator.sanitize(title).escape(), + isMainPost: results[0].isMainPost, + content: results[1] }); - }, - function(next) { - PostTools.parse(content, next); - } - ], function(err, results) { - websockets.in('topic_' + results[0].tid).emit('event:post_edited', { - pid: pid, - title: validator.sanitize(title).escape(), - isMainPost: results[0].isMainPost, - content: results[1] }); - }); - }; + }; PostTools.privileges(pid, uid, function(privileges) { if (privileges.editable) { diff --git a/src/routes/authentication.js b/src/routes/authentication.js index 22039d0b1d..ea6228717a 100644 --- a/src/routes/authentication.js +++ b/src/routes/authentication.js @@ -131,7 +131,7 @@ if (req.user && req.user.uid > 0) { winston.info('[Auth] Session ' + req.sessionID + ' logout (uid: ' + req.user.uid + ')'); - var ws = require('./../websockets'); + var ws = require('../socket.io'); ws.logoutUser(req.user.uid); req.logout(); @@ -203,7 +203,7 @@ uid: uid }, function() { - require('./../websockets').emitUserCount(); + require('../socket.io').emitUserCount(); if(req.body.referrer) res.redirect(req.body.referrer); diff --git a/src/routes/user.js b/src/routes/user.js index d279de0870..35ecc00857 100644 --- a/src/routes/user.js +++ b/src/routes/user.js @@ -9,8 +9,7 @@ var fs = require('fs'), postTools = require('../postTools'), utils = require('./../../public/src/utils'), meta = require('./../meta'), - db = require('./../database'), - websockets = require('./../websockets'); + db = require('./../database'); (function (User) { User.createRoutes = function (app) { @@ -485,6 +484,8 @@ var fs = require('fs'), } function getOnlineUsers(req, res) { + var websockets = require('../socket.io'); + user.getUsers('users:online', 0, 49, function (err, data) { var onlineUsers = []; diff --git a/src/socket.io/index.js b/src/socket.io/index.js new file mode 100644 index 0000000000..93d6245532 --- /dev/null +++ b/src/socket.io/index.js @@ -0,0 +1,297 @@ +var SocketIO = require('socket.io'), + socketioWildcard = require('socket.io-wildcard'), + util = require('util'), + async = require('async'), + fs = require('fs'), + nconf = require('nconf'), + express = require('express'), + socketCookieParser = express.cookieParser(nconf.get('secret')), + winston = require('winston'), + + db = require('../database'), + user = require('../user'), + topics = require('../topics'), + logger = require('../logger'), + + Sockets = {}, + Namespaces = {}; + +/* === */ + +var users = {}, + userSockets = {}, + rooms = {}, + io; + +Sockets.init = function() { + io = socketioWildcard(SocketIO).listen(global.server, { + log: false, + transports: ['websocket', 'xhr-polling', 'jsonp-polling', 'flashsocket'], + 'browser client minification': true + }); + + io.sockets.on('connection', function(socket) { + var hs = socket.handshake, + sessionID, uid, lastPostTime = 0; + + // Validate the session, if present + socketCookieParser(hs, {}, function(err) { + sessionID = socket.handshake.signedCookies["express.sid"]; + db.sessionStore.get(sessionID, function(err, sessionData) { + if (!err && sessionData && sessionData.passport && sessionData.passport.user) { + uid = users[sessionID] = sessionData.passport.user; + } else { + uid = users[sessionID] = 0; + } + + userSockets[uid] = userSockets[uid] || []; + userSockets[uid].push(socket); + + /* Need to save some state for the logger & maybe some other modules later on */ + socket.state = { + user : { + uid : uid + } + }; + + /* If meta.config.loggerIOStatus > 0, logger.io_one will hook into this socket */ + logger.io_one(socket,uid); + + if (uid) { + + db.sortedSetAdd('users:online', Date.now(), uid, function(err, data) { + socket.join('uid_' + uid); + + user.getUserField(uid, 'username', function(err, username) { + socket.emit('event:connect', { + status: 1, + username: username, + uid: uid + }); + }); + }); + } + + io.sockets.in('global').emit('api:user.isOnline', isUserOnline(uid)); + }); + }); + + socket.on('disconnect', function() { + + var index = (userSockets[uid] || []).indexOf(socket); + if (index !== -1) { + userSockets[uid].splice(index, 1); + } + + if (userSockets[uid] && userSockets[uid].length === 0) { + delete users[sessionID]; + delete userSockets[uid]; + if (uid) { + db.sortedSetRemove('users:online', uid, function(err, data) { + }); + } + } + + io.sockets.in('global').emit('api:user.isOnline', isUserOnline(uid)); + + emitOnlineUserCount(); + + for (var roomName in rooms) { + + socket.leave(roomName); + + if (rooms[roomName][socket.id]) { + delete rooms[roomName][socket.id]; + } + + updateRoomBrowsingText(roomName); + } + }); + + socket.on('reconnected', function() { + if (uid) { + topics.pushUnreadCount(uid); + user.pushNotifCount(uid); + } + + if (process.env.NODE_ENV === 'development') { + if (uid) { + winston.info('[socket] uid ' + uid + ' (' + sessionID + ') has successfully reconnected.'); + } else { + winston.info('[socket] An anonymous user (' + sessionID + ') has successfully reconnected.'); + } + } + }); + + socket.on('*', function(payload) { + // Ignore all non-api messages + if (payload.name.substr(0, 4) !== 'api:') { + return; + } else { + // Deconstruct the message + var parts = payload.name.split('.'), + namespace = parts[0], + command = parts[1], + subcommand = parts[2], // MUST ADD RECURSION (:P) + executeHandler = function(args) { + if (!subcommand) { + Namespaces[namespace][command](args); + } else { + Namespaces[namespace][command][subcommand](args); + } + }; + + if (Namespaces[namespace]) { + executeHandler(payload.args); + } else { + fs.exists(path.join(__dirname, namespace + '.js'), function(exists) { + if (exists) { + Namespaces[namespace] = require('./' + namespace); + executeHandler(payload.args); + } else { + winston.warn('[socket.io] Unrecognized message: ' + payload.name); + } + }) + } + } + console.log('message!', arguments); + }); + }); +} + +Sockets.logoutUser = function(uid) { + if(userSockets[uid] && userSockets[uid].length) { + for(var i=0; i< userSockets[uid].length; ++i) { + userSockets[uid][i].emit('event:disconnect'); + userSockets[uid][i].disconnect(); + + if(!userSockets[uid]) { + return; + } + } + } +} + +Sockets.emitUserCount = function() { + db.getObjectField('global', 'userCount', function(err, count) { + io.sockets.emit('user.count', { + count: count + }); + }); +}; + +Sockets.in = function(room) { + return io.sockets.in(room); +}; + +Sockets.getConnectedClients = function() { + return userSockets; +} + +Sockets.getOnlineAnonCount = function () { + return userSockets[0] ? userSockets[0].length : 0; +}; + +/* Helpers */ + +function isUserOnline(uid) { + return !!userSockets[uid] && userSockets[uid].length > 0; +} +Sockets.isUserOnline = isUserOnline; + +function updateRoomBrowsingText(roomName) { + + function getUidsInRoom(room) { + var uids = []; + for (var socketId in room) { + if (uids.indexOf(room[socketId]) === -1) + uids.push(room[socketId]); + } + return uids; + } + + function getAnonymousCount(roomName) { + var clients = io.sockets.clients(roomName); + var anonCount = 0; + + for (var i = 0; i < clients.length; ++i) { + var hs = clients[i].handshake; + if (hs && clients[i].state && clients[i].state.user.uid === 0) { + ++anonCount; + } + } + return anonCount; + } + + var uids = getUidsInRoom(rooms[roomName]), + anonymousCount = getAnonymousCount(roomName); + + if (uids.length === 0) { + io.sockets.in(roomName).emit('api:get_users_in_room', { users: [], anonymousCount: anonymousCount }); + } else { + user.getMultipleUserFields(uids, ['uid', 'username', 'userslug', 'picture'], function(err, users) { + if(!err) + io.sockets.in(roomName).emit('api:get_users_in_room', { users: users, anonymousCount: anonymousCount }); + }); + } +} + +function emitTopicPostStats() { + db.getObjectFields('global', ['topicCount', 'postCount'], function(err, data) { + if (err) { + return winston.err(err); + } + + var stats = { + topics: data.topicCount ? data.topicCount : 0, + posts: data.postCount ? data.postCount : 0 + }; + + io.sockets.emit('post.stats', stats); + }); +} + +function emitOnlineUserCount() { + var anon = userSockets[0] ? userSockets[0].length : 0; + var registered = Object.keys(userSockets).length; + if (anon) + registered = registered - 1; + + var returnObj = { + users: registered + anon, + anon: anon + }; + io.sockets.emit('api:user.active.get', returnObj) +} + +function emitAlert(socket, title, message) { + socket.emit('event:alert', { + type: 'danger', + timeout: 2000, + title: title, + message: message, + alert_id: 'post_error' + }); +} + +function emitContentTooShortAlert(socket) { + socket.emit('event:alert', { + type: 'danger', + timeout: 2000, + title: 'Content too short', + message: "Please enter a longer post. At least " + meta.config.minimumPostLength + " characters.", + alert_id: 'post_error' + }); +} + +function emitTooManyPostsAlert(socket) { + socket.emit('event:alert', { + title: 'Too many posts!', + message: 'You can only post every ' + meta.config.postDelay + ' seconds.', + type: 'danger', + timeout: 2000 + }); +} + +/* Exporting */ +module.exports = Sockets; \ No newline at end of file diff --git a/src/threadTools.js b/src/threadTools.js index 6d4388fcb0..a1afd0f915 100644 --- a/src/threadTools.js +++ b/src/threadTools.js @@ -10,7 +10,7 @@ var winston = require('winston'), notifications = require('./notifications'), posts = require('./posts'), meta = require('./meta'), - websockets = require('./websockets'), + websockets = require('./socket.io'), events = require('./events'); diff --git a/src/topics.js b/src/topics.js index 85747dd4af..37a997d6ad 100644 --- a/src/topics.js +++ b/src/topics.js @@ -17,10 +17,7 @@ var async = require('async'), notifications = require('./notifications'), feed = require('./feed'), favourites = require('./favourites'), - meta = require('./meta') - - websockets = require('./websockets'); - + meta = require('./meta'); (function(Topics) { @@ -589,6 +586,8 @@ var async = require('async'), }; Topics.pushUnreadCount = function(uids, callback) { + var websockets = require('./socket.io'); + if (!uids) { clients = websockets.getConnectedClients(); uids = Object.keys(clients); diff --git a/src/user.js b/src/user.js index 8e604a2440..e712c35e30 100644 --- a/src/user.js +++ b/src/user.js @@ -15,8 +15,7 @@ var bcrypt = require('bcrypt'), notifications = require('./notifications'), topics = require('./topics'), events = require('./events'), - Emailer = require('./emailer'), - websockets = require('./websockets'); + Emailer = require('./emailer'); (function(User) { 'use strict'; @@ -888,6 +887,8 @@ var bcrypt = require('bcrypt'), }; User.pushNotifCount = function(uid) { + var websockets = require('./socket.io'); + User.notifications.getUnreadCount(uid, function(err, count) { if (!err) { websockets.in('uid_' + uid).emit('event:notifications.updateCount', count); diff --git a/src/websockets.js b/src/websockets.js index a2a320f969..24a9411093 100644 --- a/src/websockets.js +++ b/src/websockets.js @@ -1,23 +1,21 @@ +console.log('HEY NIB, I STILL GOT CALLED'); 'use strict'; var cookie = require('cookie'), - express = require('express'), - util = require('util'), - async = require('async'), - fs = require('fs'), - nconf = require('nconf'), + + gravatar = require('gravatar'), - winston = require('winston'), + S = require('string'), - db = require('./database'), - user = require('./user'), + + groups = require('./groups'), posts = require('./posts'), favourites = require('./favourites'), utils = require('../public/src/utils'), - topics = require('./topics'), + categories = require('./categories'), CategoryTools = require('./categoryTools'), notifications = require('./notifications'), @@ -25,8 +23,8 @@ var cookie = require('cookie'), postTools = require('./postTools'), Messaging = require('./messaging'), meta = require('./meta'), - logger = require('./logger'), - socketCookieParser = express.cookieParser(nconf.get('secret')), + + admin = { 'categories': require('./admin/categories'), 'user': require('./admin/user') @@ -35,163 +33,12 @@ var cookie = require('cookie'), (function(websockets) { -var users = {}, - userSockets = {}, - rooms = {}; - -websockets.logoutUser = function(uid) { - if(userSockets[uid] && userSockets[uid].length) { - for(var i=0; i< userSockets[uid].length; ++i) { - userSockets[uid][i].emit('event:disconnect'); - userSockets[uid][i].disconnect(); - - if(!userSockets[uid]) { - return; - } - } - } -} - -function isUserOnline(uid) { - return !!userSockets[uid] && userSockets[uid].length > 0; -} -websockets.isUserOnline = isUserOnline; - websockets.init = function(io) { - io.sockets.on('connection', function(socket) { - var hs = socket.handshake, - sessionID, uid, lastPostTime = 0; - - // Validate the session, if present - socketCookieParser(hs, {}, function(err) { - sessionID = socket.handshake.signedCookies["express.sid"]; - db.sessionStore.get(sessionID, function(err, sessionData) { - if (!err && sessionData && sessionData.passport && sessionData.passport.user) { - uid = users[sessionID] = sessionData.passport.user; - } else { - uid = users[sessionID] = 0; - } - - userSockets[uid] = userSockets[uid] || []; - userSockets[uid].push(socket); - - /* Need to save some state for the logger & maybe some other modules later on */ - socket.state = { - user : { - uid : uid - } - }; - - /* If meta.config.loggerIOStatus > 0, logger.io_one will hook into this socket */ - logger.io_one(socket,uid); - - if (uid) { - - db.sortedSetAdd('users:online', Date.now(), uid, function(err, data) { - socket.join('uid_' + uid); - - user.getUserField(uid, 'username', function(err, username) { - socket.emit('event:connect', { - status: 1, - username: username, - uid: uid - }); - }); - }); - } - - io.sockets.in('global').emit('api:user.isOnline', isUserOnline(uid)); - }); - }); - - socket.on('disconnect', function() { - - var index = (userSockets[uid] || []).indexOf(socket); - if (index !== -1) { - userSockets[uid].splice(index, 1); - } - - if (userSockets[uid] && userSockets[uid].length === 0) { - delete users[sessionID]; - delete userSockets[uid]; - if (uid) { - db.sortedSetRemove('users:online', uid, function(err, data) { - }); - } - } - - io.sockets.in('global').emit('api:user.isOnline', isUserOnline(uid)); - - emitOnlineUserCount(); - - for (var roomName in rooms) { - - socket.leave(roomName); - - if (rooms[roomName][socket.id]) { - delete rooms[roomName][socket.id]; - } - - updateRoomBrowsingText(roomName); - } - }); - - socket.on('reconnected', function() { - if (uid) { - topics.pushUnreadCount(uid); - user.pushNotifCount(uid); - } - - if (process.env.NODE_ENV === 'development') { - if (uid) { - winston.info('[socket] uid ' + uid + ' (' + sessionID + ') has successfully reconnected.'); - } else { - winston.info('[socket] An anonymous user (' + sessionID + ') has successfully reconnected.'); - } - } - }); socket.on('api:get_all_rooms', function(data) { socket.emit('api:get_all_rooms', io.sockets.manager.rooms); }); - function updateRoomBrowsingText(roomName) { - - function getUidsInRoom(room) { - var uids = []; - for (var socketId in room) { - if (uids.indexOf(room[socketId]) === -1) - uids.push(room[socketId]); - } - return uids; - } - - function getAnonymousCount(roomName) { - var clients = io.sockets.clients(roomName); - var anonCount = 0; - - for (var i = 0; i < clients.length; ++i) { - var hs = clients[i].handshake; - if (hs && clients[i].state && clients[i].state.user.uid === 0) { - ++anonCount; - } - } - return anonCount; - } - - var uids = getUidsInRoom(rooms[roomName]), - anonymousCount = getAnonymousCount(roomName); - - if (uids.length === 0) { - io.sockets.in(roomName).emit('api:get_users_in_room', { users: [], anonymousCount: anonymousCount }); - } else { - user.getMultipleUserFields(uids, ['uid', 'username', 'userslug', 'picture'], function(err, users) { - if(!err) - io.sockets.in(roomName).emit('api:get_users_in_room', { users: users, anonymousCount: anonymousCount }); - }); - } - } - socket.on('event:enter_room', function(data) { if (data.leave !== null) { @@ -362,34 +209,7 @@ websockets.init = function(io) { } }); - function emitAlert(socket, title, message) { - socket.emit('event:alert', { - type: 'danger', - timeout: 2000, - title: title, - message: message, - alert_id: 'post_error' - }); - } - - function emitContentTooShortAlert(socket) { - socket.emit('event:alert', { - type: 'danger', - timeout: 2000, - title: 'Content too short', - message: "Please enter a longer post. At least " + meta.config.minimumPostLength + " characters.", - alert_id: 'post_error' - }); - } - - function emitTooManyPostsAlert(socket) { - socket.emit('event:alert', { - title: 'Too many posts!', - message: 'You can only post every ' + meta.config.postDelay + ' seconds.', - type: 'danger', - timeout: 2000 - }); - } + socket.on('api:topics.post', function(data) { if (uid < 1 && parseInt(meta.config.allowGuestPosting, 10) === 0) { @@ -530,22 +350,9 @@ websockets.init = function(io) { callback(websockets.getOnlineAnonCount()); }); - websockets.getOnlineAnonCount = function () { - return userSockets[0] ? userSockets[0].length : 0; - }; - function emitOnlineUserCount() { - var anon = userSockets[0] ? userSockets[0].length : 0; - var registered = Object.keys(userSockets).length; - if (anon) - registered = registered - 1; - var returnObj = { - users: registered + anon, - anon: anon - }; - io.sockets.emit('api:user.active.get', returnObj) - } + socket.on('api:user.active.get', function() { emitOnlineUserCount(); @@ -1233,40 +1040,9 @@ websockets.init = function(io) { }); socket.on('api:admin.theme.set', meta.themes.set); - }); - - - function emitTopicPostStats() { - db.getObjectFields('global', ['topicCount', 'postCount'], function(err, data) { - if (err) { - return winston.err(err); - } - - var stats = { - topics: data.topicCount ? data.topicCount : 0, - posts: data.postCount ? data.postCount : 0 - }; - - io.sockets.emit('post.stats', stats); - }); - } - - websockets.emitUserCount = function() { - db.getObjectField('global', 'userCount', function(err, count) { - io.sockets.emit('user.count', { - count: count - }); - }); - }; - - websockets.in = function(room) { - return io.sockets.in(room); - }; } - websockets.getConnectedClients = function() { - return userSockets; - } + })(module.exports);