Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
530 changes: 530 additions & 0 deletions HOW_IT_WORKS.md

Large diffs are not rendered by default.

56 changes: 49 additions & 7 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,27 @@ pip install -e .

## Quick Start

**Table of Contents:**
- [Basic Usage](#basic-usage)
- [Using Connection String](#using-connection-string)
- [Context Manager Support](#context-manager-support)
- [Using DictCursor for Dictionary Results](#using-dictcursor-for-dictionary-results)
- [Cursor vs DictCursor](#cursor-vs-dictcursor)
- [Query with Parameters](#query-with-parameters)
- [Supported SQL Features](#supported-sql-features)
- [SELECT Statements](#select-statements)
- [WHERE Clauses](#where-clauses)
- [Nested Field Support](#nested-field-support)
- [Sorting and Limiting](#sorting-and-limiting)
- [INSERT Statements](#insert-statements)
- [UPDATE Statements](#update-statements)
- [DELETE Statements](#delete-statements)
- [Transaction Support](#transaction-support)
- [Apache Superset Integration](#apache-superset-integration)
- [Limitations & Roadmap](#limitations--roadmap)
- [Contributing](#contributing)
- [License](#license)

### Basic Usage

```python
Expand Down Expand Up @@ -378,6 +399,33 @@ cursor.execute("DELETE FROM Music WHERE available = false")
print(f"Deleted {cursor.rowcount} documents")
```

### Transaction Support

PyMongoSQL supports DB API 2.0 transactions for ACID-compliant database operations. Use the `begin()`, `commit()`, and `rollback()` methods to manage transactions:

```python
from pymongosql import connect

connection = connect(host="mongodb://localhost:27017/database")

try:
connection.begin() # Start transaction

cursor = connection.cursor()
cursor.execute('UPDATE accounts SET balance = balance - 100 WHERE id = ?', [1])
cursor.execute('UPDATE accounts SET balance = balance + 100 WHERE id = ?', [2])

connection.commit() # Commit all changes
print("Transaction committed successfully")
except Exception as e:
connection.rollback() # Rollback on error
print(f"Transaction failed: {e}")
finally:
connection.close()
```

**Note:** MongoDB requires a replica set or sharded cluster for transaction support. Standalone MongoDB servers do not support ACID transactions at the server level.

## Apache Superset Integration

PyMongoSQL can be used as a database driver in Apache Superset for querying and visualizing MongoDB data:
Expand All @@ -403,14 +451,8 @@ This allows seamless integration between MongoDB data and Superset's BI capabili

**Note**: PyMongoSQL currently supports DQL (Data Query Language) and DML (Data Manipulation Language) operations. The following SQL features are **not yet supported** but are planned for future releases:

- **DDL Operations** (Data Definition Language)
- `CREATE TABLE/COLLECTION`, `DROP TABLE/COLLECTION`
- `CREATE INDEX`, `DROP INDEX`
- `LIST TABLES/COLLECTIONS`
- `ALTER TABLE/COLLECTION`
- **Advanced DML Operations**
- `MERGE`, `UPSERT`
- Transactions and multi-document operations
- `REPLACE`, `MERGE`, `UPSERT`

These features are on our development roadmap and contributions are welcome!

Expand Down
2 changes: 1 addition & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.3.1"
__version__: str = "0.3.2"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down
54 changes: 42 additions & 12 deletions pymongosql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

from .common import BaseCursor
from .cursor import Cursor
from .error import DatabaseError, NotSupportedError, OperationalError
from .error import DatabaseError, OperationalError
from .helper import ConnectionHelper

_logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -212,8 +212,8 @@ def in_transaction(self) -> bool:
return self._in_transaction

@in_transaction.setter
def in_transaction(self, value: bool) -> bool:
self._in_transaction = False
def in_transaction(self, value: bool) -> None:
self._in_transaction = value

@property
def host(self) -> str:
Expand Down Expand Up @@ -407,24 +407,54 @@ def _with_transaction(self, callback, **kwargs):
return self._session.with_transaction(callback, **kwargs)

def begin(self) -> None:
"""Begin transaction (DB-API 2.0 standard method)"""
"""Begin transaction (DB-API 2.0 standard method)

Starts an explicit transaction. After calling begin(), operations
are executed within the transaction context until commit() or
rollback() is called. Requires MongoDB 4.0+ for multi-document
transactions on replica sets or sharded clusters.

Example:
conn.begin()
try:
cursor.execute("INSERT INTO users VALUES (...)")
cursor.execute("UPDATE accounts SET balance = balance - 100")
conn.commit()
except Exception:
conn.rollback()

Raises:
OperationalError: If unable to start transaction
"""
self._start_transaction()

def commit(self) -> None:
"""Commit transaction (DB-API 2.0 standard method)"""
"""Commit transaction (DB-API 2.0 standard method)

Commits the current transaction to the database. All operations
executed since begin() will be atomically persisted. If no
transaction is active, this is a no-op (DB-API 2.0 compliant).

Raises:
OperationalError: If commit fails
"""
if self._session and self._session.in_transaction:
self._commit_transaction()
else:
# Fallback for non-session based operations
self._in_transaction = False
self._autocommit = True
# If no transaction, this is a no-op (DB-API 2.0 compliant)

def rollback(self) -> None:
"""Rollback transaction (DB-API 2.0 standard method)"""
"""Rollback transaction (DB-API 2.0 standard method)

Rolls back (aborts) the current transaction, undoing all operations
executed since begin(). If no transaction is active, this is a no-op
(DB-API 2.0 compliant).

Raises:
OperationalError: If rollback fails
"""
if self._session and self._session.in_transaction:
self._abort_transaction()
else:
raise NotSupportedError("MongoDB doesn't support rollback without an active transaction")
# If no transaction, this is a no-op (DB-API 2.0 compliant)

def test_connection(self) -> bool:
"""Test if the connection is alive"""
Expand Down
73 changes: 59 additions & 14 deletions pymongosql/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,11 +101,23 @@ def _replace_placeholders(self, obj: Any, parameters: Sequence[Any]) -> Any:
def _execute_execution_plan(
self,
execution_plan: QueryExecutionPlan,
db: Any,
connection: Any = None,
parameters: Optional[Sequence[Any]] = None,
) -> Optional[Dict[str, Any]]:
"""Execute a QueryExecutionPlan against MongoDB using db.command"""
"""Execute a QueryExecutionPlan against MongoDB using db.command

Args:
execution_plan: QueryExecutionPlan to execute
connection: Connection object (for session and database access)
parameters: Parameters for placeholder replacement
"""
try:
# Get database from connection
if not connection:
raise OperationalError("No connection provided")

db = connection.database

# Get database
if not execution_plan.collection:
raise ProgrammingError("No collection specified in query")
Expand Down Expand Up @@ -144,8 +156,11 @@ def _execute_execution_plan(

_logger.debug(f"Executing MongoDB command: {find_command}")

# Execute find command directly
result = db.command(find_command)
# Execute find command with session if in transaction
if connection and connection.session and connection.session.in_transaction:
result = db.command(find_command, session=connection.session)
else:
result = db.command(find_command)

# Create command result
return result
Expand Down Expand Up @@ -182,7 +197,7 @@ def execute(
# Parse the query
self._execution_plan = self._parse_sql(processed_query)

return self._execute_execution_plan(self._execution_plan, connection.database, processed_params)
return self._execute_execution_plan(self._execution_plan, connection, processed_params)


class InsertExecution(ExecutionStrategy):
Expand Down Expand Up @@ -224,10 +239,16 @@ def _replace_placeholders(
def _execute_execution_plan(
self,
execution_plan: InsertExecutionPlan,
db: Any,
connection: Any = None,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
) -> Optional[Dict[str, Any]]:
try:
# Get database from connection
if not connection:
raise OperationalError("No connection provided")

db = connection.database

if not execution_plan.collection:
raise ProgrammingError("No collection specified in insert")

Expand All @@ -238,7 +259,11 @@ def _execute_execution_plan(

_logger.debug(f"Executing MongoDB insert command: {command}")

return db.command(command)
# Execute with session if in transaction
if connection and connection.session and connection.session.in_transaction:
return db.command(command, session=connection.session)
else:
return db.command(command)
except PyMongoError as e:
_logger.error(f"MongoDB insert failed: {e}")
raise DatabaseError(f"Insert execution failed: {e}")
Expand All @@ -259,7 +284,7 @@ def execute(

self._execution_plan = self._parse_sql(context.query)

return self._execute_execution_plan(self._execution_plan, connection.database, parameters)
return self._execute_execution_plan(self._execution_plan, connection, parameters)


class DeleteExecution(ExecutionStrategy):
Expand Down Expand Up @@ -293,10 +318,16 @@ def _parse_sql(self, sql: str) -> Any:
def _execute_execution_plan(
self,
execution_plan: Any,
db: Any,
connection: Any = None,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
) -> Optional[Dict[str, Any]]:
try:
# Get database from connection
if not connection:
raise OperationalError("No connection provided")

db = connection.database

if not execution_plan.collection:
raise ProgrammingError("No collection specified in delete")

Expand All @@ -312,7 +343,11 @@ def _execute_execution_plan(

_logger.debug(f"Executing MongoDB delete command: {command}")

return db.command(command)
# Execute with session if in transaction
if connection and connection.session and connection.session.in_transaction:
return db.command(command, session=connection.session)
else:
return db.command(command)
except PyMongoError as e:
_logger.error(f"MongoDB delete failed: {e}")
raise DatabaseError(f"Delete execution failed: {e}")
Expand All @@ -333,7 +368,7 @@ def execute(

self._execution_plan = self._parse_sql(context.query)

return self._execute_execution_plan(self._execution_plan, connection.database, parameters)
return self._execute_execution_plan(self._execution_plan, connection, parameters)


class UpdateExecution(ExecutionStrategy):
Expand Down Expand Up @@ -367,10 +402,16 @@ def _parse_sql(self, sql: str) -> Any:
def _execute_execution_plan(
self,
execution_plan: Any,
db: Any,
connection: Any = None,
parameters: Optional[Union[Sequence[Any], Dict[str, Any]]] = None,
) -> Optional[Dict[str, Any]]:
try:
# Get database from connection
if not connection:
raise OperationalError("No connection provided")

db = connection.database

if not execution_plan.collection:
raise ProgrammingError("No collection specified in update")

Expand Down Expand Up @@ -406,7 +447,11 @@ def _execute_execution_plan(

_logger.debug(f"Executing MongoDB update command: {command}")

return db.command(command)
# Execute with session if in transaction
if connection and connection.session and connection.session.in_transaction:
return db.command(command, session=connection.session)
else:
return db.command(command)
except PyMongoError as e:
_logger.error(f"MongoDB update failed: {e}")
raise DatabaseError(f"Update execution failed: {e}")
Expand All @@ -427,7 +472,7 @@ def execute(

self._execution_plan = self._parse_sql(context.query)

return self._execute_execution_plan(self._execution_plan, connection.database, parameters)
return self._execute_execution_plan(self._execution_plan, connection, parameters)


class ExecutionPlanFactory:
Expand Down
2 changes: 1 addition & 1 deletion pymongosql/superset_mongodb/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def execute(
_logger.debug(f"Stage 1: Executing MongoDB subquery: {mongo_query}")

mongo_execution_plan = self._parse_sql(mongo_query)
mongo_result = self._execute_execution_plan(mongo_execution_plan, connection.database)
mongo_result = self._execute_execution_plan(mongo_execution_plan, connection)

# Extract result set from MongoDB
mongo_result_set = ResultSet(
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ testpaths = ["tests"]
python_files = ["test_*.py"]
python_classes = ["Test*"]
python_functions = ["test_*"]
markers = [
"transactional: marks tests that require MongoDB transaction support (requires replica set or sharded cluster)",
]

[tool.coverage.run]
source = ["pymongosql"]
Expand Down
Loading