From 33228bb7fed6c449efbebde26deffed43cb12a14 Mon Sep 17 00:00:00 2001 From: Ben Lubar Date: Wed, 8 Aug 2018 14:13:48 -0500 Subject: [PATCH] PostgreSQL database driver (#5861) * [test/database/list] Fix test list 4 being used in two different tests * [database/postgres] PostgreSQL database driver * [database/postgres] Make transactions work based on continuation scope. * [database/postgres] Implement nested transactions * eslint --fix * Add database changes from earlier this week to the PostgreSQL driver. * Fix typo * Fix postgres.incrObjectFieldBy returning undefined instead of null when given NaN * [database/postgres] Fix sortedSetsCard returning an array of strings. * Update socket.io postgres adapter * Fix PostgreSQL erroring when multiple updates are made to the same sorted set entry in a single operation. Add a test case to catch this error. * Fix lint errors. * Only prune sessions on one instance in a cluster to avoid deadlocks. They're caught and handled by the database server, but they spam the logs. * Fix arguments.slice. --- .eslintrc | 4 + .travis.yml | 4 + install/databases.js | 17 +- install/package.json | 7 +- install/web.js | 2 +- .../en-GB/admin/advanced/database.json | 8 +- src/controllers/admin/database.js | 8 + src/database/mongo.js | 1 + src/database/mongo/transaction.js | 8 + src/database/postgres.js | 452 +++++++++++ src/database/postgres/hash.js | 391 +++++++++ src/database/postgres/helpers.js | 139 ++++ src/database/postgres/list.js | 234 ++++++ src/database/postgres/main.js | 239 ++++++ src/database/postgres/pubsub.js | 51 ++ src/database/postgres/sets.js | 342 ++++++++ src/database/postgres/sorted.js | 744 ++++++++++++++++++ src/database/postgres/sorted/add.js | 108 +++ src/database/postgres/sorted/intersect.js | 105 +++ src/database/postgres/sorted/remove.js | 83 ++ src/database/postgres/sorted/union.js | 97 +++ src/database/postgres/transaction.js | 50 ++ src/database/redis.js | 1 + src/database/redis/transaction.js | 8 + src/install.js | 3 +- src/pubsub.js | 2 + src/upgrades/1.4.6/delete_sessions.js | 4 +- src/views/admin/advanced/database.tpl | 27 + src/views/install/index.tpl | 1 + test/controllers-admin.js | 2 + test/database.js | 5 + test/database/list.js | 6 +- test/database/sorted.js | 15 + test/mocks/databasemock.js | 8 + 34 files changed, 3166 insertions(+), 10 deletions(-) create mode 100644 src/database/mongo/transaction.js create mode 100644 src/database/postgres.js create mode 100644 src/database/postgres/hash.js create mode 100644 src/database/postgres/helpers.js create mode 100644 src/database/postgres/list.js create mode 100644 src/database/postgres/main.js create mode 100644 src/database/postgres/pubsub.js create mode 100644 src/database/postgres/sets.js create mode 100644 src/database/postgres/sorted.js create mode 100644 src/database/postgres/sorted/add.js create mode 100644 src/database/postgres/sorted/intersect.js create mode 100644 src/database/postgres/sorted/remove.js create mode 100644 src/database/postgres/sorted/union.js create mode 100644 src/database/postgres/transaction.js create mode 100644 src/database/redis/transaction.js diff --git a/.eslintrc b/.eslintrc index ef9f48dedc..9414543335 100644 --- a/.eslintrc +++ b/.eslintrc @@ -36,6 +36,10 @@ "no-restricted-globals": "off", "function-paren-newline": "off", "import/no-unresolved": "error", + "quotes": ["error", "single", { + "avoidEscape": true, + "allowTemplateLiterals": true + }], // ES6 "prefer-rest-params": "off", diff --git a/.travis.yml b/.travis.yml index b41e65cca7..0831d9d8d7 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,14 @@ services: - mongodb - redis-server + - postgresql before_install: - cp install/package.json package.json before_script: - sleep 15 # wait for mongodb to be ready - sh -c "if [ '$DB' = 'mongodb' ]; then node app --setup=\"{\\\"url\\\":\\\"http://127.0.0.1:4567\\\",\\\"secret\\\":\\\"abcdef\\\",\\\"database\\\":\\\"mongo\\\",\\\"mongo:host\\\":\\\"127.0.0.1\\\",\\\"mongo:port\\\":27017,\\\"mongo:username\\\":\\\"\\\",\\\"mongo:password\\\":\\\"\\\",\\\"mongo:database\\\":0,\\\"admin:username\\\":\\\"admin\\\",\\\"admin:email\\\":\\\"test@example.org\\\",\\\"admin:password\\\":\\\"abcdef\\\",\\\"admin:password:confirm\\\":\\\"abcdef\\\"}\" --ci=\"{\\\"host\\\":\\\"127.0.0.1\\\",\\\"port\\\":27017,\\\"database\\\":0}\"; fi" - sh -c "if [ '$DB' = 'redis' ]; then node app --setup=\"{\\\"url\\\":\\\"http://127.0.0.1:4567\\\",\\\"secret\\\":\\\"abcdef\\\",\\\"database\\\":\\\"redis\\\",\\\"redis:host\\\":\\\"127.0.0.1\\\",\\\"redis:port\\\":6379,\\\"redis:password\\\":\\\"\\\",\\\"redis:database\\\":0,\\\"admin:username\\\":\\\"admin\\\",\\\"admin:email\\\":\\\"test@example.org\\\",\\\"admin:password\\\":\\\"abcdef\\\",\\\"admin:password:confirm\\\":\\\"abcdef\\\"}\" --ci=\"{\\\"host\\\":\\\"127.0.0.1\\\",\\\"port\\\":6379,\\\"database\\\":0}\"; fi" + - sh -c "if [ '$DB' = 'postgres' ]; then psql -c 'create database nodebb;' -U postgres; psql -c 'create database travis_ci_test;' -U postgres; node app --setup=\"{\\\"url\\\":\\\"http://127.0.0.1:4567\\\",\\\"secret\\\":\\\"abcdef\\\",\\\"database\\\":\\\"postgres\\\",\\\"postgres:host\\\":\\\"127.0.0.1\\\",\\\"postgres:port\\\":5432,\\\"postgres:password\\\":\\\"\\\",\\\"postgres:database\\\":\\\"nodebb\\\",\\\"admin:username\\\":\\\"admin\\\",\\\"admin:email\\\":\\\"test@example.org\\\",\\\"admin:password\\\":\\\"abcdef\\\",\\\"admin:password:confirm\\\":\\\"abcdef\\\"}\" --ci=\"{\\\"host\\\":\\\"127.0.0.1\\\",\\\"port\\\":5432,\\\"username\\\":\\\"postgres\\\",\\\"database\\\":\\\"travis_ci_test\\\"}\"; fi" after_success: - "npm run coveralls" language: node_js @@ -15,6 +17,7 @@ dist: trusty env: - CXX=g++-4.8 DB=mongodb - CXX=g++-4.8 DB=redis + - CXX=g++-4.8 DB=postgres addons: apt: sources: @@ -23,6 +26,7 @@ addons: packages: - g++-4.8 - mongodb-org-server + postgresql: "9.5" node_js: - "10" - "8" diff --git a/install/databases.js b/install/databases.js index 430a40e04d..45de648cb0 100644 --- a/install/databases.js +++ b/install/databases.js @@ -7,6 +7,7 @@ var winston = require('winston'); var questions = { redis: require('../src/database/redis').questions, mongo: require('../src/database/mongo').questions, + postgres: require('../src/database/postgres').questions, }; module.exports = function (config, callback) { @@ -38,6 +39,12 @@ function getDatabaseConfig(config, callback) { } else { prompt.get(questions.mongo, callback); } + } else if (config.database === 'postgres') { + if (config['postgres:host'] && config['postgres:port']) { + callback(null, config); + } else { + prompt.get(questions.postgres, callback); + } } else { return callback(new Error('unknown database : ' + config.database)); } @@ -69,11 +76,19 @@ function saveDatabaseConfig(config, databaseConfig, callback) { database: databaseConfig['mongo:database'], uri: databaseConfig['mongo:uri'], }; + } else if (config.database === 'postgres') { + config.postgres = { + host: databaseConfig['postgres:host'], + port: databaseConfig['postgres:port'], + username: databaseConfig['postgres:username'], + password: databaseConfig['postgres:password'], + database: databaseConfig['postgres:database'], + }; } else { return callback(new Error('unknown database : ' + config.database)); } - var allQuestions = questions.redis.concat(questions.mongo); + var allQuestions = questions.redis.concat(questions.mongo).concat(questions.postgres); for (var x = 0; x < allQuestions.length; x += 1) { delete config[allQuestions[x].name]; } diff --git a/install/package.json b/install/package.json index 341ef67c90..f9b0ec9e95 100644 --- a/install/package.json +++ b/install/package.json @@ -29,13 +29,15 @@ "cli-graph": "^3.2.2", "clipboard": "^2.0.1", "colors": "^1.1.2", - "compression": "^1.7.1", "commander": "^2.12.2", + "compression": "^1.7.1", "connect-ensure-login": "^0.1.1", "connect-flash": "^0.1.1", "connect-mongo": "2.0.1", "connect-multiparty": "^2.1.0", + "connect-pg-simple": "^4.2.1", "connect-redis": "3.3.3", + "continuation-local-storage": "^3.2.1", "cookie-parser": "^1.4.3", "cron": "^1.3.0", "cropperjs": "^1.2.2", @@ -82,6 +84,8 @@ "nodemailer": "^4.6.5", "passport": "^0.4.0", "passport-local": "1.0.0", + "pg": "^7.4.0", + "pg-cursor": "^1.3.0", "postcss": "7.0.2", "postcss-clean": "1.1.0", "promise-polyfill": "^8.0.0", @@ -97,6 +101,7 @@ "socket.io": "2.1.1", "socket.io-adapter-cluster": "^1.0.1", "socket.io-adapter-mongo": "^2.0.1", + "socket.io-adapter-postgres": "^1.0.1", "socket.io-client": "2.1.1", "socket.io-redis": "5.2.0", "socketio-wildcard": "2.0.0", diff --git a/install/web.js b/install/web.js index 2f0ef39a42..8ebf0fb641 100644 --- a/install/web.js +++ b/install/web.js @@ -90,7 +90,7 @@ function ping(req, res) { } function welcome(req, res) { - var dbs = ['redis', 'mongo']; + var dbs = ['redis', 'mongo', 'postgres']; var databases = dbs.map(function (databaseName) { var questions = require('../src/database/' + databaseName).questions.filter(function (question) { return question && !question.hideOnWebInstall; diff --git a/public/language/en-GB/admin/advanced/database.json b/public/language/en-GB/admin/advanced/database.json index b88ca6fc82..b5351b91e4 100644 --- a/public/language/en-GB/admin/advanced/database.json +++ b/public/language/en-GB/admin/advanced/database.json @@ -32,5 +32,9 @@ "redis.iops": "Instantaneous Ops. Per Second", "redis.keyspace-hits": "Keyspace Hits", "redis.keyspace-misses": "Keyspace Misses", - "redis.raw-info": "Redis Raw Info" -} \ No newline at end of file + "redis.raw-info": "Redis Raw Info", + + "postgres": "Postgres", + "postgres.version": "PostgreSQL Version", + "postgres.raw-info": "Postgres Raw Info" +} diff --git a/src/controllers/admin/database.js b/src/controllers/admin/database.js index efec771ee6..147747822c 100644 --- a/src/controllers/admin/database.js +++ b/src/controllers/admin/database.js @@ -25,6 +25,14 @@ databaseController.get = function (req, res, next) { next(); } }, + postgres: function (next) { + if (nconf.get('postgres')) { + var pdb = require('../../database/postgres'); + pdb.info(pdb.pool, next); + } else { + next(); + } + }, }, next); }, function (results) { diff --git a/src/database/mongo.js b/src/database/mongo.js index 5b264253f5..8c9cd8e0fa 100644 --- a/src/database/mongo.js +++ b/src/database/mongo.js @@ -126,6 +126,7 @@ mongoModule.init = function (callback) { require('./mongo/sets')(db, mongoModule); require('./mongo/sorted')(db, mongoModule); require('./mongo/list')(db, mongoModule); + require('./mongo/transaction')(db, mongoModule); callback(); }); }; diff --git a/src/database/mongo/transaction.js b/src/database/mongo/transaction.js new file mode 100644 index 0000000000..75ea5fbaa2 --- /dev/null +++ b/src/database/mongo/transaction.js @@ -0,0 +1,8 @@ +'use strict'; + +module.exports = function (db, module) { + // TODO + module.transaction = function (perform, callback) { + perform(db, callback); + }; +}; diff --git a/src/database/postgres.js b/src/database/postgres.js new file mode 100644 index 0000000000..8cd4f97d42 --- /dev/null +++ b/src/database/postgres.js @@ -0,0 +1,452 @@ +'use strict'; + +var winston = require('winston'); +var async = require('async'); +var nconf = require('nconf'); +var session = require('express-session'); +var _ = require('lodash'); +var semver = require('semver'); +var dbNamespace = require('continuation-local-storage').createNamespace('postgres'); +var db; + +var postgresModule = module.exports; + +postgresModule.questions = [ + { + name: 'postgres:host', + description: 'Host IP or address of your PostgreSQL instance', + default: nconf.get('postgres:host') || '127.0.0.1', + }, + { + name: 'postgres:port', + description: 'Host port of your PostgreSQL instance', + default: nconf.get('postgres:port') || 5432, + }, + { + name: 'postgres:username', + description: 'PostgreSQL username', + default: nconf.get('postgres:username') || '', + }, + { + name: 'postgres:password', + description: 'Password of your PostgreSQL database', + hidden: true, + default: nconf.get('postgres:password') || '', + before: function (value) { value = value || nconf.get('postgres:password') || ''; return value; }, + }, + { + name: 'postgres:database', + description: 'PostgreSQL database name', + default: nconf.get('postgres:database') || 'nodebb', + }, +]; + +postgresModule.helpers = postgresModule.helpers || {}; +postgresModule.helpers.postgres = require('./postgres/helpers'); + +postgresModule.getConnectionOptions = function () { + // Sensible defaults for PostgreSQL, if not set + if (!nconf.get('postgres:host')) { + nconf.set('postgres:host', '127.0.0.1'); + } + if (!nconf.get('postgres:port')) { + nconf.set('postgres:port', 5432); + } + if (!nconf.get('postgres:database')) { + nconf.set('postgres:database', 'nodebb'); + } + + var connOptions = { + host: nconf.get('postgres:host'), + port: nconf.get('postgres:port'), + user: nconf.get('postgres:username'), + password: nconf.get('postgres:password'), + database: nconf.get('postgres:database'), + }; + + return _.merge(connOptions, nconf.get('postgres:options') || {}); +}; + +postgresModule.init = function (callback) { + callback = callback || function () { }; + + var Pool = require('pg').Pool; + + var connOptions = postgresModule.getConnectionOptions(); + + db = new Pool(connOptions); + + db.on('connect', function (client) { + var realQuery = client.query; + client.query = function () { + var args = Array.prototype.slice.call(arguments, 0); + if (dbNamespace.active && typeof args[args.length - 1] === 'function') { + args[args.length - 1] = dbNamespace.bind(args[args.length - 1]); + } + return realQuery.apply(client, args); + }; + }); + + db.connect(function (err, client, release) { + if (err) { + winston.error('NodeBB could not connect to your PostgreSQL database. PostgreSQL returned the following error: ' + err.message); + return callback(err); + } + + postgresModule.pool = db; + Object.defineProperty(postgresModule, 'client', { + get: function () { + return (dbNamespace.active && dbNamespace.get('db')) || db; + }, + configurable: true, + }); + + var wrappedDB = { + connect: function () { + return postgresModule.pool.connect.apply(postgresModule.pool, arguments); + }, + query: function () { + return postgresModule.client.query.apply(postgresModule.client, arguments); + }, + }; + + checkUpgrade(client, function (err) { + release(); + if (err) { + return callback(err); + } + + require('./postgres/main')(wrappedDB, postgresModule); + require('./postgres/hash')(wrappedDB, postgresModule); + require('./postgres/sets')(wrappedDB, postgresModule); + require('./postgres/sorted')(wrappedDB, postgresModule); + require('./postgres/list')(wrappedDB, postgresModule); + require('./postgres/transaction')(db, dbNamespace, postgresModule); + + callback(); + }); + }); +}; + +function checkUpgrade(client, callback) { + client.query(` +SELECT EXISTS(SELECT * + FROM "information_schema"."columns" + WHERE "table_schema" = 'public' + AND "table_name" = 'objects' + AND "column_name" = 'data') a, + EXISTS(SELECT * + FROM "information_schema"."columns" + WHERE "table_schema" = 'public' + AND "table_name" = 'legacy_hash' + AND "column_name" = '_key') b`, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows[0].b) { + return callback(null); + } + + var query = client.query.bind(client); + + async.series([ + async.apply(query, `BEGIN`), + async.apply(query, ` +CREATE TYPE LEGACY_OBJECT_TYPE AS ENUM ( + 'hash', 'zset', 'set', 'list', 'string' +)`), + async.apply(query, ` +CREATE TABLE "legacy_object" ( + "_key" TEXT NOT NULL + PRIMARY KEY, + "type" LEGACY_OBJECT_TYPE NOT NULL, + "expireAt" TIMESTAMPTZ DEFAULT NULL, + UNIQUE ( "_key", "type" ) +)`), + async.apply(query, ` +CREATE TABLE "legacy_hash" ( + "_key" TEXT NOT NULL + PRIMARY KEY, + "data" JSONB NOT NULL, + "type" LEGACY_OBJECT_TYPE NOT NULL + DEFAULT 'hash'::LEGACY_OBJECT_TYPE + CHECK ( "type" = 'hash' ), + CONSTRAINT "fk__legacy_hash__key" + FOREIGN KEY ("_key", "type") + REFERENCES "legacy_object"("_key", "type") + ON UPDATE CASCADE + ON DELETE CASCADE +)`), + async.apply(query, ` +CREATE TABLE "legacy_zset" ( + "_key" TEXT NOT NULL, + "value" TEXT NOT NULL, + "score" NUMERIC NOT NULL, + "type" LEGACY_OBJECT_TYPE NOT NULL + DEFAULT 'zset'::LEGACY_OBJECT_TYPE + CHECK ( "type" = 'zset' ), + PRIMARY KEY ("_key", "value"), + CONSTRAINT "fk__legacy_zset__key" + FOREIGN KEY ("_key", "type") + REFERENCES "legacy_object"("_key", "type") + ON UPDATE CASCADE + ON DELETE CASCADE +)`), + async.apply(query, ` +CREATE TABLE "legacy_set" ( + "_key" TEXT NOT NULL, + "member" TEXT NOT NULL, + "type" LEGACY_OBJECT_TYPE NOT NULL + DEFAULT 'set'::LEGACY_OBJECT_TYPE + CHECK ( "type" = 'set' ), + PRIMARY KEY ("_key", "member"), + CONSTRAINT "fk__legacy_set__key" + FOREIGN KEY ("_key", "type") + REFERENCES "legacy_object"("_key", "type") + ON UPDATE CASCADE + ON DELETE CASCADE +)`), + async.apply(query, ` +CREATE TABLE "legacy_list" ( + "_key" TEXT NOT NULL + PRIMARY KEY, + "array" TEXT[] NOT NULL, + "type" LEGACY_OBJECT_TYPE NOT NULL + DEFAULT 'list'::LEGACY_OBJECT_TYPE + CHECK ( "type" = 'list' ), + CONSTRAINT "fk__legacy_list__key" + FOREIGN KEY ("_key", "type") + REFERENCES "legacy_object"("_key", "type") + ON UPDATE CASCADE + ON DELETE CASCADE +)`), + async.apply(query, ` +CREATE TABLE "legacy_string" ( + "_key" TEXT NOT NULL + PRIMARY KEY, + "data" TEXT NOT NULL, + "type" LEGACY_OBJECT_TYPE NOT NULL + DEFAULT 'string'::LEGACY_OBJECT_TYPE + CHECK ( "type" = 'string' ), + CONSTRAINT "fk__legacy_string__key" + FOREIGN KEY ("_key", "type") + REFERENCES "legacy_object"("_key", "type") + ON UPDATE CASCADE + ON DELETE CASCADE +)`), + function (next) { + if (!res.rows[0].a) { + return next(); + } + async.series([ + async.apply(query, ` +INSERT INTO "legacy_object" ("_key", "type", "expireAt") +SELECT DISTINCT "data"->>'_key', + CASE WHEN (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 2 + THEN CASE WHEN ("data" ? 'value') + OR ("data" ? 'data') + THEN 'string' + WHEN "data" ? 'array' + THEN 'list' + WHEN "data" ? 'members' + THEN 'set' + ELSE 'hash' + END + WHEN (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 3 + THEN CASE WHEN ("data" ? 'value') + AND ("data" ? 'score') + THEN 'zset' + ELSE 'hash' + END + ELSE 'hash' + END::LEGACY_OBJECT_TYPE, + CASE WHEN ("data" ? 'expireAt') + THEN to_timestamp(("data"->>'expireAt')::double precision / 1000) + ELSE NULL + END + FROM "objects"`), + async.apply(query, ` +INSERT INTO "legacy_hash" ("_key", "data") +SELECT "data"->>'_key', + "data" - '_key' - 'expireAt' + FROM "objects" + WHERE CASE WHEN (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 2 + THEN NOT (("data" ? 'value') + OR ("data" ? 'data') + OR ("data" ? 'members') + OR ("data" ? 'array')) + WHEN (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 3 + THEN NOT (("data" ? 'value') + AND ("data" ? 'score')) + ELSE TRUE + END`), + async.apply(query, ` +INSERT INTO "legacy_zset" ("_key", "value", "score") +SELECT "data"->>'_key', + "data"->>'value', + ("data"->>'score')::NUMERIC + FROM "objects" + WHERE (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 3 + AND ("data" ? 'value') + AND ("data" ? 'score')`), + async.apply(query, ` +INSERT INTO "legacy_set" ("_key", "member") +SELECT "data"->>'_key', + jsonb_array_elements_text("data"->'members') + FROM "objects" + WHERE (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 2 + AND ("data" ? 'members')`), + async.apply(query, ` +INSERT INTO "legacy_list" ("_key", "array") +SELECT "data"->>'_key', + ARRAY(SELECT t + FROM jsonb_array_elements_text("data"->'list') WITH ORDINALITY l(t, i) + ORDER BY i ASC) + FROM "objects" + WHERE (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 2 + AND ("data" ? 'array')`), + async.apply(query, ` +INSERT INTO "legacy_string" ("_key", "data") +SELECT "data"->>'_key', + CASE WHEN "data" ? 'value' + THEN "data"->>'value' + ELSE "data"->>'data' + END + FROM "objects" + WHERE (SELECT COUNT(*) + FROM jsonb_object_keys("data" - 'expireAt')) = 2 + AND (("data" ? 'value') + OR ("data" ? 'data'))`), + async.apply(query, `DROP TABLE "objects" CASCADE`), + async.apply(query, `DROP FUNCTION "fun__objects__expireAt"() CASCADE`), + ], next); + }, + async.apply(query, ` +CREATE VIEW "legacy_object_live" AS +SELECT "_key", "type" + FROM "legacy_object" + WHERE "expireAt" IS NULL + OR "expireAt" > CURRENT_TIMESTAMP`), + ], function (err) { + query(err ? `ROLLBACK` : `COMMIT`, function (err1) { + callback(err1 || err); + }); + }); + }); +} + +postgresModule.initSessionStore = function (callback) { + var meta = require('../meta'); + var sessionStore; + + var ttl = meta.getSessionTTLSeconds(); + + if (nconf.get('redis')) { + sessionStore = require('connect-redis')(session); + var rdb = require('./redis'); + rdb.client = rdb.connect(); + + postgresModule.sessionStore = new sessionStore({ + client: rdb.client, + ttl: ttl, + }); + + return callback(); + } + + db.query(` +CREATE TABLE IF NOT EXISTS "session" ( + "sid" VARCHAR NOT NULL + COLLATE "default", + "sess" JSON NOT NULL, + "expire" TIMESTAMP(6) NOT NULL, + CONSTRAINT "session_pkey" + PRIMARY KEY ("sid") + NOT DEFERRABLE + INITIALLY IMMEDIATE +) WITH (OIDS=FALSE)`, function (err) { + if (err) { + return callback(err); + } + + sessionStore = require('connect-pg-simple')(session); + postgresModule.sessionStore = new sessionStore({ + pool: db, + ttl: ttl, + pruneSessionInterval: nconf.get('isPrimary') === 'true' ? 60 : false, + }); + + callback(); + }); +}; + +postgresModule.createIndices = function (callback) { + if (!postgresModule.pool) { + winston.warn('[database/createIndices] database not initialized'); + return callback(); + } + + var query = postgresModule.pool.query.bind(postgresModule.pool); + + winston.info('[database] Checking database indices.'); + async.series([ + async.apply(query, `CREATE INDEX IF NOT EXISTS "idx__legacy_zset__key__score" ON "legacy_zset"("_key" ASC, "score" DESC)`), + async.apply(query, `CREATE INDEX IF NOT EXISTS "idx__legacy_object__expireAt" ON "legacy_object"("expireAt" ASC)`), + ], function (err) { + if (err) { + winston.error('Error creating index ' + err.message); + return callback(err); + } + winston.info('[database] Checking database indices done!'); + callback(); + }); +}; + +postgresModule.checkCompatibility = function (callback) { + var postgresPkg = require('pg/package.json'); + postgresModule.checkCompatibilityVersion(postgresPkg.version, callback); +}; + +postgresModule.checkCompatibilityVersion = function (version, callback) { + if (semver.lt(version, '7.0.0')) { + return callback(new Error('The `pg` package is out-of-date, please run `./nodebb setup` again.')); + } + + callback(); +}; + +postgresModule.info = function (db, callback) { + if (!db) { + return callback(); + } + + db.query(` +SELECT true "postgres", + current_setting('server_version') "version", + EXTRACT(EPOCH FROM NOW() - pg_postmaster_start_time()) * 1000 "uptime"`, function (err, res) { + if (err) { + return callback(err); + } + callback(null, res.rows[0]); + }); +}; + +postgresModule.close = function (callback) { + callback = callback || function () {}; + db.end(callback); +}; + +postgresModule.socketAdapter = function () { + var postgresAdapter = require('socket.io-adapter-postgres'); + return postgresAdapter(postgresModule.getConnectionOptions(), { + pubClient: postgresModule.pool, + }); +}; diff --git a/src/database/postgres/hash.js b/src/database/postgres/hash.js new file mode 100644 index 0000000000..aeb794185b --- /dev/null +++ b/src/database/postgres/hash.js @@ -0,0 +1,391 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + module.setObject = function (key, data, callback) { + callback = callback || helpers.noop; + + if (!key || !data) { + return callback(); + } + + if (data.hasOwnProperty('')) { + delete data['']; + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'hash'), + async.apply(query, { + name: 'setObject', + text: ` +INSERT INTO "legacy_hash" ("_key", "data") +VALUES ($1::TEXT, $2::TEXT::JSONB) + ON CONFLICT ("_key") + DO UPDATE SET "data" = "legacy_hash"."data" || $2::TEXT::JSONB`, + values: [key, JSON.stringify(data)], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.setObjectField = function (key, field, value, callback) { + callback = callback || helpers.noop; + + if (!field) { + return callback(); + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'hash'), + async.apply(query, { + name: 'setObjectField', + text: ` +INSERT INTO "legacy_hash" ("_key", "data") +VALUES ($1::TEXT, jsonb_build_object($2::TEXT, $3::TEXT::JSONB)) + ON CONFLICT ("_key") + DO UPDATE SET "data" = jsonb_set("legacy_hash"."data", ARRAY[$2::TEXT], $3::TEXT::JSONB)`, + values: [key, field, JSON.stringify(value)], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.getObject = function (key, callback) { + if (!key) { + return callback(); + } + + db.query({ + name: 'getObject', + text: ` +SELECT h."data" + FROM "legacy_object_live" o + INNER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + WHERE o."_key" = $1::TEXT + LIMIT 1`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].data); + } + + callback(null, null); + }); + }; + + module.getObjects = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, []); + } + + db.query({ + name: 'getObjects', + text: ` +SELECT h."data" + FROM UNNEST($1::TEXT[]) WITH ORDINALITY k("_key", i) + LEFT OUTER JOIN "legacy_object_live" o + ON o."_key" = k."_key" + LEFT OUTER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + ORDER BY k.i ASC`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows.map(function (row) { + return row.data; + })); + }); + }; + + module.getObjectField = function (key, field, callback) { + if (!key) { + return callback(); + } + + db.query({ + name: 'getObjectField', + text: ` +SELECT h."data"->>$2::TEXT f + FROM "legacy_object_live" o + INNER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + WHERE o."_key" = $1::TEXT + LIMIT 1`, + values: [key, field], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].f); + } + + callback(null, null); + }); + }; + + module.getObjectFields = function (key, fields, callback) { + if (!key) { + return callback(); + } + + db.query({ + name: 'getObjectFields', + text: ` +SELECT (SELECT jsonb_object_agg(f, d."value") + FROM UNNEST($2::TEXT[]) f + LEFT OUTER JOIN jsonb_each(h."data") d + ON d."key" = f) d + FROM "legacy_object_live" o + INNER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + WHERE o."_key" = $1::TEXT`, + values: [key, fields], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].d); + } + + var obj = {}; + fields.forEach(function (f) { + obj[f] = null; + }); + + callback(null, obj); + }); + }; + + module.getObjectsFields = function (keys, fields, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, []); + } + + db.query({ + name: 'getObjectsFields', + text: ` +SELECT (SELECT jsonb_object_agg(f, d."value") + FROM UNNEST($2::TEXT[]) f + LEFT OUTER JOIN jsonb_each(h."data") d + ON d."key" = f) d + FROM UNNEST($1::text[]) WITH ORDINALITY k("_key", i) + LEFT OUTER JOIN "legacy_object_live" o + ON o."_key" = k."_key" + LEFT OUTER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + ORDER BY k.i ASC`, + values: [keys, fields], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows.map(function (row) { + return row.d; + })); + }); + }; + + module.getObjectKeys = function (key, callback) { + if (!key) { + return callback(); + } + + db.query({ + name: 'getObjectKeys', + text: ` +SELECT ARRAY(SELECT jsonb_object_keys(h."data")) k + FROM "legacy_object_live" o + INNER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + WHERE o."_key" = $1::TEXT + LIMIT 1`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].k); + } + + callback(null, []); + }); + }; + + module.getObjectValues = function (key, callback) { + module.getObject(key, function (err, data) { + if (err) { + return callback(err); + } + + var values = []; + + if (data) { + for (var key in data) { + if (data.hasOwnProperty(key)) { + values.push(data[key]); + } + } + } + + callback(null, values); + }); + }; + + module.isObjectField = function (key, field, callback) { + if (!key) { + return callback(); + } + + db.query({ + name: 'isObjectField', + text: ` +SELECT (h."data" ? $2::TEXT AND h."data"->>$2::TEXT IS NOT NULL) b + FROM "legacy_object_live" o + INNER JOIN "legacy_hash" h + ON o."_key" = h."_key" + AND o."type" = h."type" + WHERE o."_key" = $1::TEXT + LIMIT 1`, + values: [key, field], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].b); + } + + callback(null, false); + }); + }; + + module.isObjectFields = function (key, fields, callback) { + if (!key) { + return callback(); + } + + module.getObjectFields(key, fields, function (err, data) { + if (err) { + return callback(err); + } + + if (!data) { + return callback(null, fields.map(function () { + return false; + })); + } + + callback(null, fields.map(function (field) { + return data.hasOwnProperty(field) && data[field] !== null; + })); + }); + }; + + module.deleteObjectField = function (key, field, callback) { + module.deleteObjectFields(key, [field], callback); + }; + + module.deleteObjectFields = function (key, fields, callback) { + callback = callback || helpers.noop; + if (!key || !Array.isArray(fields) || !fields.length) { + return callback(); + } + + db.query({ + name: 'deleteObjectFields', + text: ` +UPDATE "legacy_hash" + SET "data" = COALESCE((SELECT jsonb_object_agg("key", "value") + FROM jsonb_each("data") + WHERE "key" <> ALL ($2::TEXT[])), '{}') + WHERE "_key" = $1::TEXT`, + values: [key, fields], + }, function (err) { + callback(err); + }); + }; + + module.incrObjectField = function (key, field, callback) { + module.incrObjectFieldBy(key, field, 1, callback); + }; + + module.decrObjectField = function (key, field, callback) { + module.incrObjectFieldBy(key, field, -1, callback); + }; + + module.incrObjectFieldBy = function (key, field, value, callback) { + callback = callback || helpers.noop; + value = parseInt(value, 10); + + if (!key || isNaN(value)) { + return callback(null, null); + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.waterfall([ + async.apply(Array.isArray(key) ? helpers.ensureLegacyObjectsType : helpers.ensureLegacyObjectType, tx.client, key, 'hash'), + async.apply(query, Array.isArray(key) ? { + name: 'incrObjectFieldByMulti', + text: ` +INSERT INTO "legacy_hash" ("_key", "data") +SELECT UNNEST($1::TEXT[]), jsonb_build_object($2::TEXT, $3::NUMERIC) + ON CONFLICT ("_key") + DO UPDATE SET "data" = jsonb_set("legacy_hash"."data", ARRAY[$2::TEXT], to_jsonb(COALESCE(("legacy_hash"."data"->>$2::TEXT)::NUMERIC, 0) + $3::NUMERIC)) +RETURNING ("data"->>$2::TEXT)::NUMERIC v`, + values: [key, field, value], + } : { + name: 'incrObjectFieldBy', + text: ` +INSERT INTO "legacy_hash" ("_key", "data") +VALUES ($1::TEXT, jsonb_build_object($2::TEXT, $3::NUMERIC)) + ON CONFLICT ("_key") + DO UPDATE SET "data" = jsonb_set("legacy_hash"."data", ARRAY[$2::TEXT], to_jsonb(COALESCE(("legacy_hash"."data"->>$2::TEXT)::NUMERIC, 0) + $3::NUMERIC)) +RETURNING ("data"->>$2::TEXT)::NUMERIC v`, + values: [key, field, value], + }), + function (res, next) { + next(null, Array.isArray(key) ? res.rows.map(function (r) { + return parseFloat(r.v); + }) : parseFloat(res.rows[0].v)); + }, + ], done); + }, callback); + }; +}; diff --git a/src/database/postgres/helpers.js b/src/database/postgres/helpers.js new file mode 100644 index 0000000000..1b9a4d2cf6 --- /dev/null +++ b/src/database/postgres/helpers.js @@ -0,0 +1,139 @@ +'use strict'; + +var helpers = {}; + +helpers.valueToString = function (value) { + if (value === null || value === undefined) { + return value; + } + + return value.toString(); +}; + +helpers.removeDuplicateValues = function (values) { + var others = Array.prototype.slice.call(arguments, 1); + for (var i = 0; i < values.length; i++) { + if (values.lastIndexOf(values[i]) !== i) { + values.splice(i, 1); + for (var j = 0; j < others.length; j++) { + others[j].splice(i, 1); + } + i -= 1; + } + } +}; + +helpers.ensureLegacyObjectType = function (db, key, type, callback) { + db.query({ + name: 'ensureLegacyObjectTypeBefore', + text: ` +DELETE FROM "legacy_object" + WHERE "expireAt" IS NOT NULL + AND "expireAt" <= CURRENT_TIMESTAMP`, + }, function (err) { + if (err) { + return callback(err); + } + + db.query({ + name: 'ensureLegacyObjectType1', + text: ` +INSERT INTO "legacy_object" ("_key", "type") +VALUES ($1::TEXT, $2::TEXT::LEGACY_OBJECT_TYPE) + ON CONFLICT + DO NOTHING`, + values: [key, type], + }, function (err) { + if (err) { + return callback(err); + } + + db.query({ + name: 'ensureLegacyObjectType2', + text: ` +SELECT "type" + FROM "legacy_object_live" + WHERE "_key" = $1::TEXT`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows[0].type !== type) { + return callback(new Error('database: cannot insert ' + JSON.stringify(key) + ' as ' + type + ' because it already exists as ' + res.rows[0].type)); + } + + callback(null); + }); + }); + }); +}; + +helpers.ensureLegacyObjectsType = function (db, keys, type, callback) { + db.query({ + name: 'ensureLegacyObjectTypeBefore', + text: ` +DELETE FROM "legacy_object" + WHERE "expireAt" IS NOT NULL + AND "expireAt" <= CURRENT_TIMESTAMP`, + }, function (err) { + if (err) { + return callback(err); + } + + db.query({ + name: 'ensureLegacyObjectsType1', + text: ` +INSERT INTO "legacy_object" ("_key", "type") +SELECT k, $2::TEXT::LEGACY_OBJECT_TYPE + FROM UNNEST($1::TEXT[]) k + ON CONFLICT + DO NOTHING`, + values: [keys, type], + }, function (err) { + if (err) { + return callback(err); + } + + db.query({ + name: 'ensureLegacyObjectsType2', + text: ` +SELECT "_key", "type" + FROM "legacy_object_live" + WHERE "_key" = ANY($1::TEXT[])`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + var invalid = res.rows.filter(function (r) { + return r.type !== type; + }); + + if (invalid.length) { + return callback(new Error('database: cannot insert multiple objects as ' + type + ' because they already exist: ' + invalid.map(function (r) { + return JSON.stringify(r._key) + ' is ' + r.type; + }).join(', '))); + } + + var missing = keys.filter(function (k) { + return !res.rows.some(function (r) { + return r._key === k; + }); + }); + + if (missing.length) { + return callback(new Error('database: failed to insert keys for objects: ' + JSON.stringify(missing))); + } + + callback(null); + }); + }); + }); +}; + +helpers.noop = function () {}; + +module.exports = helpers; diff --git a/src/database/postgres/list.js b/src/database/postgres/list.js new file mode 100644 index 0000000000..9d11e16dff --- /dev/null +++ b/src/database/postgres/list.js @@ -0,0 +1,234 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + module.listPrepend = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'list'), + async.apply(query, { + name: 'listPrepend', + text: ` +INSERT INTO "legacy_list" ("_key", "array") +VALUES ($1::TEXT, ARRAY[$2::TEXT]) + ON CONFLICT ("_key") + DO UPDATE SET "array" = ARRAY[$2::TEXT] || "legacy_list"."array"`, + values: [key, value], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.listAppend = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'list'), + async.apply(query, { + name: 'listAppend', + text: ` +INSERT INTO "legacy_list" ("_key", "array") +VALUES ($1::TEXT, ARRAY[$2::TEXT]) + ON CONFLICT ("_key") + DO UPDATE SET "array" = "legacy_list"."array" || ARRAY[$2::TEXT]`, + values: [key, value], + }), + ], function (err) { + done(err); + }); + }, callback || helpers.noop); + }; + + module.listRemoveLast = function (key, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + db.query({ + name: 'listRemoveLast', + text: ` +WITH A AS ( + SELECT l.* + FROM "legacy_object_live" o + INNER JOIN "legacy_list" l + ON o."_key" = l."_key" + AND o."type" = l."type" + WHERE o."_key" = $1::TEXT + FOR UPDATE) +UPDATE "legacy_list" l + SET "array" = A."array"[1 : array_length(A."array", 1) - 1] + FROM A + WHERE A."_key" = l."_key" +RETURNING A."array"[array_length(A."array", 1)] v`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].v); + } + + callback(null, null); + }); + }; + + module.listRemoveAll = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + db.query({ + name: 'listRemoveAll', + text: ` +UPDATE "legacy_list" l + SET "array" = array_remove(l."array", $2::TEXT) + FROM "legacy_object_live" o + WHERE o."_key" = l."_key" + AND o."type" = l."type" + AND o."_key" = $1::TEXT`, + values: [key, value], + }, function (err) { + callback(err); + }); + }; + + module.listTrim = function (key, start, stop, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + stop += 1; + + db.query(stop > 0 ? { + name: 'listTrim', + text: ` +UPDATE "legacy_list" l + SET "array" = ARRAY(SELECT m.m + FROM UNNEST(l."array") WITH ORDINALITY m(m, i) + ORDER BY m.i ASC + LIMIT ($3::INTEGER - $2::INTEGER) + OFFSET $2::INTEGER) + FROM "legacy_object_live" o + WHERE o."_key" = l."_key" + AND o."type" = l."type" + AND o."_key" = $1::TEXT`, + values: [key, start, stop], + } : { + name: 'listTrimBack', + text: ` +UPDATE "legacy_list" l + SET "array" = ARRAY(SELECT m.m + FROM UNNEST(l."array") WITH ORDINALITY m(m, i) + ORDER BY m.i ASC + LIMIT ($3::INTEGER - $2::INTEGER + array_length(l."array", 1)) + OFFSET $2::INTEGER) + FROM "legacy_object_live" o + WHERE o."_key" = l."_key" + AND o."type" = l."type" + AND o."_key" = $1::TEXT`, + values: [key, start, stop], + }, function (err) { + callback(err); + }); + }; + + module.getListRange = function (key, start, stop, callback) { + if (!key) { + return callback(); + } + + stop += 1; + + db.query(stop > 0 ? { + name: 'getListRange', + text: ` +SELECT ARRAY(SELECT m.m + FROM UNNEST(l."array") WITH ORDINALITY m(m, i) + ORDER BY m.i ASC + LIMIT ($3::INTEGER - $2::INTEGER) + OFFSET $2::INTEGER) l + FROM "legacy_object_live" o + INNER JOIN "legacy_list" l + ON o."_key" = l."_key" + AND o."type" = l."type" + WHERE o."_key" = $1::TEXT`, + values: [key, start, stop], + } : { + name: 'getListRangeBack', + text: ` +SELECT ARRAY(SELECT m.m + FROM UNNEST(l."array") WITH ORDINALITY m(m, i) + ORDER BY m.i ASC + LIMIT ($3::INTEGER - $2::INTEGER + array_length(l."array", 1)) + OFFSET $2::INTEGER) l + FROM "legacy_object_live" o + INNER JOIN "legacy_list" l + ON o."_key" = l."_key" + AND o."type" = l."type" + WHERE o."_key" = $1::TEXT`, + values: [key, start, stop], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].l); + } + + callback(null, []); + }); + }; + + module.listLength = function (key, callback) { + db.query({ + name: 'listLength', + text: ` +SELECT array_length(l."array", 1) l + FROM "legacy_object_live" o + INNER JOIN "legacy_list" l + ON o."_key" = l."_key" + AND o."type" = l."type" + WHERE o."_key" = $1::TEXT`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].l); + } + + callback(null, 0); + }); + }; +}; diff --git a/src/database/postgres/main.js b/src/database/postgres/main.js new file mode 100644 index 0000000000..2dcc6ecd5e --- /dev/null +++ b/src/database/postgres/main.js @@ -0,0 +1,239 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + var query = db.query.bind(db); + + module.flushdb = function (callback) { + callback = callback || helpers.noop; + + async.series([ + async.apply(query, `DROP SCHEMA "public" CASCADE`), + async.apply(query, `CREATE SCHEMA "public"`), + ], function (err) { + callback(err); + }); + }; + + module.emptydb = function (callback) { + callback = callback || helpers.noop; + query(`DELETE FROM "legacy_object"`, function (err) { + callback(err); + }); + }; + + module.exists = function (key, callback) { + if (!key) { + return callback(); + } + + query({ + name: 'exists', + text: ` +SELECT EXISTS(SELECT * + FROM "legacy_object_live" + WHERE "_key" = $1::TEXT + LIMIT 1) e`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows[0].e); + }); + }; + + module.delete = function (key, callback) { + callback = callback || helpers.noop; + if (!key) { + return callback(); + } + + query({ + name: 'delete', + text: ` +DELETE FROM "legacy_object" + WHERE "_key" = $1::TEXT`, + values: [key], + }, function (err) { + callback(err); + }); + }; + + module.deleteAll = function (keys, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + query({ + name: 'deleteAll', + text: ` +DELETE FROM "legacy_object" + WHERE "_key" = ANY($1::TEXT[])`, + values: [keys], + }, function (err) { + callback(err); + }); + }; + + module.get = function (key, callback) { + if (!key) { + return callback(); + } + + query({ + name: 'get', + text: ` +SELECT s."data" t + FROM "legacy_object_live" o + INNER JOIN "legacy_string" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT + LIMIT 1`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].t); + } + + callback(null, null); + }); + }; + + module.set = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + module.transaction(function (tx, done) { + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'string'), + async.apply(tx.client.query.bind(tx.client), { + name: 'set', + text: ` +INSERT INTO "legacy_string" ("_key", "data") +VALUES ($1::TEXT, $2::TEXT) + ON CONFLICT ("_key") + DO UPDATE SET "data" = $2::TEXT`, + values: [key, value], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.increment = function (key, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + module.transaction(function (tx, done) { + async.waterfall([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'string'), + async.apply(tx.client.query.bind(tx.client), { + name: 'increment', + text: ` +INSERT INTO "legacy_string" ("_key", "data") +VALUES ($1::TEXT, '1') + ON CONFLICT ("_key") + DO UPDATE SET "data" = ("legacy_string"."data"::NUMERIC + 1)::TEXT +RETURNING "data" d`, + values: [key], + }), + ], function (err, res) { + if (err) { + return done(err); + } + + done(null, parseFloat(res.rows[0].d)); + }); + }, callback); + }; + + module.rename = function (oldKey, newKey, callback) { + module.transaction(function (tx, done) { + async.series([ + async.apply(tx.delete, newKey), + async.apply(tx.client.query.bind(tx.client), { + name: 'rename', + text: ` +UPDATE "legacy_object" + SET "_key" = $2::TEXT + WHERE "_key" = $1::TEXT`, + values: [oldKey, newKey], + }), + ], function (err) { + done(err); + }); + }, callback || helpers.noop); + }; + + module.type = function (key, callback) { + query({ + name: 'type', + text: ` +SELECT "type"::TEXT t + FROM "legacy_object_live" + WHERE "_key" = $1::TEXT + LIMIT 1`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].t); + } + + callback(null, null); + }); + }; + + function doExpire(key, date, callback) { + query({ + name: 'expire', + text: ` +UPDATE "legacy_object" + SET "expireAt" = $2::TIMESTAMPTZ + WHERE "_key" = $1::TEXT`, + values: [key, date], + }, function (err) { + if (callback) { + callback(err); + } + }); + } + + module.expire = function (key, seconds, callback) { + doExpire(key, new Date(((Date.now() / 1000) + seconds) * 1000), callback); + }; + + module.expireAt = function (key, timestamp, callback) { + doExpire(key, new Date(timestamp * 1000), callback); + }; + + module.pexpire = function (key, ms, callback) { + doExpire(key, new Date(Date.now() + parseInt(ms, 10)), callback); + }; + + module.pexpireAt = function (key, timestamp, callback) { + doExpire(key, new Date(timestamp), callback); + }; +}; diff --git a/src/database/postgres/pubsub.js b/src/database/postgres/pubsub.js new file mode 100644 index 0000000000..969ef62cb8 --- /dev/null +++ b/src/database/postgres/pubsub.js @@ -0,0 +1,51 @@ +'use strict'; + +var util = require('util'); +var winston = require('winston'); +var EventEmitter = require('events').EventEmitter; +var pg = require('pg'); +var db = require('../postgres'); + +var PubSub = function () { + var self = this; + + var subClient = new pg.Client(db.getConnectionOptions()); + + subClient.connect(function (err) { + if (err) { + winston.error(err); + return; + } + + subClient.query('LISTEN pubsub', function (err) { + if (err) { + winston.error(err); + } + }); + + subClient.on('notification', function (message) { + if (message.channel !== 'pubsub') { + return; + } + + try { + var msg = JSON.parse(message.payload); + self.emit(msg.event, msg.data); + } catch (err) { + winston.error(err.stack); + } + }); + }); +}; + +util.inherits(PubSub, EventEmitter); + +PubSub.prototype.publish = function (event, data) { + db.pool.query({ + name: 'pubSubPublish', + text: `SELECT pg_notify('pubsub', $1::TEXT)`, + values: [JSON.stringify({ event: event, data: data })], + }); +}; + +module.exports = new PubSub(); diff --git a/src/database/postgres/sets.js b/src/database/postgres/sets.js new file mode 100644 index 0000000000..615cb6ad42 --- /dev/null +++ b/src/database/postgres/sets.js @@ -0,0 +1,342 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + module.setAdd = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(value)) { + value = [value]; + } + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'set'), + async.apply(query, { + name: 'setAdd', + text: ` +INSERT INTO "legacy_set" ("_key", "member") +SELECT $1::TEXT, m + FROM UNNEST($2::TEXT[]) m + ON CONFLICT ("_key", "member") + DO NOTHING`, + values: [key, value], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.setsAdd = function (keys, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + if (!Array.isArray(value)) { + value = [value]; + } + + keys = keys.filter(function (k, i, a) { + return a.indexOf(k) === i; + }); + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectsType, tx.client, keys, 'set'), + async.apply(query, { + name: 'setsAdd', + text: ` +INSERT INTO "legacy_set" ("_key", "member") +SELECT k, m + FROM UNNEST($1::TEXT[]) k + CROSS JOIN UNNEST($2::TEXT[]) m + ON CONFLICT ("_key", "member") + DO NOTHING`, + values: [keys, value], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + module.setRemove = function (key, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(key)) { + key = [key]; + } + + if (!Array.isArray(value)) { + value = [value]; + } + + db.query({ + name: 'setRemove', + text: ` +DELETE FROM "legacy_set" + WHERE "_key" = ANY($1::TEXT[]) + AND "member" = ANY($2::TEXT[])`, + values: [key, value], + }, function (err) { + callback(err); + }); + }; + + module.setsRemove = function (keys, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + db.query({ + name: 'setsRemove', + text: ` +DELETE FROM "legacy_set" + WHERE "_key" = ANY($1::TEXT[]) + AND "member" = $2::TEXT`, + values: [keys, value], + }, function (err) { + callback(err); + }); + }; + + module.isSetMember = function (key, value, callback) { + if (!key) { + return callback(null, false); + } + + db.query({ + name: 'isSetMember', + text: ` +SELECT 1 + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT + AND s."member" = $2::TEXT`, + values: [key, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, !!res.rows.length); + }); + }; + + module.isSetMembers = function (key, values, callback) { + if (!key || !Array.isArray(values) || !values.length) { + return callback(null, []); + } + + values = values.map(helpers.valueToString); + + db.query({ + name: 'isSetMembers', + text: ` +SELECT s."member" m + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT + AND s."member" = ANY($2::TEXT[])`, + values: [key, values], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, values.map(function (v) { + return res.rows.some(function (r) { + return r.m === v; + }); + })); + }); + }; + + module.isMemberOfSets = function (sets, value, callback) { + if (!Array.isArray(sets) || !sets.length) { + return callback(null, []); + } + + value = helpers.valueToString(value); + + db.query({ + name: 'isMemberOfSets', + text: ` +SELECT o."_key" k + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = ANY($1::TEXT[]) + AND s."member" = $2::TEXT`, + values: [sets, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, sets.map(function (s) { + return res.rows.some(function (r) { + return r.k === s; + }); + })); + }); + }; + + module.getSetMembers = function (key, callback) { + if (!key) { + return callback(null, []); + } + + db.query({ + name: 'getSetMembers', + text: ` +SELECT s."member" m + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows.map(function (r) { + return r.m; + })); + }); + }; + + module.getSetsMembers = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, []); + } + + db.query({ + name: 'getSetsMembers', + text: ` +SELECT o."_key" k, + array_agg(s."member") m + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = ANY($1::TEXT[]) + GROUP BY o."_key"`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + return (res.rows.find(function (r) { + return r.k === k; + }) || { m: [] }).m; + })); + }); + }; + + module.setCount = function (key, callback) { + if (!key) { + return callback(null, 0); + } + + db.query({ + name: 'setCount', + text: ` +SELECT COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + module.setsCount = function (keys, callback) { + db.query({ + name: 'setsCount', + text: ` +SELECT o."_key" k, + COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = ANY($1::TEXT[]) + GROUP BY o."_key"`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + return (res.rows.find(function (r) { + return r.k === k; + }) || { c: 0 }).c; + })); + }); + }; + + module.setRemoveRandom = function (key, callback) { + callback = callback || helpers.noop; + + db.query({ + name: 'setRemoveRandom', + text: ` +WITH A AS ( + SELECT s."member" + FROM "legacy_object_live" o + INNER JOIN "legacy_set" s + ON o."_key" = s."_key" + AND o."type" = s."type" + WHERE o."_key" = $1::TEXT + ORDER BY RANDOM() + LIMIT 1 + FOR UPDATE) +DELETE FROM "legacy_set" s + USING A + WHERE s."_key" = $1::TEXT + AND s."member" = A."member" +RETURNING A."member" m`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, res.rows[0].m); + } + + callback(null, null); + }); + }; +}; diff --git a/src/database/postgres/sorted.js b/src/database/postgres/sorted.js new file mode 100644 index 0000000000..7f136ac653 --- /dev/null +++ b/src/database/postgres/sorted.js @@ -0,0 +1,744 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + var query = db.query.bind(db); + + require('./sorted/add')(db, module); + require('./sorted/remove')(db, module); + require('./sorted/union')(db, module); + require('./sorted/intersect')(db, module); + + module.getSortedSetRange = function (key, start, stop, callback) { + getSortedSetRange(key, start, stop, 1, false, callback); + }; + + module.getSortedSetRevRange = function (key, start, stop, callback) { + getSortedSetRange(key, start, stop, -1, false, callback); + }; + + module.getSortedSetRangeWithScores = function (key, start, stop, callback) { + getSortedSetRange(key, start, stop, 1, true, callback); + }; + + module.getSortedSetRevRangeWithScores = function (key, start, stop, callback) { + getSortedSetRange(key, start, stop, -1, true, callback); + }; + + function getSortedSetRange(key, start, stop, sort, withScores, callback) { + if (!key) { + return callback(); + } + + if (!Array.isArray(key)) { + key = [key]; + } + + if (start < 0 && start > stop) { + return callback(null, []); + } + + var reverse = false; + if (start === 0 && stop < -1) { + reverse = true; + sort *= -1; + start = Math.abs(stop + 1); + stop = -1; + } else if (start < 0 && stop > start) { + var tmp1 = Math.abs(stop + 1); + stop = Math.abs(start + 1); + start = tmp1; + } + + var limit = stop - start + 1; + if (limit <= 0) { + limit = null; + } + + query({ + name: 'getSortedSetRangeWithScores' + (sort > 0 ? 'Asc' : 'Desc'), + text: ` +SELECT z."value", + z."score" + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + ORDER BY z."score" ` + (sort > 0 ? 'ASC' : 'DESC') + ` + LIMIT $3::INTEGER +OFFSET $2::INTEGER`, + values: [key, start, limit], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (reverse) { + res.rows.reverse(); + } + + if (withScores) { + res.rows = res.rows.map(function (r) { + return { + value: r.value, + score: parseFloat(r.score), + }; + }); + } else { + res.rows = res.rows.map(function (r) { + return r.value; + }); + } + + callback(null, res.rows); + }); + } + + module.getSortedSetRangeByScore = function (key, start, count, min, max, callback) { + getSortedSetRangeByScore(key, start, count, min, max, 1, false, callback); + }; + + module.getSortedSetRevRangeByScore = function (key, start, count, max, min, callback) { + getSortedSetRangeByScore(key, start, count, min, max, -1, false, callback); + }; + + module.getSortedSetRangeByScoreWithScores = function (key, start, count, min, max, callback) { + getSortedSetRangeByScore(key, start, count, min, max, 1, true, callback); + }; + + module.getSortedSetRevRangeByScoreWithScores = function (key, start, count, max, min, callback) { + getSortedSetRangeByScore(key, start, count, min, max, -1, true, callback); + }; + + function getSortedSetRangeByScore(key, start, count, min, max, sort, withScores, callback) { + if (!key) { + return callback(); + } + + if (!Array.isArray(key)) { + key = [key]; + } + + if (parseInt(count, 10) === -1) { + count = null; + } + + if (min === '-inf') { + min = null; + } + if (max === '+inf') { + max = null; + } + + query({ + name: 'getSortedSetRangeByScoreWithScores' + (sort > 0 ? 'Asc' : 'Desc'), + text: ` +SELECT z."value", + z."score" + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + AND (z."score" >= $4::NUMERIC OR $4::NUMERIC IS NULL) + AND (z."score" <= $5::NUMERIC OR $5::NUMERIC IS NULL) + ORDER BY z."score" ` + (sort > 0 ? 'ASC' : 'DESC') + ` + LIMIT $3::INTEGER +OFFSET $2::INTEGER`, + values: [key, start, count, min, max], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (withScores) { + res.rows = res.rows.map(function (r) { + return { + value: r.value, + score: parseFloat(r.score), + }; + }); + } else { + res.rows = res.rows.map(function (r) { + return r.value; + }); + } + + return callback(null, res.rows); + }); + } + + module.sortedSetCount = function (key, min, max, callback) { + if (!key) { + return callback(); + } + + if (min === '-inf') { + min = null; + } + if (max === '+inf') { + max = null; + } + + query({ + name: 'sortedSetCount', + text: ` +SELECT COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + AND (z."score" >= $2::NUMERIC OR $2::NUMERIC IS NULL) + AND (z."score" <= $3::NUMERIC OR $3::NUMERIC IS NULL)`, + values: [key, min, max], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + module.sortedSetCard = function (key, callback) { + if (!key) { + return callback(null, 0); + } + + query({ + name: 'sortedSetCard', + text: ` +SELECT COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT`, + values: [key], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + module.sortedSetsCard = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + query({ + name: 'sortedSetsCard', + text: ` +SELECT o."_key" k, + COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + GROUP BY o."_key"`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + return parseInt((res.rows.find(function (r) { + return r.k === k; + }) || { c: 0 }).c, 10); + })); + }); + }; + + module.sortedSetRank = function (key, value, callback) { + getSortedSetRank('ASC', [key], [value], function (err, result) { + callback(err, result ? result[0] : null); + }); + }; + + module.sortedSetRevRank = function (key, value, callback) { + getSortedSetRank('DESC', [key], [value], function (err, result) { + callback(err, result ? result[0] : null); + }); + }; + + function getSortedSetRank(sort, keys, values, callback) { + values = values.map(helpers.valueToString); + query({ + name: 'getSortedSetRank' + sort, + text: ` +SELECT (SELECT r + FROM (SELECT z."value" v, + RANK() OVER (PARTITION BY o."_key" + ORDER BY z."score" ` + sort + `, + z."value" ` + sort + `) - 1 r + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = kvi.k) r + WHERE v = kvi.v) r + FROM UNNEST($1::TEXT[], $2::TEXT[]) WITH ORDINALITY kvi(k, v, i) + ORDER BY kvi.i ASC`, + values: [keys, values], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows.map(function (r) { return r.r === null ? null : parseFloat(r.r); })); + }); + } + + module.sortedSetsRanks = function (keys, values, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, []); + } + + getSortedSetRank('ASC', keys, values, callback); + }; + + module.sortedSetRanks = function (key, values, callback) { + if (!Array.isArray(values) || !values.length) { + return callback(null, []); + } + + getSortedSetRank('ASC', new Array(values.length).fill(key), values, callback); + }; + + module.sortedSetScore = function (key, value, callback) { + if (!key) { + return callback(null, null); + } + + value = helpers.valueToString(value); + + query({ + name: 'sortedSetScore', + text: ` +SELECT z."score" s + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + AND z."value" = $2::TEXT`, + values: [key, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (res.rows.length) { + return callback(null, parseFloat(res.rows[0].s)); + } + + callback(null, null); + }); + }; + + module.sortedSetsScore = function (keys, value, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + value = helpers.valueToString(value); + + query({ + name: 'sortedSetsScore', + text: ` +SELECT o."_key" k, + z."score" s + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + AND z."value" = $2::TEXT`, + values: [keys, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + var s = res.rows.find(function (r) { + return r.k === k; + }); + + return s ? parseFloat(s.s) : null; + })); + }); + }; + + module.sortedSetScores = function (key, values, callback) { + if (!key) { + return callback(null, null); + } + + values = values.map(helpers.valueToString); + + query({ + name: 'sortedSetScores', + text: ` +SELECT z."value" v, + z."score" s + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + AND z."value" = ANY($2::TEXT[])`, + values: [key, values], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, values.map(function (v) { + var s = res.rows.find(function (r) { + return r.v === v; + }); + + return s ? parseFloat(s.s) : null; + })); + }); + }; + + module.isSortedSetMember = function (key, value, callback) { + if (!key) { + return callback(); + } + + value = helpers.valueToString(value); + + query({ + name: 'isSortedSetMember', + text: ` +SELECT 1 + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + AND z."value" = $2::TEXT`, + values: [key, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, !!res.rows.length); + }); + }; + + module.isSortedSetMembers = function (key, values, callback) { + if (!key) { + return callback(); + } + + values = values.map(helpers.valueToString); + + query({ + name: 'isSortedSetMembers', + text: ` +SELECT z."value" v + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + AND z."value" = ANY($2::TEXT[])`, + values: [key, values], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, values.map(function (v) { + return res.rows.some(function (r) { + return r.v === v; + }); + })); + }); + }; + + module.isMemberOfSortedSets = function (keys, value, callback) { + if (!Array.isArray(keys)) { + return callback(); + } + + value = helpers.valueToString(value); + + query({ + name: 'isMemberOfSortedSets', + text: ` +SELECT o."_key" k + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + AND z."value" = $2::TEXT`, + values: [keys, value], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + return res.rows.some(function (r) { + return r.k === k; + }); + })); + }); + }; + + module.getSortedSetsMembers = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, []); + } + + query({ + name: 'getSortedSetsMembers', + text: ` +SELECT o."_key" k, + array_agg(z."value" ORDER BY z."score" ASC) m + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + GROUP BY o."_key"`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, keys.map(function (k) { + return (res.rows.find(function (r) { + return r.k === k; + }) || { m: [] }).m; + })); + }); + }; + + module.sortedSetIncrBy = function (key, increment, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + value = helpers.valueToString(value); + increment = parseFloat(increment); + + module.transaction(function (tx, done) { + async.waterfall([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'zset'), + async.apply(tx.client.query.bind(tx.client), { + name: 'sortedSetIncrBy', + text: ` +INSERT INTO "legacy_zset" ("_key", "value", "score") +VALUES ($1::TEXT, $2::TEXT, $3::NUMERIC) + ON CONFLICT ("_key", "value") + DO UPDATE SET "score" = "legacy_zset"."score" + $3::NUMERIC +RETURNING "score" s`, + values: [key, value, increment], + }), + function (res, next) { + next(null, parseFloat(res.rows[0].s)); + }, + ], done); + }, callback); + }; + + module.getSortedSetRangeByLex = function (key, min, max, start, count, callback) { + sortedSetLex(key, min, max, 1, start, count, callback); + }; + + module.getSortedSetRevRangeByLex = function (key, max, min, start, count, callback) { + sortedSetLex(key, min, max, -1, start, count, callback); + }; + + module.sortedSetLexCount = function (key, min, max, callback) { + var q = buildLexQuery(key, min, max); + + query({ + name: 'sortedSetLexCount' + q.suffix, + text: ` +SELECT COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE ` + q.where, + values: q.values, + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + function sortedSetLex(key, min, max, sort, start, count, callback) { + if (!callback) { + callback = start; + start = 0; + count = 0; + } + + var q = buildLexQuery(key, min, max); + q.values.push(start); + q.values.push(count <= 0 ? null : count); + query({ + name: 'sortedSetLex' + (sort > 0 ? 'Asc' : 'Desc') + q.suffix, + text: ` +SELECT z."value" v + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE ` + q.where + ` + ORDER BY z."value" ` + (sort > 0 ? 'ASC' : 'DESC') + ` + LIMIT $` + q.values.length + `::INTEGER +OFFSET $` + (q.values.length - 1) + `::INTEGER`, + values: q.values, + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, res.rows.map(function (r) { + return r.v; + })); + }); + } + + module.sortedSetRemoveRangeByLex = function (key, min, max, callback) { + callback = callback || helpers.noop; + + var q = buildLexQuery(key, min, max); + query({ + name: 'sortedSetRemoveRangeByLex' + q.suffix, + text: ` +DELETE FROM "legacy_zset" z + USING "legacy_object_live" o + WHERE o."_key" = z."_key" + AND o."type" = z."type" + AND ` + q.where, + values: q.values, + }, function (err) { + callback(err); + }); + }; + + function buildLexQuery(key, min, max) { + var q = { + suffix: '', + where: `o."_key" = $1::TEXT`, + values: [key], + }; + + if (min !== '-') { + if (min.match(/^\(/)) { + q.values.push(min.substr(1)); + q.suffix += 'GT'; + q.where += ` AND z."value" > $` + q.values.length + `::TEXT`; + } else if (min.match(/^\[/)) { + q.values.push(min.substr(1)); + q.suffix += 'GE'; + q.where += ` AND z."value" >= $` + q.values.length + `::TEXT`; + } else { + q.values.push(min); + q.suffix += 'GE'; + q.where += ` AND z."value" >= $` + q.values.length + `::TEXT`; + } + } + + if (max !== '+') { + if (max.match(/^\(/)) { + q.values.push(max.substr(1)); + q.suffix += 'LT'; + q.where += ` AND z."value" < $` + q.values.length + `::TEXT`; + } else if (max.match(/^\[/)) { + q.values.push(max.substr(1)); + q.suffix += 'LE'; + q.where += ` AND z."value" <= $` + q.values.length + `::TEXT`; + } else { + q.values.push(max); + q.suffix += 'LE'; + q.where += ` AND z."value" <= $` + q.values.length + `::TEXT`; + } + } + + return q; + } + + module.processSortedSet = function (setKey, process, options, callback) { + var Cursor = require('pg-cursor'); + + db.connect(function (err, client, done) { + if (err) { + return callback(err); + } + + var batchSize = (options || {}).batch || 100; + var query = client.query(new Cursor(` +SELECT z."value" v + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = $1::TEXT + ORDER BY z."score" ASC, z."value" ASC`, [setKey])); + + async.doUntil(function (next) { + query.read(batchSize, function (err, rows) { + if (err) { + return next(err); + } + + if (!rows.length) { + return next(null, true); + } + + rows = rows.map(function (row) { + return row.v; + }); + + process(rows, function (err) { + if (err) { + return query.close(function () { + next(err); + }); + } + + if (options.interval) { + setTimeout(next, options.interval); + } else { + next(); + } + }); + }); + }, function (stop) { + return stop; + }, function (err) { + done(); + callback(err); + }); + }); + }; +}; diff --git a/src/database/postgres/sorted/add.js b/src/database/postgres/sorted/add.js new file mode 100644 index 0000000000..a187091746 --- /dev/null +++ b/src/database/postgres/sorted/add.js @@ -0,0 +1,108 @@ +'use strict'; + +var async = require('async'); + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + module.sortedSetAdd = function (key, score, value, callback) { + callback = callback || helpers.noop; + + if (!key) { + return callback(); + } + + if (Array.isArray(score) && Array.isArray(value)) { + return sortedSetAddBulk(key, score, value, callback); + } + + value = helpers.valueToString(value); + score = parseFloat(score); + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'zset'), + async.apply(query, { + name: 'sortedSetAdd', + text: ` +INSERT INTO "legacy_zset" ("_key", "value", "score") +VALUES ($1::TEXT, $2::TEXT, $3::NUMERIC) + ON CONFLICT ("_key", "value") + DO UPDATE SET "score" = $3::NUMERIC`, + values: [key, value, score], + }), + ], function (err) { + done(err); + }); + }, callback); + }; + + function sortedSetAddBulk(key, scores, values, callback) { + if (!scores.length || !values.length) { + return callback(); + } + if (scores.length !== values.length) { + return callback(new Error('[[error:invalid-data]]')); + } + + values = values.map(helpers.valueToString); + scores = scores.map(function (score) { + return parseFloat(score); + }); + + helpers.removeDuplicateValues(values, scores); + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectType, tx.client, key, 'zset'), + async.apply(query, { + name: 'sortedSetAddBulk', + text: ` +INSERT INTO "legacy_zset" ("_key", "value", "score") +SELECT $1::TEXT, v, s + FROM UNNEST($2::TEXT[], $3::NUMERIC[]) vs(v, s) + ON CONFLICT ("_key", "value") + DO UPDATE SET "score" = EXCLUDED."score"`, + values: [key, values, scores], + }), + ], function (err) { + done(err); + }); + }, callback); + } + + module.sortedSetsAdd = function (keys, score, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + value = helpers.valueToString(value); + score = parseFloat(score); + + module.transaction(function (tx, done) { + var query = tx.client.query.bind(tx.client); + + async.series([ + async.apply(helpers.ensureLegacyObjectsType, tx.client, keys, 'zset'), + async.apply(query, { + name: 'sortedSetsAdd', + text: ` +INSERT INTO "legacy_zset" ("_key", "value", "score") +SELECT k, $2::TEXT, $3::NUMERIC + FROM UNNEST($1::TEXT[]) k + ON CONFLICT ("_key", "value") + DO UPDATE SET "score" = $3::NUMERIC`, + values: [keys, value, score], + }), + ], function (err) { + done(err); + }); + }, callback); + }; +}; diff --git a/src/database/postgres/sorted/intersect.js b/src/database/postgres/sorted/intersect.js new file mode 100644 index 0000000000..47fe5b9b16 --- /dev/null +++ b/src/database/postgres/sorted/intersect.js @@ -0,0 +1,105 @@ +'use strict'; + +module.exports = function (db, module) { + module.sortedSetIntersectCard = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, 0); + } + + db.query({ + name: 'sortedSetIntersectCard', + text: ` +WITH A AS (SELECT z."value" v, + COUNT(*) c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[]) + GROUP BY z."value") +SELECT COUNT(*) c + FROM A + WHERE A.c = array_length($1::TEXT[], 1)`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + + module.getSortedSetIntersect = function (params, callback) { + params.sort = 1; + getSortedSetIntersect(params, callback); + }; + + module.getSortedSetRevIntersect = function (params, callback) { + params.sort = -1; + getSortedSetIntersect(params, callback); + }; + + function getSortedSetIntersect(params, callback) { + var sets = params.sets; + var start = params.hasOwnProperty('start') ? params.start : 0; + var stop = params.hasOwnProperty('stop') ? params.stop : -1; + var weights = params.weights || []; + var aggregate = params.aggregate || 'SUM'; + + if (sets.length < weights.length) { + weights = weights.slice(0, sets.length); + } + while (sets.length > weights.length) { + weights.push(1); + } + + var limit = stop - start + 1; + if (limit <= 0) { + limit = null; + } + + db.query({ + name: 'getSortedSetIntersect' + aggregate + (params.sort > 0 ? 'Asc' : 'Desc') + 'WithScores', + text: ` +WITH A AS (SELECT z."value", + ` + aggregate + `(z."score" * k."weight") "score", + COUNT(*) c + FROM UNNEST($1::TEXT[], $2::NUMERIC[]) k("_key", "weight") + INNER JOIN "legacy_object_live" o + ON o."_key" = k."_key" + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + GROUP BY z."value") +SELECT A."value", + A."score" + FROM A + WHERE c = array_length($1::TEXT[], 1) + ORDER BY A."score" ` + (params.sort > 0 ? 'ASC' : 'DESC') + ` + LIMIT $4::INTEGER +OFFSET $3::INTEGER`, + values: [sets, weights, start, limit], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (params.withScores) { + res.rows = res.rows.map(function (r) { + return { + value: r.value, + score: parseFloat(r.score), + }; + }); + } else { + res.rows = res.rows.map(function (r) { + return r.value; + }); + } + + callback(null, res.rows); + }); + } +}; diff --git a/src/database/postgres/sorted/remove.js b/src/database/postgres/sorted/remove.js new file mode 100644 index 0000000000..6118b22981 --- /dev/null +++ b/src/database/postgres/sorted/remove.js @@ -0,0 +1,83 @@ +'use strict'; + +module.exports = function (db, module) { + var helpers = module.helpers.postgres; + + module.sortedSetRemove = function (key, value, callback) { + function done(err) { + if (callback) { + callback(err); + } + } + + if (!key) { + return done(); + } + + if (!Array.isArray(key)) { + key = [key]; + } + + if (!Array.isArray(value)) { + value = [value]; + } + value = value.map(helpers.valueToString); + + db.query({ + name: 'sortedSetRemove', + text: ` +DELETE FROM "legacy_zset" + WHERE "_key" = ANY($1::TEXT[]) + AND "value" = ANY($2::TEXT[])`, + values: [key, value], + }, done); + }; + + module.sortedSetsRemove = function (keys, value, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + value = helpers.valueToString(value); + + db.query({ + name: 'sortedSetsRemove', + text: ` +DELETE FROM "legacy_zset" + WHERE "_key" = ANY($1::TEXT[]) + AND "value" = $2::TEXT`, + values: [keys, value], + }, function (err) { + callback(err); + }); + }; + + module.sortedSetsRemoveRangeByScore = function (keys, min, max, callback) { + callback = callback || helpers.noop; + + if (!Array.isArray(keys) || !keys.length) { + return callback(); + } + + if (min === '-inf') { + min = null; + } + if (max === '+inf') { + max = null; + } + + db.query({ + name: 'sortedSetsRemoveRangeByScore', + text: ` +DELETE FROM "legacy_zset" + WHERE "_key" = ANY($1::TEXT[]) + AND ("score" >= $2::NUMERIC OR $2::NUMERIC IS NULL) + AND ("score" <= $3::NUMERIC OR $3::NUMERIC IS NULL)`, + values: [keys, min, max], + }, function (err) { + callback(err); + }); + }; +}; diff --git a/src/database/postgres/sorted/union.js b/src/database/postgres/sorted/union.js new file mode 100644 index 0000000000..2f991dc761 --- /dev/null +++ b/src/database/postgres/sorted/union.js @@ -0,0 +1,97 @@ +'use strict'; + +module.exports = function (db, module) { + module.sortedSetUnionCard = function (keys, callback) { + if (!Array.isArray(keys) || !keys.length) { + return callback(null, 0); + } + + db.query({ + name: 'sortedSetUnionCard', + text: ` +SELECT COUNT(DISTINCT z."value") c + FROM "legacy_object_live" o + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + WHERE o."_key" = ANY($1::TEXT[])`, + values: [keys], + }, function (err, res) { + if (err) { + return callback(err); + } + + callback(null, parseInt(res.rows[0].c, 10)); + }); + }; + + module.getSortedSetUnion = function (params, callback) { + params.sort = 1; + getSortedSetUnion(params, callback); + }; + + module.getSortedSetRevUnion = function (params, callback) { + params.sort = -1; + getSortedSetUnion(params, callback); + }; + + function getSortedSetUnion(params, callback) { + var sets = params.sets; + var start = params.hasOwnProperty('start') ? params.start : 0; + var stop = params.hasOwnProperty('stop') ? params.stop : -1; + var weights = params.weights || []; + var aggregate = params.aggregate || 'SUM'; + + if (sets.length < weights.length) { + weights = weights.slice(0, sets.length); + } + while (sets.length > weights.length) { + weights.push(1); + } + + var limit = stop - start + 1; + if (limit <= 0) { + limit = null; + } + + db.query({ + name: 'getSortedSetUnion' + aggregate + (params.sort > 0 ? 'Asc' : 'Desc') + 'WithScores', + text: ` +WITH A AS (SELECT z."value", + ` + aggregate + `(z."score" * k."weight") "score" + FROM UNNEST($1::TEXT[], $2::NUMERIC[]) k("_key", "weight") + INNER JOIN "legacy_object_live" o + ON o."_key" = k."_key" + INNER JOIN "legacy_zset" z + ON o."_key" = z."_key" + AND o."type" = z."type" + GROUP BY z."value") +SELECT A."value", + A."score" + FROM A + ORDER BY A."score" ` + (params.sort > 0 ? 'ASC' : 'DESC') + ` + LIMIT $4::INTEGER +OFFSET $3::INTEGER`, + values: [sets, weights, start, limit], + }, function (err, res) { + if (err) { + return callback(err); + } + + if (params.withScores) { + res.rows = res.rows.map(function (r) { + return { + value: r.value, + score: parseFloat(r.score), + }; + }); + } else { + res.rows = res.rows.map(function (r) { + return r.value; + }); + } + + callback(null, res.rows); + }); + } +}; diff --git a/src/database/postgres/transaction.js b/src/database/postgres/transaction.js new file mode 100644 index 0000000000..ed13d7a537 --- /dev/null +++ b/src/database/postgres/transaction.js @@ -0,0 +1,50 @@ +'use strict'; + +module.exports = function (db, dbNamespace, module) { + module.transaction = function (perform, callback) { + if (dbNamespace.active && dbNamespace.get('db')) { + var client = dbNamespace.get('db'); + return client.query(`SAVEPOINT nodebb_subtx`, function (err) { + if (err) { + return callback(err); + } + + perform(module, function (err) { + var args = Array.prototype.slice.call(arguments, 1); + + client.query(err ? `ROLLBACK TO SAVEPOINT nodebb_subtx` : `RELEASE SAVEPOINT nodebb_subtx`, function (err1) { + callback.apply(this, [err || err1].concat(args)); + }); + }); + }); + } + + db.connect(function (err, client, done) { + if (err) { + return callback(err); + } + + dbNamespace.run(function () { + dbNamespace.set('db', client); + + client.query(`BEGIN`, function (err) { + if (err) { + done(); + dbNamespace.set('db', null); + return callback(err); + } + + perform(module, function (err) { + var args = Array.prototype.slice.call(arguments, 1); + + client.query(err ? `ROLLBACK` : `COMMIT`, function (err1) { + done(); + dbNamespace.set('db', null); + callback.apply(this, [err || err1].concat(args)); + }); + }); + }); + }); + }); + }; +}; diff --git a/src/database/redis.js b/src/database/redis.js index 4bbd6ed0da..8186a950e2 100644 --- a/src/database/redis.js +++ b/src/database/redis.js @@ -50,6 +50,7 @@ redisModule.init = function (callback) { require('./redis/sets')(redisClient, redisModule); require('./redis/sorted')(redisClient, redisModule); require('./redis/list')(redisClient, redisModule); + require('./redis/transaction')(redisClient, redisModule); callback(); }); diff --git a/src/database/redis/transaction.js b/src/database/redis/transaction.js new file mode 100644 index 0000000000..75ea5fbaa2 --- /dev/null +++ b/src/database/redis/transaction.js @@ -0,0 +1,8 @@ +'use strict'; + +module.exports = function (db, module) { + // TODO + module.transaction = function (perform, callback) { + perform(db, callback); + }; +}; diff --git a/src/install.js b/src/install.js index 25cb213488..cd628b13d9 100644 --- a/src/install.js +++ b/src/install.js @@ -126,7 +126,8 @@ function setupConfig(next) { var config = {}; var redisQuestions = require('./database/redis').questions; var mongoQuestions = require('./database/mongo').questions; - var allQuestions = questions.main.concat(questions.optional).concat(redisQuestions).concat(mongoQuestions); + var postgresQuestions = require('./database/postgres').questions; + var allQuestions = questions.main.concat(questions.optional).concat(redisQuestions).concat(mongoQuestions).concat(postgresQuestions); allQuestions.forEach(function (question) { config[question.name] = install.values[question.name] || question.default || undefined; diff --git a/src/pubsub.js b/src/pubsub.js index 82c4a1b57b..e67fc87fc9 100644 --- a/src/pubsub.js +++ b/src/pubsub.js @@ -33,6 +33,8 @@ function get() { pubsub = require('./database/redis/pubsub'); } else if (nconf.get('mongo')) { pubsub = require('./database/mongo/pubsub'); + } else if (nconf.get('postgres')) { + pubsub = require('./database/postgres/pubsub'); } real = pubsub; diff --git a/src/upgrades/1.4.6/delete_sessions.js b/src/upgrades/1.4.6/delete_sessions.js index c899126bde..28db5948b5 100644 --- a/src/upgrades/1.4.6/delete_sessions.js +++ b/src/upgrades/1.4.6/delete_sessions.js @@ -45,10 +45,12 @@ module.exports = { ], function (err) { next(err); }); - } else { + } else if (db.client && db.client.collection) { db.client.collection('sessions').deleteMany({}, {}, function (err) { next(err); }); + } else { + next(); } }, ], callback); diff --git a/src/views/admin/advanced/database.tpl b/src/views/admin/advanced/database.tpl index 9519ce9141..53533a9249 100644 --- a/src/views/admin/advanced/database.tpl +++ b/src/views/admin/advanced/database.tpl @@ -57,6 +57,19 @@ + +
+
[[admin/advanced/database:postgres]]
+
+
+ [[admin/advanced/database:postgres.version]] {postgres.version}
+
+ [[admin/advanced/database:uptime-seconds]] {postgres.uptime}
+
+
+
+ +
@@ -84,5 +97,19 @@
+ + +
+
+

[[admin/advanced/database:postgres.raw-info]]

+
+ +
+
+
{postgres.raw}
+
+
+
+ diff --git a/src/views/install/index.tpl b/src/views/install/index.tpl index f24231b952..4f2f87cf46 100644 --- a/src/views/install/index.tpl +++ b/src/views/install/index.tpl @@ -102,6 +102,7 @@
There was an error connecting to your database. Please try again.
diff --git a/test/controllers-admin.js b/test/controllers-admin.js index 24da882cbe..f35dd437fa 100644 --- a/test/controllers-admin.js +++ b/test/controllers-admin.js @@ -185,6 +185,8 @@ describe('Admin Controllers', function () { assert(body.redis); } else if (nconf.get('mongo')) { assert(body.mongo); + } else if (nconf.get('postgres')) { + assert(body.postgres); } done(); }); diff --git a/test/database.js b/test/database.js index 327d8095cf..b88e425fd6 100644 --- a/test/database.js +++ b/test/database.js @@ -48,6 +48,11 @@ describe('Test database', function () { assert.equal(err.message, 'The `mongodb` package is out-of-date, please run `./nodebb setup` again.'); done(); }); + } else if (dbName === 'postgres') { + db.checkCompatibilityVersion('6.3.0', function (err) { + assert.equal(err.message, 'The `pg` package is out-of-date, please run `./nodebb setup` again.'); + done(); + }); } }); }); diff --git a/test/database/list.js b/test/database/list.js index 23768eb85d..76e7029f5c 100644 --- a/test/database/list.js +++ b/test/database/list.js @@ -111,16 +111,16 @@ describe('List methods', function () { before(function (done) { async.series([ function (next) { - db.listAppend('testList4', 12, next); + db.listAppend('testList7', 12, next); }, function (next) { - db.listPrepend('testList4', 9, next); + db.listPrepend('testList7', 9, next); }, ], done); }); it('should remove the last element of list and return it', function (done) { - db.listRemoveLast('testList4', function (err, lastElement) { + db.listRemoveLast('testList7', function (err, lastElement) { assert.equal(err, null); assert.equal(arguments.length, 2); assert.equal(lastElement, '12'); diff --git a/test/database/sorted.js b/test/database/sorted.js index 2b7babf951..6d31d80935 100644 --- a/test/database/sorted.js +++ b/test/database/sorted.js @@ -42,6 +42,21 @@ describe('Sorted Set methods', function () { done(); }); }); + + it('should gracefully handle adding the same element twice', function (done) { + db.sortedSetAdd('sorted2', [1, 2], ['value1', 'value1'], function (err) { + assert.equal(err, null); + assert.equal(arguments.length, 1); + + db.sortedSetScore('sorted2', 'value1', function (err, score) { + assert.equal(err, null); + assert.equal(score, 2); + assert.equal(arguments.length, 2); + + done(); + }); + }); + }); }); describe('sortedSetsAdd()', function () { diff --git a/test/mocks/databasemock.js b/test/mocks/databasemock.js index 0aaab1647e..7fee6efede 100644 --- a/test/mocks/databasemock.js +++ b/test/mocks/databasemock.js @@ -59,6 +59,14 @@ if (!testDbConfig) { ' "password": "",\n' + ' "database": "nodebb_test"\n' + '}\n' + + ' or (postgres):\n' + + '"test_database": {\n' + + ' "host": "127.0.0.1",\n' + + ' "port": "5432",\n' + + ' "username": "postgres",\n' + + ' "password": "",\n' + + ' "database": "nodebb_test"\n' + + '}\n' + '===========================================================' ); winston.error(errorText);