Skip to content

Commit d7a4f23

Browse files
authored
Async client for use with Python's asynchronous I/O + examples (#14)
* Async client for use with Python's asynchronous I/O + examples Here we add support for Python's built-in asyncio [1], a feature that's commonly requested for library APIs because it enables very performant code built on asynchronous I/O. The change creates a second client class called `AsyncClient` along with an `riversqlalchemy.AsyncDriver` to go with the original driver. This design is based on SQLAlchemy's and the code that the Python sqlc plugin generates. The APIs for the pair of clients and drivers are identical, but one set has `async` annotations on everything and takes appropriate inputs (like an async SQLAlchemy engine instead of synchronous one). The final usage looks pretty close to non-async code (which I think is a pretty reasonable outcome overall): engine = sqlalchemy.ext.asyncio.create_async_engine(dev_database_url(is_async=True)) client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine)) insert_res = await client.insert( SortArgs(strings=["whale", "tiger", "bear"]), insert_opts=riverqueue.InsertOpts( unique_opts=riverqueue.UniqueOpts(by_period=900) ), ) print(insert_res) I also added a new `examples/` package that shows code for how to do various things with River Python, and makes sure they're run in CI and get static analysis from MyPy. [1] https://docs.python.org/3/library/asyncio.html * Drop Docker-based tests in favor of normal Postgres Drop Docker-based tests (or at least within the Python program) in favor of using normal Postgres like the other River projects are doing. I'm having a lot of trouble getting Docker working in conjunction with async. It's probably possible to get it working, but the testcontainers docs are subpar, and I'm having to iterate failing tests in GitHub Actions which is awful.
1 parent 51c2851 commit d7a4f23

21 files changed

+801
-249
lines changed

.github/workflows/ci.yaml

Lines changed: 93 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,56 @@
1-
name: build
1+
name: CI
22

33
on:
44
push:
55
branches:
66
- master
77
pull_request:
88

9+
env:
10+
# Database to connect to that can create other databases with `CREATE DATABASE`.
11+
ADMIN_DATABASE_URL: postgres://postgres:postgres@localhost:5432
12+
13+
# A suitable name/URL for non-test database.
14+
DATABASE_NAME: river_dev
15+
DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_dev
16+
17+
# Test database.
18+
TEST_DATABASE_NAME: river_test
19+
TEST_DATABASE_URL: postgres://postgres:postgres@127.0.0.1:5432/river_test
20+
921
jobs:
10-
build:
22+
build_and_test:
1123
runs-on: ubuntu-latest
1224

25+
services:
26+
postgres:
27+
image: postgres:latest
28+
env:
29+
POSTGRES_PASSWORD: postgres
30+
options: >-
31+
--health-cmd pg_isready
32+
--health-interval 2s
33+
--health-timeout 5s
34+
--health-retries 5
35+
ports:
36+
- 5432:5432
37+
1338
steps:
1439
- name: Checkout
1540
uses: actions/checkout@v4
1641

17-
- name: Install the latest version of rye
42+
- name: Install Rye
1843
uses: eifinger/setup-rye@v3
1944

20-
- name: Rye setup
45+
# Needed for River's CLI. There is a version of Go on Actions' base image,
46+
# but it's old and can't read modern `go.mod` annotations correctly.
47+
- name: Install Go
48+
uses: actions/setup-go@v5
49+
with:
50+
go-version: "stable"
51+
check-latest: true
52+
53+
- name: Rye sync
2154
run: rye sync
2255

2356
- name: Format check
@@ -32,5 +65,60 @@ jobs:
3265
- name: Build
3366
run: rye build
3467

