Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 28 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,18 @@ PyMongoSQL is a Python [DB API 2.0 (PEP 249)](https://www.python.org/dev/peps/pe

## Objectives

PyMongoSQL implements the DB API 2.0 interfaces to provide SQL-like access to MongoDB. The project aims to:
PyMongoSQL implements the DB API 2.0 interfaces to provide SQL-like access to MongoDB, built on PartiQL syntax for querying semi-structured data. The project aims to:

- Bridge the gap between SQL and NoSQL by providing SQL capabilities for MongoDB
- Support standard SQL DQL (Data Query Language) operations including SELECT statements with WHERE, ORDER BY, and LIMIT clauses
- Bridge the gap between SQL and NoSQL by providing SQL capabilities for MongoDB's nested document structures
- Support standard SQL DQL (Data Query Language) operations including SELECT statements with WHERE, ORDER BY, and LIMIT clauses on nested and hierarchical data
- Provide seamless integration with existing Python applications that expect DB API 2.0 compliance
- Enable easy migration from traditional SQL databases to MongoDB
- Enable easy migration from traditional SQL databases to MongoDB without rewriting queries for document traversal

## Features

- **DB API 2.0 Compliant**: Full compatibility with Python Database API 2.0 specification
- **PartiQL-based SQL Syntax**: Built on [PartiQL](https://partiql.org/tutorial.html) (SQL for semi-structured data), enabling seamless SQL querying of nested and hierarchical MongoDB documents
- **Nested Structure Support**: Query and filter deeply nested fields and arrays within MongoDB documents using standard SQL syntax
- **SQLAlchemy Integration**: Complete ORM and Core support with dedicated MongoDB dialect
- **SQL Query Support**: SELECT statements with WHERE conditions, field selection, and aliases
- **Connection String Support**: MongoDB URI format for easy configuration
Expand Down Expand Up @@ -140,6 +142,7 @@ while users:
### SELECT Statements
- Field selection: `SELECT name, age FROM users`
- Wildcards: `SELECT * FROM products`
- **Field aliases**: `SELECT name as user_name, age as user_age FROM users`
- **Nested fields**: `SELECT profile.name, profile.age FROM users`
- **Array access**: `SELECT items[0], items[1].name FROM orders`

Expand Down Expand Up @@ -176,6 +179,27 @@ while users:

These features are on our development roadmap and contributions are welcome!

## Apache Superset Integration

PyMongoSQL can be used as a database driver in Apache Superset for querying and visualizing MongoDB data:

1. **Install PyMongoSQL**: Install PyMongoSQL on the Superset app server:
```bash
pip install pymongosql
```
2. **Create Connection**: Connect to your MongoDB instance using the connection URI with superset mode:
```
mongodb://username:password@host:port/database?mode=superset
```
or for MongoDB Atlas:
```
mongodb+srv://username:password@host/database?mode=superset
```
3. **Use SQL Lab**: Write and execute SQL queries against MongoDB collections directly in Superset's SQL Lab
4. **Create Visualizations**: Build charts and dashboards from your MongoDB queries using Superset's visualization tools

This allows seamless integration between MongoDB data and Superset's BI capabilities without requiring data migration to traditional SQL databases.

## Contributing

Contributions are welcome! Please feel free to submit a Pull Request. For major changes, please open an issue first to discuss what you would like to change.
Expand Down
2 changes: 1 addition & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.2.3"
__version__: str = "0.2.4"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down
11 changes: 9 additions & 2 deletions pymongosql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@ def __init__(

Supports connection string patterns:
- mongodb://host:port/database - Core driver (no subquery support)
- mongodb+superset://host:port/database - Superset driver with subquery support
- mongodb+srv://host:port/database - Cloud/SRV connection string
- mongodb://host:port/database?mode=superset - Superset driver with subquery support
- mongodb+srv://host:port/database?mode=superset - Cloud SRV with superset mode

Mode is specified via the ?mode= query parameter. If not specified, defaults to "standard".

See PyMongo MongoClient documentation for full parameter details.
https://www.mongodb.com/docs/languages/python/pymongo-driver/current/connect/mongoclient/
"""
# Check if connection string specifies mode
connection_string = host if isinstance(host, str) else None
mode, host = ConnectionHelper.parse_connection_string(connection_string)
mode, db_from_uri, host = ConnectionHelper.parse_connection_string(connection_string)

self._mode = kwargs.pop("mode", None)
if not self._mode and mode:
Expand All @@ -56,7 +60,10 @@ def __init__(
self._port = port or 27017

# Handle database parameter separately (not a MongoClient parameter)
# Explicit 'database' parameter takes precedence over database in URI
self._database_name = kwargs.pop("database", None)
if not self._database_name and db_from_uri:
self._database_name = db_from_uri

# Store all PyMongo parameters to pass through directly
self._pymongo_params = kwargs.copy()
Expand Down
10 changes: 0 additions & 10 deletions pymongosql/executor.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,4 @@
# -*- coding: utf-8 -*-
"""
Query execution strategies for handling both simple and subquery-based SQL operations.

This module provides different execution strategies:
- StandardExecution: Direct MongoDB query for simple SELECT statements

The intermediate database is configurable - any backend implementing QueryDatabase
interface can be used (SQLite3, PostgreSQL, MySQL, etc.).
"""

import logging
from abc import ABC, abstractmethod
from dataclasses import dataclass
Expand Down
100 changes: 64 additions & 36 deletions pymongosql/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

import logging
from typing import Optional, Tuple
from urllib.parse import urlparse
from urllib.parse import parse_qs, urlparse

_logger = logging.getLogger(__name__)

Expand All @@ -17,52 +17,80 @@ class ConnectionHelper:

Supports connection string patterns:
- mongodb://host:port/database - Core driver (no subquery support)
- mongodb+superset://host:port/database - Superset driver with subquery support
- mongodb+srv://host:port/database - Cloud/SRV connection string
- mongodb://host:port/database?mode=superset - Superset driver with subquery support
- mongodb+srv://host:port/database?mode=superset - Cloud SRV with superset mode

Mode is specified via query parameter (?mode=superset) and defaults to "standard" if not specified.
"""

@staticmethod
def parse_connection_string(connection_string: str) -> Tuple[str, str, Optional[str], int, Optional[str]]:
def parse_connection_string(connection_string: Optional[str]) -> Tuple[Optional[str], Optional[str], Optional[str]]:
"""
Parse PyMongoSQL connection string and determine driver mode.
Parse MongoDB connection string and extract driver mode from query parameters.

Mode is extracted from the 'mode' query parameter and removed from the normalized
connection string. Database name is extracted from the path. If mode is not specified,
it defaults to "standard".

Supports all standard MongoDB connection string patterns:
mongodb://[username:password@]host1[:port1][,host2[:port2]...][/[defaultauthdb]?options]

Args:
connection_string: MongoDB connection string

Returns:
Tuple of (mode, database_name, normalized_connection_string)
- mode: "standard" (default) or other mode values specified via ?mode= parameter
- database_name: extracted database name from path, or None if not specified
- normalized_connection_string: connection string without the mode parameter
"""
try:
if not connection_string:
return "standard", None
return "standard", None, None

parsed = urlparse(connection_string)
scheme = parsed.scheme

if not parsed.scheme:
return "standard", connection_string

base_scheme = "mongodb"
mode = "standard"

# Determine mode from scheme
if "+" in scheme:
base_scheme = scheme.split("+")[0].lower()
mode = scheme.split("+")[-1].lower()

host = parsed.hostname or "localhost"
port = parsed.port or 27017
database = parsed.path.lstrip("/") if parsed.path else None

# Build normalized connection string with mongodb scheme (removing any +mode)
# Reconstruct netloc with credentials if present
netloc = host
if parsed.username:
creds = parsed.username
if parsed.password:
creds += f":{parsed.password}"
netloc = f"{creds}@{host}"
netloc += f":{port}"

query_part = f"?{parsed.query}" if parsed.query else ""
normalized_connection_string = f"{base_scheme}://{netloc}/{database or ''}{query_part}"

_logger.debug(f"Parsed connection string - Mode: {mode}, Host: {host}, Port: {port}, Database: {database}")

return mode, normalized_connection_string
return "standard", None, connection_string

# Extract mode from query parameters (defaults to "standard" if not specified)
query_params = parse_qs(parsed.query, keep_blank_values=True) if parsed.query else {}
mode = query_params.get("mode", ["standard"])[0]

# Extract database name from path
database_name = None
if parsed.path:
# Remove leading slash and trailing slashes
path_parts = parsed.path.strip("/").split("/")
if path_parts and path_parts[0]: # Get the first path segment as database name
database_name = path_parts[0]

# Remove mode from query parameters
query_params.pop("mode", None)

# Rebuild query string without mode parameter
query_string = (
"&".join(f"{k}={v}" if v else k for k, v_list in query_params.items() for v in v_list)
if query_params
else ""
)

# Reconstruct the connection string without mode parameter
if query_string:
if parsed.path:
normalized_connection_string = f"{parsed.scheme}://{parsed.netloc}{parsed.path}?{query_string}"
else:
normalized_connection_string = f"{parsed.scheme}://{parsed.netloc}?{query_string}"
else:
if parsed.path:
normalized_connection_string = f"{parsed.scheme}://{parsed.netloc}{parsed.path}"
else:
normalized_connection_string = f"{parsed.scheme}://{parsed.netloc}"

_logger.debug(f"Parsed connection string - Mode: {mode}, Database: {database_name}")

return mode, database_name, normalized_connection_string

except Exception as e:
_logger.error(f"Failed to parse connection string: {e}")
Expand Down
21 changes: 9 additions & 12 deletions pymongosql/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ def __init__(
self._is_closed = False
self._cache_exhausted = False
self._total_fetched = 0
self._description: Optional[List[Tuple[str, str, None, None, None, None, None]]] = None
self._description: Optional[List[Tuple[str, Any, None, None, None, None, None]]] = None
self._column_names: Optional[List[str]] = None # Track column order for sequences
self._errors: List[Dict[str, str]] = []

Expand All @@ -69,20 +69,22 @@ def _build_description(self) -> None:
if not self._execution_plan.projection_stage:
# No projection specified, build description from column names if available
if self._column_names:
self._description = [
(col_name, "VARCHAR", None, None, None, None, None) for col_name in self._column_names
]
self._description = [(col_name, str, None, None, None, None, None) for col_name in self._column_names]
else:
# Will be built dynamically when columns are established
self._description = None
return

# Build description from projection (now in MongoDB format {field: 1})
description = []
column_aliases = getattr(self._execution_plan, "column_aliases", {})

for field_name, include_flag in self._execution_plan.projection_stage.items():
# SQL cursor description format: (name, type_code, display_size, internal_size, precision, scale, null_ok)
if include_flag == 1: # Field is included in projection
description.append((field_name, "VARCHAR", None, None, None, None, None))
# Use alias if available, otherwise use field name
display_name = column_aliases.get(field_name, field_name)
description.append((display_name, str, None, None, None, None, None))

self._description = description

Expand Down Expand Up @@ -202,19 +204,15 @@ def rowcount(self) -> int:
@property
def description(
self,
) -> Optional[List[Tuple[str, str, None, None, None, None, None]]]:
) -> Optional[List[Tuple[str, Any, None, None, None, None, None]]]:
"""Return column description"""
if self._description is None:
# Try to build description from established column names
try:
if not self._cache_exhausted:
# Fetch one result to establish column names if needed
self._ensure_results_available(1)

if self._column_names:
# Build description from established column names
self._description = [
(col_name, "VARCHAR", None, None, None, None, None) for col_name in self._column_names
(col_name, str, None, None, None, None, None) for col_name in self._column_names
]
except Exception as e:
_logger.warning(f"Could not build dynamic description: {e}")
Expand Down Expand Up @@ -272,7 +270,6 @@ def fetchall(self) -> List[Sequence[Any]]:

# Now get everything from cache
all_results.extend(self._cached_results)
self._total_fetched += len(self._cached_results)
self._cached_results.clear()
self._cache_exhausted = True

Expand Down
8 changes: 5 additions & 3 deletions pymongosql/sql/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,11 @@ def parse_to_execution_plan(self) -> ExecutionPlan:
"""Convert the parse result to an ExecutionPlan using BuilderFactory"""
builder = BuilderFactory.create_query_builder().collection(self._parse_result.collection)

builder.filter(self._parse_result.filter_conditions).project(self._parse_result.projection).sort(
self._parse_result.sort_fields
).limit(self._parse_result.limit_value).skip(self._parse_result.offset_value)
builder.filter(self._parse_result.filter_conditions).project(self._parse_result.projection).column_aliases(
self._parse_result.column_aliases
).sort(self._parse_result.sort_fields).limit(self._parse_result.limit_value).skip(
self._parse_result.offset_value
)

return builder.build()

Expand Down
15 changes: 12 additions & 3 deletions pymongosql/sql/builder.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# -*- coding: utf-8 -*-
"""
Query builder for constructing MongoDB queries in a fluent, readable way
"""
import logging
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
Expand All @@ -16,6 +13,7 @@ class ExecutionPlan:
collection: Optional[str] = None
filter_stage: Dict[str, Any] = field(default_factory=dict)
projection_stage: Dict[str, Any] = field(default_factory=dict)
column_aliases: Dict[str, str] = field(default_factory=dict) # Maps field_name -> alias
sort_stage: List[Dict[str, int]] = field(default_factory=list)
limit_stage: Optional[int] = None
skip_stage: Optional[int] = None
Expand Down Expand Up @@ -56,6 +54,7 @@ def copy(self) -> "ExecutionPlan":
collection=self.collection,
filter_stage=self.filter_stage.copy(),
projection_stage=self.projection_stage.copy(),
column_aliases=self.column_aliases.copy(),
sort_stage=self.sort_stage.copy(),
limit_stage=self.limit_stage,
skip_stage=self.skip_stage,
Expand Down Expand Up @@ -156,6 +155,16 @@ def skip(self, count: int) -> "MongoQueryBuilder":
_logger.debug(f"Set skip to: {count}")
return self

def column_aliases(self, aliases: Dict[str, str]) -> "MongoQueryBuilder":
"""Set column aliases mapping (field_name -> alias)"""
if not isinstance(aliases, dict):
self._add_error("Column aliases must be a dictionary")
return self

self._execution_plan.column_aliases = aliases
_logger.debug(f"Set column aliases to: {aliases}")
return self

def where(self, field: str, operator: str, value: Any) -> "MongoQueryBuilder":
"""Add a where condition in a readable format"""
condition = self._build_condition(field, operator, value)
Expand Down
9 changes: 6 additions & 3 deletions pymongosql/sql/handler.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
# -*- coding: utf-8 -*-
"""
Expression handlers for converting SQL expressions to MongoDB query format
"""
import logging
import re
from abc import ABC, abstractmethod
Expand Down Expand Up @@ -42,6 +39,7 @@ class ParseResult:
# Visitor parsing state fields
collection: Optional[str] = None
projection: Dict[str, Any] = field(default_factory=dict)
column_aliases: Dict[str, str] = field(default_factory=dict) # Maps field_name -> alias
sort_fields: List[Dict[str, int]] = field(default_factory=list)
limit_value: Optional[int] = None
offset_value: Optional[int] = None
Expand Down Expand Up @@ -894,14 +892,19 @@ def can_handle(self, ctx: Any) -> bool:

def handle_visitor(self, ctx: PartiQLParser.SelectItemsContext, parse_result: "ParseResult") -> Any:
projection = {}
column_aliases = {}

if hasattr(ctx, "projectionItems") and ctx.projectionItems():
for item in ctx.projectionItems().projectionItem():
field_name, alias = self._extract_field_and_alias(item)
# Use MongoDB standard projection format: {field: 1} to include field
projection[field_name] = 1
# Store alias if present
if alias:
column_aliases[field_name] = alias

parse_result.projection = projection
parse_result.column_aliases = column_aliases
return projection

def _extract_field_and_alias(self, item) -> Tuple[str, Optional[str]]:
Expand Down
Loading