From e55da7287b8fb028228a01a0419464fc51c75dca Mon Sep 17 00:00:00 2001 From: Alec Larson <1925840+aleclarson@users.noreply.github.com> Date: Wed, 7 Sep 2022 13:23:25 -0400 Subject: [PATCH 1/5] fix(pg-query-stream): invoke `this.callback` on cursor end/error Closes #2013 --- packages/pg-query-stream/src/index.ts | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index c942b0441..f3251b919 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -13,6 +13,7 @@ class QueryStream extends Readable implements Submittable { cursor: any _result: any + callback: Function handleRowDescription: Function handleDataRow: Function handlePortalSuspended: Function @@ -26,6 +27,11 @@ class QueryStream extends Readable implements Submittable { super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark }) this.cursor = new Cursor(text, values, config) + this.cursor.on('end', (result) => { + this.callback && this.callback(null, result) + }).on('error', (err) => { + this.callback && this.callback(err) + }) // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) From 4ce357f1701f8106b0ef27049648b754312d968f Mon Sep 17 00:00:00 2001 From: Alec Larson <1925840+aleclarson@users.noreply.github.com> Date: Wed, 7 Sep 2022 13:26:36 -0400 Subject: [PATCH 2/5] fix(Client): respect `callback` argument for `Submittable` case --- packages/pg/lib/client.js | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/packages/pg/lib/client.js b/packages/pg/lib/client.js index c6aa3dabe..79a1d3d01 100644 --- a/packages/pg/lib/client.js +++ b/packages/pg/lib/client.js @@ -511,8 +511,12 @@ class Client extends EventEmitter { } else if (typeof config.submit === 'function') { readTimeout = config.query_timeout || this.connectionParameters.query_timeout result = query = config - if (typeof values === 'function') { - query.callback = query.callback || values + if (!query.callback) { + if (typeof values === 'function') { + query.callback = values + } else if (callback) { + query.callback = callback + } } } else { readTimeout = this.connectionParameters.query_timeout From a10a151ca6f654fe0e29a316fd857f90cfdffcaa Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Fri, 13 Feb 2026 14:45:30 -0600 Subject: [PATCH 3/5] Add tests --- packages/pg-query-stream/src/index.ts | 12 +++++++----- packages/pg-query-stream/test/pool.ts | 19 +++++++++++++++++++ 2 files changed, 26 insertions(+), 5 deletions(-) create mode 100644 packages/pg-query-stream/test/pool.ts diff --git a/packages/pg-query-stream/src/index.ts b/packages/pg-query-stream/src/index.ts index f3251b919..2a4509e09 100644 --- a/packages/pg-query-stream/src/index.ts +++ b/packages/pg-query-stream/src/index.ts @@ -27,11 +27,13 @@ class QueryStream extends Readable implements Submittable { super({ objectMode: true, autoDestroy: true, highWaterMark: batchSize || highWaterMark }) this.cursor = new Cursor(text, values, config) - this.cursor.on('end', (result) => { - this.callback && this.callback(null, result) - }).on('error', (err) => { - this.callback && this.callback(err) - }) + this.cursor + .on('end', (result) => { + this.callback && this.callback(null, result) + }) + .on('error', (err) => { + this.callback && this.callback(err) + }) // delegate Submittable callbacks to cursor this.handleRowDescription = this.cursor.handleRowDescription.bind(this.cursor) diff --git a/packages/pg-query-stream/test/pool.ts b/packages/pg-query-stream/test/pool.ts new file mode 100644 index 000000000..36634933e --- /dev/null +++ b/packages/pg-query-stream/test/pool.ts @@ -0,0 +1,19 @@ +import assert from 'assert' +import { Pool } from 'pg' +import QueryStream from '../src' + +describe('pool', function () { + it('works', async function () { + const pool = new Pool() + const query = new QueryStream('SELECT * FROM generate_series(0, 10) num', []) + const q = pool.query(query) + console.log(q) // Promise { , ... + query.on('data', (row) => { + // just consume the whole stream + }) + await q //! + query.on('end', () => { + pool.end() + }) + }) +}) From 75d1d7dee8b3e538a7384cb6b2b63406a2ab648e Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Fri, 13 Feb 2026 14:46:00 -0600 Subject: [PATCH 4/5] Remove log --- packages/pg-query-stream/test/pool.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/pg-query-stream/test/pool.ts b/packages/pg-query-stream/test/pool.ts index 36634933e..34fce7ac2 100644 --- a/packages/pg-query-stream/test/pool.ts +++ b/packages/pg-query-stream/test/pool.ts @@ -7,11 +7,10 @@ describe('pool', function () { const pool = new Pool() const query = new QueryStream('SELECT * FROM generate_series(0, 10) num', []) const q = pool.query(query) - console.log(q) // Promise { , ... query.on('data', (row) => { // just consume the whole stream }) - await q //! + await q query.on('end', () => { pool.end() }) From aed54ca44ca4c05f7bc0513fdcd2aaf0b28d9d14 Mon Sep 17 00:00:00 2001 From: Brian Carlson Date: Fri, 13 Feb 2026 14:47:58 -0600 Subject: [PATCH 5/5] Fix lint --- packages/pg-query-stream/test/pool.ts | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/packages/pg-query-stream/test/pool.ts b/packages/pg-query-stream/test/pool.ts index 34fce7ac2..06adf8e18 100644 --- a/packages/pg-query-stream/test/pool.ts +++ b/packages/pg-query-stream/test/pool.ts @@ -1,4 +1,3 @@ -import assert from 'assert' import { Pool } from 'pg' import QueryStream from '../src' @@ -10,7 +9,7 @@ describe('pool', function () { query.on('data', (row) => { // just consume the whole stream }) - await q + await q query.on('end', () => { pool.end() })