Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pymongosql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
if TYPE_CHECKING:
from .connection import Connection

__version__: str = "0.2.2"
__version__: str = "0.2.3"

# Globals https://www.python.org/dev/peps/pep-0249/#globals
apilevel: str = "2.0"
Expand Down
8 changes: 6 additions & 2 deletions pymongosql/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,18 @@ def __init__(
"""
# Check if connection string specifies mode
connection_string = host if isinstance(host, str) else None
self._mode, host = ConnectionHelper.parse_connection_string(connection_string)
mode, host = ConnectionHelper.parse_connection_string(connection_string)

self._mode = kwargs.pop("mode", None)
if not self._mode and mode:
self._mode = mode

# Extract commonly used parameters for backward compatibility
self._host = host or "localhost"
self._port = port or 27017

# Handle database parameter separately (not a MongoClient parameter)
self._database_name = kwargs.pop("database", None) # Remove from kwargs
self._database_name = kwargs.pop("database", None)

# Store all PyMongo parameters to pass through directly
self._pymongo_params = kwargs.copy()
Expand Down
21 changes: 15 additions & 6 deletions pymongosql/result_set.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,10 +65,16 @@ def _process_and_cache_batch(self, batch: List[Dict[str, Any]]) -> None:
self._total_fetched += len(batch)

def _build_description(self) -> None:
"""Build column description from execution plan projection"""
"""Build column description from execution plan projection or established column names"""
if not self._execution_plan.projection_stage:
# No projection specified, description will be built dynamically
self._description = None
# No projection specified, build description from column names if available
if self._column_names:
self._description = [
(col_name, "VARCHAR", None, None, None, None, None) for col_name in self._column_names
]
else:
# Will be built dynamically when columns are established
self._description = None
return

# Build description from projection (now in MongoDB format {field: 1})
Expand Down Expand Up @@ -198,10 +204,13 @@ def description(
self,
) -> Optional[List[Tuple[str, str, None, None, None, None, None]]]:
"""Return column description"""
if self._description is None and not self._cache_exhausted:
# Try to fetch one result to build description dynamically
if self._description is None:
# Try to build description from established column names
try:
self._ensure_results_available(1)
if not self._cache_exhausted:
# Fetch one result to establish column names if needed
self._ensure_results_available(1)

if self._column_names:
# Build description from established column names
self._description = [
Expand Down
61 changes: 50 additions & 11 deletions pymongosql/sqlalchemy_mongodb/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,31 @@
__supports_sqlalchemy_2x__ = False


def create_engine_url(host: str = "localhost", port: int = 27017, database: str = "test", **kwargs) -> str:
def create_engine_url(
host: str = "localhost", port: int = 27017, database: str = "test", mode: str = "standard", **kwargs
) -> str:
"""Create a SQLAlchemy engine URL for PyMongoSQL.

Args:
host: MongoDB host
port: MongoDB port
database: Database name
mode: Connection mode - "standard" (default) or "superset" (with subquery support)
**kwargs: Additional connection parameters

Returns:
SQLAlchemy URL string (uses mongodb:// format)
SQLAlchemy URL string

Example:
>>> # Standard mode
>>> url = create_engine_url("localhost", 27017, "mydb")
>>> engine = sqlalchemy.create_engine(url)
>>> # Superset mode with subquery support
>>> url = create_engine_url("localhost", 27017, "mydb", mode="superset")
>>> engine = sqlalchemy.create_engine(url)
"""
scheme = "mongodb+superset" if mode == "superset" else "mongodb"

params = []
for key, value in kwargs.items():
params.append(f"{key}={value}")
Expand All @@ -53,7 +62,7 @@ def create_engine_url(host: str = "localhost", port: int = 27017, database: str
if param_str:
param_str = "?" + param_str

return f"mongodb://{host}:{port}/{database}{param_str}"
return f"{scheme}://{host}:{port}/{database}{param_str}"


def create_mongodb_url(mongodb_uri: str) -> str:
Expand All @@ -77,11 +86,11 @@ def create_mongodb_url(mongodb_uri: str) -> str:
def create_engine_from_mongodb_uri(mongodb_uri: str, **engine_kwargs):
"""Create a SQLAlchemy engine from any MongoDB connection string.

This function handles both mongodb:// and mongodb+srv:// URIs properly.
Use this instead of create_engine() directly for mongodb+srv URIs.
This function handles mongodb://, mongodb+srv://, and mongodb+superset:// URIs properly.
Use this instead of create_engine() directly for special URI schemes.

Args:
mongodb_uri: Standard MongoDB connection string
mongodb_uri: MongoDB connection string (supports standard, SRV, and superset modes)
**engine_kwargs: Additional arguments passed to create_engine

Returns:
Expand All @@ -92,6 +101,8 @@ def create_engine_from_mongodb_uri(mongodb_uri: str, **engine_kwargs):
>>> engine = create_engine_from_mongodb_uri("mongodb+srv://user:pass@cluster.net/db")
>>> # For standard MongoDB
>>> engine = create_engine_from_mongodb_uri("mongodb://localhost:27017/mydb")
>>> # For superset mode (with subquery support)
>>> engine = create_engine_from_mongodb_uri("mongodb+superset://localhost:27017/mydb")
"""
try:
from sqlalchemy import create_engine
Expand All @@ -109,6 +120,22 @@ def custom_create_connect_args(url):
opts = {"host": mongodb_uri}
return [], opts

engine.dialect.create_connect_args = custom_create_connect_args
return engine
elif mongodb_uri.startswith("mongodb+superset://"):
# For MongoDB+Superset, convert to standard mongodb:// for SQLAlchemy compatibility
# but preserve the superset mode by passing it through connection options
converted_uri = mongodb_uri.replace("mongodb+superset://", "mongodb://")

# Create engine with converted URI
engine = create_engine(converted_uri, **engine_kwargs)

def custom_create_connect_args(url):
# Use original superset URI for actual MongoDB connection
# This preserves the superset mode for subquery support
opts = {"host": mongodb_uri}
return [], opts

engine.dialect.create_connect_args = custom_create_connect_args
return engine
else:
Expand All @@ -123,18 +150,18 @@ def register_dialect():
"""Register the PyMongoSQL dialect with SQLAlchemy.

This function handles registration for both SQLAlchemy 1.x and 2.x.
Registers support for standard MongoDB connection strings only.
Registers support for standard, SRV, and superset MongoDB connection strings.
"""
try:
from sqlalchemy.dialects import registry

# Register for standard MongoDB URLs
registry.register("mongodb", "pymongosql.sqlalchemy_mongodb.sqlalchemy_dialect", "PyMongoSQLDialect")

# Try to register both SRV forms so SQLAlchemy can resolve SRV-style URLs
# (either 'mongodb+srv' or the dotted 'mongodb.srv' plugin name).
# Some SQLAlchemy versions accept '+' in scheme names; others import
# the dotted plugin name. Attempt both registrations in one block.
# Try to register SRV and Superset forms so SQLAlchemy can resolve these URL patterns
# (either with '+' or dotted notation for compatibility with different SQLAlchemy versions).
# Some SQLAlchemy versions accept '+' in scheme names; others import the dotted plugin name.
# Attempt all registrations but don't fail if some are not supported.
try:
registry.register("mongodb+srv", "pymongosql.sqlalchemy_mongodb.sqlalchemy_dialect", "PyMongoSQLDialect")
registry.register("mongodb.srv", "pymongosql.sqlalchemy_mongodb.sqlalchemy_dialect", "PyMongoSQLDialect")
Expand All @@ -143,6 +170,18 @@ def register_dialect():
# create_engine_from_mongodb_uri by converting 'mongodb+srv' to 'mongodb'.
pass

try:
registry.register(
"mongodb+superset", "pymongosql.sqlalchemy_mongodb.sqlalchemy_dialect", "PyMongoSQLDialect"
)
registry.register(
"mongodb.superset", "pymongosql.sqlalchemy_mongodb.sqlalchemy_dialect", "PyMongoSQLDialect"
)
except Exception:
# If registration fails we fall back to handling Superset URIs in
# create_engine_from_mongodb_uri by converting 'mongodb+superset' to 'mongodb'.
pass

return True
except ImportError:
# Fallback for versions without registry
Expand Down
50 changes: 44 additions & 6 deletions pymongosql/superset_mongodb/detector.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,20 +89,58 @@ def extract_outer_query(cls, query: str) -> Optional[Tuple[str, str]]:
"""
Extract outer query with subquery placeholder.

Preserves the complete outer query structure while replacing the subquery
with a reference to the temporary table.

Returns:
Tuple of (outer_query, subquery_alias) or None
Tuple of (outer_query, subquery_alias) or None if not a wrapped subquery
"""
info = cls.detect(query)
if not info.is_wrapped:
return None

# Replace subquery with temporary table reference
outer = cls.WRAPPED_SUBQUERY_PATTERN.sub(
f"SELECT * FROM {info.subquery_alias}",
query,
# Pattern to capture: SELECT <columns> FROM ( <subquery> ) AS <alias> <rest>
# Matches both SELECT col1, col2 and SELECT col1 AS alias1, col2 AS alias2 formats
pattern = re.compile(
r"(SELECT\s+.+?)\s+FROM\s*\(\s*(?:select|SELECT)\s+.+?\s*\)\s+(?:AS\s+)?(\w+)(.*)",
re.IGNORECASE | re.DOTALL,
)

return outer, info.subquery_alias
match = pattern.search(query)
if match:
select_clause = match.group(1).strip()
table_alias = match.group(2)
rest_of_query = match.group(3).strip()

if rest_of_query:
outer = f"{select_clause} FROM {table_alias} {rest_of_query}"
else:
outer = f"{select_clause} FROM {table_alias}"

return outer, table_alias

# If pattern doesn't match exactly, fall back to preserving SELECT clause
# Extract from SELECT to FROM keyword
select_match = re.search(r"(SELECT\s+.+?)\s+FROM", query, re.IGNORECASE | re.DOTALL)
if not select_match:
return None

select_clause = select_match.group(1).strip()

# Extract table alias and rest of query after the closing paren
rest_match = re.search(r"\)\s+(?:AS\s+)?(\w+)(.*)", query, re.IGNORECASE | re.DOTALL)
if rest_match:
table_alias = rest_match.group(1)
rest_of_query = rest_match.group(2).strip()

if rest_of_query:
outer = f"{select_clause} FROM {table_alias} {rest_of_query}"
else:
outer = f"{select_clause} FROM {table_alias}"

return outer, table_alias

return None

@classmethod
def is_simple_select(cls, query: str) -> bool:
Expand Down
33 changes: 28 additions & 5 deletions pymongosql/superset_mongodb/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,34 +105,57 @@ def execute(
try:
# Create temporary table with MongoDB results
querydb_query, table_name = SubqueryDetector.extract_outer_query(context.query)
if querydb_query is None or table_name is None:
# Fallback to original query if extraction fails
querydb_query = context.query
table_name = "virtual_table"

query_db.insert_records(table_name, mongo_dicts)

# Execute outer query against intermediate DB
_logger.debug(f"Stage 2: Executing {db_name} query: {querydb_query}")
_logger.debug(f"Stage 2: Executing QueryDBSQLite query: {querydb_query}")

querydb_rows = query_db.execute_query(querydb_query)
_logger.debug(f"Stage 2 complete: Got {len(querydb_rows)} rows from {db_name}")

# Create a ResultSet-like object from intermediate DB results
result_set = self._create_result_set_from_db(querydb_rows, querydb_query)

self._execution_plan = ExecutionPlan(collection="query_db_result", projection_stage={})
# Build projection_stage from query database result columns
projection_stage = {}
if querydb_rows and isinstance(querydb_rows[0], dict):
# Extract column names from first result row
for col_name in querydb_rows[0].keys():
projection_stage[col_name] = 1 # 1 means included in projection
else:
# If no rows, get column names from the SQLite query directly
try:
cursor = query_db.execute_query_cursor(querydb_query)
if cursor.description:
# Extract column names from cursor description
for col_desc in cursor.description:
col_name = col_desc[0]
projection_stage[col_name] = 1
except Exception as e:
_logger.warning(f"Could not extract column names from empty result: {e}")

self._execution_plan = ExecutionPlan(collection="query_db_result", projection_stage=projection_stage)

return result_set

finally:
query_db.close()

def _create_result_set_from_db(self, rows: List[Dict[str, Any]], query: str) -> ResultSet:
def _create_result_set_from_db(self, rows: List[Dict[str, Any]], query: str) -> Dict[str, Any]:
"""
Create a ResultSet from query database results.
Create a command result from query database results.

Args:
rows: List of dictionaries from query database
query: Original SQL query

Returns:
ResultSet with query database results
Dictionary with command result format
"""
# Create a mock command result structure compatible with ResultSet
command_result = {
Expand Down
Loading