From c908a5c41f1aace3cb8936cd61d1e3f89c35c69b Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 6 Jan 2026 09:04:15 -0500 Subject: [PATCH 1/6] fix(bigquery): add timeout parameter to to_dataframe and to_arrow methods --- google/cloud/bigquery/_pandas_helpers.py | 126 ++++++++++++++--------- google/cloud/bigquery/job/query.py | 11 ++ google/cloud/bigquery/table.py | 26 ++++- 3 files changed, 111 insertions(+), 52 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 2dab03a06..9e36729fd 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -26,6 +26,7 @@ import logging import queue import threading +import time import warnings from typing import Any, Union, Optional, Callable, Generator, List @@ -869,6 +870,7 @@ def _download_table_bqstorage( max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, download_state: Optional[_DownloadState] = None, + timeout: Optional[float] = None, ) -> Generator[Any, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. @@ -899,6 +901,9 @@ def _download_table_bqstorage( download_state (Optional[_DownloadState]): A threadsafe state object which can be used to observe the behavior of the worker threads created by this method. + timeout (Optional[float]): + The number of seconds to wait for the download to complete. + If None, wait indefinitely. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data @@ -906,6 +911,8 @@ def _download_table_bqstorage( Raises: ValueError: If attempting to read from a specific partition or snapshot. + concurrent.futures.TimeoutError: + If the download does not complete within the specified timeout. Note: This method requires the `google-cloud-bigquery-storage` library @@ -973,60 +980,73 @@ def _download_table_bqstorage( worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) - with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: - try: - # Manually submit jobs and wait for download to complete rather - # than using pool.map because pool.map continues running in the - # background even if there is an exception on the main thread. - # See: https://github.com/googleapis/google-cloud-python/pull/7698 - not_done = [ - pool.submit( - _download_table_bqstorage_stream, - download_state, - bqstorage_client, - session, - stream, - worker_queue, - page_to_item, - ) - for stream in session.streams - ] - - while not_done: - # Don't block on the worker threads. For performance reasons, - # we want to block on the queue's get method, instead. This - # prevents the queue from filling up, because the main thread - # has smaller gaps in time between calls to the queue's get - # method. For a detailed explanation, see: - # https://friendliness.dev/2019/06/18/python-nowait/ - done, not_done = _nowait(not_done) - for future in done: - # Call result() on any finished threads to raise any - # exceptions encountered. - future.result() + # Manually manage the pool to control shutdown behavior on timeout. + pool = concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) + wait_on_shutdown = True + start_time = time.time() - try: - frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) - yield frame - except queue.Empty: # pragma: NO COVER - continue + try: + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + _download_table_bqstorage_stream, + download_state, + bqstorage_client, + session, + stream, + worker_queue, + page_to_item, + ) + for stream in session.streams + ] + + while not_done: + # Check for timeout + if timeout is not None: + elapsed = time.time() - start_time + if elapsed > timeout: + wait_on_shutdown = False + raise concurrent.futures.TimeoutError( + f"Download timed out after {timeout} seconds." + ) + + # Don't block on the worker threads. For performance reasons, + # we want to block on the queue's get method, instead. This + # prevents the queue from filling up, because the main thread + # has smaller gaps in time between calls to the queue's get + # method. For a detailed explanation, see: + # https://friendliness.dev/2019/06/18/python-nowait/ + done, not_done = _nowait(not_done) + for future in done: + # Call result() on any finished threads to raise any + # exceptions encountered. + future.result() + + try: + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue - # Return any remaining values after the workers finished. - while True: # pragma: NO COVER - try: - frame = worker_queue.get_nowait() - yield frame - except queue.Empty: # pragma: NO COVER - break - finally: - # No need for a lock because reading/replacing a variable is - # defined to be an atomic operation in the Python language - # definition (enforced by the global interpreter lock). - download_state.done = True + # Return any remaining values after the workers finished. + while True: # pragma: NO COVER + try: + frame = worker_queue.get_nowait() + yield frame + except queue.Empty: # pragma: NO COVER + break + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + download_state.done = True - # Shutdown all background threads, now that they should know to - # exit early. - pool.shutdown(wait=True) + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=wait_on_shutdown) def download_arrow_bqstorage( @@ -1037,6 +1057,7 @@ def download_arrow_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): return _download_table_bqstorage( project_id, @@ -1047,6 +1068,7 @@ def download_arrow_bqstorage( page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) @@ -1060,6 +1082,7 @@ def download_dataframe_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -1071,6 +1094,7 @@ def download_dataframe_bqstorage( page_to_item=page_to_item, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 38b8a7148..495a80d59 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1857,6 +1857,7 @@ def to_arrow( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1904,6 +1905,10 @@ def to_arrow( .. versionadded:: 2.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1921,6 +1926,7 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -2191,6 +2197,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Return a GeoPandas GeoDataFrame from a QueryJob @@ -2269,6 +2276,9 @@ def to_geodataframe( then the data type will be ``numpy.dtype("object")``. BigQuery String type can be found at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: geopandas.GeoDataFrame: @@ -2296,6 +2306,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 5efcb1958..144f93b4f 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2087,6 +2087,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. @@ -2127,6 +2128,10 @@ def to_arrow_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.RecordBatch: A generator of :class:`~pyarrow.RecordBatch`. @@ -2144,6 +2149,7 @@ def to_arrow_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -2161,6 +2167,7 @@ def to_arrow( progress_bar_type: Optional[str] = None, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -2202,6 +2209,9 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. .. versionadded:: 1.24.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: pyarrow.Table @@ -2236,7 +2246,7 @@ def to_arrow( record_batches = [] for record_batch in self.to_arrow_iterable( - bqstorage_client=bqstorage_client + bqstorage_client=bqstorage_client, timeout=timeout ): record_batches.append(record_batch) @@ -2271,6 +2281,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2317,6 +2328,10 @@ def to_dataframe_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -2344,6 +2359,7 @@ def to_dataframe_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, @@ -2381,6 +2397,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -2577,6 +2594,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -2690,6 +2711,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # Default date dtype is `db_dtypes.DateDtype()` that could cause out of bounds error, @@ -2768,6 +2790,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Create a GeoPandas GeoDataFrame by loading all pages of a query. @@ -2902,6 +2925,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) return geopandas.GeoDataFrame( From f57810c6915f7ce4ee27d99b02f4c03a5b23ebcf Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 6 Jan 2026 09:25:46 -0500 Subject: [PATCH 2/6] fix(bigquery): address review feedback - update _EmptyRowIterator signatures and QueryJob.to_dataframe --- google/cloud/bigquery/job/query.py | 6 ++++++ google/cloud/bigquery/table.py | 14 ++++++++++++++ 2 files changed, 20 insertions(+) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 495a80d59..e82deb1ef 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1955,6 +1955,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Return a pandas DataFrame from a QueryJob @@ -2147,6 +2148,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data @@ -2180,6 +2185,7 @@ def to_dataframe( range_date_dtype=range_date_dtype, range_datetime_dtype=range_datetime_dtype, range_timestamp_dtype=range_timestamp_dtype, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 144f93b4f..f2fcd31ca 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2961,6 +2961,7 @@ def to_arrow( progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create an empty class:`pyarrow.Table`. @@ -2968,6 +2969,7 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -2994,6 +2996,7 @@ def to_dataframe( range_date_dtype=None, range_datetime_dtype=None, range_timestamp_dtype=None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -3014,6 +3017,7 @@ def to_dataframe( range_date_dtype (Any): Ignored. Added for compatibility with RowIterator. range_datetime_dtype (Any): Ignored. Added for compatibility with RowIterator. range_timestamp_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3032,6 +3036,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -3045,6 +3050,7 @@ def to_geodataframe( int_dtype (Any): Ignored. Added for compatibility with RowIterator. float_dtype (Any): Ignored. Added for compatibility with RowIterator. string_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3062,6 +3068,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3080,6 +3087,9 @@ def to_dataframe_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pandas.DataFrame`. @@ -3095,6 +3105,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3110,6 +3121,9 @@ def to_arrow_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. """ From 031adf1be4ff0fcf09fdf71218edcd246e60fb45 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 6 Jan 2026 09:46:09 -0500 Subject: [PATCH 3/6] fix(bigquery): update tests to expect timeout parameter --- tests/unit/job/test_query_pandas.py | 1 + tests/unit/test_table.py | 1 + tests/unit/test_table_pandas.py | 2 ++ 3 files changed, 4 insertions(+) diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index a6c59b158..7c5e6e1d3 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -1023,5 +1023,6 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) assert df is row_iterator.to_geodataframe.return_value diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index af31d116b..d23826c50 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -5665,6 +5665,7 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) self.assertIsInstance(df, geopandas.GeoDataFrame) diff --git a/tests/unit/test_table_pandas.py b/tests/unit/test_table_pandas.py index a4fa3fa39..64d8b1451 100644 --- a/tests/unit/test_table_pandas.py +++ b/tests/unit/test_table_pandas.py @@ -301,6 +301,7 @@ def test_rowiterator_to_geodataframe_with_default_dtypes( int_dtype=bigquery.enums.DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" @@ -358,6 +359,7 @@ def test_rowiterator_to_geodataframe_with_custom_dtypes( int_dtype=custom_int_dtype, float_dtype=custom_float_dtype, string_dtype=custom_string_dtype, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" From 39febf96f05eb175e43d8662adf7ce9b1160378d Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Tue, 6 Jan 2026 10:44:05 -0500 Subject: [PATCH 4/6] test(bigquery): add unit tests for timeout logic in _pandas_helpers.py --- tests/unit/job/test_query_pandas.py | 32 ++++++++++++++ tests/unit/test__pandas_helpers.py | 68 +++++++++++++++++++++++++++++ tests/unit/test_table.py | 12 +++++ 3 files changed, 112 insertions(+) diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 7c5e6e1d3..4390309f1 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -1026,3 +1026,35 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): timeout=None, ) assert df is row_iterator.to_geodataframe.return_value + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_dataframe_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_dataframe(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_dataframe.assert_called_once() + call_args = row_iterator.to_dataframe.call_args + assert call_args.kwargs["timeout"] == timeout + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_arrow_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_arrow(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_arrow.assert_called_once() + call_args = row_iterator.to_arrow.call_args + assert call_args.kwargs["timeout"] == timeout diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index bc94f5f54..07660f8e1 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -13,12 +13,14 @@ # limitations under the License. import collections +import concurrent.futures import datetime import decimal import functools import gc import operator import queue +import time from typing import Union from unittest import mock import warnings @@ -2177,3 +2179,69 @@ def test_determine_requested_streams_invalid_max_stream_count(): """Tests that a ValueError is raised if max_stream_count is negative.""" with pytest.raises(ValueError): determine_requested_streams(preserve_order=False, max_stream_count=-1) + + +def test__download_table_bqstorage_w_timeout_error(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + # Give it one stream + fake_session = mock.Mock(streams=["stream/s0"]) + bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def slow_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + # Block until the main thread sets done=True (which it will on timeout) + while not download_state.done: + time.sleep(0.01) + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=slow_download_stream + ): + # Use a very small timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, bqstorage_client, timeout=0.01 + ) + with pytest.raises(concurrent.futures.TimeoutError, match="timed out"): + list(result_gen) + + +def test__download_table_bqstorage_w_timeout_success(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=["stream/s0"]) + bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def fast_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + worker_queue.put("result_page") + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=fast_download_stream + ): + # Use a generous timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, bqstorage_client, timeout=10.0 + ) + results = list(result_gen) + + assert results == ["result_page"] diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index d23826c50..f0330bbab 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2495,6 +2495,18 @@ def test_to_geodataframe(self): else: assert not hasattr(df, "crs") + def test_methods_w_timeout(self): + # Ensure that timeout parameter is accepted by all methods and ignored. + row_iterator = self._make_one() + timeout = 42.0 + + # Just calling them to ensure no TypeError is raised + row_iterator.to_arrow(timeout=timeout) + row_iterator.to_arrow_iterable(timeout=timeout) + row_iterator.to_dataframe(timeout=timeout) + row_iterator.to_dataframe_iterable(timeout=timeout) + row_iterator.to_geodataframe(timeout=timeout) + class TestRowIterator(unittest.TestCase): PYARROW_MINIMUM_VERSION = str(_versions_helpers._MIN_PYARROW_VERSION) From a8e2d3f77e1ef76e3c4cc626c3a018122f952eb8 Mon Sep 17 00:00:00 2001 From: chalmer lowe Date: Wed, 7 Jan 2026 07:00:03 -0500 Subject: [PATCH 5/6] fix: resolve EmptyRowIterator test failures and lint error\n\n- Instantiate new _EmptyRowIterator for each test in test_methods_w_timeout to prevent ValueError\n- Remove total_rows property from _EmptyRowIterator to fix AssertionError\n- Remove redundant __iter__ method in _EmptyRowIterator to fix lint error --- google/cloud/bigquery/_pandas_helpers.py | 2 +- google/cloud/bigquery/table.py | 4 +--- tests/unit/test__pandas_helpers.py | 23 +++++++++++++++-------- tests/unit/test_table.py | 15 ++++++++------- 4 files changed, 25 insertions(+), 19 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 9e36729fd..5460f7ca7 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -981,7 +981,7 @@ def _download_table_bqstorage( worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) # Manually manage the pool to control shutdown behavior on timeout. - pool = concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) + pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) wait_on_shutdown = True start_time = time.time() diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index f2fcd31ca..195461006 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2941,9 +2941,6 @@ class _EmptyRowIterator(RowIterator): statements. """ - pages = () - total_rows = 0 - def __init__( self, client=None, api_request=None, path=None, schema=(), *args, **kwargs ): @@ -2955,6 +2952,7 @@ def __init__( *args, **kwargs, ) + self._total_rows = 0 def to_arrow( self, diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index 07660f8e1..a1cbb726b 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2181,16 +2181,19 @@ def test_determine_requested_streams_invalid_max_stream_count(): determine_requested_streams(preserve_order=False, max_stream_count=-1) +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) def test__download_table_bqstorage_w_timeout_error(module_under_test): from google.cloud.bigquery import dataset from google.cloud.bigquery import table + from unittest import mock - bqstorage_client = mock.create_autospec( + mock_bqstorage_client = mock.create_autospec( bigquery_storage.BigQueryReadClient, instance=True ) - # Give it one stream - fake_session = mock.Mock(streams=["stream/s0"]) - bqstorage_client.create_read_session.return_value = fake_session + fake_session = mock.Mock(streams=[mock.Mock()]) + mock_bqstorage_client.create_read_session.return_value = fake_session table_ref = table.TableReference( dataset.DatasetReference("project-x", "dataset-y"), @@ -2209,21 +2212,25 @@ def slow_download_stream( ): # Use a very small timeout result_gen = module_under_test._download_table_bqstorage( - "some-project", table_ref, bqstorage_client, timeout=0.01 + "some-project", table_ref, mock_bqstorage_client, timeout=0.01 ) with pytest.raises(concurrent.futures.TimeoutError, match="timed out"): list(result_gen) +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) def test__download_table_bqstorage_w_timeout_success(module_under_test): from google.cloud.bigquery import dataset from google.cloud.bigquery import table + from unittest import mock - bqstorage_client = mock.create_autospec( + mock_bqstorage_client = mock.create_autospec( bigquery_storage.BigQueryReadClient, instance=True ) fake_session = mock.Mock(streams=["stream/s0"]) - bqstorage_client.create_read_session.return_value = fake_session + mock_bqstorage_client.create_read_session.return_value = fake_session table_ref = table.TableReference( dataset.DatasetReference("project-x", "dataset-y"), @@ -2240,7 +2247,7 @@ def fast_download_stream( ): # Use a generous timeout result_gen = module_under_test._download_table_bqstorage( - "some-project", table_ref, bqstorage_client, timeout=10.0 + "some-project", table_ref, mock_bqstorage_client, timeout=10.0 ) results = list(result_gen) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index f0330bbab..7acd0fcd3 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2496,16 +2496,17 @@ def test_to_geodataframe(self): assert not hasattr(df, "crs") def test_methods_w_timeout(self): + pytest.importorskip("pyarrow") + pytest.importorskip("geopandas") # Ensure that timeout parameter is accepted by all methods and ignored. - row_iterator = self._make_one() timeout = 42.0 - # Just calling them to ensure no TypeError is raised - row_iterator.to_arrow(timeout=timeout) - row_iterator.to_arrow_iterable(timeout=timeout) - row_iterator.to_dataframe(timeout=timeout) - row_iterator.to_dataframe_iterable(timeout=timeout) - row_iterator.to_geodataframe(timeout=timeout) + # Call each type to ensure no TypeError is raised + self._make_one().to_arrow(timeout=timeout) + self._make_one().to_arrow_iterable(timeout=timeout) + self._make_one().to_dataframe(timeout=timeout) + self._make_one().to_dataframe_iterable(timeout=timeout) + self._make_one().to_geodataframe(timeout=timeout) class TestRowIterator(unittest.TestCase): From 43e6a403bcef6f4078442ad5f545350a1d7aeba2 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Wed, 7 Jan 2026 07:32:11 -0500 Subject: [PATCH 6/6] Update tests/unit/test_table.py --- tests/unit/test_table.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 7acd0fcd3..97a1b4916 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2498,7 +2498,8 @@ def test_to_geodataframe(self): def test_methods_w_timeout(self): pytest.importorskip("pyarrow") pytest.importorskip("geopandas") - # Ensure that timeout parameter is accepted by all methods and ignored. + # Ensure that the timeout parameter is accepted by all methods without raising a TypeError, + # even though the _EmptyRowIterator implementations do not use the timeout value. timeout = 42.0 # Call each type to ensure no TypeError is raised