removed node cluster spawn procs on ports

v1.18.x
barisusakli 10 years ago
parent 3ad3e781f3
commit 64e13df14c

@ -30,7 +30,6 @@ var fs = require('fs'),
semver = require('semver'),
winston = require('winston'),
path = require('path'),
cluster = require('cluster'),
pkg = require('./package.json'),
utils = require('./public/src/utils.js');
@ -60,7 +59,7 @@ if(os.platform() === 'linux') {
});
}
if (!cluster.isWorker) {
if (!process.send) {
// If run using `node app`, log GNU copyright info along with server info
winston.info('NodeBB v' + pkg.version + ' Copyright (C) 2013-2014 NodeBB Inc.');
winston.info('This program comes with ABSOLUTELY NO WARRANTY.');
@ -98,6 +97,11 @@ function loadConfig() {
views_dir: path.join(__dirname, 'public/templates')
});
if (!nconf.get('isCluster')) {
nconf.set('isPrimary', 'true');
nconf.set('isCluster', 'false');
}
// Ensure themes_path is a full filepath
nconf.set('themes_path', path.resolve(__dirname, nconf.get('themes_path')));
nconf.set('core_templates_path', path.join(__dirname, 'src/views'));
@ -119,13 +123,11 @@ function start() {
nconf.set('port', urlObject.port || nconf.get('port') || nconf.get('PORT') || 4567);
nconf.set('upload_url', relativePath + '/uploads/');
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
winston.info('Time: %s', (new Date()).toString());
winston.info('Initializing NodeBB v%s', pkg.version);
winston.verbose('* using configuration stored in: %s', configFile);
}
if (cluster.isWorker && process.env.cluster_setup === 'true') {
var host = nconf.get(nconf.get('database') + ':host'),
storeLocation = host ? 'at ' + host + (host.indexOf('/') === -1 ? ':' + nconf.get(nconf.get('database') + ':port') : '') : '';
@ -157,7 +159,7 @@ function start() {
webserver.init();
sockets.init(webserver.server);
if (cluster.isWorker && process.env.handle_jobs === 'true') {
if (nconf.get('isPrimary')) {
require('./src/notifications').init();
require('./src/user').startJobs();
}
@ -191,13 +193,13 @@ function start() {
meta.js.cache = message.cache;
meta.js.map = message.map;
meta.js.hash = message.hash;
winston.verbose('[cluster] Client-side javascript and mapping propagated to worker %s', cluster.worker.id);
winston.verbose('[cluster] Client-side javascript and mapping propagated to worker %s', process.pid);
break;
case 'css-propagate':
meta.css.cache = message.cache;
meta.css.acpCache = message.acpCache;
meta.css.hash = message.hash;
winston.verbose('[cluster] Stylesheets propagated to worker %s', cluster.worker.id);
winston.verbose('[cluster] Stylesheets propagated to worker %s', process.pid);
break;
}
});
@ -212,11 +214,7 @@ function start() {
} else {
winston.warn('Your NodeBB schema is out-of-date. Please run the following command to bring your dataset up to spec:');
winston.warn(' ./nodebb upgrade');
if (cluster.isWorker) {
cluster.worker.kill();
} else {
process.exit();
}
process.exit();
}
});
});

