Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions doc/api/pymongo/asynchronous/command_cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@
.. automodule:: pymongo.asynchronous.command_cursor
:synopsis: Tools for iterating over MongoDB command results
:members:
:inherited-members:
2 changes: 2 additions & 0 deletions doc/api/pymongo/asynchronous/cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

.. autoclass:: pymongo.asynchronous.cursor.AsyncCursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
:members:
:inherited-members:


.. describe:: c[index]

Expand Down
1 change: 1 addition & 0 deletions doc/api/pymongo/command_cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
.. automodule:: pymongo.command_cursor
:synopsis: Tools for iterating over MongoDB command results
:members:
:inherited-members:
1 change: 1 addition & 0 deletions doc/api/pymongo/cursor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

.. autoclass:: pymongo.cursor.Cursor(collection, filter=None, projection=None, skip=0, limit=0, no_cursor_timeout=False, cursor_type=CursorType.NON_TAILABLE, sort=None, allow_partial_results=False, oplog_replay=False, batch_size=0, collation=None, hint=None, max_scan=None, max_time_ms=None, max=None, min=None, return_key=False, show_record_id=False, snapshot=False, comment=None, session=None, allow_disk_use=None)
:members:
:inherited-members:

.. describe:: c[index]

Expand Down
2 changes: 1 addition & 1 deletion pymongo/asynchronous/client_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@
from bson.int64 import Int64
from bson.timestamp import Timestamp
from pymongo import _csot
from pymongo.asynchronous.cursor import _ConnectionManager
from pymongo.asynchronous.cursor_base import _ConnectionManager
from pymongo.errors import (
ConfigurationError,
ConnectionFailure,
Expand Down
138 changes: 8 additions & 130 deletions pymongo/asynchronous/command_cursor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
TYPE_CHECKING,
Any,
AsyncIterator,
Generic,
Mapping,
NoReturn,
Optional,
Expand All @@ -29,17 +28,10 @@
)

from bson import CodecOptions, _convert_raw_document_lists_to_streams
from pymongo import _csot
from pymongo.asynchronous.cursor import _ConnectionManager
from pymongo.asynchronous.cursor_base import _AsyncCursorBase, _ConnectionManager
from pymongo.cursor_shared import _CURSOR_CLOSED_ERRORS
from pymongo.errors import ConnectionFailure, InvalidOperation, OperationFailure
from pymongo.message import (
_CursorAddress,
_GetMore,
_OpMsg,
_OpReply,
_RawBatchGetMore,
)
from pymongo.message import _GetMore, _OpMsg, _OpReply, _RawBatchGetMore
from pymongo.response import PinnedResponse
from pymongo.typings import _Address, _DocumentOut, _DocumentType

Expand All @@ -51,7 +43,7 @@
_IS_SYNC = False


class AsyncCommandCursor(Generic[_DocumentType]):
class AsyncCommandCursor(_AsyncCursorBase[_DocumentType]):
"""An asynchronous cursor / iterator over command cursors."""

_getmore_class = _GetMore
Expand Down Expand Up @@ -98,8 +90,8 @@ def __init__(
f"max_await_time_ms must be an integer or None, not {type(max_await_time_ms)}"
)

def __del__(self) -> None:
self._die_no_lock()
def _get_namespace(self) -> str:
return self._ns

def batch_size(self, batch_size: int) -> AsyncCommandCursor[_DocumentType]:
"""Limits the number of documents returned in one batch. Each batch
Expand Down Expand Up @@ -161,94 +153,12 @@ def _unpack_response(
) -> Sequence[_DocumentOut]:
return response.unpack_response(cursor_id, codec_options, user_fields, legacy_response)

@property
def alive(self) -> bool:
"""Does this cursor have the potential to return more data?

Even if :attr:`alive` is ``True``, :meth:`next` can raise
:exc:`StopIteration`. Best to use a for loop::

async for doc in collection.aggregate(pipeline):
print(doc)

.. note:: :attr:`alive` can be True while iterating a cursor from
a failed server. In this case :attr:`alive` will return False after
:meth:`next` fails to retrieve the next batch of results from the
server.
"""
return bool(len(self._data) or (not self._killed))

@property
def cursor_id(self) -> int:
"""Returns the id of the cursor."""
return self._id

@property
def address(self) -> Optional[_Address]:
"""The (host, port) of the server used, or None.

.. versionadded:: 3.0
"""
return self._address

@property
def session(self) -> Optional[AsyncClientSession]:
"""The cursor's :class:`~pymongo.asynchronous.client_session.AsyncClientSession`, or None.

.. versionadded:: 3.6
"""
if self._session and not self._session._implicit:
return self._session
return None

def _prepare_to_die(self) -> tuple[int, Optional[_CursorAddress]]:
already_killed = self._killed
self._killed = True
if self._id and not already_killed:
cursor_id = self._id
assert self._address is not None
address = _CursorAddress(self._address, self._ns)
else:
# Skip killCursors.
cursor_id = 0
address = None
return cursor_id, address

def _die_no_lock(self) -> None:
"""Closes this cursor without acquiring a lock."""
cursor_id, address = self._prepare_to_die()
self._collection.database.client._cleanup_cursor_no_lock(
cursor_id, address, self._sock_mgr, self._session
)
if self._session and self._session._implicit:
self._session._attached_to_cursor = False
self._session = None
self._sock_mgr = None

async def _die_lock(self) -> None:
"""Closes this cursor."""
cursor_id, address = self._prepare_to_die()
await self._collection.database.client._cleanup_cursor_lock(
cursor_id,
address,
self._sock_mgr,
self._session,
)
if self._session and self._session._implicit:
self._session._attached_to_cursor = False
self._session = None
self._sock_mgr = None

def _end_session(self) -> None:
if self._session and self._session._implicit:
self._session._attached_to_cursor = False
self._session._end_implicit_session()
self._session = None

async def close(self) -> None:
"""Explicitly close / kill this cursor."""
await self._die_lock()

async def _send_message(self, operation: _GetMore) -> None:
"""Send a getmore message and handle the response."""
client = self._collection.database.client
Expand Down Expand Up @@ -330,6 +240,9 @@ async def _refresh(self) -> int:
def __aiter__(self) -> AsyncIterator[_DocumentType]:
return self

async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
return self

async def next(self) -> _DocumentType:
"""Advance the cursor."""
# Block until a document is returnable.
Expand Down Expand Up @@ -385,41 +298,6 @@ async def try_next(self) -> Optional[_DocumentType]:
"""
return await self._try_next(get_more_allowed=True)

async def __aenter__(self) -> AsyncCommandCursor[_DocumentType]:
return self

async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
await self.close()

@_csot.apply
async def to_list(self, length: Optional[int] = None) -> list[_DocumentType]:
"""Converts the contents of this cursor to a list more efficiently than ``[doc async for doc in cursor]``.

To use::

>>> await cursor.to_list()

Or, so read at most n items from the cursor::

>>> await cursor.to_list(n)

If the cursor is empty or has no more results, an empty list will be returned.

.. versionadded:: 4.9
"""
res: list[_DocumentType] = []
remaining = length
if isinstance(length, int) and length < 1:
raise ValueError("to_list() length must be greater than 0")
while self.alive:
if not await self._next_batch(res, remaining):
break
if length is not None:
remaining = length - len(res)
if remaining == 0:
break
return res


class AsyncRawBatchCommandCursor(AsyncCommandCursor[_DocumentType]):
_getmore_class = _RawBatchGetMore
Expand Down
Loading
Loading