@ -1,16 +1,30 @@
var RDB = require ( './redis.js' ) ,
async = require ( 'async' ) ,
utils = require ( '../public/src/utils.js' ) ,
winston = require ( 'winston' ) ,
cron = require ( 'cron' ) . CronJob ,
notifications = {
init : function ( ) {
if ( process . env . NODE _ENV === 'development' ) {
winston . info ( '[notifications.init] Registering jobs.' ) ;
}
new cron ( '0 0 * * *' , notifications . prune , null , true ) ;
} ,
get : function ( nid , uid , callback ) {
RDB . multi ( )
. hmget ( 'notifications:' + nid , 'text' , 'score' , 'path' , 'datetime' , 'uniqueId' )
. zrank ( 'uid:' + uid + ':notifications:read' , nid )
. exists ( 'notifications:' + nid )
. exec ( function ( err , results ) {
var notification = results [ 0 ]
var notification = results [ 0 ] ,
readIdx = results [ 1 ] ;
if ( ! results [ 2 ] ) {
return callback ( null ) ;
}
callback ( {
nid : nid ,
text : notification [ 0 ] ,
@ -30,16 +44,32 @@ var RDB = require('./redis.js'),
* the new one put in its place .
* /
RDB . incr ( 'notifications:next_nid' , function ( err , nid ) {
RDB . sadd ( 'notifications' , nid ) ;
RDB . hmset ( 'notifications:' + nid , {
text : text || '' ,
path : path || null ,
datetime : Date . now ( ) ,
uniqueId : uniqueId || utils . generateUUID ( )
} , function ( err , status ) {
if ( ! err ) callback ( nid ) ;
if ( ! err ) {
callback ( nid ) ;
}
} ) ;
} ) ;
} ,
destroy : function ( nid ) {
var multi = RDB . multi ( ) ;
multi . del ( 'notifications:' + nid ) ;
multi . srem ( 'notifications' , nid ) ;
multi . exec ( function ( err ) {
if ( err ) {
winston . error ( 'Problem deleting expired notifications. Stack follows.' ) ;
winston . error ( err . stack ) ;
}
} ) ;
} ,
push : function ( nid , uids , callback ) {
if ( ! Array . isArray ( uids ) ) uids = [ uids ] ;
@ -48,12 +78,14 @@ var RDB = require('./redis.js'),
notifications . get ( nid , null , function ( notif _data ) {
for ( x = 0 ; x < numUids ; x ++ ) {
if ( parseInt ( uids [ x ] ) > 0 ) {
if ( parseInt ( uids [ x ] , 10 ) > 0 ) {
( function ( uid ) {
notifications . remove _by _uniqueId ( notif _data . uniqueId , uid , function ( ) {
RDB . zadd ( 'uid:' + uid + ':notifications:unread' , notif _data . datetime , nid ) ;
global . io . sockets . in ( 'uid_' + uid ) . emit ( 'event:new_notification' ) ;
if ( callback ) callback ( true ) ;
if ( callback ) {
callback ( true ) ;
}
} ) ;
} ) ( uids [ x ] ) ;
}
@ -67,13 +99,18 @@ var RDB = require('./redis.js'),
if ( nids && nids . length > 0 ) {
async . each ( nids , function ( nid , next ) {
notifications . get ( nid , uid , function ( nid _info ) {
if ( nid _info . uniqueId === uniqueId ) RDB . zrem ( 'uid:' + uid + ':notifications:unread' , nid ) ;
if ( nid _info . uniqueId === uniqueId ) {
RDB . zrem ( 'uid:' + uid + ':notifications:unread' , nid ) ;
}
next ( ) ;
} ) ;
} , function ( err ) {
next ( ) ;
} ) ;
} else next ( ) ;
} else {
next ( ) ;
}
} ) ;
} ,
function ( next ) {
@ -81,17 +118,24 @@ var RDB = require('./redis.js'),
if ( nids && nids . length > 0 ) {
async . each ( nids , function ( nid , next ) {
notifications . get ( nid , uid , function ( nid _info ) {
if ( nid _info . uniqueId === uniqueId ) RDB . zrem ( 'uid:' + uid + ':notifications:read' , nid ) ;
if ( nid _info . uniqueId === uniqueId ) {
RDB . zrem ( 'uid:' + uid + ':notifications:read' , nid ) ;
}
next ( ) ;
} ) ;
} , function ( err ) {
next ( ) ;
} ) ;
} else next ( ) ;
} else {
next ( ) ;
}
} ) ;
}
] , function ( err ) {
if ( ! err ) callback ( true ) ;
if ( ! err ) {
callback ( true ) ;
}
} ) ;
} ,
mark _read : function ( nid , uid , callback ) {
@ -99,38 +143,126 @@ var RDB = require('./redis.js'),
notifications . get ( nid , uid , function ( notif _data ) {
RDB . zrem ( 'uid:' + uid + ':notifications:unread' , nid ) ;
RDB . zadd ( 'uid:' + uid + ':notifications:read' , notif _data . datetime , nid ) ;
if ( callback ) callback ( ) ;
if ( callback ) {
callback ( ) ;
}
} ) ;
}
} ,
mark _read _multiple : function ( nids , uid , callback ) {
if ( ! Array . isArray ( nids ) && parseInt ( nids , 10 ) > 0 ) nids = [ nids ] ;
if ( ! Array . isArray ( nids ) && parseInt ( nids , 10 ) > 0 ) {
nids = [ nids ] ;
}
async . each ( nids , function ( nid , next ) {
notifications . mark _read ( nid , uid , function ( err ) {
if ( ! err ) next ( null ) ;
if ( ! err ) {
next ( null ) ;
}
} ) ;
} , function ( err ) {
if ( callback ) callback ( err ) ;
if ( callback ) {
callback ( err ) ;
}
} ) ;
} ,
mark _all _read : function ( uid , callback ) {
RDB . zrange ( 'uid:' + uid + ':notifications:unread' , 0 , 10 , function ( err , nids ) {
if ( err ) return callback ( err ) ;
if ( err ) {
return callback ( err ) ;
}
if ( nids . length > 0 ) {
notifications . mark _read _multiple ( nids , uid , function ( err ) {
callback ( err ) ;
} ) ;
} else callback ( ) ;
} else {
callback ( ) ;
}
} ) ;
} ,
prune : function ( cutoff ) {
if ( process . env . NODE _ENV === 'development' ) {
winston . info ( '[notifications.prune] Removing expired notifications from the database.' ) ;
}
var today = new Date ( ) ,
numPruned = 0 ;
if ( ! cutoff ) {
cutoff = new Date ( today . getFullYear ( ) , today . getMonth ( ) , today . getDate ( ) - 7 ) ;
}
var cutoffTime = cutoff . getTime ( ) ;
async . parallel ( {
"inboxes" : function ( next ) {
RDB . keys ( 'uid:*:notifications:unread' , next ) ;
} ,
"nids" : function ( next ) {
RDB . smembers ( 'notifications' , function ( err , nids ) {
async . filter ( nids , function ( nid , next ) {
RDB . hget ( 'notifications:' + nid , 'datetime' , function ( err , datetime ) {
if ( parseInt ( datetime , 10 ) < cutoffTime ) {
next ( true ) ;
} else {
next ( false ) ;
}
} ) ;
} , function ( expiredNids ) {
next ( null , expiredNids ) ;
} ) ;
} ) ;
}
} , function ( err , results ) {
if ( ! err ) {
var numInboxes = results . inboxes . length ,
x ;
async . eachSeries ( results . nids , function ( nid , next ) {
var multi = RDB . multi ( ) ;
for ( x = 0 ; x < numInboxes ; x ++ ) {
multi . zscore ( results . inboxes [ x ] , nid ) ;
}
multi . exec ( function ( err , results ) {
// If the notification is not present in any inbox, delete it altogether
var expired = results . every ( function ( present ) {
if ( present === null ) {
return true ;
}
} ) ;
if ( expired ) {
notifications . destroy ( nid ) ;
numPruned ++ ;
}
next ( ) ;
} ) ;
} , function ( err ) {
if ( process . env . NODE _ENV === 'development' ) {
winston . info ( '[notifications.prune] Notification pruning completed. ' + numPruned + ' expired notification' + ( numPruned !== 1 ? 's' : '' ) + ' removed.' ) ;
}
} ) ;
} else {
if ( process . env . NODE _ENV === 'development' ) {
winston . error ( '[notifications.prune] Ran into trouble pruning expired notifications. Stack trace to follow.' ) ;
winston . error ( err . stack ) ;
}
}
} ) ;
}
} ;
module . exports = {
init : notifications . init ,
get : notifications . get ,
create : notifications . create ,
push : notifications . push ,
mark _read : notifications . mark _read _multiple ,
mark _all _read : notifications . mark _all _read
}
mark _all _read : notifications . mark _all _read ,
prune : notifications . prune
} ;