68+
- name: Install River CLI
69+
run: go install github.com/riverqueue/river/cmd/river@latest
70+
71+
- name: Create database
72+
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${TEST_DATABASE_NAME};" ${ADMIN_DATABASE_URL}
73+
74+
- name: river migrate-up
75+
run: river migrate-up --database-url "$TEST_DATABASE_URL"
76+
3577
- name: Test
36-
run: RIVER_USE_DOCKER=true rye test
78+
run: rye test
79+
80+
examples_run:
81+
runs-on: ubuntu-latest
82+
83+
services:
84+
postgres:
85+
image: postgres:latest
86+
env:
87+
POSTGRES_PASSWORD: postgres
88+
options: >-
89+
--health-cmd pg_isready
90+
--health-interval 2s
91+
--health-timeout 5s
92+
--health-retries 5
93+
ports:
94+
- 5432:5432
95+
96+
steps:
97+
- name: Checkout
98+
uses: actions/checkout@v4
99+
100+
- name: Install Rye
101+
uses: eifinger/setup-rye@v3
102+
103+
# Needed for River's CLI. There is a version of Go on Actions' base image,
104+
# but it's old and can't read modern `go.mod` annotations correctly.
105+
- name: Install Go
106+
uses: actions/setup-go@v5
107+
with:
108+
go-version: "stable"
109+
check-latest: true
110+
111+
- name: Rye sync
112+
run: rye sync
113+
114+
- name: Install River CLI
115+
run: go install github.com/riverqueue/river/cmd/river@latest
116+
117+
- name: Create database
118+
run: psql --echo-errors --quiet -c '\timing off' -c "CREATE DATABASE ${DATABASE_NAME};" ${ADMIN_DATABASE_URL}
119+
120+
- name: river migrate-up
121+
run: river migrate-up --database-url "$DATABASE_URL"
122+
123+
- name: Run examples
124+
run: rye run python3 -m examples.all

Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,4 @@ test: ## Run test suite with Rye/pytest
2020

2121
.PHONY: type-check
2222
type-check: ## Run type check with MyPy
23-
rye run mypy -p riverqueue -p tests
23+
rye run mypy -p riverqueue -p examples -p tests

docs/development.md

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,6 @@ Run all tests:
3636
$ rye test
3737
```
3838

39-
_Or_, using a Docker test Postgres container instead of a test database:
40-
41-
```shell
42-
RIVER_USE_DOCKER=true rye test
43-
```
44-
4539
Run a specific test (or without `-k` option for all tests in a single file):
4640

4741
```shell

examples/__init__.py

Whitespace-only changes.

examples/all.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
#
2+
# Run with:
3+
#
4+
# rye run python3 -m examples.all
5+
#
6+
# It'd be nice if this could pick up every example automatically, but after
7+
# spending an hour just trying to find a way of making a relative import work,
8+
# I'm not going anywhere near that problem right now.
9+
#
10+
11+
import asyncio
12+
13+
from examples import async_client_insert_example
14+
from examples import async_client_insert_tx_example
15+
from examples import client_insert_example
16+
from examples import client_insert_tx_example
17+
18+
if __name__ == "__main__":
19+
asyncio.set_event_loop(asyncio.new_event_loop())
20+
21+
asyncio.run(async_client_insert_example.example())
22+
asyncio.run(async_client_insert_tx_example.example())
23+
24+
client_insert_example.example()
25+
client_insert_tx_example.example()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#
2+
# Run with:
3+
#
4+
# rye run python3 -m examples.async_client_insert_example
5+
#
6+
7+
import asyncio
8+
from dataclasses import dataclass
9+
import json
10+
import riverqueue
11+
import sqlalchemy
12+
13+
from examples.helpers import dev_database_url
14+
from riverqueue.driver import riversqlalchemy
15+
16+
17+
@dataclass
18+
class SortArgs:
19+
strings: list[str]
20+
21+
kind: str = "sort"
22+
23+
def to_json(self) -> str:
24+
return json.dumps({"strings": self.strings})
25+
26+
27+
async def example():
28+
engine = sqlalchemy.ext.asyncio.create_async_engine(dev_database_url(is_async=True))
29+
client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine))
30+
31+
insert_res = await client.insert(
32+
SortArgs(strings=["whale", "tiger", "bear"]),
33+
insert_opts=riverqueue.InsertOpts(
34+
unique_opts=riverqueue.UniqueOpts(by_period=900)
35+
),
36+
)
37+
print(insert_res)
38+
39+
40+
if __name__ == "__main__":
41+
asyncio.set_event_loop(asyncio.new_event_loop())
42+
asyncio.run(example())
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
#
2+
# Run with:
3+
#
4+
# rye run python3 -m examples.async_client_insert_tx_example
5+
#
6+
7+
import asyncio
8+
from dataclasses import dataclass
9+
import json
10+
import riverqueue
11+
import sqlalchemy
12+
13+
from examples.helpers import dev_database_url
14+
from riverqueue.driver import riversqlalchemy
15+
16+
17+
@dataclass
18+
class SortArgs:
19+
strings: list[str]
20+
21+
kind: str = "sort"
22+
23+
def to_json(self) -> str:
24+
return json.dumps({"strings": self.strings})
25+
26+
27+
async def example():
28+
engine = sqlalchemy.ext.asyncio.create_async_engine(dev_database_url(is_async=True))
29+
client = riverqueue.AsyncClient(riversqlalchemy.AsyncDriver(engine))
30+
31+
async with engine.begin() as session:
32+
insert_res = await client.insert_tx(
33+
session,
34+
SortArgs(strings=["whale", "tiger", "bear"]),
35+
insert_opts=riverqueue.InsertOpts(
36+
unique_opts=riverqueue.UniqueOpts(by_period=900)
37+
),
38+
)
39+
print(insert_res)
40+
41+
42+
if __name__ == "__main__":
43+
asyncio.set_event_loop(asyncio.new_event_loop())
44+
asyncio.run(example())

