Skip to content
Open
101 changes: 39 additions & 62 deletions cassandra/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -4241,7 +4241,7 @@
self._scheduled_tasks.discard(task)
fn, args, kwargs = task
kwargs = dict(kwargs)
future = self._executor.submit(fn, *args, **kwargs)

Check failure on line 4244 in cassandra/cluster.py

View workflow job for this annotation

GitHub Actions / test libev (3.11)

cannot schedule new futures after shutdown
future.add_done_callback(self._log_if_failed)
else:
self._queue.put_nowait((run_at, i, task))
Expand Down Expand Up @@ -4283,89 +4283,66 @@
:meth:`.add_callbacks()`.
"""

query = None
"""
The :class:`~.Statement` instance that is being executed through this
:class:`.ResponseFuture`.
"""

is_schema_agreed = True
"""
For DDL requests, this may be set ``False`` if the schema agreement poll after the response fails.

Always ``True`` for non-DDL requests.
"""

request_encoded_size = None
"""
Size of the request message sent
"""

coordinator_host = None
"""
The host from which we received a response
"""

attempted_hosts = None
"""
A list of hosts tried, including all speculative executions, retries, and pages
"""

session = None
row_factory = None
message = None
default_timeout = None

_retry_policy = None
_profile_manager = None

_req_id = None
_final_result = _NOT_SET
_col_names = None
_col_types = None
_final_exception = None
_query_traces = None
_callbacks = None
_errbacks = None
_current_host = None
_connection = None
_query_retries = 0
_start_time = None
_metrics = None
_paging_state = None
_custom_payload = None
_warnings = None
_timer = None
_protocol_handler = ProtocolHandler
_spec_execution_plan = NoSpeculativeExecutionPlan()
_continuous_paging_session = None
_host = None

_warned_timeout = False
__slots__ = (
# Public attributes
'query', 'is_schema_agreed', 'request_encoded_size', 'coordinator_host',
'attempted_hosts', 'session', 'row_factory', 'message', 'timeout',
'prepared_statement', 'query_plan',
# Private attributes
'_retry_policy', '_req_id', '_final_result', '_col_names', '_col_types',
'_final_exception', '_query_traces', '_callbacks', '_errbacks',
'_current_host', '_connection', '_query_retries', '_start_time',
'_metrics', '_paging_state', '_custom_payload', '_warnings', '_timer',
'_protocol_handler', '_spec_execution_plan', '_continuous_paging_session',
'_host', '_load_balancer', '_callback_lock', '_event', '_errors',
'_continuous_paging_state'
)

def __init__(self, session, message, query, timeout, metrics=None, prepared_statement=None,
retry_policy=RetryPolicy(), row_factory=None, load_balancer=None, start_time=None,
speculative_execution_plan=None, continuous_paging_state=None, host=None):
# Initialize attributes with default values
self.query = query
self.is_schema_agreed = True
self.request_encoded_size = None
self.coordinator_host = None
self.attempted_hosts = []
self.session = session
# TODO: normalize handling of retry policy and row factory
self.row_factory = row_factory or session.row_factory
self._load_balancer = load_balancer or session.cluster._default_load_balancing_policy
self.message = message
self.query = query
self.timeout = timeout
self._retry_policy = retry_policy
self._metrics = metrics
self.prepared_statement = prepared_statement
self._callback_lock = Lock()
self._start_time = start_time or time.time()
self._host = host
self._spec_execution_plan = speculative_execution_plan or self._spec_execution_plan
self._spec_execution_plan = speculative_execution_plan or NoSpeculativeExecutionPlan()
self._protocol_handler = ProtocolHandler

# Initialize other private attributes
self._req_id = None
self._final_result = _NOT_SET
self._col_names = None
self._col_types = None
self._final_exception = None
self._query_traces = None
self._current_host = None
self._connection = None
self._query_retries = 0
self._paging_state = None
self._custom_payload = None
self._warnings = None
self._timer = None
self._continuous_paging_session = None

self._make_query_plan()
self._event = Event()
self._errors = {}
self._callbacks = []
self._errbacks = []
self.attempted_hosts = []
self._start_timer()
self._continuous_paging_state = continuous_paging_state

Expand Down
2 changes: 2 additions & 0 deletions cassandra/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,8 @@ def __repr__(self):


class _Frame(object):
__slots__ = ('version', 'flags', 'stream', 'opcode', 'body_offset', 'end_pos')

def __init__(self, version, flags, stream, opcode, body_offset, end_pos):
self.version = version
self.flags = flags
Expand Down
93 changes: 37 additions & 56 deletions cassandra/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -1647,38 +1647,26 @@ def escape_name(name):
class ColumnMetadata(object):
"""
A representation of a single column in a table.
"""

table = None
""" The :class:`.TableMetadata` this column belongs to. """

name = None
""" The string name of this column. """

cql_type = None
"""
The CQL type for the column.
"""

is_static = False
"""
If this column is static (available in Cassandra 2.1+), this will
be :const:`True`, otherwise :const:`False`.
"""

is_reversed = False
"""
If this column is reversed (DESC) as in clustering order
Attributes:
table: The :class:`.TableMetadata` this column belongs to.
name: The string name of this column.
cql_type: The CQL type for the column.
is_static: If this column is static (available in Cassandra 2.1+), this
will be :const:`True`, otherwise :const:`False`.
is_reversed: If this column is reversed (DESC) as in clustering order.
_cass_type: Internal cache for the cassandra type.
"""

_cass_type = None
__slots__ = ('table', 'name', 'cql_type', 'is_static', 'is_reversed', '_cass_type')

def __init__(self, table_metadata, column_name, cql_type, is_static=False, is_reversed=False):
self.table = table_metadata
self.name = column_name
self.cql_type = cql_type
self.is_static = is_static
self.is_reversed = is_reversed
self._cass_type = None

def __str__(self):
return "%s %s" % (self.name, self.cql_type)
Expand All @@ -1687,21 +1675,16 @@ def __str__(self):
class IndexMetadata(object):
"""
A representation of a secondary index on a column.
"""
keyspace_name = None
""" A string name of the keyspace. """

table_name = None
""" A string name of the table this index is on. """

name = None
""" A string name for the index. """

kind = None
""" A string representing the kind of index (COMPOSITE, CUSTOM,...). """
Attributes:
keyspace_name: A string name of the keyspace.
table_name: A string name of the table this index is on.
name: A string name for the index.
kind: A string representing the kind of index (COMPOSITE, CUSTOM, ...).
index_options: A dict of index options.
"""

