first part of messaging refactor

v1.18.x
barisusakli 9 years ago
parent 8bda497b41
commit e2e547db45

@ -16,6 +16,13 @@ var db = require('./database'),
sockets = require('./socket.io'); sockets = require('./socket.io');
(function(Messaging) { (function(Messaging) {
require('./create')(Messaging);
require('./delete')(Messaging);
require('./edit')(Messaging);
require('./rooms')(Messaging);
require('./unread')(Messaging);
Messaging.notifyQueue = {}; // Only used to notify a user of a new chat message, see Messaging.notifyUser Messaging.notifyQueue = {}; // Only used to notify a user of a new chat message, see Messaging.notifyUser
var terms = { var terms = {
@ -29,140 +36,6 @@ var db = require('./database'),
return [fromuid, touid].sort(); return [fromuid, touid].sort();
} }
Messaging.addMessage = function(fromuid, touid, content, timestamp, callback) {
if (typeof timestamp === 'function') {
callback = timestamp;
timestamp = Date.now();
} else {
timestamp = timestamp || Date.now();
}
if (!content) {
return callback(new Error('[[error:invalid-chat-message]]'));
}
if (content.length > (meta.config.maximumChatMessageLength || 1000)) {
return callback(new Error('[[error:chat-message-too-long]]'));
}
var uids = sortUids(fromuid, touid);
db.incrObjectField('global', 'nextMid', function(err, mid) {
if (err) {
return callback(err);
}
var message = {
content: content,
timestamp: timestamp,
fromuid: fromuid,
touid: touid
};
async.waterfall([
function(next) {
plugins.fireHook('filter:messaging.save', message, next);
},
function(message, next) {
db.setObject('message:' + mid, message, next);
}
], function(err) {
if (err) {
return callback(err);
}
async.parallel([
async.apply(db.sortedSetAdd, 'messages:uid:' + uids[0] + ':to:' + uids[1], timestamp, mid),
async.apply(Messaging.updateChatTime, fromuid, touid),
async.apply(Messaging.updateChatTime, touid, fromuid),
async.apply(Messaging.markRead, fromuid, touid),
async.apply(Messaging.markUnread, touid, fromuid),
], function(err) {
if (err) {
return callback(err);
}
async.waterfall([
function(next) {
getMessages([mid], fromuid, touid, true, next);
},
function(messages, next) {
Messaging.isNewSet(fromuid, touid, mid, function(err, isNewSet) {
if (err) {
return next(err);
}
if (!messages || !messages[0]) {
return next(null, null);
}
messages[0].newSet = isNewSet;
messages[0].mid = mid;
next(null, messages[0]);
});
}
], callback);
});
});
});
};
Messaging.editMessage = function(mid, content, callback) {
async.series([
function(next) {
// Verify that the message actually changed
Messaging.getMessageField(mid, 'content', function(err, raw) {
if (raw === content) {
// No dice.
return callback();
}
next();
});
},
async.apply(Messaging.setMessageFields, mid, {
content: content,
edited: Date.now()
}),
function(next) {
Messaging.getMessageFields(mid, ['fromuid', 'touid'], function(err, data) {
getMessages([mid], data.fromuid, data.touid, true, function(err, messages) {
sockets.in('uid_' + data.fromuid).emit('event:chats.edit', {
messages: messages
});
sockets.in('uid_' + data.touid).emit('event:chats.edit', {
messages: messages
});
next();
});
});
}
], callback);
};
Messaging.deleteMessage = function(mid, callback) {
var uids = [];
async.series([
function(next) {
db.getObject('message:' + mid, function(err, messageObj) {
messageObj.fromuid = parseInt(messageObj.fromuid, 10);
messageObj.touid = parseInt(messageObj.touid, 10);
uids.push(messageObj.fromuid, messageObj.touid);
uids.sort(function(a, b) {
return a > b ? 1 : -1;
});
next();
});
},
function(next) {
next();
},
function(next) {
db.sortedSetRemove('messages:uid:' + uids[0] + ':to:' + uids[1], mid, next);
},
async.apply(db.delete, 'message:' + mid)
], callback);
};
Messaging.getMessageField = function(mid, field, callback) { Messaging.getMessageField = function(mid, field, callback) {
Messaging.getMessageFields(mid, [field], function(err, fields) { Messaging.getMessageFields(mid, [field], function(err, fields) {
callback(err, fields[field]); callback(err, fields[field]);
@ -208,7 +81,7 @@ var db = require('./database'),
mids.reverse(); mids.reverse();
getMessages(mids, fromuid, touid, isNew, callback); Messaging.getMessagesData(mids, fromuid, touid, isNew, callback);
}); });
if (markRead) { if (markRead) {
@ -222,7 +95,7 @@ var db = require('./database'),
} }
}; };
function getMessages(mids, fromuid, touid, isNew, callback) { Messaging.getMessagesData = function(mids, fromuid, touid, isNew, callback) {
user.getUsersFields([fromuid, touid], ['uid', 'username', 'userslug', 'picture', 'status'], function(err, userData) { user.getUsersFields([fromuid, touid], ['uid', 'username', 'userslug', 'picture', 'status'], function(err, userData) {
if(err) { if(err) {
return callback(err); return callback(err);
@ -279,7 +152,7 @@ var db = require('./database'),
next(undefined, messages); next(undefined, messages);
} else { } else {
// For single messages, we don't know the context, so look up the previous message and compare // For single messages, we don't know the context, so look up the previous message and compare
var uids = [fromuid, touid].sort(function(a, b) { return a > b ? 1 : -1 }); var uids = [fromuid, touid].sort(function(a, b) { return a > b ? 1 : -1; });
var key = 'messages:uid:' + uids[0] + ':to:' + uids[1]; var key = 'messages:uid:' + uids[0] + ':to:' + uids[1];
async.waterfall([ async.waterfall([
async.apply(db.sortedSetRank, key, messages[0].messageId), async.apply(db.sortedSetRank, key, messages[0].messageId),
@ -314,7 +187,7 @@ var db = require('./database'),
} }
], callback); ], callback);
}); });
} };
Messaging.parse = function (message, fromuid, myuid, toUserData, myUserData, isNew, callback) { Messaging.parse = function (message, fromuid, myuid, toUserData, myUserData, isNew, callback) {
plugins.fireHook('filter:parse.raw', message, function(err, parsed) { plugins.fireHook('filter:parse.raw', message, function(err, parsed) {
@ -369,10 +242,6 @@ var db = require('./database'),
], callback); ], callback);
}; };
Messaging.updateChatTime = function(uid, toUid, callback) {
callback = callback || function() {};
db.sortedSetAdd('uid:' + uid + ':chats', Date.now(), toUid, callback);
};
Messaging.getRecentChats = function(uid, start, stop, callback) { Messaging.getRecentChats = function(uid, start, stop, callback) {
db.getSortedSetRevRange('uid:' + uid + ':chats', start, stop, function(err, uids) { db.getSortedSetRevRange('uid:' + uid + ':chats', start, stop, function(err, uids) {
@ -424,36 +293,7 @@ var db = require('./database'),
}); });
}; };
Messaging.getUnreadCount = function(uid, callback) {
db.sortedSetCard('uid:' + uid + ':chats:unread', callback);
};
Messaging.pushUnreadCount = function(uid) {
Messaging.getUnreadCount(uid, function(err, unreadCount) {
if (err) {
return;
}
sockets.in('uid_' + uid).emit('event:unread.updateChatCount', unreadCount);
});
};
Messaging.markRead = function(uid, toUid, callback) {
db.sortedSetRemove('uid:' + uid + ':chats:unread', toUid, callback);
};
Messaging.markUnread = function(uid, toUid, callback) {
async.waterfall([
function (next) {
user.exists(toUid, next);
},
function (exists, next) {
if (!exists) {
return next(new Error('[[error:no-user]]'));
}
db.sortedSetAdd('uid:' + uid + ':chats:unread', Date.now(), toUid, next);
}
], callback);
};
Messaging.notifyUser = function(fromuid, touid, messageObj) { Messaging.notifyUser = function(fromuid, touid, messageObj) {
// Immediate notifications // Immediate notifications
@ -534,38 +374,6 @@ var db = require('./database'),
], callback); ], callback);
}; };
Messaging.canEdit = function(messageId, uid, callback) {
if (parseInt(meta.config.disableChat) === 1) {
return callback(null, false);
}
async.waterfall([
function (next) {
user.getUserFields(uid, ['banned', 'email:confirmed'], next);
},
function (userData, next) {
if (parseInt(userData.banned, 10) === 1) {
return callback(null, false);
}
if (parseInt(meta.config.requireEmailConfirmation, 10) === 1 && parseInt(userData['email:confirmed'], 10) !== 1) {
return callback(null, false);
}
Messaging.getMessageField(messageId, 'fromuid', next);
},
function(fromUid, next) {
if (parseInt(fromUid, 10) === parseInt(uid, 10)) {
return callback(null, true);
}
user.isAdministrator(uid, next);
},
function(isAdmin, next) {
next(null, isAdmin);
}
], callback);
};
function sendNotifications(fromuid, touid, messageObj, callback) { function sendNotifications(fromuid, touid, messageObj, callback) {
user.isOnline(touid, function(err, isOnline) { user.isOnline(touid, function(err, isOnline) {

@ -0,0 +1,134 @@
'use strict';
var async = require('async');
var meta = require('../meta');
var plugins = require('../plugins');
var db = require('../database');
module.exports = function(Messaging) {
Messaging.newMessage = function(fromuid, toUids, content, timestamp, callback) {
var roomId;
async.waterfall([
function (next) {
Messaging.checkContent(content, next);
},
function (next) {
db.incrObjectField('global', 'nextChatRoomId', next);
},
function (_roomId, next) {
roomId = _roomId;
db.sortedSetAdd('chat:room:' + roomId + ':uids', timestamp, fromuid, next);
},
function (next) {
Messaging.addUsersToRoom(fromuid, toUids, roomId, next);
},
function (next) {
Messaging.sendMessage(fromuid, roomId, content, timestamp, next);
}
], callback);
};
Messaging.sendMessage = function(fromuid, roomId, content, timestamp, callback) {
async.waterfall([
function (next) {
Messaging.checkContent(content, next);
},
function (next) {
Messaging.roomExists(roomId, next);
},
function (exists, next) {
if (!exists) {
return next(new Error('[[error:chat-room-does-not-exist]]'));
}
Messaging.addMessage(fromuid, roomId, content, timestamp, next);
}
], callback);
};
Messaging.checkContent = function(content, callback) {
if (!content) {
return callback(new Error('[[error:invalid-chat-message]]'));
}
if (content.length > (meta.config.maximumChatMessageLength || 1000)) {
return callback(new Error('[[error:chat-message-too-long]]'));
}
};
Messaging.addMessage = function(fromuid, roomId, content, timestamp, callback) {
var mid;
var message;
async.waterfall([
function (next) {
Messaging.checkContent(content, next);
},
function (next) {
db.incrObjectField('global', 'nextMid', next);
},
function (_mid, next) {
mid = _mid;
message = {
content: content,
timestamp: timestamp,
fromuid: fromuid,
roomId: roomId
};
plugins.fireHook('filter:messaging.save', message, next);
},
function (message, next) {
db.setObject('message:' + mid, message, next);
},
function (next) {
db.getSortedSetRange('chat:room:' + roomId + ':uids', 0, -1, next);
},
function (uids, next) {
async.parallel([
async.apply(Messaging.updateChatTime, roomId, uids, timestamp),
async.apply(Messaging.addMessageToUsers, roomId, uids, mid, timestamp),
async.apply(Messaging.markRead, fromuid, roomId),
async.apply(Messaging.markUnread, uids, roomId)
], next);
},
function (results, next) {
getMessages([mid], fromuid, touid, true, next);
},
function (messages, next) {
Messaging.isNewSet(fromuid, touid, mid, next);
},
function (isNewSet, next) {
if (!messages || !messages[0]) {
return next(null, null);
}
messages[0].newSet = isNewSet;
messages[0].mid = mid;
next(null, messages[0]);
}
], callback);
};
Messaging.updateChatTime = function(roomId, uids, timestamp, callback) {
if (!uids.length) {
return callback();
}
var keys = uids.map(function(uid) {
return 'uid:' + uid + ':chat:rooms';
});
db.sortedSetsAdd(keys, timestamp, roomId, next);
};
Messaging.addMessageToUsers = function(roomId, uids, mid, timestamp, callback) {
if (!uids.length) {
return callback();
}
var keys = uids.map(function(uid) {
return 'uid:' + uid + ':chat:room:' + roomId + ':mids';
});
db.sortedSetsAdd(keys, timestamp, mid, callback);
};
};

@ -0,0 +1,27 @@
'use strict';
var async = require('async');
var db = require('../database');
module.exports = function(Messaging) {
Messaging.deleteMessage = function(mid, roomId, callback) {
async.waterfall([
function (next) {
Messaging.getUidsInRoom(roomId, 0, -1, next);
},
function (uids, next) {
if (!uids.length) {
return next();
}
var keys = uids.map(function(uid) {
return 'uid:' + uid + ':chat:room:' + roomId + 'mids';
});
db.sortedSetsRemove(keys, roomId, next);
},
function(next) {
db.delete('message:' + mid, next);
}
], callback);
};
};

@ -0,0 +1,79 @@
'use strict';
var async = require('async');
var meta = require('../meta');
var user = require('../user');
var sockets = require('../socket.io');
module.exports = function(Messaging) {
Messaging.editMessage = function(mid, content, callback) {
async.series([
function(next) {
// Verify that the message actually changed
Messaging.getMessageField(mid, 'content', function(err, raw) {
if (raw === content) {
// No dice.
return callback();
}
next();
});
},
async.apply(Messaging.setMessageFields, mid, {
content: content,
edited: Date.now()
}),
function(next) {
Messaging.getMessageFields(mid, ['fromuid', 'touid'], function(err, data) {
Messaging.getMessagesData([mid], data.fromuid, data.touid, true, function(err, messages) {
sockets.in('uid_' + data.fromuid).emit('event:chats.edit', {
messages: messages
});
sockets.in('uid_' + data.touid).emit('event:chats.edit', {
messages: messages
});
next();
});
});
}
], callback);
};
Messaging.canEdit = function(messageId, uid, callback) {
if (parseInt(meta.config.disableChat) === 1) {
return callback(null, false);
}
async.waterfall([
function (next) {
user.getUserFields(uid, ['banned', 'email:confirmed'], next);
},
function (userData, next) {
if (parseInt(userData.banned, 10) === 1) {
return callback(null, false);
}
if (parseInt(meta.config.requireEmailConfirmation, 10) === 1 && parseInt(userData['email:confirmed'], 10) !== 1) {
return callback(null, false);
}
Messaging.getMessageField(messageId, 'fromuid', next);
},
function(fromUid, next) {
if (parseInt(fromUid, 10) === parseInt(uid, 10)) {
return callback(null, true);
}
user.isAdministrator(uid, next);
},
function(isAdmin, next) {
next(null, isAdmin);
}
], callback);
};
};

@ -0,0 +1,46 @@
'use strict';
var async = require('async');
var db = require('../database');
module.exports = function(Messaging) {
Messaging.roomExists = function(roomId, callback) {
db.exists('chat:room:' + roomId + ':uids', callback);
};
Messaging.isRoomOwner = function(uid, roomId, callback) {
db.getSortedSetRange('chat:room:' + roomId + ':uids', 0, 0, function(err, uids) {
if (err) {
return callback(err);
}
if (!Array.isArray(uids) || !uids.length) {
return callback(null, false);
}
callback(null, parseInt(uids[0], 10) === parseInt(uid, 10));
});
};
Messaging.addUsersToRoom = function(fromuid, toUids, roomId, callback) {
async.waterfall([
function (next) {
Messaging.isRoomOwner(fromuid, roomId, next);
},
function (isOwner, next) {
if (!isOwner) {
return next(new Error('[[error:cant-add-users-to-chat-room]]'));
}
var now = Date.now();
var timestamps = toUids.map(function() {
return now;
});
db.sortedSetAdd('chat:room:' + roomId + ':uids', timestamps, toUids, next);
}
], callback);
};
Messaging.getUidsInRoom = function(roomId, start, stop, callback) {
db.getSortedSetRange('chat:room:' + roomId + ':uids', start, stop, callback);
};
};

@ -0,0 +1,44 @@
'use strict';
var async = require('async');
var db = require('../database');
var sockets = require('../socket.io');
module.exports = function(Messaging) {
Messaging.getUnreadCount = function(uid, callback) {
db.sortedSetCard('uid:' + uid + ':chat:rooms:unread', callback);
};
Messaging.pushUnreadCount = function(uid) {
Messaging.getUnreadCount(uid, function(err, unreadCount) {
if (err) {
return;
}
sockets.in('uid_' + uid).emit('event:unread.updateChatCount', unreadCount);
});
};
Messaging.markRead = function(uid, roomId, callback) {
db.sortedSetRemove('uid:' + uid + ':chat:rooms:unread', roomId, callback);
};
Messaging.markUnread = function(uids, roomId, callback) {
async.waterfall([
function (next) {
Messaging.roomExists(roomId, next);
},
function (exists, next) {
if (!exists) {
return next(new Error('[[error:chat-room-does-not-exist]]'));
}
var keys = uids.map(function(uid) {
return 'uid:' + uid + ':chat:rooms:unread';
});
db.sortedSetAdd(keys, Date.now(), roomId, next);
}
], callback);
};
};

@ -61,7 +61,7 @@ SocketModules.chats.send = function(socket, data, callback) {
return callback(err || new Error('[[error:chat-restricted]]')); return callback(err || new Error('[[error:chat-restricted]]'));
} }
Messaging.addMessage(socket.uid, touid, data.message, function(err, message) { Messaging.addMessage(socket.uid, touid, data.message, now, function(err, message) {
if (err) { if (err) {
return callback(err); return callback(err);
} }
@ -88,13 +88,13 @@ SocketModules.chats.edit = function(socket, data, callback) {
}; };
SocketModules.chats.delete = function(socket, data, callback) { SocketModules.chats.delete = function(socket, data, callback) {
if (!data) { if (!data || !data.roomId || !data.messageId) {
return callback(new Error('[[error:invalid-data]]')); return callback(new Error('[[error:invalid-data]]'));
} }
Messaging.canEdit(data.messageId, socket.uid, function(err, allowed) { Messaging.canEdit(data.messageId, socket.uid, function(err, allowed) {
if (allowed) { if (allowed) {
Messaging.deleteMessage(data.messageId, callback); Messaging.deleteMessage(data.messageId, data.roomId, callback);
} }
}); });
} }

Loading…
Cancel
Save