diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index c942b0441..2a4509e09 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -13,6 +13,7 @@ class QueryStream extends Readable implements Submittable { cursor: any _result: any + callback: Function handleRowDescription: Function handleDataRow: Function handlePortalSuspended: Function @@ -26,6 +27,13 @@ class QueryStream extends Readable implements Submittable { super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark }) this.cursor = new Cursor(text, values, config) + this.cursor + .on('end', (result) => { + this.callback && this.callback(null, result) + }) + .on('error', (err) => { + this.callback && this.callback(err) + }) // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) diff --git a/packages/pg-query-stream/test/pool.ts b/packages/pg-query-stream/test/pool.ts new file mode 100644 index 000000000..06adf8e18 --- /dev/null +++ b/packages/pg-query-stream/test/pool.ts @@ -0,0 +1,17 @@ +import { Pool } from 'pg' +import QueryStream from '../src' + +describe('pool', function () { + it('works', async function () { + const pool = new Pool() + const query = new QueryStream('SELECT * FROM generate_series(0, 10) num', []) + const q = pool.query(query) + query.on('data', (row) => { + // just consume the whole stream + }) + await q + query.on('end', () => { + pool.end() + }) + }) +}) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index 0168ce637..02dca96be 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -609,8 +609,12 @@ class Client extends EventEmitter { } else if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values + if (!query.callback) { + if (typeof values === 'function') { + query.callback = values + } else if (callback) { + query.callback = callback + } } } else { readTimeout = config.query_timeout || this.connectionParameters.query_timeout