examples/client_insert_example.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
#
2+
# Run with:
3+
#
4+
# rye run python3 -m examples.client_insert_example
5+
#
6+
7+
from dataclasses import dataclass
8+
import json
9+
import riverqueue
10+
import sqlalchemy
11+
12+
from examples.helpers import dev_database_url
13+
from riverqueue.driver import riversqlalchemy
14+
15+
16+
@dataclass
17+
class SortArgs:
18+
strings: list[str]
19+
20+
kind: str = "sort"
21+
22+
def to_json(self) -> str:
23+
return json.dumps({"strings": self.strings})
24+
25+
26+
def example():
27+
engine = sqlalchemy.create_engine(dev_database_url())
28+
client = riverqueue.Client(riversqlalchemy.Driver(engine))
29+
30+
insert_res = client.insert(
31+
SortArgs(strings=["whale", "tiger", "bear"]),
32+
insert_opts=riverqueue.InsertOpts(
33+
unique_opts=riverqueue.UniqueOpts(by_period=900)
34+
),
35+
)
36+
print(insert_res)
37+
38+
39+
if __name__ == "__main__":
40+
example()
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
#
2+
# Run with:
3+
#
4+
# rye run python3 -m examples.client_insert_tx_example
5+
#
6+
7+
from dataclasses import dataclass
8+
import json
9+
import riverqueue
10+
import sqlalchemy
11+
12+
from examples.helpers import dev_database_url
13+
from riverqueue.driver import riversqlalchemy
14+
15+
16+
@dataclass
17+
class SortArgs:
18+
strings: list[str]
19+
20+
kind: str = "sort"
21+
22+
def to_json(self) -> str:
23+
return json.dumps({"strings": self.strings})
24+
25+
26+
def example():
27+
engine = sqlalchemy.create_engine(dev_database_url())
28+
client = riverqueue.Client(riversqlalchemy.Driver(engine))
29+
30+
with engine.begin() as session:
31+
insert_res = client.insert_tx(
32+
session,
33+
SortArgs(strings=["whale", "tiger", "bear"]),
34+
insert_opts=riverqueue.InsertOpts(
35+
unique_opts=riverqueue.UniqueOpts(by_period=900)
36+
),
37+
)
38+
print(insert_res)
39+
40+
41+
if __name__ == "__main__":
42+
example()

examples/helpers.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import os
2+
3+
4+
def dev_database_url(is_async: bool = False) -> str:
5+
database_url = os.getenv("DATABASE_URL", "postgres://localhost/river_dev")
6+
7+
# sqlalchemy removed support for postgres:// for reasons beyond comprehension
8+
database_url = database_url.replace("postgres://", "postgresql://")
9+
10+
# mix in an async driver for async
11+
if is_async:
12+
database_url = database_url.replace("postgresql://", "postgresql+asyncpg://")
13+
14+
return database_url

0 commit comments

Comments
 (0)