diff --git a/HOW_IT_WORKS.md b/HOW_IT_WORKS.md new file mode 100644 index 0000000..61d7336 --- /dev/null +++ b/HOW_IT_WORKS.md @@ -0,0 +1,530 @@ +# PyMongoSQL Architecture & Class Diagram + +## Overview + +PyMongoSQL is a Python library that provides a SQL-to-MongoDB translation layer with DB API 2.0 compliance and SQLAlchemy integration. The architecture consists of three main layers: + +1. **Database API Layer** - DB API 2.0 compliant interface (Connection, Cursor, ResultSet) +2. **SQL Processing Layer** - ANTLR4-based PartiQL parser with MongoDB execution builders +3. **Integration Layer** - SQLAlchemy dialect and Superset support + +--- + +## Class Hierarchy Diagram + +```mermaid +classDiagram + %% ==================== DATABASE API LAYER ==================== + + class Connection { + -host: str + -port: int + -mongo_client: MongoClient + -mode: str + +cursor() Cursor + +close() + +commit() + +rollback() + +get_database() Database + } + + class BaseCursor { + <> + -connection: Connection + -mode: str + +execute(operation, parameters) ResultSet + +fetchone() Sequence + +fetchall() List[Sequence] + +description: List + } + + class Cursor { + -result_set: ResultSet + -current_execution_plan: ExecutionPlan + -is_closed: bool + +execute(operation, parameters)* ResultSet + +executemany(operation, seq_of_params) + +fetchone() Sequence + +fetchall() List[Sequence] + +fetchmany(size) List[Sequence] + +close() + +__iter__() + +__next__() Sequence + } + + class DictCursor { + -result_set_class: DictResultSet + +fetchone() Dict + +fetchall() List[Dict] + } + + class CursorIterator { + -arraysize: int + +__iter__() + +__next__() + } + + class ResultSet { + -command_result: Dict + -execution_plan: QueryExecutionPlan + -raw_results: List + -cached_results: List + -column_names: List[str] + -description: List + +description + +fetch(rows) Sequence + +fetchone() Sequence + +fetchall() List[Sequence] + +fetchmany(size) List[Sequence] + +close() + +__iter__() + +__next__() + } + + class DictResultSet { + -column_names: List[str] + +fetch(rows) List[Dict] + +fetchone() Dict + +fetchall() List[Dict] + } + + %% ==================== SQL PARSING LAYER ==================== + + class SQLParser { + -original_sql: str + -preprocessed_sql: str + -ast: Any + -visitor: MongoSQLParserVisitor + +parse() ExecutionPlan + +get_prepared_statement() str + } + + class ExecutionPlanFactory { + +create_from_parsed() ExecutionPlan + } + + class ExecutionPlan { + <> + -collection: str + +to_dict() Dict + +validate() bool + +copy() ExecutionPlan + } + + class QueryExecutionPlan { + -filter_stage: Dict + -projection_stage: Dict + -sort_stage: List + -limit_stage: int + -skip_stage: int + +to_dict() Dict + +validate() bool + } + + class InsertExecutionPlan { + -insert_documents: List[Dict] + -parameter_style: str + -parameter_count: int + +to_dict() Dict + +validate() bool + } + + class UpdateExecutionPlan { + -filter_stage: Dict + -update_stage: Dict + +to_dict() Dict + +validate() bool + } + + class DeleteExecutionPlan { + -filter_stage: Dict + +to_dict() Dict + +validate() bool + } + + %% ==================== SQL HANDLER LAYER ==================== + + class MongoSQLParserVisitor { + <> + +visitQuery() QueryExecutionPlan + +visitInsert() InsertExecutionPlan + +visitUpdate() UpdateExecutionPlan + +visitDelete() DeleteExecutionPlan + +visitWhere() Dict + +visitExpression() Any + } + + class QueryHandler { + <> + +create_execution_plan() QueryExecutionPlan + +build_filter() Dict + +build_projection() Dict + } + + class InsertHandler { + <> + +create_execution_plan() InsertExecutionPlan + +build_documents() List[Dict] + +detect_parameter_style() str + } + + class UpdateHandler { + <> + +create_execution_plan() UpdateExecutionPlan + +build_update_doc() Dict + } + + class DeleteHandler { + <> + +create_execution_plan() DeleteExecutionPlan + +build_filter() Dict + } + + class ContextUtilsMixin { + <> + +get_context_text() str + +get_context_type_name() str + +has_children() bool + +normalize_field_path() str + } + + %% ==================== EXECUTION LAYER ==================== + + class ExecutionContext { + -query: str + -execution_mode: str + -parameters: Union[Sequence, Dict] + } + + class ExecutionStrategy { + <> + +execute_plan: ExecutionPlan + +execute() Dict + } + + class QueryExecutionStrategy { + +execute() Dict + } + + class InsertExecutionStrategy { + +execute() Dict + } + + class UpdateExecutionStrategy { + +execute() Dict + } + + class DeleteExecutionStrategy { + +execute() Dict + } + + class ExecutionPlanFactory { + +create_from_operation() ExecutionPlan + +create_query_plan() QueryExecutionPlan + +create_insert_plan() InsertExecutionPlan + } + + %% ==================== SQLALCHEMY INTEGRATION ==================== + + class PyMongoSQLDialect { + -execution_ctx_cls: ExecutionContext + +initialize() Connection + +create_connect_args() Dict + +compiler_class: TypeCompiler + +type_compiler_class: TypeCompiler + } + + class PyMongoSQLIdentifierPreparer { + <> + -reserved_words: Set[str] + +quote_identifier() str + } + + class PyMongoSQLTypeCompiler { + <> + +process_result_value() Any + } + + class PyMongoSQLStatementCompiler { + <> + +visit_select() str + +visit_insert() str + +visit_update() str + +visit_delete() str + } + + %% ==================== ERROR HANDLING ==================== + + class Error { + <> + } + + class DatabaseError { + <> + } + + class OperationalError { + <> + } + + class ProgrammingError { + <> + } + + class SqlSyntaxError { + <> + } + + class NotSupportedError { + <> + } + + %% ==================== HELPER CLASSES ==================== + + class ConnectionHelper { + +parse_connection_string() Dict + +extract_mode() str + +get_database_config() Dict + } + + class SQLHelper { + +prepare_statement() str + +detect_parameter_style() str + +bind_parameters() str + } + + %% ==================== RELATIONSHIPS ==================== + + %% Database API Layer + Connection --> BaseCursor: creates + Cursor --|> BaseCursor + DictCursor --|> BaseCursor + CursorIterator <|-- Cursor + CursorIterator <|-- ResultSet + Cursor --> ResultSet: manages + Cursor --> ExecutionContext: creates + Cursor --> ExecutionPlanFactory: uses + ResultSet --> QueryExecutionPlan: uses + + %% SQL Parsing Layer + SQLParser --> ExecutionPlanFactory: uses + SQLParser --> MongoSQLParserVisitor: uses + ExecutionPlan <|-- QueryExecutionPlan + ExecutionPlan <|-- InsertExecutionPlan + ExecutionPlan <|-- UpdateExecutionPlan + ExecutionPlan <|-- DeleteExecutionPlan + + %% SQL Handler Layer + MongoSQLParserVisitor --> QueryHandler: delegates to + MongoSQLParserVisitor --> InsertHandler: delegates to + MongoSQLParserVisitor --> UpdateHandler: delegates to + MongoSQLParserVisitor --> DeleteHandler: delegates to + ContextUtilsMixin <|-- MongoSQLParserVisitor + ContextUtilsMixin <|-- QueryHandler + + %% Execution Layer + ExecutionStrategy <|-- QueryExecutionStrategy + ExecutionStrategy <|-- InsertExecutionStrategy + ExecutionStrategy <|-- UpdateExecutionStrategy + ExecutionStrategy <|-- DeleteExecutionStrategy + ExecutionContext --> ExecutionStrategy: drives + ExecutionPlanFactory --> ExecutionPlan: produces + + %% SQLAlchemy Integration + PyMongoSQLDialect --> Connection: creates + PyMongoSQLDialect --> PyMongoSQLIdentifierPreparer: uses + PyMongoSQLDialect --> PyMongoSQLTypeCompiler: uses + PyMongoSQLDialect --> PyMongoSQLStatementCompiler: uses + PyMongoSQLStatementCompiler --> SQLParser: uses + + %% Error Handling + Error <|-- DatabaseError + Error <|-- OperationalError + Error <|-- ProgrammingError + Error <|-- SqlSyntaxError + Error <|-- NotSupportedError + + %% Helpers + Connection --> ConnectionHelper: uses + Cursor --> SQLHelper: uses +``` + +--- + +## Layer Details + +### 1. Database API Layer (DB API 2.0 Compliant) + +**Purpose**: Provides a standard Python database interface similar to PyMySQL, psycopg2, etc. + +| Class | Responsibility | Key Methods | +|-------|----------------|------------| +| `Connection` | Manages MongoDB connection; creates cursors | `cursor()`, `close()`, `commit()`, `rollback()` | +| `BaseCursor` | Abstract base for cursor implementations | `execute()`, `fetchone()`, `fetchall()` | +| `Cursor` | Standard cursor with sequence results | `fetchone()`, `fetchall()`, `executemany()` | +| `DictCursor` | Cursor returning dictionaries instead of tuples | `fetchone()` → `Dict` | +| `ResultSet` | Manages query result caching and fetching | `fetch()`, `fetchone()`, `fetchall()` | +| `DictResultSet` | ResultSet variant for dictionary-based results | Inherits from `ResultSet` | + +### 2. SQL Processing Layer (ANTLR4 PartiQL Parser) + +**Purpose**: Parses SQL/PartiQL statements and builds execution plans for MongoDB. + +| Class | Responsibility | Key Attributes | +|-------|----------------|-----------------| +| `SQLParser` | Main parser entry point; orchestrates ANTLR parsing | `_ast`, `_visitor`, `_preprocessed_sql` | +| `ExecutionPlanFactory` | Factory pattern for creating execution plans | Creates typed plans from parsed AST | +| `ExecutionPlan` | Abstract base for all execution plans | `collection`, `validate()`, `to_dict()` | +| `QueryExecutionPlan` | Execution plan for SELECT queries | `filter_stage`, `projection_stage`, `sort_stage` | +| `InsertExecutionPlan` | Execution plan for INSERT operations | `insert_documents`, `parameter_style` | +| `UpdateExecutionPlan` | Execution plan for UPDATE operations | `filter_stage`, `update_stage` | +| `DeleteExecutionPlan` | Execution plan for DELETE operations | `filter_stage` | + +### 3. SQL Handler Layer (Visitor Pattern) + +**Purpose**: Translates parsed AST nodes into MongoDB operations using visitor pattern. + +| Class | Responsibility | Key Methods | +|-------|----------------|------------| +| `MongoSQLParserVisitor` | Main visitor; delegates to specific handlers | `visitQuery()`, `visitInsert()`, `visitUpdate()`, `visitDelete()` | +| `QueryHandler` | Handles SELECT statement parsing | `build_filter()`, `build_projection()` | +| `InsertHandler` | Handles INSERT statement parsing | `build_documents()`, `detect_parameter_style()` | +| `UpdateHandler` | Handles UPDATE statement parsing | `build_update_doc()` | +| `DeleteHandler` | Handles DELETE statement parsing | `build_filter()` | +| `ContextUtilsMixin` | Shared utility methods for AST traversal | `get_context_text()`, `normalize_field_path()` | + +### 4. Execution Layer + +**Purpose**: Executes parsed plans against MongoDB. + +| Class | Responsibility | Key Methods | +|-------|----------------|------------| +| `ExecutionContext` | Holds query execution metadata | `query`, `execution_mode`, `parameters` | +| `ExecutionStrategy` | Abstract strategy for query execution | `execute()` | +| `QueryExecutionStrategy` | Executes SELECT queries | Returns result set | +| `InsertExecutionStrategy` | Executes INSERT operations | Returns insert result | +| `UpdateExecutionStrategy` | Executes UPDATE operations | Returns update result | +| `DeleteExecutionStrategy` | Executes DELETE operations | Returns delete result | + +### 5. SQLAlchemy Integration Layer + +**Purpose**: Provides SQLAlchemy dialect for ORM and SQLAlchemy Core support. + +| Class | Responsibility | Key Role | +|-------|----------------|----------| +| `PyMongoSQLDialect` | Main dialect implementation | Inherits from `sqlalchemy.engine.default.DefaultDialect` | +| `PyMongoSQLIdentifierPreparer` | Handles MongoDB identifier quoting | Reserved word checking | +| `PyMongoSQLTypeCompiler` | Type mapping SQL↔MongoDB | Converts between SQL and BSON types | +| `PyMongoSQLStatementCompiler` | Compiles SQLAlchemy AST to SQL | `visit_select()`, `visit_insert()`, etc. | + +### 6. Error Handling + +**Purpose**: Provides DB API 2.0 compliant exception hierarchy. + +``` +Error (base) +├── DatabaseError +├── OperationalError +├── ProgrammingError +├── SqlSyntaxError +└── NotSupportedError +``` + +--- + +## Data Flow Diagrams + +### Query Execution Flow + +``` +User Code + ↓ +Cursor.execute(sql_string, parameters) + ↓ +ExecutionContext (holds query + params) + ↓ +SQLParser.parse() + ├─→ ANTLR lexer/parser + ├─→ MongoSQLParserVisitor + └─→ Handlers (Query/Insert/Update/Delete) + ↓ +ExecutionPlan (QueryExecutionPlan, InsertExecutionPlan, etc.) + ↓ +ExecutionStrategy (QueryExecutionStrategy, InsertExecutionStrategy, etc.) + ↓ +MongoDB Collection Operations + ├─→ collection.find() [SELECT] + ├─→ collection.insert_many() [INSERT] + ├─→ collection.update_many() [UPDATE] + └─→ collection.delete_many() [DELETE] + ↓ +MongoDB Command Result + ↓ +ResultSet (caches and iterates results) + ↓ +Cursor.fetchone()/fetchall()/fetchmany() + ↓ +User receives Sequence or Dict +``` + +### SQLAlchemy Integration Flow + +``` +SQLAlchemy Core/ORM Code + ↓ +PyMongoSQLDialect + ├─→ PyMongoSQLStatementCompiler.visit_select() + ├─→ PyMongoSQLStatementCompiler.visit_insert() + ├─→ PyMongoSQLStatementCompiler.visit_update() + └─→ PyMongoSQLStatementCompiler.visit_delete() + ↓ +Generated SQL String + ↓ +Cursor.execute(sql_string) + ↓ +SQLParser + Handlers + ExecutionStrategies + ↓ +MongoDB Results + ↓ +ResultSet + ↓ +SQLAlchemy ORM/Core Result Proxy + ↓ +User receives ORM objects or rows +``` + +--- + +## Superset Integration + +PyMongoSQL provides specialized support for Apache Superset with a **two-stage execution strategy**: + +### Key Points + +- **Connection Mode**: Use `mode='superset'` for complex SQL support + ```python + conn = Connection(uri="mongodb://localhost:27017/mydb?mode=superset") + ``` + +- **Two-Stage Execution**: + 1. **Stage 1**: Execute subquery on MongoDB (translate SQL to aggregation pipeline) + 2. **Stage 2**: Cache results in temporary SQLite3 table, execute complex SQL (GROUP BY, HAVING, window functions) + +- **Query Detection** (`SubqueryDetector`): + - Detects wrapped subqueries: `SELECT ... FROM (SELECT ... FROM collection) AS alias` + - Falls back to direct MongoDB execution for simple queries + +- **Core Classes**: + | Class | Purpose | + |-------|---------| + | `SupersetExecution` | Two-stage execution strategy | + | `SubqueryDetector` | Analyzes query structure | + | `QueryDBSQLite` | In-memory SQLite3 backend for intermediate results | + +- **Benefits**: Native MongoDB support in Superset dashboards with full SQL capability +- **Limitations**: Single collection queries, session-scoped caching, performance depends on result set size + +--- + +Generated: 2024 +Architecture Version: 2.0 (Post-INSERT VALUES implementation with Superset integration) diff --git a/README.md b/README.md index ddfacb1..fce6a16 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,27 @@ pip install -e . ## Quick Start +**Table of Contents:** +- [Basic Usage](#basic-usage) +- [Using Connection String](#using-connection-string) +- [Context Manager Support](#context-manager-support) +- [Using DictCursor for Dictionary Results](#using-dictcursor-for-dictionary-results) +- [Cursor vs DictCursor](#cursor-vs-dictcursor) +- [Query with Parameters](#query-with-parameters) +- [Supported SQL Features](#supported-sql-features) + - [SELECT Statements](#select-statements) + - [WHERE Clauses](#where-clauses) + - [Nested Field Support](#nested-field-support) + - [Sorting and Limiting](#sorting-and-limiting) + - [INSERT Statements](#insert-statements) + - [UPDATE Statements](#update-statements) + - [DELETE Statements](#delete-statements) + - [Transaction Support](#transaction-support) +- [Apache Superset Integration](#apache-superset-integration) +- [Limitations & Roadmap](#limitations--roadmap) +- [Contributing](#contributing) +- [License](#license) + ### Basic Usage ```python @@ -378,6 +399,33 @@ cursor.execute("DELETE FROM Music WHERE available = false") print(f"Deleted {cursor.rowcount} documents") ``` +### Transaction Support + +PyMongoSQL supports DB API 2.0 transactions for ACID-compliant database operations. Use the `begin()`, `commit()`, and `rollback()` methods to manage transactions: + +```python +from pymongosql import connect + +connection = connect(host="mongodb://localhost:27017/database") + +try: + connection.begin() # Start transaction + + cursor = connection.cursor() + cursor.execute('UPDATE accounts SET balance = balance - 100 WHERE id = ?', [1]) + cursor.execute('UPDATE accounts SET balance = balance + 100 WHERE id = ?', [2]) + + connection.commit() # Commit all changes + print("Transaction committed successfully") +except Exception as e: + connection.rollback() # Rollback on error + print(f"Transaction failed: {e}") +finally: + connection.close() +``` + +**Note:** MongoDB requires a replica set or sharded cluster for transaction support. Standalone MongoDB servers do not support ACID transactions at the server level. + ## Apache Superset Integration PyMongoSQL can be used as a database driver in Apache Superset for querying and visualizing MongoDB data: @@ -403,14 +451,8 @@ This allows seamless integration between MongoDB data and Superset's BI capabili **Note**: PyMongoSQL currently supports DQL (Data Query Language) and DML (Data Manipulation Language) operations. The following SQL features are **not yet supported** but are planned for future releases: -- **DDL Operations** (Data Definition Language) - - `CREATE TABLE/COLLECTION`, `DROP TABLE/COLLECTION` - - `CREATE INDEX`, `DROP INDEX` - - `LIST TABLES/COLLECTIONS` - - `ALTER TABLE/COLLECTION` - **Advanced DML Operations** - - `MERGE`, `UPSERT` - - Transactions and multi-document operations + - `REPLACE`, `MERGE`, `UPSERT` These features are on our development roadmap and contributions are welcome! diff --git a/pymongosql/__init__.py b/pymongosql/__init__.py index ae4b077..b9de191 100644 --- a/pymongosql/__init__.py +++ b/pymongosql/__init__.py @@ -6,7 +6,7 @@ if TYPE_CHECKING: from .connection import Connection -__version__: str = "0.3.1" +__version__: str = "0.3.2" # Globals https://www.python.org/dev/peps/pep-0249/#globals apilevel: str = "2.0" diff --git a/pymongosql/connection.py b/pymongosql/connection.py index a83a832..b577c59 100644 --- a/pymongosql/connection.py +++ b/pymongosql/connection.py @@ -11,7 +11,7 @@ from .common import BaseCursor from .cursor import Cursor -from .error import DatabaseError, NotSupportedError, OperationalError +from .error import DatabaseError, OperationalError from .helper import ConnectionHelper _logger = logging.getLogger(__name__) @@ -212,8 +212,8 @@ def in_transaction(self) -> bool: return self._in_transaction @in_transaction.setter - def in_transaction(self, value: bool) -> bool: - self._in_transaction = False + def in_transaction(self, value: bool) -> None: + self._in_transaction = value @property def host(self) -> str: @@ -407,24 +407,54 @@ def _with_transaction(self, callback, **kwargs): return self._session.with_transaction(callback, **kwargs) def begin(self) -> None: - """Begin transaction (DB-API 2.0 standard method)""" + """Begin transaction (DB-API 2.0 standard method) + + Starts an explicit transaction. After calling begin(), operations + are executed within the transaction context until commit() or + rollback() is called. Requires MongoDB 4.0+ for multi-document + transactions on replica sets or sharded clusters. + + Example: + conn.begin() + try: + cursor.execute("INSERT INTO users VALUES (...)") + cursor.execute("UPDATE accounts SET balance = balance - 100") + conn.commit() + except Exception: + conn.rollback() + + Raises: + OperationalError: If unable to start transaction + """ self._start_transaction() def commit(self) -> None: - """Commit transaction (DB-API 2.0 standard method)""" + """Commit transaction (DB-API 2.0 standard method) + + Commits the current transaction to the database. All operations + executed since begin() will be atomically persisted. If no + transaction is active, this is a no-op (DB-API 2.0 compliant). + + Raises: + OperationalError: If commit fails + """ if self._session and self._session.in_transaction: self._commit_transaction() - else: - # Fallback for non-session based operations - self._in_transaction = False - self._autocommit = True + # If no transaction, this is a no-op (DB-API 2.0 compliant) def rollback(self) -> None: - """Rollback transaction (DB-API 2.0 standard method)""" + """Rollback transaction (DB-API 2.0 standard method) + + Rolls back (aborts) the current transaction, undoing all operations + executed since begin(). If no transaction is active, this is a no-op + (DB-API 2.0 compliant). + + Raises: + OperationalError: If rollback fails + """ if self._session and self._session.in_transaction: self._abort_transaction() - else: - raise NotSupportedError("MongoDB doesn't support rollback without an active transaction") + # If no transaction, this is a no-op (DB-API 2.0 compliant) def test_connection(self) -> bool: """Test if the connection is alive""" diff --git a/pymongosql/executor.py b/pymongosql/executor.py index b9aae8a..7acabba 100644 --- a/pymongosql/executor.py +++ b/pymongosql/executor.py @@ -101,11 +101,23 @@ def _replace_placeholders(self, obj: Any, parameters: Sequence[Any]) -> Any: def _execute_execution_plan( self, execution_plan: QueryExecutionPlan, - db: Any, + connection: Any = None, parameters: Optional[Sequence[Any]] = None, ) -> Optional[Dict[str, Any]]: - """Execute a QueryExecutionPlan against MongoDB using db.command""" + """Execute a QueryExecutionPlan against MongoDB using db.command + + Args: + execution_plan: QueryExecutionPlan to execute + connection: Connection object (for session and database access) + parameters: Parameters for placeholder replacement + """ try: + # Get database from connection + if not connection: + raise OperationalError("No connection provided") + + db = connection.database + # Get database if not execution_plan.collection: raise ProgrammingError("No collection specified in query") @@ -144,8 +156,11 @@ def _execute_execution_plan( _logger.debug(f"Executing MongoDB command: {find_command}") - # Execute find command directly - result = db.command(find_command) + # Execute find command with session if in transaction + if connection and connection.session and connection.session.in_transaction: + result = db.command(find_command, session=connection.session) + else: + result = db.command(find_command) # Create command result return result @@ -182,7 +197,7 @@ def execute( # Parse the query self._execution_plan = self._parse_sql(processed_query) - return self._execute_execution_plan(self._execution_plan, connection.database, processed_params) + return self._execute_execution_plan(self._execution_plan, connection, processed_params) class InsertExecution(ExecutionStrategy): @@ -224,10 +239,16 @@ def _replace_placeholders( def _execute_execution_plan( self, execution_plan: InsertExecutionPlan, - db: Any, + connection: Any = None, parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None, ) -> Optional[Dict[str, Any]]: try: + # Get database from connection + if not connection: + raise OperationalError("No connection provided") + + db = connection.database + if not execution_plan.collection: raise ProgrammingError("No collection specified in insert") @@ -238,7 +259,11 @@ def _execute_execution_plan( _logger.debug(f"Executing MongoDB insert command: {command}") - return db.command(command) + # Execute with session if in transaction + if connection and connection.session and connection.session.in_transaction: + return db.command(command, session=connection.session) + else: + return db.command(command) except PyMongoError as e: _logger.error(f"MongoDB insert failed: {e}") raise DatabaseError(f"Insert execution failed: {e}") @@ -259,7 +284,7 @@ def execute( self._execution_plan = self._parse_sql(context.query) - return self._execute_execution_plan(self._execution_plan, connection.database, parameters) + return self._execute_execution_plan(self._execution_plan, connection, parameters) class DeleteExecution(ExecutionStrategy): @@ -293,10 +318,16 @@ def _parse_sql(self, sql: str) -> Any: def _execute_execution_plan( self, execution_plan: Any, - db: Any, + connection: Any = None, parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None, ) -> Optional[Dict[str, Any]]: try: + # Get database from connection + if not connection: + raise OperationalError("No connection provided") + + db = connection.database + if not execution_plan.collection: raise ProgrammingError("No collection specified in delete") @@ -312,7 +343,11 @@ def _execute_execution_plan( _logger.debug(f"Executing MongoDB delete command: {command}") - return db.command(command) + # Execute with session if in transaction + if connection and connection.session and connection.session.in_transaction: + return db.command(command, session=connection.session) + else: + return db.command(command) except PyMongoError as e: _logger.error(f"MongoDB delete failed: {e}") raise DatabaseError(f"Delete execution failed: {e}") @@ -333,7 +368,7 @@ def execute( self._execution_plan = self._parse_sql(context.query) - return self._execute_execution_plan(self._execution_plan, connection.database, parameters) + return self._execute_execution_plan(self._execution_plan, connection, parameters) class UpdateExecution(ExecutionStrategy): @@ -367,10 +402,16 @@ def _parse_sql(self, sql: str) -> Any: def _execute_execution_plan( self, execution_plan: Any, - db: Any, + connection: Any = None, parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None, ) -> Optional[Dict[str, Any]]: try: + # Get database from connection + if not connection: + raise OperationalError("No connection provided") + + db = connection.database + if not execution_plan.collection: raise ProgrammingError("No collection specified in update") @@ -406,7 +447,11 @@ def _execute_execution_plan( _logger.debug(f"Executing MongoDB update command: {command}") - return db.command(command) + # Execute with session if in transaction + if connection and connection.session and connection.session.in_transaction: + return db.command(command, session=connection.session) + else: + return db.command(command) except PyMongoError as e: _logger.error(f"MongoDB update failed: {e}") raise DatabaseError(f"Update execution failed: {e}") @@ -427,7 +472,7 @@ def execute( self._execution_plan = self._parse_sql(context.query) - return self._execute_execution_plan(self._execution_plan, connection.database, parameters) + return self._execute_execution_plan(self._execution_plan, connection, parameters) class ExecutionPlanFactory: diff --git a/pymongosql/superset_mongodb/executor.py b/pymongosql/superset_mongodb/executor.py index 9fb6c9b..028fa10 100644 --- a/pymongosql/superset_mongodb/executor.py +++ b/pymongosql/superset_mongodb/executor.py @@ -63,7 +63,7 @@ def execute( _logger.debug(f"Stage 1: Executing MongoDB subquery: {mongo_query}") mongo_execution_plan = self._parse_sql(mongo_query) - mongo_result = self._execute_execution_plan(mongo_execution_plan, connection.database) + mongo_result = self._execute_execution_plan(mongo_execution_plan, connection) # Extract result set from MongoDB mongo_result_set = ResultSet( diff --git a/pyproject.toml b/pyproject.toml index e42724a..3a341ab 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -89,6 +89,9 @@ testpaths = ["tests"] python_files = ["test_*.py"] python_classes = ["Test*"] python_functions = ["test_*"] +markers = [ + "transactional: marks tests that require MongoDB transaction support (requires replica set or sharded cluster)", +] [tool.coverage.run] source = ["pymongosql"] diff --git a/tests/test_connection_transaction.py b/tests/test_connection_transaction.py new file mode 100644 index 0000000..6dbe37d --- /dev/null +++ b/tests/test_connection_transaction.py @@ -0,0 +1,539 @@ +# -*- coding: utf-8 -*- +import pytest +from pymongo.errors import InvalidOperation, OperationFailure + +from pymongosql.error import DatabaseError + +# Mark tests that require server-side transaction support +transactional = pytest.mark.transactional + + +class TestConnectionTransaction: + """Test suite for transaction support in Connection class""" + + def test_autocommit_enabled_by_default(self, conn): + """Test that autocommit is enabled by default""" + assert conn.autocommit is True + assert conn.in_transaction is False + conn.close() + + def test_in_transaction_initial_state(self, conn): + """Test in_transaction property is False initially""" + assert conn.in_transaction is False + conn.close() + + def test_begin_starts_transaction(self, conn): + """Test that begin() starts a transaction""" + conn.begin() + + assert conn.in_transaction is True + assert conn.autocommit is False + assert conn.session is not None + + conn.rollback() # Clean up + conn.close() + + def test_begin_creates_session(self, conn): + """Test that begin() creates a session""" + assert conn.session is None + + conn.begin() + + assert conn.session is not None + assert conn.session.in_transaction + + conn.rollback() + conn.close() + + def test_commit_on_empty_transaction(self, conn): + """Test commit on an empty transaction (no operations)""" + conn.begin() + + # Commit without any operations should succeed + conn.commit() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + def test_rollback_on_empty_transaction(self, conn): + """Test rollback on an empty transaction (no operations)""" + conn.begin() + + # Rollback without any operations should succeed + conn.rollback() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + def test_commit_without_transaction_is_noop(self, conn): + """Test that commit() without active transaction is a no-op (DB-API 2.0 compliant)""" + assert conn.in_transaction is False + + # Calling commit without begin() should not raise + conn.commit() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + def test_rollback_without_transaction_is_noop(self, conn): + """Test that rollback() without active transaction is a no-op (DB-API 2.0 compliant)""" + assert conn.in_transaction is False + + # Calling rollback without begin() should not raise + conn.rollback() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + @transactional + def test_transaction_with_insert_operation(self, conn): + """Test transaction with INSERT operation + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up any existing test data + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["transaction_test"]) + conn.commit() if not conn.autocommit else None + + conn.begin() + + try: + cursor = conn.cursor() + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["transaction_test", 100]) + + assert conn.in_transaction is True + + conn.commit() + + assert conn.in_transaction is False + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["transaction_test"]) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_transaction_with_multiple_operations(self, conn): + """Test transaction with multiple INSERT operations + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name IN (?, ?)", ["txn_test_1", "txn_test_2"]) + + conn.begin() + + try: + cursor = conn.cursor() + + # First insert + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["txn_test_1", 101]) + + # Second insert + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["txn_test_2", 102]) + + assert conn.in_transaction is True + + # Commit both operations atomically + conn.commit() + + assert conn.in_transaction is False + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name IN (?, ?)", ["txn_test_1", "txn_test_2"]) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_transaction_rollback_undoes_changes(self, conn): + """Test that rollback() undoes uncommitted changes + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up first + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["rollback_test"]) + + conn.begin() + + try: + cursor = conn.cursor() + + # Insert during transaction + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["rollback_test", 200]) + + # Verify we're in transaction + assert conn.in_transaction is True + + # Rollback the transaction + conn.rollback() + + # Verify transaction is ended + assert conn.in_transaction is False + + # Verify the insert was rolled back (should not exist) + cursor = conn.cursor() + cursor.execute("SELECT * FROM test_transaction WHERE name = ?", ["rollback_test"]) + result = cursor.fetchall() + assert len(result) == 0, "Insert should have been rolled back" + + finally: + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_transaction_with_update_operation(self, conn): + """Test transaction with UPDATE operation + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Setup: Insert initial data + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["update_test"]) + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["update_test", 50]) + + conn.begin() + + try: + cursor = conn.cursor() + + # Update within transaction + cursor.execute("UPDATE test_transaction SET value = ? WHERE name = ?", [150, "update_test"]) + + assert conn.in_transaction is True + + conn.commit() + + assert conn.in_transaction is False + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["update_test"]) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_transaction_with_delete_operation(self, conn): + """Test transaction with DELETE operation + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Setup: Insert initial data + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["delete_test"]) + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["delete_test", 300]) + + conn.begin() + + try: + cursor = conn.cursor() + + # Delete within transaction + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["delete_test"]) + + assert conn.in_transaction is True + + conn.commit() + + assert conn.in_transaction is False + + finally: + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_nested_begin_allowed(self, conn): + """Test that multiple begin() calls raises error (PyMongo doesn't allow nested transactions) + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + conn.begin() + session1 = conn.session + + # Begin again - should raise InvalidOperation + with pytest.raises(InvalidOperation): + conn.begin() + + # Original session should still be in transaction + assert conn.in_transaction is True + assert session1 is not None + + conn.rollback() + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + def test_transaction_state_property_setter(self, conn): + """Test in_transaction property setter""" + assert conn.in_transaction is False + + # Manually set in_transaction (internal use) + conn.in_transaction = True + assert conn.in_transaction is True + + conn.in_transaction = False + assert conn.in_transaction is False + + conn.close() + + def test_autocommit_disabled_after_begin(self, conn): + """Test that autocommit is disabled after begin()""" + assert conn.autocommit is True + + conn.begin() + + assert conn.autocommit is False + + conn.rollback() + assert conn.autocommit is True + + conn.close() + + @transactional + def test_transaction_with_query_select(self, conn): + """Test transaction with SELECT operation (read-only) + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + conn.begin() + + try: + cursor = conn.cursor() + + # Select should work within transaction + cursor.execute("SELECT * FROM test_transaction LIMIT ?", [1]) + _ = cursor.fetchone() + + assert conn.in_transaction is True + + conn.commit() + + finally: + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_context_manager_transaction_success(self, conn): + """Test transaction context manager on successful completion + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["ctx_test"]) + + try: + # Use session context manager + with conn.session_context(): + cursor = conn.cursor() + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["ctx_test", 400]) + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name = ?", ["ctx_test"]) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + @transactional + def test_transaction_with_multiple_cursors(self, conn): + """Test transaction consistency with multiple cursor objects + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up + cursor1 = conn.cursor() + cursor1.execute("DELETE FROM test_transaction WHERE name IN (?, ?)", ["cursor_test_1", "cursor_test_2"]) + + conn.begin() + + try: + cursor1 = conn.cursor() + cursor2 = conn.cursor() + + # Both cursors should see the same transaction + cursor1.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["cursor_test_1", 501]) + + cursor2.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["cursor_test_2", 502]) + + # Both operations should be committed together + conn.commit() + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute("DELETE FROM test_transaction WHERE name IN (?, ?)", ["cursor_test_1", "cursor_test_2"]) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise + + def test_transaction_state_after_rollback(self, conn): + """Test transaction state is properly reset after rollback""" + conn.begin() + assert conn.in_transaction is True + assert conn.autocommit is False + + conn.rollback() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + def test_transaction_state_after_commit(self, conn): + """Test transaction state is properly reset after commit""" + conn.begin() + assert conn.in_transaction is True + assert conn.autocommit is False + + conn.commit() + + assert conn.in_transaction is False + assert conn.autocommit is True + + conn.close() + + def test_begin_after_commit(self, conn): + """Test that begin() works after commit()""" + # First transaction + conn.begin() + conn.commit() + assert conn.in_transaction is False + + # Second transaction + conn.begin() + assert conn.in_transaction is True + + conn.rollback() + conn.close() + + def test_begin_after_rollback(self, conn): + """Test that begin() works after rollback()""" + # First transaction + conn.begin() + conn.rollback() + assert conn.in_transaction is False + + # Second transaction + conn.begin() + assert conn.in_transaction is True + + conn.rollback() + conn.close() + + def test_session_created_by_begin(self, conn): + """Test that session is created/available after begin()""" + assert conn.session is None + + conn.begin() + + assert conn.session is not None + assert conn.session.in_transaction + + conn.rollback() + conn.close() + + @transactional + def test_multiple_transactions_sequential(self, conn): + """Test multiple sequential transactions + + NOTE: Requires MongoDB replica set or sharded cluster. + Will be skipped on standalone servers. + """ + try: + # Clean up + cursor = conn.cursor() + cursor.execute( + "DELETE FROM test_transaction WHERE name IN (?, ?, ?)", ["seq_test_1", "seq_test_2", "seq_test_3"] + ) + + try: + # First transaction + conn.begin() + cursor = conn.cursor() + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["seq_test_1", 601]) + conn.commit() + + # Second transaction + conn.begin() + cursor = conn.cursor() + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["seq_test_2", 602]) + conn.commit() + + # Third transaction + conn.begin() + cursor = conn.cursor() + cursor.execute("INSERT INTO test_transaction {'name': '?', 'value': '?'}", ["seq_test_3", 603]) + conn.commit() + + assert conn.in_transaction is False + + finally: + # Clean up + cursor = conn.cursor() + cursor.execute( + "DELETE FROM test_transaction WHERE name IN (?, ?, ?)", ["seq_test_1", "seq_test_2", "seq_test_3"] + ) + conn.close() + except (InvalidOperation, OperationFailure, DatabaseError) as e: + if "Transaction numbers are only allowed on a replica set member or mongos" in str(e): + pytest.skip("MongoDB server does not support transactions (requires replica set or sharded cluster)") + raise