diff --git a/.github/workflows/label_prs.yaml b/.github/workflows/label_prs.yaml index 9797a956f..8f3fcec95 100644 --- a/.github/workflows/label_prs.yaml +++ b/.github/workflows/label_prs.yaml @@ -14,5 +14,5 @@ jobs: with: repo-token: ${{ secrets.GITHUB_TOKEN }} configuration-path: .github/pr_labeler.yaml - sync-labels: true + sync-labels: false # Don't remove manually added labels dot: true \ No newline at end of file diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index f2bd59004..218134d62 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,8 +1,7 @@ -# pip install datajoint[test] # pre-commit install # pre-commit run --all-files # pre-commit autoupdate -# SKIP=flake8 git commit -m "foo" +# SKIP=ruff git commit -m "foo" # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks diff --git a/pyproject.toml b/pyproject.toml index ac3af18df..ef9e622a2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,20 +41,23 @@ description = "DataJoint for Python is a framework for scientific workflow manag readme = "README.md" license = {file = "LICENSE"} keywords = [ - "database", - "automated", - "automation", - "compute", - "data", - "pipeline", - "workflow", - "scientific", - "science", - "research", - "neuroscience", - "bioinformatics", - "bio-informatics", "datajoint", + "data-pipelines", + "workflow-management", + "data-engineering", + "scientific-computing", + "neuroscience", + "research-software", + "data-integrity", + "reproducibility", + "declarative", + "etl", + "object-storage", + "schema-management", + "data-lineage", + "relational-model", + "mysql", + "postgresql", ] # https://pypi.org/classifiers/ classifiers = [ @@ -166,6 +169,8 @@ check_untyped_defs = true [[tool.mypy.overrides]] module = [ "datajoint.content_registry", + "datajoint.errors", + "datajoint.hash", ] disallow_untyped_defs = true disallow_incomplete_defs = true @@ -185,10 +190,8 @@ module = [ "datajoint.declare", "datajoint.dependencies", "datajoint.diagram", - "datajoint.errors", "datajoint.expression", "datajoint.gc", - "datajoint.hash", "datajoint.heading", "datajoint.jobs", "datajoint.lineage", diff --git a/specs/autopopulate-1.0.md b/specs/autopopulate-1.0.md deleted file mode 100644 index 44d5a0b1e..000000000 --- a/specs/autopopulate-1.0.md +++ /dev/null @@ -1,622 +0,0 @@ -# AutoPopulate 1.0 Specification - -This document describes the legacy AutoPopulate system in DataJoint Python, documenting how automated computation pipelines work. This specification serves as a reference for the system being replaced by AutoPopulate 2.0. - -## Overview - -AutoPopulate is a mixin class that adds the `populate()` method to a Table class. Auto-populated tables inherit from both `Table` and `AutoPopulate`, define the `key_source` property, and implement the `make` callback method. - -**Source Files:** -- `src/datajoint/autopopulate.py` - Main AutoPopulate mixin -- `src/datajoint/jobs.py` - Job reservation table -- `src/datajoint/schemas.py` - Schema class with jobs property - -## Key Characteristics (1.0 vs 2.0) - -| Aspect | AutoPopulate 1.0 | AutoPopulate 2.0 | -|--------|------------------|------------------| -| **Jobs table scope** | Schema-level (`~jobs`) | Per-table (`~table__jobs`) | -| **Primary key** | `(table_name, key_hash)` | FK-derived attributes only | -| **Key storage** | MD5 hash + pickled blob | Native column values | -| **Status values** | `reserved`, `error`, `ignore` | `pending`, `reserved`, `success`, `error`, `ignore` | -| **Pending tracking** | None (computed on-the-fly) | Explicit `pending` status | -| **Priority** | None | Integer priority (lower = more urgent) | -| **Scheduling** | None | `scheduled_time` for delayed execution | -| **Duration tracking** | None | `duration` in seconds | -| **Code version** | None | `version` field | -| **`schema.jobs`** | Single `JobTable` | List of per-table `JobsTable` objects | -| **Job refresh** | None | `refresh()` syncs with `key_source` | - -## 1. Key Source Generation - -### Default Behavior - -The `key_source` property returns a `QueryExpression` yielding primary key values to be passed to `make()`. - -**Default implementation** (`autopopulate.py:59-83`): -1. Fetch all primary parent tables via `self.target.parents(primary=True, as_objects=True, foreign_key_info=True)` -2. Handle aliased attributes by projecting with renamed columns -3. Join all parent tables using the `*` operator (natural join) - -```python -@property -def key_source(self): - def _rename_attributes(table, props): - return ( - table.proj(**{attr: ref for attr, ref in props["attr_map"].items() if attr != ref}) - if props["aliased"] - else table.proj() - ) - - if self._key_source is None: - parents = self.target.parents(primary=True, as_objects=True, foreign_key_info=True) - if not parents: - raise DataJointError( - "A table must have dependencies from its primary key for auto-populate to work" - ) - self._key_source = _rename_attributes(*parents[0]) - for q in parents[1:]: - self._key_source *= _rename_attributes(*q) - return self._key_source -``` - -### Custom Key Source - -Subclasses may override `key_source` to change the scope or granularity of `make()` calls. - -### Jobs To Do Computation - -The `_jobs_to_do()` method (`autopopulate.py:171-197`): -1. Validates `key_source` is a `QueryExpression` -2. Verifies target table has all primary key attributes from `key_source` -3. Applies restrictions via `AndList` -4. Projects to primary key attributes only - -```python -def _jobs_to_do(self, restrictions): - todo = self.key_source - # ... validation ... - return (todo & AndList(restrictions)).proj() -``` - -The actual keys to populate are computed as: -```python -keys = (self._jobs_to_do(restrictions) - self.target).fetch("KEY", limit=limit) -``` - -This subtracts already-populated keys from the todo list. - -## 2. Job Table Creation and Management - -### Schema-Level Job Tables - -Each schema has its own job reservation table named `~jobs`. The job table is created lazily when first accessed. - -**Schema.jobs property** (`schemas.py:367-377`): -```python -@property -def jobs(self): - """ - schema.jobs provides a view of the job reservation table for the schema - """ - self._assert_exists() - if self._jobs is None: - self._jobs = JobTable(self.connection, self.database) - return self._jobs -``` - -### JobTable Initialization - -**JobTable.__init__** (`jobs.py:18-40`): -```python -def __init__(self, conn, database): - self.database = database - self._connection = conn - self._heading = Heading(table_info=dict( - conn=conn, database=database, table_name=self.table_name, context=None - )) - self._support = [self.full_table_name] - - self._definition = """ # job reservation table for `{database}` - table_name :varchar(255) # className of the table - key_hash :char(32) # key hash - --- - status :enum('reserved','error','ignore') - key=null : # structure containing the key - error_message="" :varchar({error_message_length}) - error_stack=null : # error stack if failed - user="" :varchar(255) - host="" :varchar(255) - pid=0 :int unsigned - connection_id = 0 : bigint unsigned - timestamp=CURRENT_TIMESTAMP :timestamp - """.format(database=database, error_message_length=ERROR_MESSAGE_LENGTH) - if not self.is_declared: - self.declare() - self._user = self.connection.get_user() -``` - -The `~jobs` table is automatically declared (created) if it doesn't exist when the `JobTable` is instantiated. - -### Schema Registration - -When a schema is activated, it registers itself with the connection (`schemas.py:136`): -```python -self.connection.register(self) -``` - -**Connection.register** (`connection.py:222-224`): -```python -def register(self, schema): - self.schemas[schema.database] = schema - self.dependencies.clear() -``` - -This allows `populate()` to access the jobs table via: -```python -jobs = self.connection.schemas[self.target.database].jobs -``` - -### Job Table Name - -The job table uses a special name prefixed with `~` (`jobs.py:47-48`): -```python -@property -def table_name(self): - return "~jobs" -``` - -Tables prefixed with `~` are system tables excluded from `schema.list_tables()`. - -## 3. Job Reservation System - -### Job Table Structure - -The `~jobs` table (`jobs.py:24-37`) stores job reservations: - -| Attribute | Type | Description | -|-----------|------|-------------| -| `table_name` | varchar(255) | Full table name (`database.table_name`) | -| `key_hash` | char(32) | MD5 hash of primary key dict | -| `status` | enum | `'reserved'`, `'error'`, or `'ignore'` | -| `key` | blob | Pickled key dict | -| `error_message` | varchar(2047) | Truncated error message | -| `error_stack` | blob | Full stack trace | -| `user` | varchar(255) | Database user | -| `host` | varchar(255) | System hostname | -| `pid` | int unsigned | Process ID | -| `connection_id` | bigint unsigned | MySQL connection ID | -| `timestamp` | timestamp | Automatic timestamp | - -### Reservation Flow - -**Reserve** (`jobs.py:58-81`): -```python -def reserve(self, table_name, key): - job = dict( - table_name=table_name, - key_hash=key_hash(key), - status="reserved", - host=platform.node(), - pid=os.getpid(), - connection_id=self.connection.connection_id, - key=key, - user=self._user, - ) - try: - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False - return True -``` - -Atomicity is guaranteed by MySQL's unique constraint on `(table_name, key_hash)`. - -**Complete** (`jobs.py:113-121`): -```python -def complete(self, table_name, key): - job_key = dict(table_name=table_name, key_hash=key_hash(key)) - (self & job_key).delete_quick() -``` - -**Error** (`jobs.py:123-150`): -```python -def error(self, table_name, key, error_message, error_stack=None): - if len(error_message) > ERROR_MESSAGE_LENGTH: - error_message = error_message[:ERROR_MESSAGE_LENGTH - len(TRUNCATION_APPENDIX)] + TRUNCATION_APPENDIX - self.insert1( - dict( - table_name=table_name, - key_hash=key_hash(key), - status="error", - # ... metadata ... - error_message=error_message, - error_stack=error_stack, - ), - replace=True, - ) -``` - -**Ignore** (`jobs.py:83-111`): -```python -def ignore(self, table_name, key): - job = dict( - table_name=table_name, - key_hash=key_hash(key), - status="ignore", - # ... metadata ... - ) - try: - self.insert1(job, ignore_extra_fields=True) - except DuplicateError: - return False - return True -``` - -### Job Filtering in Populate - -Before populating, keys with existing job entries are excluded (`autopopulate.py:257-261`): -```python -if reserve_jobs: - exclude_key_hashes = ( - jobs & {"table_name": self.target.table_name} & 'status in ("error", "ignore", "reserved")' - ).fetch("key_hash") - keys = [key for key in keys if key_hash(key) not in exclude_key_hashes] -``` - -### Job Table Maintenance - -The `JobTable` class provides simplified `delete()` and `drop()` methods (`jobs.py:50-56`): -```python -def delete(self): - """bypass interactive prompts and dependencies""" - self.delete_quick() - -def drop(self): - """bypass interactive prompts and dependencies""" - self.drop_quick() -``` - -These bypass normal safety prompts since the jobs table is a system table. - -## 4. Make Method Invocation - -### Make Method Contract - -The `make(key)` method must perform three steps: -1. **Fetch**: Retrieve data from parent tables, restricted by key -2. **Compute**: Calculate secondary attributes from fetched data -3. **Insert**: Insert new tuple(s) into the target table - -### Two Implementation Patterns - -#### Pattern A: Regular Method - -All three steps execute within a single database transaction. - -**Execution flow** (`autopopulate.py:340-355`): -```python -if not is_generator: - self.connection.start_transaction() - # ... key existence check ... - make(dict(key), **(make_kwargs or {})) -``` - -#### Pattern B: Generator (Tripartite) Method - -Separates computation from transaction to allow long-running computation outside the transaction window. - -**Required methods**: -- `make_fetch(key)` - All database queries -- `make_compute(key, *fetched_data)` - All computation -- `make_insert(key, *computed_result)` - All inserts - -**Default generator implementation** (`autopopulate.py:140-152`): -```python -def make(self, key): - fetched_data = self.make_fetch(key) - computed_result = yield fetched_data - - if computed_result is None: - computed_result = self.make_compute(key, *fetched_data) - yield computed_result - - self.make_insert(key, *computed_result) - yield -``` - -**Execution flow** (`autopopulate.py:356-370`): -```python -# Phase 1: Fetch and compute OUTSIDE transaction -gen = make(dict(key), **(make_kwargs or {})) -fetched_data = next(gen) -fetch_hash = deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[fetched_data] -computed_result = next(gen) - -# Phase 2: Verify and insert INSIDE transaction -self.connection.start_transaction() -gen = make(dict(key), **(make_kwargs or {})) # restart -fetched_data = next(gen) -if fetch_hash != deepdiff.DeepHash(fetched_data, ignore_iterable_order=False)[fetched_data]: - raise DataJointError("Referential integrity failed! The `make_fetch` data has changed") -gen.send(computed_result) # insert -``` - -The deep hash comparison ensures data integrity by detecting concurrent modifications. - -### Legacy Support - -The legacy `_make_tuples` method name is supported (`autopopulate.py:333`): -```python -make = self._make_tuples if hasattr(self, "_make_tuples") else self.make -``` - -### Insert Protection - -Direct inserts into auto-populated tables are blocked outside `make()` (`autopopulate.py:351, 402`): -```python -self.__class__._allow_insert = True -try: - # ... make() execution ... -finally: - self.__class__._allow_insert = False -``` - -The `Table.insert()` method checks this flag and raises `DataJointError` if insert is attempted outside the populate context (unless `allow_direct_insert=True`). - -## 5. Transaction Management - -### Transaction Lifecycle - -**Start** (`connection.py:322-327`): -```python -def start_transaction(self): - if self.in_transaction: - raise DataJointError("Nested transactions are not supported.") - self.query("START TRANSACTION WITH CONSISTENT SNAPSHOT") - self._in_transaction = True -``` - -Uses MySQL's `WITH CONSISTENT SNAPSHOT` for repeatable read isolation. - -**Commit** (`connection.py:337-343`): -```python -def commit_transaction(self): - self.query("COMMIT") - self._in_transaction = False -``` - -**Cancel/Rollback** (`connection.py:329-335`): -```python -def cancel_transaction(self): - self.query("ROLLBACK") - self._in_transaction = False -``` - -### Transaction Rules - -1. **No nested transactions** - `populate()` cannot be called during an existing transaction (`autopopulate.py:237-238`) -2. **Regular make**: Transaction spans entire `make()` execution -3. **Generator make**: Transaction spans only the final fetch verification and insert phase - -## 6. Error Management - -### Error Handling Flow - -(`autopopulate.py:372-402`): - -```python -try: - # ... make() execution ... -except (KeyboardInterrupt, SystemExit, Exception) as error: - try: - self.connection.cancel_transaction() - except LostConnectionError: - pass # Connection lost during rollback - - error_message = "{exception}{msg}".format( - exception=error.__class__.__name__, - msg=": " + str(error) if str(error) else "", - ) - - if jobs is not None: - jobs.error( - self.target.table_name, - self._job_key(key), - error_message=error_message, - error_stack=traceback.format_exc(), - ) - - if not suppress_errors or isinstance(error, SystemExit): - raise - else: - logger.error(error) - return key, error if return_exception_objects else error_message -else: - self.connection.commit_transaction() - if jobs is not None: - jobs.complete(self.target.table_name, self._job_key(key)) - return True -``` - -### Error Suppression - -When `suppress_errors=True`: -- Errors are logged to the jobs table -- Errors are collected and returned instead of raised -- `SystemExit` is never suppressed (for graceful SIGTERM handling) - -### SIGTERM Handling - -When `reserve_jobs=True`, a SIGTERM handler is installed (`autopopulate.py:245-251`): -```python -def handler(signum, frame): - logger.info("Populate terminated by SIGTERM") - raise SystemExit("SIGTERM received") - -old_handler = signal.signal(signal.SIGTERM, handler) -``` - -This allows graceful termination of long-running populate jobs. - -## 7. Populate Method Interface - -### Full Signature - -```python -def populate( - self, - *restrictions, - keys=None, - suppress_errors=False, - return_exception_objects=False, - reserve_jobs=False, - order="original", - limit=None, - max_calls=None, - display_progress=False, - processes=1, - make_kwargs=None, -): -``` - -### Parameters - -| Parameter | Type | Default | Description | -|-----------|------|---------|-------------| -| `*restrictions` | various | - | Restrictions AND-ed to filter `key_source` | -| `keys` | list[dict] | None | Explicit keys to populate (bypasses `key_source`) | -| `suppress_errors` | bool | False | Collect errors instead of raising | -| `return_exception_objects` | bool | False | Return exception objects vs. strings | -| `reserve_jobs` | bool | False | Enable job reservation for distributed processing | -| `order` | str | "original" | Key order: "original", "reverse", "random" | -| `limit` | int | None | Max keys to fetch from `key_source` | -| `max_calls` | int | None | Max `make()` calls to execute | -| `display_progress` | bool | False | Show progress bar | -| `processes` | int | 1 | Number of worker processes | -| `make_kwargs` | dict | None | Non-computation kwargs passed to `make()` | - -### Return Value - -```python -{ - "success_count": int, # Number of successful make() calls - "error_list": list, # List of (key, error) tuples if suppress_errors=True -} -``` - -## 8. Multiprocessing Support - -### Process Initialization - -(`autopopulate.py:27-36`): -```python -def _initialize_populate(table, jobs, populate_kwargs): - process = mp.current_process() - process.table = table - process.jobs = jobs - process.populate_kwargs = populate_kwargs - table.connection.connect() # reconnect -``` - -### Connection Handling - -Before forking (`autopopulate.py:296-297`): -```python -self.connection.close() # Disconnect parent -del self.connection._conn.ctx # SSLContext not pickleable -``` - -After workers complete (`autopopulate.py:311`): -```python -self.connection.connect() # Reconnect parent -``` - -### Worker Execution - -```python -def _call_populate1(key): - process = mp.current_process() - return process.table._populate1(key, process.jobs, **process.populate_kwargs) -``` - -Uses `Pool.imap()` with `chunksize=1` for ordered execution with progress tracking. - -## 9. Return Values from _populate1 - -| Value | Meaning | -|-------|---------| -| `True` | Successfully completed `make()` and inserted data | -| `False` | Key already exists in target OR job reservation failed | -| `(key, error)` | Error occurred (when `suppress_errors=True`) | - -## 10. Key Observations - -### Strengths - -1. **Atomic job reservation** via MySQL unique constraints -2. **Generator pattern** allows long computation outside transactions -3. **Deep hash verification** ensures data consistency -4. **Graceful shutdown** via SIGTERM handling -5. **Error persistence** in jobs table for debugging -6. **Per-schema job tables** allow independent job management - -### Limitations (Addressed in 2.0) - -The following limitations are documented in GitHub issue [#1258](https://github.com/datajoint/datajoint-python/issues/1258) and related issues. - -#### Job Table Design Issues - -1. **Limited status tracking**: Only `reserved`, `error`, and `ignore` statuses. No explicit tracking of pending jobs or successful completions. - -2. **Functions as error log**: Cannot track pending or completed jobs efficiently. Finding pending jobs requires computing `key_source - target - jobs` each time. - -3. **Poor dashboard visibility**: No way to monitor pipeline progress without querying multiple tables and computing set differences. See [#873](https://github.com/datajoint/datajoint-python/issues/873). - -4. **Key hashing obscures data**: Primary keys stored as 32-character MD5 hashes. Actual keys stored as pickled blobs requiring deserialization to inspect. - -5. **No referential integrity**: Jobs table is independent of computed tables. Orphaned jobs accumulate when upstream data is deleted. - -6. **Schema-level scope**: All computed tables share one jobs table. Filtering by `table_name` required for all operations. - -#### Key Source Issues - -1. **Frequent manual modifications**: Subset operations require modifying `key_source` in Python code. No database-level persistence. - -2. **Local visibility only**: Custom key sources not accessible database-wide. See discussion in [#1258](https://github.com/datajoint/datajoint-python/issues/1258). - -3. **Performance bottleneck**: Multiple workers querying `key_source` simultaneously strains database. See [#749](https://github.com/datajoint/datajoint-python/issues/749). - -4. **Codebase dependency**: Requires full pipeline codebase to determine pending work. Cannot query job status from SQL alone. - -#### Missing Features - -1. **No priority system**: Jobs processed in fetch order only (original, reverse, random). - -2. **No scheduling**: Cannot delay job execution to a future time. - -3. **No duration tracking**: No record of how long jobs take to complete. - -4. **No version tracking**: No record of which code version processed a job. - -5. **Simple retry logic**: Failed jobs stay in `error` status until manually cleared. - -6. **No stale job cleanup**: Jobs referencing deleted upstream data remain indefinitely. - -7. **No orphaned job handling**: Reserved jobs from crashed workers remain forever. See [#665](https://github.com/datajoint/datajoint-python/issues/665). - -#### Populate Parameter Confusion - -The `limit` vs `max_calls` parameters have confusing behavior. See [#1203](https://github.com/datajoint/datajoint-python/issues/1203): -- `limit`: Applied before excluding reserved/error jobs (can result in no work even when jobs available) -- `max_calls`: Applied after excluding reserved/error jobs (usually what users expect) - -## 11. Related GitHub Issues - -| Issue | Title | Status | -|-------|-------|--------| -| [#1258](https://github.com/datajoint/datajoint-python/issues/1258) | FEAT: Autopopulate 2.0 | Open | -| [#1203](https://github.com/datajoint/datajoint-python/issues/1203) | Unexpected behaviour of `limit` in populate() | Open | -| [#749](https://github.com/datajoint/datajoint-python/issues/749) | Strain on MySQL with expensive key-source | Closed | -| [#873](https://github.com/datajoint/datajoint-python/issues/873) | Provide way to list specific jobs | Closed | -| [#665](https://github.com/datajoint/datajoint-python/issues/665) | Cluster support - machine failures | Closed | diff --git a/specs/autopopulate-2.0-implementation.md b/specs/autopopulate-2.0-implementation.md deleted file mode 100644 index 15960a202..000000000 --- a/specs/autopopulate-2.0-implementation.md +++ /dev/null @@ -1,458 +0,0 @@ -# AutoPopulate 2.0 Implementation Plan - -This document outlines the implementation steps for AutoPopulate 2.0 based on the specification in `docs/src/compute/autopopulate2.0-spec.md`. - -## Overview - -The implementation involves changes to these files: -- `src/datajoint/jobs.py` - New `JobsTable` class (per-table jobs) -- `src/datajoint/autopopulate.py` - Updated `AutoPopulate` mixin -- `src/datajoint/user_tables.py` - FK-only PK constraint for Computed/Imported -- `src/datajoint/schemas.py` - Updated `schema.jobs` property -- `src/datajoint/settings.py` - New configuration options - -## Table Naming Convention - -Jobs tables use the `~~` prefix (double tilde): - -| Table Type | Example Class | MySQL Table Name | -|------------|---------------|------------------| -| Manual | `Subject` | `subject` | -| Lookup | `#Method` | `#method` | -| Imported | `_Recording` | `_recording` | -| Computed | `__Analysis` | `__analysis` | -| Hidden | `~jobs` | `~jobs` | -| **Jobs (new)** | N/A | `~~analysis` | - -The `~~` prefix: -- Distinguishes from single-tilde hidden tables (`~jobs`, `~lineage`) -- Shorter than suffix-based naming -- Excluded from `list_tables()` (tables starting with `~`) - -## Execution Modes - -AutoPopulate 2.0 supports two execution modes, both equally valid: - -### Direct Mode (`reserve_jobs=False`, default) - -Best for: -- Early development and debugging -- Single-worker execution -- Simple pipelines without distributed computing -- Interactive exploration - -Behavior: -- Computes `(key_source & restrictions) - self` directly -- No jobs table involvement -- No coordination overhead - -### Distributed Mode (`reserve_jobs=True`) - -Best for: -- Multi-worker parallel processing -- Production pipelines with monitoring -- Job prioritization and scheduling -- Error tracking and retry workflows - -Behavior: -- Uses per-table jobs table for coordination -- Supports priority, scheduling, status tracking -- Enables dashboard monitoring - -## Phase 1: JobsTable Class - -### 1.1 Create JobsTable Class - -**File**: `src/datajoint/jobs.py` - -```python -class JobsTable(Table): - """Hidden table managing job queue for an auto-populated table.""" - - _prefix = "~~" - - def __init__(self, target_table): - """ - Initialize jobs table for an auto-populated table. - - Args: - target_table: The Computed/Imported table instance - """ - self._target_class = target_table.__class__ - self._connection = target_table.connection - self.database = target_table.database - self._definition = self._generate_definition(target_table) - - @property - def table_name(self): - """Jobs table name: ~~base_name""" - target_name = self._target_class.table_name - base_name = target_name.lstrip('_') - return f"~~{base_name}" -``` - -### 1.2 Core Methods - -```python -def refresh( - self, - *restrictions, - delay: float = 0, - priority: int = None, - stale_timeout: float = None, - orphan_timeout: float = None -) -> dict: - """ - Refresh jobs queue: add new, remove stale, handle orphans. - - Args: - restrictions: Filter key_source when adding new jobs - delay: Seconds until new jobs become available - priority: Priority for new jobs (lower = more urgent) - stale_timeout: Remove jobs older than this if key not in key_source - orphan_timeout: Reset reserved jobs older than this to pending - - Returns: - {'added': int, 'removed': int, 'orphaned': int, 're_pended': int} - """ - -def reserve(self, key: dict) -> bool: - """ - Reserve a pending job for processing. - - Returns True if reservation successful, False if job not available. - """ - -def complete(self, key: dict, duration: float = None) -> None: - """Mark job as completed (success or delete based on config).""" - -def error(self, key: dict, error_message: str, error_stack: str = None) -> None: - """Mark job as failed with error details.""" - -def ignore(self, key: dict) -> None: - """Mark job to be skipped during populate.""" - -def progress(self) -> dict: - """Return job status breakdown.""" -``` - -### 1.3 Status Properties - -```python -@property -def pending(self) -> QueryExpression: - return self & 'status="pending"' - -@property -def reserved(self) -> QueryExpression: - return self & 'status="reserved"' - -@property -def errors(self) -> QueryExpression: - return self & 'status="error"' - -@property -def ignored(self) -> QueryExpression: - return self & 'status="ignore"' - -@property -def completed(self) -> QueryExpression: - return self & 'status="success"' -``` - -### 1.4 Definition Generation - -```python -def _generate_definition(self, target_table): - """Build jobs table definition from target's FK-derived primary key.""" - fk_attrs = self._get_fk_derived_pk_attrs(target_table) - pk_lines = "\n ".join(f"{name} : {dtype}" for name, dtype in fk_attrs) - - return f""" - # Job queue for {target_table.full_table_name} - {pk_lines} - --- - status : enum('pending', 'reserved', 'success', 'error', 'ignore') - priority : uint8 # Set by refresh(), default from config - created_time=CURRENT_TIMESTAMP : timestamp - scheduled_time=CURRENT_TIMESTAMP : timestamp - reserved_time=null : timestamp - completed_time=null : timestamp - duration=null : float64 - error_message="" : varchar(2047) - error_stack=null : - user="" : varchar(255) - host="" : varchar(255) - pid=0 : uint32 - connection_id=0 : uint64 - version="" : varchar(255) - """ -``` - -## Phase 2: FK-Only Primary Key Constraint - -### 2.1 Validation for New Tables - -**File**: `src/datajoint/user_tables.py` - -New auto-populated tables must have FK-only primary keys: - -```python -@classmethod -def _validate_pk_constraint(cls): - """Enforce FK-only PK for new auto-populated tables.""" - if cls.is_declared: - return # Skip validation for existing tables - - heading = cls.heading - non_fk_pk = [ - name for name in heading.primary_key - if not heading[name].is_foreign_key - ] - if non_fk_pk: - raise DataJointError( - f"Auto-populated table {cls.__name__} has non-FK primary key " - f"attributes: {non_fk_pk}. Move these to secondary attributes " - f"or reference a lookup table." - ) -``` - -### 2.2 Legacy Table Support - -Existing tables with non-FK PK attributes continue to work: -- Jobs table uses only FK-derived attributes -- Warning logged about degraded granularity -- One job may cover multiple target rows - -## Phase 3: AutoPopulate Mixin Updates - -### 3.1 Add `jobs` Property - -**File**: `src/datajoint/autopopulate.py` - -```python -class AutoPopulate: - _jobs_table = None - - @property - def jobs(self): - """Access the jobs table for this auto-populated table.""" - if self._jobs_table is None: - self._jobs_table = JobsTable(self) - if not self._jobs_table.is_declared: - self._jobs_table.declare() - return self._jobs_table -``` - -### 3.2 Update `populate()` Signature - -```python -def populate( - self, - *restrictions, - suppress_errors: bool = False, - return_exception_objects: bool = False, - reserve_jobs: bool = False, - max_calls: int = None, - display_progress: bool = False, - processes: int = 1, - make_kwargs: dict = None, - priority: int = None, - refresh: bool = None, -) -> dict: -``` - -### 3.3 Execution Path Selection - -```python -def populate(self, *restrictions, reserve_jobs=False, **kwargs): - if self.connection.in_transaction: - raise DataJointError("Populate cannot be called during a transaction.") - - if reserve_jobs: - return self._populate_distributed(*restrictions, **kwargs) - else: - return self._populate_direct(*restrictions, **kwargs) -``` - -### 3.4 Direct Mode Implementation - -```python -def _populate_direct(self, *restrictions, max_calls=None, suppress_errors=False, ...): - """ - Populate without jobs table coordination. - - Computes keys directly from key_source, suitable for single-worker - execution, development, and debugging. - """ - keys = (self.key_source & AndList(restrictions)) - self - keys = keys.fetch('KEY', limit=max_calls) - - success_count = 0 - error_list = [] - - for key in tqdm(keys, disable=not display_progress): - result = self._populate1(key, jobs=None, suppress_errors=suppress_errors, ...) - # ... handle result -``` - -### 3.5 Distributed Mode Implementation - -```python -def _populate_distributed(self, *restrictions, refresh=None, priority=None, max_calls=None, ...): - """ - Populate with jobs table coordination. - - Uses jobs table for multi-worker coordination, priority scheduling, - and status tracking. - """ - # Refresh if configured - if refresh is None: - refresh = config['jobs.auto_refresh'] - if refresh: - self.jobs.refresh(*restrictions, priority=priority) - - # Fetch pending jobs - pending = ( - self.jobs.pending & 'scheduled_time <= NOW()' - ).fetch('KEY', order_by='priority ASC, scheduled_time ASC', limit=max_calls) - - success_count = 0 - error_list = [] - - for key in tqdm(pending, disable=not display_progress): - if not self.jobs.reserve(key): - continue # Already reserved by another worker - - start_time = time.time() - try: - self._call_make(key, ...) - duration = time.time() - start_time - self.jobs.complete(key, duration=duration) - success_count += 1 - except Exception as e: - self.connection.cancel_transaction() - self.jobs.error(key, str(e), traceback.format_exc()) - if not suppress_errors: - raise - error_list.append((key, e)) - - return {'success_count': success_count, 'error_list': error_list} -``` - -## Phase 4: Schema Updates - -### 4.1 Update `schema.jobs` Property - -**File**: `src/datajoint/schemas.py` - -```python -@property -def jobs(self): - """ - Return list of JobsTable objects for all auto-populated tables. - - Returns: - List[JobsTable]: Jobs tables for Computed/Imported tables in schema - """ - from .jobs import JobsTable - - jobs_tables = [] - for table_name in self.list_tables(): - table_class = self(table_name) - if hasattr(table_class, 'jobs'): - jobs_tables.append(table_class.jobs) - return jobs_tables -``` - -### 4.2 Exclude `~~` from `list_tables()` - -Already handled - tables starting with `~` are excluded. - -## Phase 5: Configuration - -### 5.1 Add Config Options - -**File**: `src/datajoint/settings.py` - -```python -DEFAULTS = { - 'jobs.auto_refresh': True, - 'jobs.keep_completed': False, - 'jobs.stale_timeout': 3600, - 'jobs.default_priority': 5, - 'jobs.version': None, -} -``` - -### 5.2 Version Helper - -```python -def get_job_version() -> str: - """Get version string based on config.""" - version = config['jobs.version'] - if version == 'git': - try: - result = subprocess.run( - ['git', 'rev-parse', '--short', 'HEAD'], - capture_output=True, text=True, timeout=5 - ) - return result.stdout.strip() if result.returncode == 0 else '' - except Exception: - return '' - return version or '' -``` - -## Phase 6: Table Lifecycle - -### 6.1 Drop Jobs Table with Target - -When an auto-populated table is dropped, its jobs table is also dropped: - -```python -def drop(self): - if hasattr(self, '_jobs_table') and self._jobs_table is not None: - if self._jobs_table.is_declared: - self._jobs_table.drop_quick() - # ... existing drop logic -``` - -## Phase 7: Update Spec - -Update `docs/src/compute/autopopulate2.0-spec.md`: -- Change `~table__jobs` references to `~~table` -- Update table naming section - -## Implementation Order - -1. **Phase 5**: Configuration (foundation) -2. **Phase 1**: JobsTable class -3. **Phase 2**: FK-only PK constraint -4. **Phase 3**: AutoPopulate updates -5. **Phase 4**: Schema.jobs property -6. **Phase 6**: Table lifecycle -7. **Phase 7**: Spec update -8. **Testing**: Throughout - -## Testing Strategy - -### Unit Tests -- `test_jobs_table_naming` - `~~` prefix -- `test_jobs_definition_generation` - FK-derived PK -- `test_refresh_operations` - add/remove/orphan/repend -- `test_reserve_complete_error_flow` - job lifecycle -- `test_progress_counts` - status aggregation - -### Integration Tests -- `test_populate_direct_mode` - without jobs table -- `test_populate_distributed_mode` - with jobs table -- `test_multiprocess_populate` - concurrent workers -- `test_legacy_table_support` - non-FK PK tables -- `test_schema_jobs_property` - list of jobs tables - -## Migration Notes - -- Legacy `~jobs` table is NOT auto-deleted -- New `~~` tables created on first access to `.jobs` -- Both can coexist during transition -- Manual cleanup of legacy `~jobs` when ready diff --git a/specs/table-declaration.md b/specs/table-declaration.md deleted file mode 100644 index f5f878b78..000000000 --- a/specs/table-declaration.md +++ /dev/null @@ -1,587 +0,0 @@ -# DataJoint Table Declaration Specification - -Version: 1.0 -Status: Draft -Last Updated: 2025-01-04 - -## Overview - -This document specifies the table declaration mechanism in DataJoint Python. Table declarations define the schema structure using a domain-specific language (DSL) embedded in Python class definitions. - -## 1. Table Class Structure - -### 1.1 Basic Declaration Pattern - -```python -@schema -class TableName(dj.Manual): - definition = """ - # table comment - primary_attr : int32 - --- - secondary_attr : float64 - """ -``` - -### 1.2 Table Tiers - -| Tier | Base Class | Table Prefix | Purpose | -|------|------------|--------------|---------| -| Manual | `dj.Manual` | (none) | User-entered data | -| Lookup | `dj.Lookup` | `#` | Reference/enumeration data | -| Imported | `dj.Imported` | `_` | Data from external sources | -| Computed | `dj.Computed` | `__` | Derived from other tables | -| Part | `dj.Part` | `master__` | Detail records of master table | - -### 1.3 Class Naming Rules - -- **Format**: Strict CamelCase (e.g., `MyTable`, `ProcessedData`) -- **Pattern**: `^[A-Z][A-Za-z0-9]*$` -- **Conversion**: CamelCase to snake_case for SQL table name -- **Examples**: - - `SessionTrial` -> `session_trial` - - `ProcessedEMG` -> `processed_emg` - -### 1.4 Table Name Constraints - -- **Maximum length**: 64 characters (MySQL limit) -- **Final name**: prefix + snake_case(class_name) -- **Validation**: Checked at declaration time - ---- - -## 2. Definition String Grammar - -### 2.1 Overall Structure - -``` -[table_comment] -primary_key_section ---- -secondary_section -``` - -### 2.2 Table Comment (Optional) - -``` -# Free-form description of the table purpose -``` - -- Must be first non-empty line if present -- Starts with `#` -- Cannot start with `#:` -- Stored in MySQL table COMMENT - -### 2.3 Primary Key Separator - -``` ---- -``` - -or equivalently: - -``` -___ -``` - -- Three dashes or three underscores -- Separates primary key attributes (above) from secondary attributes (below) -- Required if table has secondary attributes - -### 2.4 Line Types - -Each non-empty, non-comment line is one of: - -1. **Attribute definition** -2. **Foreign key reference** -3. **Index declaration** - ---- - -## 3. Attribute Definition - -### 3.1 Syntax - -``` -attribute_name [= default_value] : type [# comment] -``` - -### 3.2 Components - -| Component | Required | Description | -|-----------|----------|-------------| -| `attribute_name` | Yes | Identifier for the column | -| `default_value` | No | Default value (before colon) | -| `type` | Yes | Data type specification | -| `comment` | No | Documentation (after `#`) | - -### 3.3 Attribute Name Rules - -- **Pattern**: `^[a-z][a-z0-9_]*$` -- **Start**: Lowercase letter -- **Contains**: Lowercase letters, digits, underscores -- **Convention**: snake_case - -### 3.4 Examples - -```python -definition = """ -# Experimental session with subject and timing info -session_id : int32 # auto-assigned ---- -subject_name : varchar(100) # subject identifier -trial_number = 1 : int32 # default to 1 -score = null : float32 # nullable -timestamp = CURRENT_TIMESTAMP : datetime # auto-timestamp -notes = '' : varchar(4000) # empty default -""" -``` - ---- - -## 4. Type System - -### 4.1 Core Types - -Scientist-friendly type names with guaranteed semantics: - -| Type | SQL Mapping | Size | Description | -|------|-------------|------|-------------| -| `int8` | `tinyint` | 1 byte | 8-bit signed integer | -| `uint8` | `tinyint unsigned` | 1 byte | 8-bit unsigned integer | -| `int16` | `smallint` | 2 bytes | 16-bit signed integer | -| `uint16` | `smallint unsigned` | 2 bytes | 16-bit unsigned integer | -| `int32` | `int` | 4 bytes | 32-bit signed integer | -| `uint32` | `int unsigned` | 4 bytes | 32-bit unsigned integer | -| `int64` | `bigint` | 8 bytes | 64-bit signed integer | -| `uint64` | `bigint unsigned` | 8 bytes | 64-bit unsigned integer | -| `float32` | `float` | 4 bytes | 32-bit IEEE 754 float | -| `float64` | `double` | 8 bytes | 64-bit IEEE 754 float | -| `bool` | `tinyint` | 1 byte | Boolean (0 or 1) | -| `uuid` | `binary(16)` | 16 bytes | UUID stored as binary | -| `bytes` | `longblob` | Variable | Binary data (up to 4GB) | - -### 4.2 String Types - -| Type | SQL Mapping | Description | -|------|-------------|-------------| -| `char(N)` | `char(N)` | Fixed-length string | -| `varchar(N)` | `varchar(N)` | Variable-length string (max N) | -| `text` | `text` | Unlimited text | -| `enum('a','b',...)` | `enum(...)` | Enumerated values | - -### 4.3 Temporal Types - -| Type | SQL Mapping | Description | -|------|-------------|-------------| -| `date` | `date` | Date (YYYY-MM-DD) | -| `datetime` | `datetime` | Date and time | -| `datetime(N)` | `datetime(N)` | With fractional seconds (0-6) | - -### 4.4 Other Types - -| Type | SQL Mapping | Description | -|------|-------------|-------------| -| `json` | `json` | JSON document | -| `decimal(P,S)` | `decimal(P,S)` | Fixed-point decimal | - -### 4.5 Native SQL Types (Passthrough) - -These SQL types are accepted but generate a warning recommending core types: - -- Integer variants: `tinyint`, `smallint`, `mediumint`, `bigint`, `integer`, `serial` -- Float variants: `float`, `double`, `real` (with size specifiers) -- Text variants: `tinytext`, `mediumtext`, `longtext` -- Blob variants: `tinyblob`, `smallblob`, `mediumblob`, `longblob` -- Temporal: `time`, `timestamp`, `year` -- Numeric: `numeric(P,S)` - -### 4.6 Codec Types - -Format: `` or `` - -| Codec | Internal dtype | External dtype | Purpose | -|-------|---------------|----------------|---------| -| `` | `bytes` | `` | Serialized Python objects | -| `` | N/A (external only) | `json` | Hash-addressed deduped storage | -| `` | `bytes` | `` | File attachments with filename | -| `` | N/A (external only) | `json` | Reference to managed file | -| `` | N/A (external only) | `json` | Object storage (Zarr, HDF5) | - -External storage syntax: -- `` - default store -- `` - named store - -### 4.7 Type Reconstruction - -Core types and codecs are stored in the SQL COMMENT field for reconstruction: - -```sql -COMMENT ':float32:user comment here' -COMMENT '::user comment' -``` - ---- - -## 5. Default Values - -### 5.1 Syntax - -``` -attribute_name = default_value : type -``` - -### 5.2 Literal Types - -| Value | Meaning | SQL | -|-------|---------|-----| -| `null` | Nullable attribute | `DEFAULT NULL` | -| `CURRENT_TIMESTAMP` | Server timestamp | `DEFAULT CURRENT_TIMESTAMP` | -| `"string"` or `'string'` | String literal | `DEFAULT "string"` | -| `123` | Numeric literal | `DEFAULT 123` | -| `true`/`false` | Boolean | `DEFAULT 1`/`DEFAULT 0` | - -### 5.3 Constant Literals - -These values are used without quotes in SQL: -- `NULL` -- `CURRENT_TIMESTAMP` - -### 5.4 Nullable Attributes - -``` -score = null : float32 -``` - -- The special default `null` (case-insensitive) makes the attribute nullable -- Nullable attributes can be omitted from INSERT -- Primary key attributes CANNOT be nullable - -### 5.5 Blob/JSON Default Restrictions - -Blob and JSON attributes can only have `null` as default: - -```python -# Valid -data = null : - -# Invalid - raises DataJointError -data = '' : -``` - ---- - -## 6. Foreign Key References - -### 6.1 Syntax - -``` --> [options] ReferencedTable -``` - -### 6.2 Options - -| Option | Effect | -|--------|--------| -| `nullable` | All inherited attributes become nullable | -| `unique` | Creates UNIQUE INDEX on FK attributes | - -Options are comma-separated in brackets: -``` --> [nullable, unique] ParentTable -``` - -### 6.3 Attribute Inheritance - -Foreign keys automatically inherit all primary key attributes from the referenced table: - -```python -# Parent -class Subject(dj.Manual): - definition = """ - subject_id : int32 - --- - name : varchar(100) - """ - -# Child - inherits subject_id -class Session(dj.Manual): - definition = """ - -> Subject - session_id : int32 - --- - session_date : date - """ -``` - -### 6.4 Position Rules - -| Position | Effect | -|----------|--------| -| Before `---` | FK attributes become part of primary key | -| After `---` | FK attributes are secondary (dependent) | - -### 6.5 Nullable Foreign Keys - -``` --> [nullable] OptionalParent -``` - -- Only allowed after `---` (secondary) -- Primary key FKs cannot be nullable -- Creates optional relationship - -### 6.6 Unique Foreign Keys - -``` --> [unique] ParentTable -``` - -- Creates UNIQUE INDEX on inherited attributes -- Enforces one-to-one relationship from child perspective - -### 6.7 Projections in Foreign Keys - -``` --> Parent.proj(alias='original_name') -``` - -- Reference same table multiple times with different attribute names -- Useful for self-referential or multi-reference patterns - -### 6.8 Referential Actions - -All foreign keys use: -- `ON UPDATE CASCADE` - Parent key changes propagate -- `ON DELETE RESTRICT` - Cannot delete parent with children - -### 6.9 Lineage Tracking - -Foreign key relationships are recorded in the `~lineage` table: - -```python -{ - 'child_attr': ('parent_schema.parent_table', 'parent_attr') -} -``` - -Used for semantic attribute matching in queries. - ---- - -## 7. Index Declarations - -### 7.1 Syntax - -``` -index(attr1, attr2, ...) -unique index(attr1, attr2, ...) -``` - -### 7.2 Examples - -```python -definition = """ -# User contact information -user_id : int32 ---- -first_name : varchar(50) -last_name : varchar(50) -email : varchar(100) -index(last_name, first_name) -unique index(email) -""" -``` - -### 7.3 Computed Expressions - -Indexes can include SQL expressions: - -``` -index(last_name, (YEAR(birth_date))) -``` - -### 7.4 Limitations - -- Cannot be altered after table creation (via `table.alter()`) -- Must reference existing attributes - ---- - -## 8. Part Tables - -### 8.1 Declaration - -```python -@schema -class Master(dj.Manual): - definition = """ - master_id : int32 - """ - - class Detail(dj.Part): - definition = """ - -> master - detail_id : int32 - --- - value : float32 - """ -``` - -### 8.2 Naming - -- SQL name: `master_table__part_name` -- Example: `experiment__trial` - -### 8.3 Master Reference - -Within Part definition, use: -- `-> master` (lowercase keyword) -- `-> MasterClassName` (class name) - -### 8.4 Constraints - -- Parts must reference their master -- Cannot delete Part records directly (use master) -- Cannot drop Part table directly (use master) -- Part inherits master's primary key - ---- - -## 9. Auto-Populated Tables - -### 9.1 Classes - -- `dj.Imported` - Data from external sources -- `dj.Computed` - Derived from other DataJoint tables - -### 9.2 Primary Key Constraint - -All primary key attributes must come from foreign key references. - -**Valid:** -```python -class Analysis(dj.Computed): - definition = """ - -> Session - -> Parameter - --- - result : float64 - """ -``` - -**Invalid** (by default): -```python -class Analysis(dj.Computed): - definition = """ - -> Session - analysis_id : int32 # ERROR: non-FK primary key - --- - result : float64 - """ -``` - -**Override:** -```python -dj.config['jobs.allow_new_pk_fields_in_computed_tables'] = True -``` - -### 9.3 Job Metadata - -When `config['jobs.add_job_metadata'] = True`, auto-populated tables receive: - -| Column | Type | Description | -|--------|------|-------------| -| `_job_start_time` | `datetime(3)` | Job start timestamp | -| `_job_duration` | `float64` | Duration in seconds | -| `_job_version` | `varchar(64)` | Code version | - ---- - -## 10. Validation - -### 10.1 Parse-Time Checks - -| Check | Error | -|-------|-------| -| Unknown type | `DataJointError: Unsupported attribute type` | -| Invalid attribute name | `DataJointError: Declaration error` | -| Comment starts with `:` | `DataJointError: comment must not start with colon` | -| Non-null blob default | `DataJointError: default value for blob can only be NULL` | - -### 10.2 Declaration-Time Checks - -| Check | Error | -|-------|-------| -| Table name > 64 chars | `DataJointError: Table name exceeds max length` | -| No primary key | `DataJointError: Table must have a primary key` | -| Nullable primary key attr | `DataJointError: Primary key attributes cannot be nullable` | -| Invalid CamelCase | `DataJointError: Invalid table name` | -| FK resolution failure | `DataJointError: Foreign key reference could not be resolved` | - -### 10.3 Insert-Time Validation - -The `table.validate()` method checks: -- Required fields present -- NULL constraints satisfied -- Primary key completeness -- Codec validation (if defined) -- UUID format -- JSON serializability - ---- - -## 11. SQL Generation - -### 11.1 CREATE TABLE Template - -```sql -CREATE TABLE `schema`.`table_name` ( - `attr1` TYPE1 NOT NULL COMMENT "...", - `attr2` TYPE2 DEFAULT NULL COMMENT "...", - PRIMARY KEY (`pk1`, `pk2`), - FOREIGN KEY (`fk_attr`) REFERENCES `parent` (`pk`) - ON UPDATE CASCADE ON DELETE RESTRICT, - INDEX (`idx_attr`), - UNIQUE INDEX (`uniq_attr`) -) ENGINE=InnoDB COMMENT="table comment" -``` - -### 11.2 Type Comment Encoding - -Core types and codecs are preserved in comments: - -```sql -`value` float NOT NULL COMMENT ":float32:measurement value" -`data` longblob DEFAULT NULL COMMENT "::serialized data" -`archive` json DEFAULT NULL COMMENT "::external storage" -``` - ---- - -## 12. Implementation Files - -| File | Purpose | -|------|---------| -| `declare.py` | Definition parsing, SQL generation | -| `heading.py` | Attribute metadata, type reconstruction | -| `table.py` | Base Table class, declaration interface | -| `user_tables.py` | Tier classes (Manual, Computed, etc.) | -| `schemas.py` | Schema binding, table decoration | -| `codecs.py` | Codec registry and resolution | -| `lineage.py` | Attribute lineage tracking | - ---- - -## 13. Future Considerations - -Potential improvements identified for the declaration system: - -1. **Better error messages** with suggestions and context -2. **Import-time validation** via `__init_subclass__` -3. **Parser alternatives** (regex-based for simpler grammar) -4. **SQL dialect abstraction** for multi-database support -5. **Extended constraints** (CHECK, custom validation) -6. **Migration support** for schema evolution -7. **Definition caching** for performance -8. **IDE tooling** support via structured intermediate representation diff --git a/src/datajoint/__init__.py b/src/datajoint/__init__.py index 7c72b71db..e4077816c 100644 --- a/src/datajoint/__init__.py +++ b/src/datajoint/__init__.py @@ -26,7 +26,7 @@ "Schema", "schema", "VirtualModule", - "create_virtual_module", + "virtual_schema", "list_schemas", "Table", "FreeTable", @@ -77,7 +77,7 @@ from .hash import key_hash from .logging import logger from .objectref import ObjectRef -from .schemas import Schema, VirtualModule, list_schemas +from .schemas import Schema, VirtualModule, list_schemas, virtual_schema from .settings import config from .table import FreeTable, Table, ValidationResult from .user_tables import Computed, Imported, Lookup, Manual, Part @@ -85,4 +85,3 @@ ERD = Di = Diagram # Aliases for Diagram schema = Schema # Aliases for Schema -create_virtual_module = VirtualModule # Aliases for VirtualModule diff --git a/src/datajoint/cli.py b/src/datajoint/cli.py index 6437ebbc5..c77cca686 100644 --- a/src/datajoint/cli.py +++ b/src/datajoint/cli.py @@ -1,3 +1,23 @@ +""" +DataJoint command-line interface. + +Provides a Python REPL with DataJoint pre-loaded and optional schema access. + +Usage:: + + # Start REPL with database credentials + dj --user root --password secret --host localhost:3306 + + # Load schemas as virtual modules + dj -s my_lab:lab -s my_analysis:analysis + + # In the REPL + >>> lab.Subject.to_dicts() + >>> dj.Diagram(lab.schema) +""" + +from __future__ import annotations + import argparse from code import interact from collections import ChainMap @@ -5,70 +25,99 @@ import datajoint as dj -def cli(args: list = None): +def cli(args: list[str] | None = None) -> None: """ - Console interface for DataJoint Python + DataJoint command-line interface. + + Starts an interactive Python REPL with DataJoint imported and configured. + Optionally loads database schemas as virtual modules for quick exploration. + + Parameters + ---------- + args : list[str], optional + Command-line arguments. If None, reads from sys.argv. - :param args: List of arguments to be passed in, defaults to reading stdin - :type args: list, optional + Examples + -------- + From the command line:: + + $ dj --host localhost:3306 --user root --password secret + $ dj -s my_lab:lab -s my_analysis:analysis + + Programmatically:: + + >>> from datajoint.cli import cli + >>> cli(["--version"]) """ parser = argparse.ArgumentParser( - prog="datajoint", - description="DataJoint console interface.", - conflict_handler="resolve", + prog="dj", + description="DataJoint interactive console. Start a Python REPL with DataJoint pre-loaded.", + epilog="Example: dj -s my_lab:lab --host localhost:3306", + ) + parser.add_argument( + "-V", + "--version", + action="version", + version=f"{dj.__name__} {dj.__version__}", ) - parser.add_argument("-V", "--version", action="version", version=f"{dj.__name__} {dj.__version__}") parser.add_argument( "-u", "--user", type=str, - default=dj.config["database.user"], - required=False, - help="Datajoint username", + default=None, + help="Database username (default: from config)", ) parser.add_argument( "-p", "--password", type=str, - default=dj.config["database.password"], - required=False, - help="Datajoint password", + default=None, + help="Database password (default: from config)", ) parser.add_argument( - "-h", "--host", type=str, - default=dj.config["database.host"], - required=False, - help="Datajoint host", + default=None, + help="Database host as host:port (default: from config)", ) parser.add_argument( "-s", "--schemas", nargs="+", type=str, - required=False, - help="A list of virtual module mappings in `db:schema ...` format", + metavar="DB:ALIAS", + help="Load schemas as virtual modules. Format: schema_name:alias", ) + kwargs = vars(parser.parse_args(args)) - mods = {} + + # Apply credentials to config if kwargs["user"]: dj.config["database.user"] = kwargs["user"] if kwargs["password"]: dj.config["database.password"] = kwargs["password"] if kwargs["host"]: dj.config["database.host"] = kwargs["host"] + + # Load requested schemas + mods: dict[str, dj.VirtualModule] = {} if kwargs["schemas"]: for vm in kwargs["schemas"]: - d, m = vm.split(":") - mods[m] = dj.create_virtual_module(m, d) + if ":" not in vm: + parser.error(f"Invalid schema format '{vm}'. Use schema_name:alias") + schema_name, alias = vm.split(":", 1) + mods[alias] = dj.VirtualModule(alias, schema_name) - banner = "dj repl\n" + # Build banner + banner = f"DataJoint {dj.__version__} REPL\n" + banner += "Type 'dj.' and press Tab for available functions.\n" if mods: - modstr = "\n".join(" - {}".format(m) for m in mods) - banner += "\nschema modules:\n\n" + modstr + "\n" - interact(banner, local=dict(ChainMap(mods, locals(), globals()))) + banner += "\nLoaded schemas:\n" + for alias in mods: + banner += f" {alias} -> {mods[alias].schema.database}\n" + # Start interactive session + interact(banner, local=dict(ChainMap(mods, {"dj": dj}, globals()))) raise SystemExit diff --git a/src/datajoint/errors.py b/src/datajoint/errors.py index d2a789692..7e10f021d 100644 --- a/src/datajoint/errors.py +++ b/src/datajoint/errors.py @@ -11,13 +11,13 @@ class DataJointError(Exception): """Base class for errors specific to DataJoint internal operation.""" - def suggest(self, *args) -> "DataJointError": + def suggest(self, *args: object) -> "DataJointError": """ Regenerate the exception with additional arguments. Parameters ---------- - *args : any + *args : object Additional arguments to append to the exception. Returns diff --git a/src/datajoint/gc.py b/src/datajoint/gc.py index 33ede63d2..7570e6f24 100644 --- a/src/datajoint/gc.py +++ b/src/datajoint/gc.py @@ -196,7 +196,7 @@ def scan_references( for table_name in schema.list_tables(): try: # Get table class - table = schema.spawn_table(table_name) + table = schema.get_table(table_name) # Check each attribute for content storage for attr_name, attr in table.heading.attributes.items(): @@ -259,7 +259,7 @@ def scan_object_references( for table_name in schema.list_tables(): try: # Get table class - table = schema.spawn_table(table_name) + table = schema.get_table(table_name) # Check each attribute for object storage for attr_name, attr in table.heading.attributes.items(): diff --git a/src/datajoint/hash.py b/src/datajoint/hash.py index 3c67af4d1..2a58e9bf4 100644 --- a/src/datajoint/hash.py +++ b/src/datajoint/hash.py @@ -1,8 +1,11 @@ +from __future__ import annotations + import hashlib import uuid +from typing import Any -def key_hash(mapping): +def key_hash(mapping: dict[str, Any]) -> str: """ 32-byte hash of the mapping's key values sorted by the key name. This is often used to convert a long primary key value into a shorter hash. @@ -13,7 +16,7 @@ def key_hash(mapping): return hashed.hexdigest() -def uuid_from_buffer(buffer=b"", *, init_string=""): +def uuid_from_buffer(buffer: bytes = b"", *, init_string: str = "") -> uuid.UUID: """ Compute MD5 hash of buffer data, returned as UUID. diff --git a/src/datajoint/schemas.py b/src/datajoint/schemas.py index 8f7acd19b..399ab1b9f 100644 --- a/src/datajoint/schemas.py +++ b/src/datajoint/schemas.py @@ -628,6 +628,141 @@ def list_tables(self) -> list[str]: if d == self.database ] + def _find_table_name(self, name: str) -> str | None: + """ + Find the actual SQL table name for a given base name. + + Handles tier prefixes: Manual (none), Lookup (#), Imported (_), Computed (__). + + Parameters + ---------- + name : str + Base table name without tier prefix. + + Returns + ------- + str or None + The actual SQL table name, or None if not found. + """ + tables = self.list_tables() + # Check exact match first + if name in tables: + return name + # Check with tier prefixes + for prefix in ("", "#", "_", "__"): + candidate = f"{prefix}{name}" + if candidate in tables: + return candidate + return None + + def get_table(self, name: str) -> FreeTable: + """ + Get a table instance by name. + + Returns a FreeTable instance for the given table name. This is useful + for accessing tables when you don't have the Python class available. + + Parameters + ---------- + name : str + Table name (e.g., 'experiment', 'session__trial' for parts). + Can be snake_case (SQL name) or CamelCase (class name). + Tier prefixes are optional and will be auto-detected. + + Returns + ------- + FreeTable + A FreeTable instance for the table. + + Raises + ------ + DataJointError + If the table does not exist. + + Examples + -------- + >>> schema = dj.Schema('my_schema') + >>> experiment = schema.get_table('experiment') + >>> experiment.fetch() + """ + self._assert_exists() + # Convert CamelCase to snake_case if needed + if name[0].isupper(): + name = re.sub(r"(? FreeTable: + """ + Get a table instance by name using bracket notation. + + Parameters + ---------- + name : str + Table name (snake_case or CamelCase). + + Returns + ------- + FreeTable + A FreeTable instance for the table. + + Examples + -------- + >>> schema = dj.Schema('my_schema') + >>> schema['Experiment'].fetch() + >>> schema['session'].fetch() + """ + return self.get_table(name) + + def __iter__(self): + """ + Iterate over all tables in the schema. + + Yields FreeTable instances for each table in topological order. + + Yields + ------ + FreeTable + Table instances in dependency order. + + Examples + -------- + >>> for table in schema: + ... print(table.full_table_name, len(table)) + """ + self._assert_exists() + for table_name in self.list_tables(): + yield self.get_table(table_name) + + def __contains__(self, name: str) -> bool: + """ + Check if a table exists in the schema. + + Parameters + ---------- + name : str + Table name (snake_case or CamelCase). + Tier prefixes are optional and will be auto-detected. + + Returns + ------- + bool + True if the table exists. + + Examples + -------- + >>> 'Experiment' in schema + True + """ + if name[0].isupper(): + name = re.sub(r"(? list[str]: 'SELECT schema_name FROM information_schema.schemata WHERE schema_name <> "information_schema"' ) ] + + +def virtual_schema( + schema_name: str, + *, + connection: Connection | None = None, + create_schema: bool = False, + create_tables: bool = False, + add_objects: dict[str, Any] | None = None, +) -> VirtualModule: + """ + Create a virtual module for an existing database schema. + + This is the recommended way to access database schemas when you don't have + the Python source code that defined them. Returns a module-like object with + table classes as attributes. + + Parameters + ---------- + schema_name : str + Database schema name. + connection : Connection, optional + Database connection. Defaults to ``dj.conn()``. + create_schema : bool, optional + If True, create the schema if it doesn't exist. Default False. + create_tables : bool, optional + If True, allow declaring new tables. Default False. + add_objects : dict, optional + Additional objects to add to the module namespace. + + Returns + ------- + VirtualModule + A module-like object with table classes as attributes. + + Examples + -------- + >>> lab = dj.virtual_schema('my_lab') + >>> lab.Subject.fetch() + >>> lab.Session & 'subject_id="M001"' + + See Also + -------- + Schema : For defining new schemas with Python classes. + VirtualModule : The underlying class (prefer virtual_schema function). + """ + return VirtualModule( + schema_name, + schema_name, + connection=connection, + create_schema=create_schema, + create_tables=create_tables, + add_objects=add_objects, + ) diff --git a/tests/integration/test_cli.py b/tests/integration/test_cli.py index 0ab83ccce..35230ea4e 100644 --- a/tests/integration/test_cli.py +++ b/tests/integration/test_cli.py @@ -50,7 +50,7 @@ def test_cli_config(): def test_cli_args(): process = subprocess.Popen( - ["dj", "-utest_user", "-ptest_pass", "-htest_host"], + ["dj", "-u", "test_user", "-p", "test_pass", "--host", "test_host"], stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, @@ -63,9 +63,9 @@ def test_cli_args(): process.stdin.flush() stdout, stderr = process.communicate() - assert "test_user" == stdout[5:14] - assert "test_pass" == stdout[21:30] - assert "test_host" == stdout[37:46] + assert "test_user" in stdout + assert "test_pass" in stdout + assert "test_host" in stdout def test_cli_schemas(prefix, connection_root, db_creds_root): @@ -83,11 +83,14 @@ class IJ(dj.Lookup): process = subprocess.Popen( [ "dj", - f"-u{db_creds_root['user']}", - f"-p{db_creds_root['password']}", - f"-h{db_creds_root['host']}", + "-u", + db_creds_root["user"], + "-p", + db_creds_root["password"], + "--host", + db_creds_root["host"], "-s", - "djtest_cli:test_schema", + f"{prefix}_cli:test_schema", ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, @@ -116,6 +119,6 @@ class IJ(dj.Lookup): cleaned = stdout.strip(" >\t\n\r") for key in ( "test_schema", - "Schema `djtest_cli`", + f"Schema `{prefix}_cli`", ): - assert key in cleaned, f"Key {key} not found in config from stdout: {cleaned}" + assert key in cleaned, f"Key {key} not found in stdout: {cleaned}" diff --git a/tests/integration/test_relational_operand.py b/tests/integration/test_relational_operand.py index 32fcc50d2..eea53288e 100644 --- a/tests/integration/test_relational_operand.py +++ b/tests/integration/test_relational_operand.py @@ -570,42 +570,32 @@ def test_restrictions_by_top(self, schema_simp_pop): ] def test_top_restriction_with_keywords(self, schema_simp_pop): - # dj.Top only guarantees which elements are selected, not their order - select = SelectPK() & dj.Top(limit=9, order_by=["select desc"]) - key = KeyPK() & dj.Top(limit=9, order_by="key desc") - # Convert to sets of tuples for order-independent comparison - select_result = {tuple(sorted(d.items())) for d in select.to_dicts()} - select_expected = { - tuple(sorted(d.items())) - for d in [ - {"id": 2, "select": 8}, - {"id": 2, "select": 6}, - {"id": 1, "select": 4}, - {"id": 2, "select": 4}, - {"id": 1, "select": 3}, - {"id": 1, "select": 2}, - {"id": 2, "select": 2}, - {"id": 1, "select": 1}, - {"id": 0, "select": 0}, - ] - } - assert select_result == select_expected - key_result = {tuple(sorted(d.items())) for d in key.to_dicts()} - key_expected = { - tuple(sorted(d.items())) - for d in [ - {"id": 2, "key": 6}, - {"id": 2, "key": 5}, - {"id": 1, "key": 5}, - {"id": 0, "key": 4}, - {"id": 1, "key": 4}, - {"id": 2, "key": 4}, - {"id": 0, "key": 3}, - {"id": 1, "key": 3}, - {"id": 2, "key": 3}, - ] - } - assert key_result == key_expected + # dj.Top preserves the ORDER BY clause in results + # Use secondary sort by 'id' to ensure deterministic ordering when there are ties + select = SelectPK() & dj.Top(limit=9, order_by=["select desc", "id"]) + key = KeyPK() & dj.Top(limit=9, order_by=["key desc", "id"]) + assert select.to_dicts() == [ + {"id": 2, "select": 8}, + {"id": 2, "select": 6}, + {"id": 1, "select": 4}, + {"id": 2, "select": 4}, + {"id": 1, "select": 3}, + {"id": 1, "select": 2}, + {"id": 2, "select": 2}, + {"id": 1, "select": 1}, + {"id": 0, "select": 0}, + ] + assert key.to_dicts() == [ + {"id": 2, "key": 6}, + {"id": 1, "key": 5}, + {"id": 2, "key": 5}, + {"id": 0, "key": 4}, + {"id": 1, "key": 4}, + {"id": 2, "key": 4}, + {"id": 0, "key": 3}, + {"id": 1, "key": 3}, + {"id": 2, "key": 3}, + ] def test_top_errors(self, schema_simp_pop): with pytest.raises(DataJointError) as err1: diff --git a/tests/integration/test_virtual_module.py b/tests/integration/test_virtual_module.py index bd8a0c754..a8e953273 100644 --- a/tests/integration/test_virtual_module.py +++ b/tests/integration/test_virtual_module.py @@ -1,7 +1,107 @@ +"""Tests for virtual schema infrastructure.""" + +import pytest + import datajoint as dj +from datajoint.table import FreeTable from datajoint.user_tables import UserTable -def test_virtual_module(schema_any, connection_test): - module = dj.VirtualModule("module", schema_any.database, connection=connection_test) - assert issubclass(module.Experiment, UserTable) +class TestVirtualModule: + """Tests for VirtualModule class.""" + + def test_virtual_module_creates_table_classes(self, schema_any, connection_test): + """VirtualModule creates table classes from database schema.""" + module = dj.VirtualModule("module", schema_any.database, connection=connection_test) + assert issubclass(module.Experiment, UserTable) + + def test_virtual_module_has_schema_attribute(self, schema_any, connection_test): + """VirtualModule has schema attribute.""" + module = dj.VirtualModule("module", schema_any.database, connection=connection_test) + assert hasattr(module, "schema") + assert module.schema.database == schema_any.database + + +class TestVirtualSchema: + """Tests for dj.virtual_schema() function.""" + + def test_virtual_schema_creates_module(self, schema_any, connection_test): + """virtual_schema creates a VirtualModule.""" + lab = dj.virtual_schema(schema_any.database, connection=connection_test) + assert isinstance(lab, dj.VirtualModule) + + def test_virtual_schema_has_table_classes(self, schema_any, connection_test): + """virtual_schema module has table classes as attributes.""" + lab = dj.virtual_schema(schema_any.database, connection=connection_test) + assert issubclass(lab.Experiment, UserTable) + + def test_virtual_schema_tables_are_queryable(self, schema_any, connection_test): + """Tables from virtual_schema can be queried.""" + lab = dj.virtual_schema(schema_any.database, connection=connection_test) + # Should not raise + lab.Experiment().to_dicts() + + +class TestSchemaGetTable: + """Tests for Schema.get_table() method.""" + + def test_get_table_by_snake_case(self, schema_any): + """get_table works with snake_case table names.""" + table = schema_any.get_table("experiment") + assert isinstance(table, FreeTable) + assert "experiment" in table.full_table_name + + def test_get_table_by_camel_case(self, schema_any): + """get_table works with CamelCase table names.""" + table = schema_any.get_table("Experiment") + assert isinstance(table, FreeTable) + assert "experiment" in table.full_table_name + + def test_get_table_nonexistent_raises(self, schema_any): + """get_table raises DataJointError for nonexistent tables.""" + with pytest.raises(dj.DataJointError, match="does not exist"): + schema_any.get_table("NonexistentTable") + + +class TestSchemaGetItem: + """Tests for Schema.__getitem__() method.""" + + def test_getitem_by_name(self, schema_any): + """Schema['TableName'] returns table instance.""" + table = schema_any["Experiment"] + assert isinstance(table, FreeTable) + + def test_getitem_is_queryable(self, schema_any): + """Table from __getitem__ can be queried.""" + table = schema_any["Experiment"] + # Should not raise + table.to_dicts() + + +class TestSchemaIteration: + """Tests for Schema.__iter__() method.""" + + def test_iter_yields_tables(self, schema_any): + """Iterating over schema yields FreeTable instances.""" + tables = list(schema_any) + assert len(tables) > 0 + assert all(isinstance(t, FreeTable) for t in tables) + + def test_iter_in_dependency_order(self, schema_any): + """Iteration order respects dependencies.""" + table_names = [t.table_name for t in schema_any] + # Tables should be in topological order + assert len(table_names) == len(set(table_names)) # no duplicates + + +class TestSchemaContains: + """Tests for Schema.__contains__() method.""" + + def test_contains_existing_table(self, schema_any): + """'TableName' in schema returns True for existing tables.""" + assert "Experiment" in schema_any + assert "experiment" in schema_any + + def test_contains_nonexistent_table(self, schema_any): + """'TableName' in schema returns False for nonexistent tables.""" + assert "NonexistentTable" not in schema_any