@ -10,209 +10,224 @@ var winston = require('winston');
var db = require ( '../database' ) ;
var logger = require ( '../logger' ) ;
var ratelimit = require ( '../middleware/ratelimit' ) ;
var cls = require ( '../middleware/cls' ) ;
var Sockets = { } ;
var Namespaces = { } ;
( function ( Sockets ) {
var Namespaces = { } ;
var io ;
var io ;
Sockets . init = function ( server ) {
requireModules ( ) ;
Sockets . init = function ( server ) {
requireModules ( ) ;
io = new SocketIO ( {
path : nconf . get ( 'relative_path' ) + '/socket.io'
} ) ;
io = new SocketIO ( {
path : nconf . get ( 'relative_path' ) + '/socket.io'
} ) ;
addRedisAdapter ( io ) ;
addRedisAdapter ( io ) ;
io . use ( socketioWildcard ) ;
io . use ( authorize ) ;
io . use ( socketioWildcard ) ;
io . use ( authorize ) ;
io . on ( 'connection' , onConnection ) ;
io . on ( 'disconnect' , onDisconnect ) ;
io . on ( 'connection' , onConnection ) ;
io . listen ( server , {
transports : nconf . get ( 'socket.io:transports' )
} ) ;
io . listen ( server , {
transports : nconf . get ( 'socket.io:transports' )
} ) ;
Sockets . server = io ;
} ;
function onConnection ( socket ) {
socket . ip = socket . request . headers [ 'x-forwarded-for' ] || socket . request . connection . remoteAddress ;
Sockets . server = io ;
} ;
logger . io _one ( socket , socket . uid ) ;
function onConnection ( socket ) {
socket . ip = socket . request . headers [ 'x-forwarded-for' ] || socket . request . connection . remoteAddress ;
onConnect ( socket ) ;
logger . io _one ( socket , socket . uid ) ;
socket . on ( '*' , function ( payload ) {
onMessage ( socket , payload ) ;
} ) ;
}
cls . socket ( socket , null , 'connection' , function ( ) {
onConnect ( socket ) ;
} ) ;
function onConnect ( socket ) {
if ( socket . uid ) {
socket . join ( 'uid_' + socket . uid ) ;
socket . join ( 'online_users' ) ;
} else {
socket . join ( 'online_guests' ) ;
socket . on ( '*' , function ( payload ) {
cls . socket ( socket , payload , null , function ( ) {
onMessage ( socket , payload ) ;
} ) ;
} ) ;
}
}
function onMessage ( socket , payload ) {
if ( ! payload . data . length ) {
return winston . warn ( '[socket.io] Empty payload' ) ;
function onConnect ( socket ) {
if ( socket . uid ) {
socket . join ( 'uid_' + socket . uid ) ;
socket . join ( 'online_users' ) ;
} else {
socket . join ( 'online_guests' ) ;
}
}
var eventName = payload . data [ 0 ] ;
var params = payload . data [ 1 ] ;
var callback = typeof payload . data [ payload . data . length - 1 ] === 'function' ? payload . data [ payload . data . length - 1 ] : function ( ) { } ;
if ( ! eventName ) {
return winston . warn ( '[socket.io] Empty method name' ) ;
function onDisconnect ( socket ) {
cls . socket ( socket , null , 'disconnect' , function ( ) {
} ) ;
}
var parts = eventName . toString ( ) . split ( '.' ) ;
var namespace = parts [ 0 ] ;
var methodToCall = parts . reduce ( function ( prev , cur ) {
if ( prev !== null && prev [ cur ] ) {
return prev [ cur ] ;
} else {
return null ;
}
} , Namespaces ) ;
if ( ! methodToCall ) {
if ( process . env . NODE _ENV === 'development' ) {
winston . warn ( '[socket.io] Unrecognized message: ' + eventName ) ;
function onMessage ( socket , payload ) {
if ( ! payload . data . length ) {
return winston . warn ( '[socket.io] Empty payload' ) ;
}
return ;
}
socket . previousEvents = socket . previousEvents || [ ] ;
socket . previousEvents . push ( eventName ) ;
if ( socket . previousEvents . length > 20 ) {
socket . previousEvents . shift ( ) ;
}
var eventName = payload . data [ 0 ] ;
var params = payload . data [ 1 ] ;
var callback = typeof payload . data [ payload . data . length - 1 ] === 'function' ? payload . data [ payload . data . length - 1 ] : function ( ) {
} ;
if ( ! eventName . startsWith ( 'admin.' ) && ratelimit . isFlooding ( socket ) ) {
winston . warn ( '[socket.io] Too many emits! Disconnecting uid : ' + socket . uid + '. Events : ' + socket . previousEvents ) ;
return socket . disconnect ( ) ;
}
if ( ! eventName ) {
return winston . warn ( '[socket.io] Empty method name' ) ;
}
async . waterfall ( [
function ( next ) {
validateSession ( socket , next ) ;
} ,
function ( next ) {
if ( Namespaces [ namespace ] . before ) {
Namespaces [ namespace ] . before ( socket , eventName , params , next ) ;
var parts = eventName . toString ( ) . split ( '.' ) ;
var namespace = parts [ 0 ] ;
var methodToCall = parts . reduce ( function ( prev , cur ) {
if ( prev !== null && prev [ cur ] ) {
return prev [ cur ] ;
} else {
next ( ) ;
return null ;
}
} , Namespaces ) ;
if ( ! methodToCall ) {
if ( process . env . NODE _ENV === 'development' ) {
winston . warn ( '[socket.io] Unrecognized message: ' + eventName ) ;
}
} ,
function ( next ) {
methodToCall ( socket , params , next ) ;
return ;
}
] , function ( err , result ) {
callback ( err ? { message : err . message } : null , result ) ;
} ) ;
}
function requireModules ( ) {
var modules = [ 'admin' , 'categories' , 'groups' , 'meta' , 'modules' ,
'notifications' , 'plugins' , 'posts' , 'topics' , 'user' , 'blacklist'
] ;
modules . forEach ( function ( module ) {
Namespaces [ module ] = require ( './' + module ) ;
} ) ;
}
function validateSession ( socket , callback ) {
var req = socket . request ;
if ( ! req . signedCookies || ! req . signedCookies [ 'express.sid' ] ) {
return callback ( new Error ( '[[error:invalid-session]]' ) ) ;
}
db . sessionStore . get ( req . signedCookies [ 'express.sid' ] , function ( err , sessionData ) {
if ( err || ! sessionData ) {
return callback ( err || new Error ( '[[error:invalid-session]]' ) ) ;
socket . previousEvents = socket . previousEvents || [ ] ;
socket . previousEvents . push ( eventName ) ;
if ( socket . previousEvents . length > 20 ) {
socket . previousEvents . shift ( ) ;
}
if ( ! eventName . startsWith ( 'admin.' ) && ratelimit . isFlooding ( socket ) ) {
winston . warn ( '[socket.io] Too many emits! Disconnecting uid : ' + socket . uid + '. Events : ' + socket . previousEvents ) ;
return socket . disconnect ( ) ;
}
callback ( ) ;
} ) ;
}
async . waterfall ( [
function ( next ) {
validateSession ( socket , next ) ;
} ,
function ( next ) {
if ( Namespaces [ namespace ] . before ) {
Namespaces [ namespace ] . before ( socket , eventName , params , next ) ;
} else {
next ( ) ;
}
} ,
function ( next ) {
methodToCall ( socket , params , next ) ;
}
] , function ( err , result ) {
callback ( err ? { message : err . message } : null , result ) ;
} ) ;
}
function authorize ( socket , callback ) {
var request = socket . request ;
function requireModules ( ) {
var modules = [ 'admin' , 'categories' , 'groups' , 'meta' , 'modules' ,
'notifications' , 'plugins' , 'posts' , 'topics' , 'user' , 'blacklist'
] ;
if ( ! request ) {
return callback ( new Error ( '[[error:not-authorized]]' ) ) ;
modules . forEach ( function ( module ) {
Namespaces [ module ] = require ( './' + module ) ;
} ) ;
}
async . waterfall ( [
function ( next ) {
cookieParser ( request , { } , next ) ;
} ,
function ( next ) {
db . sessionStore . get ( request . signedCookies [ 'express.sid' ] , function ( err , sessionData ) {
if ( err ) {
return next ( err ) ;
}
if ( sessionData && sessionData . passport && sessionData . passport . user ) {
request . session = sessionData ;
socket . uid = parseInt ( sessionData . passport . user , 10 ) ;
} else {
socket . uid = 0 ;
}
next ( ) ;
} ) ;
function validateSession ( socket , callback ) {
var req = socket . request ;
if ( ! req . signedCookies || ! req . signedCookies [ 'express.sid' ] ) {
return callback ( new Error ( '[[error:invalid-session]]' ) ) ;
}
] , callback ) ;
}
function addRedisAdapter ( io ) {
if ( nconf . get ( 'redis' ) ) {
var redisAdapter = require ( 'socket.io-redis' ) ;
var redis = require ( '../database/redis' ) ;
var pub = redis . connect ( { return _buffers : true } ) ;
var sub = redis . connect ( { return _buffers : true } ) ;
io . adapter ( redisAdapter ( { pubClient : pub , subClient : sub } ) ) ;
} else if ( nconf . get ( 'isCluster' ) === 'true' ) {
winston . warn ( '[socket.io] Clustering detected, you are advised to configure Redis as a websocket store.' ) ;
db . sessionStore . get ( req . signedCookies [ 'express.sid' ] , function ( err , sessionData ) {
if ( err || ! sessionData ) {
return callback ( err || new Error ( '[[error:invalid-session]]' ) ) ;
}
callback ( ) ;
} ) ;
}
}
Sockets . in = function ( room ) {
return io . in ( room ) ;
} ;
function authorize ( socket , callback ) {
var request = socket . request ;
Sockets . getUserSocketCount = function ( uid ) {
if ( ! io ) {
return 0 ;
if ( ! request ) {
return callback ( new Error ( '[[error:not-authorized]]' ) ) ;
}
async . waterfall ( [
function ( next ) {
cookieParser ( request , { } , next ) ;
} ,
function ( next ) {
db . sessionStore . get ( request . signedCookies [ 'express.sid' ] , function ( err , sessionData ) {
if ( err ) {
return next ( err ) ;
}
if ( sessionData && sessionData . passport && sessionData . passport . user ) {
request . session = sessionData ;
socket . uid = parseInt ( sessionData . passport . user , 10 ) ;
} else {
socket . uid = 0 ;
}
next ( ) ;
} ) ;
}
] , callback ) ;
}
var room = io . sockets . adapter . rooms [ 'uid_' + uid ] ;
return room ? room . length : 0 ;
} ;
function addRedisAdapter ( io ) {
if ( nconf . get ( 'redis' ) ) {
var redisAdapter = require ( 'socket.io-redis' ) ;
var redis = require ( '../database/redis' ) ;
var pub = redis . connect ( { return _buffers : true } ) ;
var sub = redis . connect ( { return _buffers : true } ) ;
io . adapter ( redisAdapter ( { pubClient : pub , subClient : sub } ) ) ;
} else if ( nconf . get ( 'isCluster' ) === 'true' ) {
winston . warn ( '[socket.io] Clustering detected, you are advised to configure Redis as a websocket store.' ) ;
}
}
Sockets . in = function ( room ) {
return io . in ( room ) ;
} ;
Sockets . reqFromSocket = function ( socket ) {
var headers = socket . request . headers ;
var host = headers . host ;
var referer = headers . referer || '' ;
Sockets . getUserSocketCount = function ( uid ) {
if ( ! io ) {
return 0 ;
}
return {
ip : headers [ 'x-forwarded-for' ] || socket . ip ,
host : host ,
protocol : socket . request . connection . encrypted ? 'https' : 'http' ,
secure : ! ! socket . request . connection . encrypted ,
url : referer ,
path : referer . substr ( referer . indexOf ( host ) + host . length ) ,
headers : headers
var room = io . sockets . adapter . rooms [ 'uid_' + uid ] ;
return room ? room . length : 0 ;
} ;
} ;
module . exports = Sockets ;
Sockets . reqFromSocket = function ( socket , payload , event ) {
var headers = socket . request . headers ;
var host = headers . host ;
var referer = headers . referer || '' ;
var data = ( ( payload || { } ) . data || [ ] ) ;
return {
uid : socket . uid ,
params : data [ 1 ] ,
method : event || data [ 0 ] ,
body : payload ,
ip : headers [ 'x-forwarded-for' ] || socket . ip ,
host : host ,
protocol : socket . request . connection . encrypted ? 'https' : 'http' ,
secure : ! ! socket . request . connection . encrypted ,
url : referer ,
path : referer . substr ( referer . indexOf ( host ) + host . length ) ,
headers : headers
} ;
} ;
} ) ( exports ) ;