index_options = {}
""" A dict of index options. """
__slots__ = ('keyspace_name', 'table_name', 'name', 'kind', 'index_options')

def __init__(self, keyspace_name, table_name, index_name, kind, index_options):
self.keyspace_name = keyspace_name
Expand Down Expand Up @@ -1746,30 +1729,18 @@ def export_as_string(self):
class TokenMap(object):
"""
Information about the layout of the ring.
"""

token_class = None
"""
A subclass of :class:`.Token`, depending on what partitioner the cluster uses.
"""

token_to_host_owner = None
"""
A map of :class:`.Token` objects to the :class:`.Host` that owns that token.
"""

tokens_to_hosts_by_ks = None
"""
A map of keyspace names to a nested map of :class:`.Token` objects to
sets of :class:`.Host` objects.
Attributes:
token_class: A subclass of :class:`.Token`, depending on what partitioner the cluster uses.
token_to_host_owner: A map of :class:`.Token` objects to the :class:`.Host` that owns that token.
tokens_to_hosts_by_ks: A map of keyspace names to a nested map of :class:`.Token` objects to sets of :class:`.Host` objects.
ring: An ordered list of :class:`.Token` instances in the ring.
_metadata: Metadata reference for internal use.
_rebuild_lock: Lock for thread-safe operations.
"""

ring = None
"""
An ordered list of :class:`.Token` instances in the ring.
"""

_metadata = None
__slots__ = ('token_class', 'token_to_host_owner', 'tokens_to_hosts_by_ks',
'ring', '_metadata', '_rebuild_lock')

def __init__(self, token_class, token_to_host_owner, all_tokens, metadata):
self.token_class = token_class
Expand Down Expand Up @@ -1832,6 +1803,8 @@ class Token(object):
Abstract class representing a token.
"""

__slots__ = ('value',)

def __init__(self, token):
self.value = token

Expand Down Expand Up @@ -1871,6 +1844,8 @@ class NoMurmur3(Exception):

class HashToken(Token):

__slots__ = ()

@classmethod
def from_string(cls, token_string):
""" `token_string` should be the string representation from the server. """
Expand All @@ -1883,6 +1858,8 @@ class Murmur3Token(HashToken):
A token for ``Murmur3Partitioner``.
"""

__slots__ = ()

@classmethod
def hash_fn(cls, key):
if murmur3 is not None:
Expand All @@ -1901,6 +1878,8 @@ class MD5Token(HashToken):
A token for ``RandomPartitioner``.
"""

__slots__ = ()

@classmethod
def hash_fn(cls, key):
if isinstance(key, str):
Expand All @@ -1913,6 +1892,8 @@ class BytesToken(Token):
A token for ``ByteOrderedPartitioner``.
"""

__slots__ = ()

@classmethod
def from_string(cls, token_string):
""" `token_string` should be the string representation from the server. """
Expand Down
Loading
Loading