@ -5,7 +5,8 @@ var nconf = require('nconf'),
fs = require('fs'),
url = require('url'),
path = require('path'),
cluster = require('cluster'),
fork = require('child_process').fork,
async = require('async'),
logrotate = require('logrotate-stream'),
@ -15,9 +16,8 @@ var nconf = require('nconf'),
output = logrotate({ file: __dirname + '/logs/output.log', size: '1m', keep: 3, compress: true }),
silent = process.env.NODE_ENV !== 'development',
numProcs,
handles = {},
handleIndex = 0,
server,
workers = [],
Loader = {
timesStarted: 0,
@ -32,11 +32,6 @@ var nconf = require('nconf'),
};
Loader.init = function(callback) {
cluster.setupMaster({
exec: "app.js",
silent: silent
});
Loader.primaryWorker = 1;
if (silent) {
console.log = function(value) {
@ -60,84 +55,9 @@ Loader.displayStartupMessages = function(callback) {
callback();
};
Loader.addClusterEvents = function(callback) {
cluster.on('fork', function(worker) {
worker.on('message', function(message) {
if (message && typeof message === 'object' && message.action) {
var otherWorkers;
switch (message.action) {
case 'ready':
if (Loader.js.cache) {
worker.send({
action: 'js-propagate',
cache: Loader.js.cache,
map: Loader.js.map,
hash: Loader.js.hash
});
}
if (Loader.css.cache) {
worker.send({
action: 'css-propagate',
cache: Loader.css.cache,
acpCache: Loader.css.acpCache,
hash: Loader.css.hash
});
}
break;
case 'restart':
console.log('[cluster] Restarting...');
Loader.restart(function(err) {
console.log('[cluster] Restarting...');
});
break;
case 'reload':
console.log('[cluster] Reloading...');
Loader.reload();
break;
case 'js-propagate':
Loader.js.cache = message.cache;
Loader.js.map = message.map;
Loader.js.hash = message.hash;
Loader.notifyWorkers({
action: 'js-propagate',
cache: message.cache,
map: message.map,
hash: message.hash
}, worker.id);
break;
case 'css-propagate':
Loader.css.cache = message.cache;
Loader.css.acpCache = message.acpCache;
Loader.css.hash = message.hash;
Loader.notifyWorkers({
action: 'css-propagate',
cache: message.cache,
acpCache: message.acpCache,
hash: message.hash
}, worker.id);
break;
case 'listening':
if (message.primary) {
Loader.primaryWorker = parseInt(worker.id, 10);
}
break;
case 'config:update':
Loader.notifyWorkers(message);
break;
}
}
});
});
Loader.addWorkerEvents = function(worker) {
cluster.on('listening', function(worker) {
console.log('[cluster] Child Process (' + worker.process.pid + ') listening for connections.');
});
cluster.on('exit', function(worker, code, signal) {
worker.on('exit', function(code, signal) {
if (code !== 0) {
if (Loader.timesStarted < numProcs*3) {
Loader.timesStarted++;
@ -153,118 +73,134 @@ Loader.addClusterEvents = function(callback) {
}
}
console.log('[cluster] Child Process (' + worker.process.pid + ') has exited (code: ' + code + ', signal: ' + signal +')');
console.log('[cluster] Child Process (' + worker.pid + ') has exited (code: ' + code + ', signal: ' + signal +')');
if (!worker.suicide) {
console.log('[cluster] Spinning up another process...');
console.log('[cluster] Spinning up another process...');
var wasPrimary = parseInt(worker.id, 10) === Loader.primaryWorker;
forkWorker(wasPrimary);
forkWorker(worker.index, worker.isPrimary);
}
});
cluster.on('disconnect', function(worker) {
console.log('[cluster] Child Process (' + worker.process.pid + ') has disconnected');
});
worker.on('message', function(message) {
if (message && typeof message === 'object' && message.action) {
var otherWorkers;
callback();
switch (message.action) {
case 'ready':
if (Loader.js.cache) {
worker.send({
action: 'js-propagate',
cache: Loader.js.cache,
map: Loader.js.map,
hash: Loader.js.hash
});
}
if (Loader.css.cache) {
worker.send({
action: 'css-propagate',
cache: Loader.css.cache,
acpCache: Loader.css.acpCache,
hash: Loader.css.hash
});
}
break;
case 'restart':
console.log('[cluster] Restarting...');
Loader.restart(function(err) {
console.log('[cluster] Restarting...');
});
break;
case 'reload':
console.log('[cluster] Reloading...');
Loader.reload();
break;
case 'js-propagate':
Loader.js.cache = message.cache;
Loader.js.map = message.map;
Loader.js.hash = message.hash;
Loader.notifyWorkers({
action: 'js-propagate',
cache: message.cache,
map: message.map,
hash: message.hash
}, worker.pid);
break;
case 'css-propagate':
Loader.css.cache = message.cache;
Loader.css.acpCache = message.acpCache;
Loader.css.hash = message.hash;
Loader.notifyWorkers({
action: 'css-propagate',
cache: message.cache,
acpCache: message.acpCache,
hash: message.hash
}, worker.pid);
break;
case 'listening':
if (message.primary) {
Loader.primaryWorker = parseInt(worker.pid, 10);
}
break;
case 'config:update':
Loader.notifyWorkers(message);
break;
}
}
});
};
Loader.start = function(callback) {
console.log('Clustering enabled: Spinning up ' + numProcs + ' process(es).\n');
for(var x=0; x<numProcs; ++x) {
forkWorker(x === 0);
for (var x=0; x<numProcs; ++x) {
forkWorker(x, x === 0);
}
var urlObject = url.parse(nconf.get('url'));
var port = urlObject.port || nconf.get('port') || nconf.get('PORT') || 4567;
nconf.set('port', port);
server = net.createServer(function(connection) {
// remove this once node 0.12.x ships, see https://github.com/elad/node-cluster-socket.io/issues/4
connection._handle.readStop();
var workers = clusterWorkers();
var worker = workers[workerIndex(connection.remoteAddress, numProcs)];
if (worker) {
handles[handleIndex] = connection._handle;
worker.send({action: 'sticky-session:connection', handleIndex: handleIndex}, connection);
handleIndex ++;
} else {
console.log('Cant find worker! Worker count : ' + workers.length);
}
}).listen(port);
if (callback) {
callback();
}
};
function forkWorker(isPrimary) {
var worker = cluster.fork({
cluster_setup: isPrimary,
handle_jobs: isPrimary
}),
output = logrotate({ file: __dirname + '/logs/output.log', size: '1m', keep: 3, compress: true });
if (silent) {
worker.process.stdout.pipe(output);
worker.process.stderr.pipe(output);
}
function forkWorker(index, isPrimary) {
var urlObject = url.parse(nconf.get('url'));
var port = urlObject.port || nconf.get('port') || nconf.get('PORT') || 4567;
worker.on('message', function(message) {
if (!message || message.action !== 'sticky-session:accept') {
return;
var worker = fork('app.js', [], {
silent: silent,
env: {
isPrimary: isPrimary,
isCluster: true,
port: parseInt(port, 10) + index
}
var _handle = handles[message.handleIndex];
});
if (_handle) {
_handle.close();
worker.index = index;
worker.isPrimary = isPrimary;
delete handles[message.handleIndex];
}
});
}
workers[index] = worker;
function workerIndex(ip, numProcs) {
var s = '';
for (var i = 0, _len = ip.length; i < _len; i++) {
if (parseInt(ip[i], 10)) {
s += ip[i];
}
}
return Number(s) % numProcs || 0;
}
Loader.addWorkerEvents(worker);
function clusterWorkers() {
var workers = [];
var output = logrotate({ file: __dirname + '/logs/output.log', size: '1m', keep: 3, compress: true });
for(var i in cluster.workers) {
workers.push(cluster.workers[i]);
if (silent) {
worker.stdout.pipe(output);
worker.stderr.pipe(output);
}
return workers;
}
Loader.restart = function(callback) {
console.log('[cluster] closing server');
killWorkers();
closeHandles();
server.close(function() {
console.log('[cluster] server closed');
Loader.start();
});
Loader.start();
};
Loader.reload = function() {
Object.keys(cluster.workers).forEach(function(worker_id) {
cluster.workers[worker_id].send({
workers.forEach(function(worker) {
worker.send({
action: 'reload'
});
});
@ -275,31 +211,20 @@ Loader.stop = function() {
// Clean up the pidfile
fs.unlinkSync(__dirname + '/pidfile');
server.close();
};
function killWorkers() {
Object.keys(cluster.workers).forEach(function(id) {
cluster.workers[id].kill();
workers.forEach(function(worker) {
worker.suicide = true;
worker.kill();
});
}
function closeHandles() {
for(var h in handles) {
var handle = handles[h];
if (handle) {
handle.close();
delete handles[h];
}
}
}
Loader.notifyWorkers = function (msg, worker_id) {
worker_id = parseInt(worker_id, 10);
Object.keys(cluster.workers).forEach(function(id) {
if (parseInt(id, 10) !== worker_id) {
cluster.workers[id].send(msg);
Loader.notifyWorkers = function (msg, worker_pid) {
worker_pid = parseInt(worker_pid, 10);
workers.forEach(function(worker) {
if (parseInt(worker.pid, 10) !== worker_pid) {
worker.send(msg);
}
});
};
@ -330,7 +255,6 @@ if (nconf.get('daemon') !== false) {
async.series([
Loader.init,
Loader.displayStartupMessages,
Loader.addClusterEvents,
Loader.start
], function(err) {
if (err) {

@ -7,7 +7,6 @@ var winston = require('winston'),
less = require('less'),
crypto = require('crypto'),
async = require('async'),
cluster = require('cluster'),
plugins = require('../plugins'),
emitter = require('../emitter'),
@ -22,7 +21,7 @@ module.exports = function(Meta) {
Meta.css.defaultBranding = {};
Meta.css.minify = function(callback) {
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
winston.verbose('[meta/css] Minifying LESS/CSS');
db.getObjectFields('config', ['theme:type', 'theme:id'], function(err, themeData) {
var themeId = (themeData['theme:id'] || 'nodebb-theme-vanilla'),
@ -64,7 +63,7 @@ module.exports = function(Meta) {
}
], function(err, minified) {
// Propagate to other workers
if (cluster.isWorker) {
if (process.send) {
process.send({
action: 'css-propagate',
cache: minified[0],
@ -81,7 +80,7 @@ module.exports = function(Meta) {
});
});
} else {
winston.verbose('[meta/css] Cluster worker ' + cluster.worker.id + ' skipping LESS/CSS compilation');
winston.verbose('[meta/css] Cluster worker ' + process.pid + ' skipping LESS/CSS compilation');
if (typeof callback === 'function') {
callback();
}
@ -106,7 +105,7 @@ module.exports = function(Meta) {
acpCachePath = path.join(__dirname, '../../public/admin.css');
fs.exists(cachePath, function(exists) {
if (exists) {
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
winston.verbose('[meta/css] (Experimental) Reading stylesheets from file');
async.map([cachePath, acpCachePath], fs.readFile, function(err, files) {
Meta.css.cache = files[0];
@ -125,7 +124,7 @@ module.exports = function(Meta) {
});
};
function minify(source, paths, destination, callback) {
function minify(source, paths, destination, callback) {
less.render(source, {
paths: paths,
compress: true
@ -149,7 +148,7 @@ module.exports = function(Meta) {
}
// Save the compiled CSS in public/ so things like nginx can serve it
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
Meta.css.commitToFile(destination);
}

@ -7,7 +7,6 @@ var winston = require('winston'),
_ = require('underscore'),
os = require('os'),
nconf = require('nconf'),
cluster = require('cluster'),
fs = require('fs'),
plugins = require('../plugins'),
@ -127,7 +126,7 @@ module.exports = function(Meta) {
};
Meta.js.minify = function(minify, callback) {
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
var minifier = Meta.js.minifierProc = fork('minifier.js'),
onComplete = function(err) {
if (err) {
@ -138,7 +137,7 @@ module.exports = function(Meta) {
winston.verbose('[meta/js] Minification complete');
minifier.kill();
if (cluster.isWorker) {
if (process.send) {
process.send({
action: 'js-propagate',
cache: Meta.js.cache,
@ -214,7 +213,7 @@ module.exports = function(Meta) {
mapPath = path.join(__dirname, '../../public/nodebb.min.js.map');
fs.exists(scriptPath, function(exists) {
if (exists) {
if (!cluster.isWorker || process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
winston.verbose('[meta/js] (Experimental) Reading client-side scripts from file');
async.map([scriptPath, mapPath], fs.readFile, function(err, files) {
Meta.js.cache = files[0];

@ -7,7 +7,6 @@ var path = require('path'),
rimraf = require('rimraf'),
mkdirp = require('mkdirp'),
async = require('async'),
cluster = require('cluster'),
plugins = require('../plugins'),
db = require('../database');
@ -17,7 +16,7 @@ module.exports = function(Meta) {
Meta.sounds = {};
Meta.sounds.init = function(callback) {
if (cluster.isWorker && process.env.cluster_setup === 'true') {
if (nconf.get('isPrimary') === 'true') {
var soundsPath = path.join(__dirname, '../../public/sounds');
plugins.fireHook('filter:sounds.get', [], function(err, filePaths) {

@ -7,7 +7,6 @@ var mkdirp = require('mkdirp'),
path = require('path'),
fs = require('fs'),
nconf = require('nconf'),
cluster = require('cluster'),
emitter = require('../emitter'),
plugins = require('../plugins'),
@ -16,7 +15,7 @@ var mkdirp = require('mkdirp'),
Templates = {};
Templates.compile = function(callback) {
if (cluster.isWorker && process.env.cluster_setup !== 'true') {
if (nconf.get('isPrimary') === 'false') {
emitter.emit('templates:compiled');
if (callback) {
callback();
@ -31,8 +30,8 @@ Templates.compile = function(callback) {
themeConfig = require(nconf.get('theme_config'));
if (themeConfig.baseTheme) {
var pathToBaseTheme = path.join(nconf.get('themes_path'), themeConfig.baseTheme),
baseTemplatesPath = require(path.join(pathToBaseTheme, 'theme.json')).templates;
var pathToBaseTheme = path.join(nconf.get('themes_path'), themeConfig.baseTheme);
baseTemplatesPath = require(path.join(pathToBaseTheme, 'theme.json')).templates;
if (!baseTemplatesPath){
baseTemplatesPath = path.join(pathToBaseTheme, 'templates');

@ -2,7 +2,6 @@
var async = require('async'),
winston = require('winston'),
cluster = require('cluster'),
fs = require('fs'),
path = require('path'),
@ -51,7 +50,7 @@ SocketAdmin.before = function(socket, method, next) {
SocketAdmin.reload = function(socket, data, callback) {
events.logWithUser(socket.uid, ' is reloading NodeBB');
if (cluster.isWorker) {
if (process.send) {
process.send({
action: 'reload'
});

@ -9,7 +9,6 @@ var path = require('path'),
server,
winston = require('winston'),
async = require('async'),
cluster = require('cluster'),
emailer = require('./emailer'),
meta = require('./meta'),
@ -82,11 +81,7 @@ if(nconf.get('ssl')) {
console.log(err.stack);
if (err.code === 'EADDRINUSE') {
winston.error('NodeBB address in use, exiting...');
if (cluster.isWorker) {
cluster.worker.kill();
} else {
process.exit(0);
}
process.exit(0);
} else {
throw err;
}
@ -103,9 +98,7 @@ if(nconf.get('ssl')) {
logger.init(app);
var bind_address = ((nconf.get('bind_address') === "0.0.0.0" || !nconf.get('bind_address')) ? '0.0.0.0' : nconf.get('bind_address')) + ':' + port;
if (cluster.isWorker) {
port = 0;
}
server.listen(port, nconf.get('bind_address'), function(err) {
if (err) {
winston.info('NodeBB was unable to listen on: ' + bind_address);
@ -117,7 +110,7 @@ if(nconf.get('ssl')) {
process.send({
action: 'listening',
bind_address: bind_address,
primary: process.env.handle_jobs === 'true'
primary: nconf.get('isPrimary') === 'true'
});
}
@ -125,13 +118,4 @@ if(nconf.get('ssl')) {
});
};
process.on('message', function(message, connection) {
if (!message || message.action !== 'sticky-session:connection') {
return;
}
process.send({action: 'sticky-session:accept', handleIndex: message.handleIndex});
server.emit('connection', connection);
});
}(WebServer));

Loading…
Cancel
Save