diff --git a/packages/pg-native/index.js b/packages/pg-native/index.js index 8c83406bb..0c9a257be 100644 --- a/packages/pg-native/index.js +++ b/packages/pg-native/index.js @@ -1,256 +1,320 @@ const Libpq = require('libpq') const EventEmitter = require('events').EventEmitter -const util = require('util') const assert = require('assert') const types = require('pg-types') const buildResult = require('./lib/build-result') const CopyStream = require('./lib/copy-stream') -const Client = (module.exports = function (config) { - if (!(this instanceof Client)) { - return new Client(config) - } +class Client extends EventEmitter { + constructor(config) { + super() - config = config || {} + config = config || {} - EventEmitter.call(this) - this.pq = new Libpq() - this._reading = false - this._read = this._read.bind(this) + this.pq = new Libpq() + this._reading = false + this._read = this._read.bind(this) - // allow custom type conversion to be passed in - this._types = config.types || types + // allow custom type conversion to be passed in + this._types = config.types || types - // allow config to specify returning results - // as an array of values instead of a hash - this.arrayMode = config.arrayMode || false - this._resultCount = 0 - this._rows = undefined - this._results = undefined + // allow config to specify returning results + // as an array of values instead of a hash + this.arrayMode = config.arrayMode || false + this._resultCount = 0 + this._rows = undefined + this._results = undefined - // lazy start the reader if notifications are listened for - // this way if you only run sync queries you wont block - // the event loop artificially - this.on('newListener', (event) => { - if (event !== 'notification') return - this._startReading() - }) + // lazy start the reader if notifications are listened for + // this way if you only run sync queries you wont block + // the event loop artificially + this.on('newListener', (event) => { + if (event !== 'notification') return + this._startReading() + }) - this.on('result', this._onResult.bind(this)) - this.on('readyForQuery', this._onReadyForQuery.bind(this)) -}) + this.on('result', this._onResult.bind(this)) + this.on('readyForQuery', this._onReadyForQuery.bind(this)) + } -util.inherits(Client, EventEmitter) + connect(params, cb) { + this.pq.connect(params, cb) + } -Client.prototype.connect = function (params, cb) { - this.pq.connect(params, cb) -} + connectSync(params) { + this.pq.connectSync(params) + } -Client.prototype.connectSync = function (params) { - this.pq.connectSync(params) -} + query(text, values, cb) { + let queryFn -Client.prototype.query = function (text, values, cb) { - let queryFn + if (typeof values === 'function') { + cb = values + } + + if (Array.isArray(values)) { + queryFn = () => { + return this.pq.sendQueryParams(text, values) + } + } else { + queryFn = () => { + return this.pq.sendQuery(text) + } + } - if (typeof values === 'function') { - cb = values + this._dispatchQuery(this.pq, queryFn, (err) => { + if (err) return cb(err) + this._awaitResult(cb) + }) } - if (Array.isArray(values)) { - queryFn = () => { - return this.pq.sendQueryParams(text, values) - } - } else { - queryFn = () => { - return this.pq.sendQuery(text) + prepare(statementName, text, nParams, cb) { + const self = this + const fn = function () { + return self.pq.sendPrepare(statementName, text, nParams) } + + self._dispatchQuery(self.pq, fn, function (err) { + if (err) return cb(err) + self._awaitResult(cb) + }) } - this._dispatchQuery(this.pq, queryFn, (err) => { - if (err) return cb(err) - this._awaitResult(cb) - }) -} + execute(statementName, parameters, cb) { + const self = this + + const fn = function () { + return self.pq.sendQueryPrepared(statementName, parameters) + } -Client.prototype.prepare = function (statementName, text, nParams, cb) { - const self = this - const fn = function () { - return self.pq.sendPrepare(statementName, text, nParams) + self._dispatchQuery(self.pq, fn, function (err, rows) { + if (err) return cb(err) + self._awaitResult(cb) + }) } - self._dispatchQuery(self.pq, fn, function (err) { - if (err) return cb(err) - self._awaitResult(cb) - }) -} + getCopyStream() { + this.pq.setNonBlocking(true) + this._stopReading() + return new CopyStream(this.pq) + } -Client.prototype.execute = function (statementName, parameters, cb) { - const self = this + // cancel a currently executing query + cancel(cb) { + assert(cb, 'Callback is required') + // result is either true or a string containing an error + const result = this.pq.cancel() + return setImmediate(function () { + cb(result === true ? undefined : new Error(result)) + }) + } + + querySync(text, values) { + if (values) { + this.pq.execParams(text, values) + } else { + this.pq.exec(text) + } - const fn = function () { - return self.pq.sendQueryPrepared(statementName, parameters) + throwIfError(this.pq) + const result = buildResult(this.pq, this._types, this.arrayMode) + return result.rows } - self._dispatchQuery(self.pq, fn, function (err, rows) { - if (err) return cb(err) - self._awaitResult(cb) - }) -} + prepareSync(statementName, text, nParams) { + this.pq.prepare(statementName, text, nParams) + throwIfError(this.pq) + } -Client.prototype.getCopyStream = function () { - this.pq.setNonBlocking(true) - this._stopReading() - return new CopyStream(this.pq) -} + executeSync(statementName, parameters) { + this.pq.execPrepared(statementName, parameters) + throwIfError(this.pq) + return buildResult(this.pq, this._types, this.arrayMode).rows + } -// cancel a currently executing query -Client.prototype.cancel = function (cb) { - assert(cb, 'Callback is required') - // result is either true or a string containing an error - const result = this.pq.cancel() - return setImmediate(function () { - cb(result === true ? undefined : new Error(result)) - }) -} + escapeLiteral(value) { + return this.pq.escapeLiteral(value) + } -Client.prototype.querySync = function (text, values) { - if (values) { - this.pq.execParams(text, values) - } else { - this.pq.exec(text) + escapeIdentifier(value) { + return this.pq.escapeIdentifier(value) } - throwIfError(this.pq) - const result = buildResult(this.pq, this._types, this.arrayMode) - return result.rows -} + end(cb) { + this._stopReading() + this.pq.finish() + if (cb) setImmediate(cb) + } -Client.prototype.prepareSync = function (statementName, text, nParams) { - this.pq.prepare(statementName, text, nParams) - throwIfError(this.pq) -} + _readError(message) { + const err = new Error(message || this.pq.errorMessage()) + this.emit('error', err) + } -Client.prototype.executeSync = function (statementName, parameters) { - this.pq.execPrepared(statementName, parameters) - throwIfError(this.pq) - return buildResult(this.pq, this._types, this.arrayMode).rows -} + _stopReading() { + if (!this._reading) return + this._reading = false + this.pq.stopReader() + this.pq.removeListener('readable', this._read) + } -Client.prototype.escapeLiteral = function (value) { - return this.pq.escapeLiteral(value) -} + _consumeQueryResults(pq) { + return buildResult(pq, this._types, this.arrayMode) + } -Client.prototype.escapeIdentifier = function (value) { - return this.pq.escapeIdentifier(value) -} + _emitResult(pq) { + const status = pq.resultStatus() + switch (status) { + case 'PGRES_FATAL_ERROR': + this._queryError = new Error(this.pq.resultErrorMessage()) + break + + case 'PGRES_TUPLES_OK': + case 'PGRES_COMMAND_OK': + case 'PGRES_EMPTY_QUERY': + { + const result = this._consumeQueryResults(this.pq) + this.emit('result', result) + } + break + + case 'PGRES_COPY_OUT': + case 'PGRES_COPY_BOTH': { + break + } -// export the version number so we can check it in node-postgres -module.exports.version = require('./package.json').version + default: + this._readError('unrecognized command status: ' + status) + break + } + return status + } -Client.prototype.end = function (cb) { - this._stopReading() - this.pq.finish() - if (cb) setImmediate(cb) -} + // called when libpq is readable + _read() { + const pq = this.pq + // read waiting data from the socket + // e.g. clear the pending 'select' + if (!pq.consumeInput()) { + // if consumeInput returns false + // than a read error has been encountered + return this._readError() + } -Client.prototype._readError = function (message) { - const err = new Error(message || this.pq.errorMessage()) - this.emit('error', err) -} + // check if there is still outstanding data + // if so, wait for it all to come in + if (pq.isBusy()) { + return + } -Client.prototype._stopReading = function () { - if (!this._reading) return - this._reading = false - this.pq.stopReader() - this.pq.removeListener('readable', this._read) -} + // load our result object -Client.prototype._consumeQueryResults = function (pq) { - return buildResult(pq, this._types, this.arrayMode) -} + while (pq.getResult()) { + const resultStatus = this._emitResult(this.pq) -Client.prototype._emitResult = function (pq) { - const status = pq.resultStatus() - switch (status) { - case 'PGRES_FATAL_ERROR': - this._queryError = new Error(this.pq.resultErrorMessage()) - break - - case 'PGRES_TUPLES_OK': - case 'PGRES_COMMAND_OK': - case 'PGRES_EMPTY_QUERY': - { - const result = this._consumeQueryResults(this.pq) - this.emit('result', result) + // if the command initiated copy mode we need to break out of the read loop + // so a substream can begin to read copy data + if (resultStatus === 'PGRES_COPY_BOTH' || resultStatus === 'PGRES_COPY_OUT') { + break } - break - case 'PGRES_COPY_OUT': - case 'PGRES_COPY_BOTH': { - break + // if reading multiple results, sometimes the following results might cause + // a blocking read. in this scenario yield back off the reader until libpq is readable + if (pq.isBusy()) { + return + } } - default: - this._readError('unrecognized command status: ' + status) - break - } - return status -} + this.emit('readyForQuery') -// called when libpq is readable -Client.prototype._read = function () { - const pq = this.pq - // read waiting data from the socket - // e.g. clear the pending 'select' - if (!pq.consumeInput()) { - // if consumeInput returns false - // than a read error has been encountered - return this._readError() + let notice = this.pq.notifies() + while (notice) { + this.emit('notification', notice) + notice = this.pq.notifies() + } } - // check if there is still outstanding data - // if so, wait for it all to come in - if (pq.isBusy()) { - return + // ensures the client is reading and + // everything is set up for async io + _startReading() { + if (this._reading) return + this._reading = true + this.pq.on('readable', this._read) + this.pq.startReader() } - // load our result object + _awaitResult(cb) { + this._queryCallback = cb + return this._startReading() + } - while (pq.getResult()) { - const resultStatus = this._emitResult(this.pq) + // wait for the writable socket to drain + _waitForDrain(pq, cb) { + const res = pq.flush() + // res of 0 is success + if (res === 0) return cb() + + // res of -1 is failure + if (res === -1) return cb(pq.errorMessage()) + + // otherwise outgoing message didn't flush to socket + // wait for it to flush and try again + const self = this + // you cannot read & write on a socket at the same time + return pq.writable(function () { + self._waitForDrain(pq, cb) + }) + } - // if the command initiated copy mode we need to break out of the read loop - // so a substream can begin to read copy data - if (resultStatus === 'PGRES_COPY_BOTH' || resultStatus === 'PGRES_COPY_OUT') { - break - } + // send an async query to libpq and wait for it to + // finish writing query text to the socket + _dispatchQuery(pq, fn, cb) { + this._stopReading() + const success = pq.setNonBlocking(true) + if (!success) return cb(new Error('Unable to set non-blocking to true')) + const sent = fn() + if (!sent) return cb(new Error(pq.errorMessage() || 'Something went wrong dispatching the query')) + this._waitForDrain(pq, cb) + } - // if reading multiple results, sometimes the following results might cause - // a blocking read. in this scenario yield back off the reader until libpq is readable - if (pq.isBusy()) { - return + _onResult(result) { + if (this._resultCount === 0) { + this._results = result + this._rows = result.rows + } else if (this._resultCount === 1) { + this._results = [this._results, result] + this._rows = [this._rows, result.rows] + } else { + this._results.push(result) + this._rows.push(result.rows) } + this._resultCount++ } - this.emit('readyForQuery') + _onReadyForQuery() { + // remove instance callback + const cb = this._queryCallback + this._queryCallback = undefined - let notice = this.pq.notifies() - while (notice) { - this.emit('notification', notice) - notice = this.pq.notifies() - } -} + // remove instance query error + const err = this._queryError + this._queryError = undefined -// ensures the client is reading and -// everything is set up for async io -Client.prototype._startReading = function () { - if (this._reading) return - this._reading = true - this.pq.on('readable', this._read) - this.pq.startReader() -} + // remove instance rows + const rows = this._rows + this._rows = undefined + + // remove instance results + const results = this._results + this._results = undefined + this._resultCount = 0 + + if (cb) { + cb(err, rows || [], results) + } + } +} const throwIfError = function (pq) { const err = pq.resultErrorMessage() || pq.errorMessage() if (err) { @@ -258,74 +322,11 @@ const throwIfError = function (pq) { } } -Client.prototype._awaitResult = function (cb) { - this._queryCallback = cb - return this._startReading() -} - -// wait for the writable socket to drain -Client.prototype._waitForDrain = function (pq, cb) { - const res = pq.flush() - // res of 0 is success - if (res === 0) return cb() - - // res of -1 is failure - if (res === -1) return cb(pq.errorMessage()) - - // otherwise outgoing message didn't flush to socket - // wait for it to flush and try again - const self = this - // you cannot read & write on a socket at the same time - return pq.writable(function () { - self._waitForDrain(pq, cb) - }) -} - -// send an async query to libpq and wait for it to -// finish writing query text to the socket -Client.prototype._dispatchQuery = function (pq, fn, cb) { - this._stopReading() - const success = pq.setNonBlocking(true) - if (!success) return cb(new Error('Unable to set non-blocking to true')) - const sent = fn() - if (!sent) return cb(new Error(pq.errorMessage() || 'Something went wrong dispatching the query')) - this._waitForDrain(pq, cb) -} - -Client.prototype._onResult = function (result) { - if (this._resultCount === 0) { - this._results = result - this._rows = result.rows - } else if (this._resultCount === 1) { - this._results = [this._results, result] - this._rows = [this._rows, result.rows] - } else { - this._results.push(result) - this._rows.push(result.rows) - } - this._resultCount++ +function createClient(config) { + return new Client(config) } -Client.prototype._onReadyForQuery = function () { - // remove instance callback - const cb = this._queryCallback - this._queryCallback = undefined - - // remove instance query error - const err = this._queryError - this._queryError = undefined - - // remove instance rows - const rows = this._rows - this._rows = undefined - - // remove instance results - const results = this._results - this._results = undefined - - this._resultCount = 0 - - if (cb) { - cb(err, rows || [], results) - } -} +module.exports = createClient +module.exports.Client = Client +// export the version number so we can check it in node-postgres +module.exports.version = require('./package.json').version diff --git a/packages/pg-native/lib/copy-stream.js b/packages/pg-native/lib/copy-stream.js index 94ae4f7e5..0a4afc68c 100644 --- a/packages/pg-native/lib/copy-stream.js +++ b/packages/pg-native/lib/copy-stream.js @@ -1,115 +1,112 @@ -const Duplex = require('stream').Duplex -const Writable = require('stream').Writable -const util = require('util') +const { Duplex, Writable } = require('stream') -const CopyStream = (module.exports = function (pq, options) { - Duplex.call(this, options) - this.pq = pq - this._reading = false -}) +module.exports = class CopyStream extends Duplex { + constructor(pq, options) { + super(options) + this.pq = pq + this._reading = false + } -util.inherits(CopyStream, Duplex) + // writer methods + _write(chunk, encoding, cb) { + const result = this.pq.putCopyData(chunk) -// writer methods -CopyStream.prototype._write = function (chunk, encoding, cb) { - const result = this.pq.putCopyData(chunk) + // sent successfully + if (result === 1) return cb() - // sent successfully - if (result === 1) return cb() + // error + if (result === -1) return cb(new Error(this.pq.errorMessage())) - // error - if (result === -1) return cb(new Error(this.pq.errorMessage())) + // command would block. wait for writable and call again. + const self = this + this.pq.writable(function () { + self._write(chunk, encoding, cb) + }) + } - // command would block. wait for writable and call again. - const self = this - this.pq.writable(function () { - self._write(chunk, encoding, cb) - }) -} + end() { + const args = Array.prototype.slice.call(arguments, 0) + const self = this + + const callback = args.pop() -CopyStream.prototype.end = function () { - const args = Array.prototype.slice.call(arguments, 0) - const self = this + if (args.length) { + this.write(args[0]) + } + const result = this.pq.putCopyEnd() + + // sent successfully + if (result === 1) { + // consume our results and then call 'end' on the + // "parent" writable class so we can emit 'finish' and + // all that jazz + return consumeResults(this.pq, function (err, res) { + Writable.prototype.end.call(self) + + // handle possible passing of callback to end method + if (callback) { + callback(err) + } + }) + } - const callback = args.pop() + // error + if (result === -1) { + const err = new Error(this.pq.errorMessage()) + return this.emit('error', err) + } - if (args.length) { - this.write(args[0]) - } - const result = this.pq.putCopyEnd() - - // sent successfully - if (result === 1) { - // consume our results and then call 'end' on the - // "parent" writable class so we can emit 'finish' and - // all that jazz - return consumeResults(this.pq, function (err, res) { - Writable.prototype.end.call(self) - - // handle possible passing of callback to end method - if (callback) { - callback(err) - } + // command would block. wait for writable and call end again + // don't pass any buffers to end on the second call because + // we already sent them to possible this.write the first time + // we called end + return this.pq.writable(function () { + return self.end.apply(self, callback) }) } - // error - if (result === -1) { - const err = new Error(this.pq.errorMessage()) - return this.emit('error', err) + // reader methods + _consumeBuffer(cb) { + const result = this.pq.getCopyData(true) + if (result instanceof Buffer) { + return setImmediate(function () { + cb(null, result) + }) + } + if (result === -1) { + // end of stream + return cb(null, null) + } + if (result === 0) { + const self = this + this.pq.once('readable', function () { + self.pq.stopReader() + self.pq.consumeInput() + self._consumeBuffer(cb) + }) + return this.pq.startReader() + } + cb(new Error('Unrecognized read status: ' + result)) } - // command would block. wait for writable and call end again - // don't pass any buffers to end on the second call because - // we already sent them to possible this.write the first time - // we called end - return this.pq.writable(function () { - return self.end.apply(self, callback) - }) -} - -// reader methods -CopyStream.prototype._consumeBuffer = function (cb) { - const result = this.pq.getCopyData(true) - if (result instanceof Buffer) { - return setImmediate(function () { - cb(null, result) - }) - } - if (result === -1) { - // end of stream - return cb(null, null) - } - if (result === 0) { + _read(size) { + if (this._reading) return + this._reading = true + // console.log('read begin'); const self = this - this.pq.once('readable', function () { - self.pq.stopReader() - self.pq.consumeInput() - self._consumeBuffer(cb) + this._consumeBuffer(function (err, buffer) { + self._reading = false + if (err) { + return self.emit('error', err) + } + if (buffer === false) { + // nothing to read for now, return + return + } + self.push(buffer) }) - return this.pq.startReader() } - cb(new Error('Unrecognized read status: ' + result)) } - -CopyStream.prototype._read = function (size) { - if (this._reading) return - this._reading = true - // console.log('read begin'); - const self = this - this._consumeBuffer(function (err, buffer) { - self._reading = false - if (err) { - return self.emit('error', err) - } - if (buffer === false) { - // nothing to read for now, return - return - } - self.push(buffer) - }) -} - const consumeResults = function (pq, cb) { const cleanup = function () { pq.removeListener('readable', onReadable)