refactor: add connectionTimeout to psql

only sleep if there is more than one iteration in processSortedSet
isekai-main
Barış Soner Uşaklı 2 years ago
parent 2ecf15c4d9
commit 90f29571f1

@ -44,7 +44,7 @@ connection.getConnectionString = function (mongo) {
connection.getConnectionOptions = function (mongo) { connection.getConnectionOptions = function (mongo) {
mongo = mongo || nconf.get('mongo'); mongo = mongo || nconf.get('mongo');
const connOptions = { const connOptions = {
maxPoolSize: 10, maxPoolSize: 20,
minPoolSize: 3, minPoolSize: 3,
connectTimeoutMS: 90000, connectTimeoutMS: 90000,
}; };

@ -574,7 +574,7 @@ module.exports = function (module) {
if (processFn && processFn.constructor && processFn.constructor.name !== 'AsyncFunction') { if (processFn && processFn.constructor && processFn.constructor.name !== 'AsyncFunction') {
processFn = util.promisify(processFn); processFn = util.promisify(processFn);
} }
let iteration = 1;
while (!done) { while (!done) {
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
const item = await cursor.next(); const item = await cursor.next();
@ -585,12 +585,12 @@ module.exports = function (module) {
} }
if (ids.length >= options.batch || (done && ids.length !== 0)) { if (ids.length >= options.batch || (done && ids.length !== 0)) {
await processFn(ids); if (iteration > 1 && options.interval) {
ids.length = 0;
if (options.interval) {
await sleep(options.interval); await sleep(options.interval);
} }
await processFn(ids);
iteration += 1;
ids.length = 0;
} }
} }
}; };

@ -28,6 +28,8 @@ connection.getConnectionOptions = function (postgres) {
password: postgres.password, password: postgres.password,
database: postgres.database, database: postgres.database,
ssl: String(postgres.ssl) === 'true', ssl: String(postgres.ssl) === 'true',
max: 20,
connectionTimeoutMillis: 90000,
}; };
return _.merge(connOptions, postgres.options || {}); return _.merge(connOptions, postgres.options || {});

@ -677,7 +677,7 @@ SELECT z."value", z."score"
if (process && process.constructor && process.constructor.name !== 'AsyncFunction') { if (process && process.constructor && process.constructor.name !== 'AsyncFunction') {
process = util.promisify(process); process = util.promisify(process);
} }
let iteration = 1;
while (true) { while (true) {
/* eslint-disable no-await-in-loop */ /* eslint-disable no-await-in-loop */
let rows = await cursor.readAsync(batchSize); let rows = await cursor.readAsync(batchSize);
@ -692,14 +692,15 @@ SELECT z."value", z."score"
rows = rows.map(r => r.value); rows = rows.map(r => r.value);
} }
try { try {
if (iteration > 1 && options.interval) {
await sleep(options.interval);
}
await process(rows); await process(rows);
iteration += 1;
} catch (err) { } catch (err) {
await client.release(); await client.release();
throw err; throw err;
} }
if (options.interval) {
await sleep(options.interval);
}
} }
}; };
}; };

Loading…
Cancel
Save