feat: move export functions into child processes
parent
54310d69e4
commit
8383992dcc
@ -0,0 +1,54 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const nconf = require('nconf');
|
||||||
|
nconf.argv().env({
|
||||||
|
separator: '__',
|
||||||
|
});
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
const json2csvAsync = require('json2csv').parseAsync;
|
||||||
|
|
||||||
|
process.env.NODE_ENV = process.env.NODE_ENV || 'production';
|
||||||
|
|
||||||
|
// Alternate configuration file support
|
||||||
|
const configFile = path.resolve(__dirname, '../../../', nconf.any(['config', 'CONFIG']) || 'config.json');
|
||||||
|
const prestart = require('../../prestart');
|
||||||
|
prestart.loadConfig(configFile);
|
||||||
|
prestart.setupWinston();
|
||||||
|
|
||||||
|
const db = require('../../database');
|
||||||
|
const batch = require('../../batch');
|
||||||
|
|
||||||
|
process.on('message', async function (msg) {
|
||||||
|
if (msg && msg.uid) {
|
||||||
|
await db.init();
|
||||||
|
|
||||||
|
const targetUid = msg.uid;
|
||||||
|
const filePath = path.join(__dirname, '../../../build/export', targetUid + '_posts.csv');
|
||||||
|
|
||||||
|
const posts = require('../../posts');
|
||||||
|
|
||||||
|
let payload = [];
|
||||||
|
await batch.processSortedSet('uid:' + targetUid + ':posts', async function (pids) {
|
||||||
|
let postData = await posts.getPostsData(pids);
|
||||||
|
// Remove empty post references and convert newlines in content
|
||||||
|
postData = postData.filter(Boolean).map(function (post) {
|
||||||
|
post.content = '"' + String(post.content || '').replace(/\n/g, '\\n').replace(/"/g, '\\"') + '"';
|
||||||
|
return post;
|
||||||
|
});
|
||||||
|
payload = payload.concat(postData);
|
||||||
|
}, {
|
||||||
|
batch: 500,
|
||||||
|
interval: 1000,
|
||||||
|
});
|
||||||
|
|
||||||
|
const fields = payload.length ? Object.keys(payload[0]) : [];
|
||||||
|
const opts = { fields };
|
||||||
|
const csv = await json2csvAsync(payload, opts);
|
||||||
|
await fs.promises.writeFile(filePath, csv);
|
||||||
|
|
||||||
|
await db.close();
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
});
|
@ -0,0 +1,108 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const nconf = require('nconf');
|
||||||
|
nconf.argv().env({
|
||||||
|
separator: '__',
|
||||||
|
});
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
const _ = require('lodash');
|
||||||
|
|
||||||
|
process.env.NODE_ENV = process.env.NODE_ENV || 'production';
|
||||||
|
|
||||||
|
// Alternate configuration file support
|
||||||
|
const configFile = path.resolve(__dirname, '../../../', nconf.any(['config', 'CONFIG']) || 'config.json');
|
||||||
|
const prestart = require('../../prestart');
|
||||||
|
prestart.loadConfig(configFile);
|
||||||
|
prestart.setupWinston();
|
||||||
|
|
||||||
|
const db = require('../../database');
|
||||||
|
const batch = require('../../batch');
|
||||||
|
|
||||||
|
process.on('message', async function (msg) {
|
||||||
|
if (msg && msg.uid) {
|
||||||
|
await db.init();
|
||||||
|
await db.initSessionStore();
|
||||||
|
|
||||||
|
const targetUid = msg.uid;
|
||||||
|
|
||||||
|
const profileFile = targetUid + '_profile.json';
|
||||||
|
const profilePath = path.join(__dirname, '../../../build/export', profileFile);
|
||||||
|
|
||||||
|
const user = require('../index');
|
||||||
|
const [userData, userSettings, ips, sessions, usernames, emails, bookmarks, watchedTopics, upvoted, downvoted, following] = await Promise.all([
|
||||||
|
db.getObject('user:' + targetUid),
|
||||||
|
db.getObject('user:' + targetUid + ':settings'),
|
||||||
|
user.getIPs(targetUid, 9),
|
||||||
|
user.auth.getSessions(targetUid),
|
||||||
|
user.getHistory('user:' + targetUid + ':usernames'),
|
||||||
|
user.getHistory('user:' + targetUid + ':emails'),
|
||||||
|
getSetData('uid:' + targetUid + ':bookmarks', 'post:', targetUid),
|
||||||
|
getSetData('uid:' + targetUid + ':followed_tids', 'topic:', targetUid),
|
||||||
|
getSetData('uid:' + targetUid + ':upvote', 'post:', targetUid),
|
||||||
|
getSetData('uid:' + targetUid + ':downvote', 'post:', targetUid),
|
||||||
|
getSetData('following:' + targetUid, 'user:', targetUid),
|
||||||
|
]);
|
||||||
|
delete userData.password;
|
||||||
|
|
||||||
|
let chatData = [];
|
||||||
|
await batch.processSortedSet('uid:' + targetUid + ':chat:rooms', async (roomIds) => {
|
||||||
|
var result = await Promise.all(roomIds.map(roomId => getRoomMessages(targetUid, roomId)));
|
||||||
|
chatData = chatData.concat(_.flatten(result));
|
||||||
|
}, { batch: 100, interval: 1000 });
|
||||||
|
|
||||||
|
await fs.promises.writeFile(profilePath, JSON.stringify({
|
||||||
|
user: userData,
|
||||||
|
settings: userSettings,
|
||||||
|
ips: ips,
|
||||||
|
sessions: sessions,
|
||||||
|
usernames: usernames,
|
||||||
|
emails: emails,
|
||||||
|
messages: chatData,
|
||||||
|
bookmarks: bookmarks,
|
||||||
|
watchedTopics: watchedTopics,
|
||||||
|
upvoted: upvoted,
|
||||||
|
downvoted: downvoted,
|
||||||
|
following: following,
|
||||||
|
}, null, 4));
|
||||||
|
|
||||||
|
await db.close();
|
||||||
|
process.exit(0);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
async function getRoomMessages(uid, roomId) {
|
||||||
|
const batch = require('../../batch');
|
||||||
|
let data = [];
|
||||||
|
await batch.processSortedSet('uid:' + uid + ':chat:room:' + roomId + ':mids', async (mids) => {
|
||||||
|
const messageData = await db.getObjects(mids.map(mid => 'message:' + mid));
|
||||||
|
data = data.concat(messageData.filter(m => m && m.fromuid === uid && !m.system)
|
||||||
|
.map(m => ({ content: m.content, timestamp: m.timestamp }))
|
||||||
|
);
|
||||||
|
}, { batch: 500, interval: 1000 });
|
||||||
|
return data;
|
||||||
|
}
|
||||||
|
|
||||||
|
async function getSetData(set, keyPrefix, uid) {
|
||||||
|
const privileges = require('../../privileges');
|
||||||
|
const batch = require('../../batch');
|
||||||
|
let data = [];
|
||||||
|
await batch.processSortedSet(set, async (ids) => {
|
||||||
|
if (keyPrefix === 'post:') {
|
||||||
|
ids = await privileges.posts.filter('topics:read', ids, uid);
|
||||||
|
} else if (keyPrefix === 'topic:') {
|
||||||
|
ids = await privileges.topics.filterTids('topics:read', ids, uid);
|
||||||
|
}
|
||||||
|
let objData = await db.getObjects(ids.map(id => keyPrefix + id));
|
||||||
|
if (keyPrefix === 'post:') {
|
||||||
|
objData = objData.map(o => _.pick(o, ['pid', 'content', 'timestamp']));
|
||||||
|
} else if (keyPrefix === 'topic:') {
|
||||||
|
objData = objData.map(o => _.pick(o, ['tid', 'title', 'timestamp']));
|
||||||
|
} else if (keyPrefix === 'user:') {
|
||||||
|
objData = objData.map(o => _.pick(o, ['uid', 'username']));
|
||||||
|
}
|
||||||
|
data = data.concat(objData);
|
||||||
|
}, { batch: 500, interval: 1000 });
|
||||||
|
return data;
|
||||||
|
}
|
@ -0,0 +1,76 @@
|
|||||||
|
'use strict';
|
||||||
|
|
||||||
|
const nconf = require('nconf');
|
||||||
|
nconf.argv().env({
|
||||||
|
separator: '__',
|
||||||
|
});
|
||||||
|
|
||||||
|
const fs = require('fs');
|
||||||
|
const path = require('path');
|
||||||
|
const archiver = require('archiver');
|
||||||
|
const winston = require('winston');
|
||||||
|
|
||||||
|
process.env.NODE_ENV = process.env.NODE_ENV || 'production';
|
||||||
|
|
||||||
|
// Alternate configuration file support
|
||||||
|
const configFile = path.resolve(__dirname, '../../../', nconf.any(['config', 'CONFIG']) || 'config.json');
|
||||||
|
const prestart = require('../../prestart');
|
||||||
|
prestart.loadConfig(configFile);
|
||||||
|
prestart.setupWinston();
|
||||||
|
|
||||||
|
const db = require('../../database');
|
||||||
|
|
||||||
|
process.on('message', async function (msg) {
|
||||||
|
if (msg && msg.uid) {
|
||||||
|
await db.init();
|
||||||
|
|
||||||
|
const targetUid = msg.uid;
|
||||||
|
|
||||||
|
const archivePath = path.join(__dirname, '../../../build/export', targetUid + '_uploads.zip');
|
||||||
|
const rootDirectory = path.join(__dirname, '../../../public/uploads/');
|
||||||
|
|
||||||
|
const user = require('../index');
|
||||||
|
|
||||||
|
const archive = archiver('zip', {
|
||||||
|
zlib: { level: 9 }, // Sets the compression level.
|
||||||
|
});
|
||||||
|
|
||||||
|
archive.on('warning', function (err) {
|
||||||
|
switch (err.code) {
|
||||||
|
case 'ENOENT':
|
||||||
|
winston.warn('[user/export/uploads] File not found: ' + err.path);
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
winston.warn('[user/export/uploads] Unexpected warning: ' + err.message);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
archive.on('error', function (err) {
|
||||||
|
const trimPath = function (path) {
|
||||||
|
return path.replace(rootDirectory, '');
|
||||||
|
};
|
||||||
|
switch (err.code) {
|
||||||
|
case 'EACCES':
|
||||||
|
winston.error('[user/export/uploads] File inaccessible: ' + trimPath(err.path));
|
||||||
|
break;
|
||||||
|
|
||||||
|
default:
|
||||||
|
winston.error('[user/export/uploads] Unable to construct archive: ' + err.message);
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
const output = fs.createWriteStream(archivePath);
|
||||||
|
output.on('close', async function () {
|
||||||
|
await db.close();
|
||||||
|
process.exit(0);
|
||||||
|
});
|
||||||
|
|
||||||
|
archive.pipe(output);
|
||||||
|
winston.verbose('[user/export/uploads] Collating uploads for uid ' + targetUid);
|
||||||
|
await user.collateUploads(targetUid, archive);
|
||||||
|
archive.finalize();
|
||||||
|
}
|
||||||
|
});
|
Loading…
Reference in New Issue