Skip to content

Commit a03cc79

Browse files
authored
Add usage instructions README + job state constants + change insert many return (#19)
Flesh out the README to add detailed usage instructions. The README also appears on PyPI, so it's nice to have extra information on what to do there. It'll also be very closely mirrored on River's docs site. Also, add job state constants like `JOB_STATE_AVAILABLE`, similar to what's available in Ruby in case anyone needs them (they're useful for use in `UniqueOpts`). Lastly, change the return value of `insert_many`/`insert_many_tx` to be the number of rows inserted instead of a list of inserted jobs to match what's returned in Go and Ruby. The rationale here is that if someone is using the bulk insert functions then they're probably concerned about performance, so it's probably not worth the cost of returning the whole set of jobs back, which are probably used infrequently anyway. These functions aren't implemented yet anyway, so the change isn't a problem.
1 parent db0ab1f commit a03cc79

File tree

6 files changed

+209
-28
lines changed

6 files changed

+209
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,4 +17,4 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1717

1818
### Added
1919

20-
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [psycopg2](https://pypi.org/project/psycopg2/) or [asyncpg](https://github.com/MagicStack/asyncpg) (for async).
20+
- Initial release, supporting insertion through [SQLAlchemy](https://www.sqlalchemy.org/) and its underlying Postgres drivers like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).

README.md

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,176 @@
22

33
An insert-only Python client for [River](https://github.com/riverqueue/river) packaged in the [`riverqueue` package on PyPI](https://pypi.org/project/riverqueue/). Allows jobs to be inserted in Python and run by a Go worker, but doesn't support working jobs in Python.
44

5+
## Basic usage
6+
7+
Your project should bundle the [`riverqueue` package](https://pypi.org/project/riverqueue/) in its dependencies. How to go about this depend on your toolchain, but for example in [Rye](https://github.com/astral-sh/rye), it'd look like:
8+
9+
```shell
10+
rye add riverqueue
11+
```
12+
13+
Initialize a client with:
14+
15+
```python
16+
import riverqueue
17+
from riverqueue.driver import riversqlalchemy
18+
19+
engine = sqlalchemy.create_engine("postgresql://...")
20+
client = riverqueue.Client(riversqlalchemy.Driver(engine))
21+
```
22+
23+
Define a job and insert it:
24+
25+
```python
26+
@dataclass
27+
class SortArgs:
28+
strings: list[str]
29+
30+
kind: str = "sort"
31+
32+
def to_json(self) -> str:
33+
return json.dumps({"strings": self.strings})
34+
35+
insert_res = client.insert(
36+
SortArgs(strings=["whale", "tiger", "bear"]),
37+
)
38+
insert_res.job # inserted job row
39+
```
40+
41+
Job args should comply with the following [protocol](https://peps.python.org/pep-0544/):
42+
43+
```python
44+
class Args(Protocol):
45+
kind: str
46+
47+
def to_json(self) -> str:
48+
pass
49+
```
50+
51+
* `kind` is a unique string that identifies them the job in the database, and which a Go worker will recognize.
52+
* `to_json()` defines how the job will serialize to JSON, which of course will have to be parseable as an object in Go.
53+
54+
They may also respond to `insert_opts()` with an instance of `InsertOpts` to define insertion options that'll be used for all jobs of the kind.
55+
56+
We recommend using [`dataclasses`](https://docs.python.org/3/library/dataclasses.html) for job args since they should ideally be minimal sets of primitive properties with little other embellishment, and `dataclasses` provide a succinct way of accomplishing this.
57+
58+
## Insertion options
59+
60+
Inserts take an `insert_opts` parameter to customize features of the inserted job:
61+
62+
```python
63+
insert_res = client.insert(
64+
SortArgs(strings=["whale", "tiger", "bear"]),
65+
insert_opts=riverqueue.InsertOpts(
66+
max_attempts=17,
67+
priority=3,
68+
queue: "my_queue",
69+
tags: ["custom"]
70+
),
71+
)
72+
```
73+
74+
## Inserting unique jobs
75+
76+
[Unique jobs](https://riverqueue.com/docs/unique-jobs) are supported through `InsertOpts.unique_opts()`, and can be made unique by args, period, queue, and state. If a job matching unique properties is found on insert, the insert is skipped and the existing job returned.
77+
78+
```python
79+
insert_res = client.insert(
80+
SortArgs(strings=["whale", "tiger", "bear"]),
81+
insert_opts=riverqueue.InsertOpts(
82+
unique_opts=riverqueue.UniqueOpts(
83+
by_args: True,
84+
by_period=15*60,
85+
by_queue: True,
86+
by_state: [riverqueue.JOB_STATE_AVAILABLE]
87+
)
88+
),
89+
)
90+
91+
# contains either a newly inserted job, or an existing one if insertion was skipped
92+
insert_res.job
93+
94+
# true if insertion was skipped
95+
insert_res.unique_skipped_as_duplicated
96+
```
97+
98+
### Custom advisory lock prefix
99+
100+
Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
101+
102+
```python
103+
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix: 123456)
104+
```
105+
106+
Doing so has the downside of leaving only 32 bits for River's locks (64 bits total - 32-bit prefix), making them somewhat more likely to conflict with each other.
107+
108+
## Inserting jobs in bulk
109+
110+
Use `#insert_many` to bulk insert jobs as a single operation for improved efficiency:
111+
112+
```python
113+
num_inserted = client.insert_many([
114+
SimpleArgs(job_num: 1),
115+
SimpleArgs(job_num: 2)
116+
])
117+
```
118+
119+
Or with `InsertManyParams`, which may include insertion options:
120+
121+
```python
122+
num_inserted = client.insert_many([
123+
InsertManyParams(args=SimpleArgs.new(job_num: 1), insert_opts=riverqueue.InsertOpts.new(max_attempts=5)),
124+
InsertManyParams(args=SimpleArgs.new(job_num: 2), insert_opts=riverqueue.InsertOpts.new(queue="high_priority"))
125+
])
126+
```
127+
128+
## Inserting in a transaction
129+
130+
To insert jobs in a transaction, open one in your driver, and pass it as the first argument to `insert_tx()` or `insert_many_tx()`:
131+
132+
```python
133+
with engine.begin() as session:
134+
insert_res = client.insert_tx(
135+
session,
136+
SortArgs(strings=["whale", "tiger", "bear"]),
137+
)
138+
```
139+
140+
## Asynchronous I/O (`asyncio`)
141+
142+
The package supports River's [`asyncio` (asynchronous I/O)](https://docs.python.org/3/library/asyncio.html) through an alternate `AsyncClient` and `riversqlalchemy.AsyncDriver`. You'll need to make sure to use SQLAlchemy's alternative async engine and an asynchronous Postgres driver like [`asyncpg`](https://github.com/MagicStack/asyncpg), but otherwise usage looks very similar to use without async:
143+
144+
```python
145+
engine = sqlalchemy.ext.asyncio.create_async_engine("postgresql+asyncpg://...")
146+
client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine))
147+
148+
insert_res = await client.insert(
149+
SortArgs(strings=["whale", "tiger", "bear"]),
150+
)
151+
```
152+
153+
With a transaction:
154+
155+
```python
156+
async with engine.begin() as session:
157+
insert_res = await client.insert_tx(
158+
session,
159+
SortArgs(strings=["whale", "tiger", "bear"]),
160+
)
161+
```
162+
163+
## MyPy and type checking
164+
165+
The package exports a `py.typed` file to indicate that it's typed, so you should be able to use [MyPy](https://mypy-lang.org/) to include it in static analysis.
166+
167+
## Drivers
168+
169+
### SQLAlchemy
170+
171+
Our read is that [SQLAlchemy](https://www.sqlalchemy.org/) is the dominant ORM in the Python ecosystem, so it's the only driver available for River. Under the hood of SQLAlchemy, projects will also need a Postgres driver like [`psycopg2`](https://pypi.org/project/psycopg2/) or [`asyncpg`](https://github.com/MagicStack/asyncpg) (for async).
172+
173+
River's driver system should enable integration with other ORMs, so let us know if there's a good reason you need one, and we'll consider it.
174+
5175
## Development
6176

7177
See [development](./docs/development.md).

src/riverqueue/__init__.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
# Reexport for more ergonomic use in calling code.
22
from .client import (
3+
JOB_STATE_AVAILABLE as JOB_STATE_AVAILABLE,
4+
JOB_STATE_CANCELLED as JOB_STATE_CANCELLED,
5+
JOB_STATE_COMPLETED as JOB_STATE_COMPLETED,
6+
JOB_STATE_DISCARDED as JOB_STATE_DISCARDED,
7+
JOB_STATE_RETRYABLE as JOB_STATE_RETRYABLE,
8+
JOB_STATE_RUNNING as JOB_STATE_RUNNING,
9+
JOB_STATE_SCHEDULED as JOB_STATE_SCHEDULED,
310
AsyncClient as AsyncClient,
411
Args as Args,
512
Client as Client,
13+
InsertManyParams as InsertManyParams,
614
InsertOpts as InsertOpts,
715
UniqueOpts as UniqueOpts,
816
)

src/riverqueue/client.py

Lines changed: 26 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,24 @@
77
from .model import InsertResult
88
from .fnv import fnv1_hash
99

10+
JOB_STATE_AVAILABLE = "available"
11+
JOB_STATE_CANCELLED = "cancelled"
12+
JOB_STATE_COMPLETED = "completed"
13+
JOB_STATE_DISCARDED = "discarded"
14+
JOB_STATE_RETRYABLE = "retryable"
15+
JOB_STATE_RUNNING = "running"
16+
JOB_STATE_SCHEDULED = "scheduled"
17+
1018
MAX_ATTEMPTS_DEFAULT = 25
1119
PRIORITY_DEFAULT = 1
1220
QUEUE_DEFAULT = "default"
13-
UNIQUE_STATES_DEFAULT = ["available", "completed", "running", "retryable", "scheduled"]
21+
UNIQUE_STATES_DEFAULT = [
22+
JOB_STATE_AVAILABLE,
23+
JOB_STATE_COMPLETED,
24+
JOB_STATE_RUNNING,
25+
JOB_STATE_RETRYABLE,
26+
JOB_STATE_SCHEDULED,
27+
]
1428

1529

1630
class Args(Protocol):
@@ -81,19 +95,13 @@ async def insert():
8195

8296
return await self.__check_unique_job(exec, insert_params, unique_opts, insert)
8397

84-
async def insert_many(self, args: List[Args]) -> List[InsertResult]:
98+
async def insert_many(self, args: List[Args | InsertManyParams]) -> int:
8599
async with self.driver.executor() as exec:
86-
return [
87-
InsertResult(x)
88-
for x in await exec.job_insert_many(_make_insert_params_many(args))
89-
]
100+
return await exec.job_insert_many(_make_insert_params_many(args))
90101

91-
async def insert_many_tx(self, tx, args: List[Args]) -> List[InsertResult]:
102+
async def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
92103
exec = self.driver.unwrap_executor(tx)
93-
return [
94-
InsertResult(x)
95-
for x in await exec.job_insert_many(_make_insert_params_many(args))
96-
]
104+
return await exec.job_insert_many(_make_insert_params_many(args))
97105

98106
async def __check_unique_job(
99107
self,
@@ -154,19 +162,13 @@ def insert():
154162

155163
return self.__check_unique_job(exec, insert_params, unique_opts, insert)
156164

157-
def insert_many(self, args: List[Args]) -> List[InsertResult]:
165+
def insert_many(self, args: List[Args | InsertManyParams]) -> int:
158166
with self.driver.executor() as exec:
159-
return [
160-
InsertResult(x)
161-
for x in exec.job_insert_many(_make_insert_params_many(args))
162-
]
167+
return exec.job_insert_many(_make_insert_params_many(args))
163168

164-
def insert_many_tx(self, tx, args: List[Args]) -> List[InsertResult]:
169+
def insert_many_tx(self, tx, args: List[Args | InsertManyParams]) -> int:
165170
exec = self.driver.unwrap_executor(tx)
166-
return [
167-
InsertResult(x)
168-
for x in exec.job_insert_many(_make_insert_params_many(args))
169-
]
171+
return exec.job_insert_many(_make_insert_params_many(args))
170172

171173
def __check_unique_job(
172174
self,
@@ -298,7 +300,9 @@ def _make_insert_params(
298300
return insert_params, unique_opts
299301

300302

301-
def _make_insert_params_many(args: List[Args]) -> List[JobInsertParams]:
303+
def _make_insert_params_many(
304+
args: List[Args | InsertManyParams],
305+
) -> List[JobInsertParams]:
302306
return [
303307
_make_insert_params(
304308
arg.args, arg.insert_opts or InsertOpts(), is_insert_many=True

src/riverqueue/driver/driver_protocol.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ async def advisory_lock(self, lock: int) -> None:
4545
async def job_insert(self, insert_params: JobInsertParams) -> Job:
4646
pass
4747

48-
async def job_insert_many(self, all_params) -> List[Job]:
48+
async def job_insert_many(self, all_params) -> int:
4949
pass
5050

5151
async def job_get_by_kind_and_unique_properties(
@@ -103,7 +103,7 @@ def advisory_lock(self, lock: int) -> None:
103103
def job_insert(self, insert_params: JobInsertParams) -> Job:
104104
pass
105105

106-
def job_insert_many(self, all_params) -> List[Job]:
106+
def job_insert_many(self, all_params) -> int:
107107
pass
108108

109109
def job_get_by_kind_and_unique_properties(

src/riverqueue/driver/riversqlalchemy/sql_alchemy_driver.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111
AsyncIterator,
1212
Iterator,
1313
Optional,
14-
List,
1514
cast,
1615
)
1716

@@ -37,7 +36,7 @@ async def job_insert(self, insert_params: JobInsertParams) -> Job:
3736
),
3837
)
3938

40-
async def job_insert_many(self, all_params) -> List[Job]:
39+
async def job_insert_many(self, all_params) -> int:
4140
raise NotImplementedError("sqlc doesn't implement copy in python yet")
4241

4342
async def job_get_by_kind_and_unique_properties(
@@ -95,7 +94,7 @@ def job_insert(self, insert_params: JobInsertParams) -> Job:
9594
),
9695
)
9796

98-
def job_insert_many(self, all_params) -> List[Job]:
97+
def job_insert_many(self, all_params) -> int:
9998
raise NotImplementedError("sqlc doesn't implement copy in python yet")
10099

101100
def job_get_by_kind_and_unique_properties(

0 commit comments

Comments
 (0)