Skip to content

Commit 343e823

Browse files
authored
Fix a problem where tests were leaving leftover jobs in the database (#28)
Fix a problem where tests were leaving leftover jobs in the database which turns out to be because SQLAlchemy stupidly doesn't start a transaction when `begin()` is called, which was having the effect of the unique job check starting a top-level transaction instead of savepoint, thereby committing insert results outside of the test transactions. This took an insanely long time to track down, and I ended up adding a number of extra measures to help find the problem which I'm also keeping for the time begin: * `RIVER_DEBUG` activates debug logging for SQLAlchemy, showing queries and connection pool activity. * An autorunning fixture checks for extra jobs after each test case to make sure nothing gets leftover. * `SimpleArgs` starts to internalize the name of the test that created it, so upon seeing an extraneous row, it's easy to know exactly where it came from. It becomes a fixture to make this terse.
1 parent 2cc9c41 commit 343e823

File tree

7 files changed

+263
-132
lines changed

7 files changed

+263
-132
lines changed

README.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ insert_res.unique_skipped_as_duplicated
9797

9898
### Custom advisory lock prefix
9999

100-
Unique job insertion takes a Postgres advisory lock to make sure that it's uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
100+
Unique job insertion takes a Postgres advisory lock to make sure that its uniqueness check still works even if two conflicting insert operations are occurring in parallel. Postgres advisory locks share a global 64-bit namespace, which is a large enough space that it's unlikely for two advisory locks to ever conflict, but to _guarantee_ that River's advisory locks never interfere with an application's, River can be configured with a 32-bit advisory lock prefix which it will use for all its locks:
101101

102102
```python
103103
client = riverqueue.Client(riversqlalchemy.Driver(engine), advisory_lock_prefix: 123456)

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.py

Lines changed: 51 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,20 @@
44
# source: river_job.sql
55
import dataclasses
66
import datetime
7-
from typing import Any, List, Optional
7+
from typing import Any, AsyncIterator, Iterator, List, Optional
88

99
import sqlalchemy
1010
import sqlalchemy.ext.asyncio
1111

1212
from . import models
1313

1414

15+
JOB_GET_ALL = """-- name: job_get_all \\:many
16+
SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
17+
FROM river_job
18+
"""
19+
20+
1521
JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES = """-- name: job_get_by_kind_and_unique_properties \\:one
1622
SELECT id, args, attempt, attempted_at, attempted_by, created_at, errors, finalized_at, kind, max_attempts, metadata, priority, queue, state, scheduled_at, tags
1723
FROM river_job
@@ -125,6 +131,28 @@ class Querier:
125131
def __init__(self, conn: sqlalchemy.engine.Connection):
126132
self._conn = conn
127133

134+
def job_get_all(self) -> Iterator[models.RiverJob]:
135+
result = self._conn.execute(sqlalchemy.text(JOB_GET_ALL))
136+
for row in result:
137+
yield models.RiverJob(
138+
id=row[0],
139+
args=row[1],
140+
attempt=row[2],
141+
attempted_at=row[3],
142+
attempted_by=row[4],
143+
created_at=row[5],
144+
errors=row[6],
145+
finalized_at=row[7],
146+
kind=row[8],
147+
max_attempts=row[9],
148+
metadata=row[10],
149+
priority=row[11],
150+
queue=row[12],
151+
state=row[13],
152+
scheduled_at=row[14],
153+
tags=row[15],
154+
)
155+
128156
def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]:
129157
row = self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), {
130158
"p1": arg.kind,
@@ -213,6 +241,28 @@ class AsyncQuerier:
213241
def __init__(self, conn: sqlalchemy.ext.asyncio.AsyncConnection):
214242
self._conn = conn
215243

244+
async def job_get_all(self) -> AsyncIterator[models.RiverJob]:
245+
result = await self._conn.stream(sqlalchemy.text(JOB_GET_ALL))
246+
async for row in result:
247+
yield models.RiverJob(
248+
id=row[0],
249+
args=row[1],
250+
attempt=row[2],
251+
attempted_at=row[3],
252+
attempted_by=row[4],
253+
created_at=row[5],
254+
errors=row[6],
255+
finalized_at=row[7],
256+
kind=row[8],
257+
max_attempts=row[9],
258+
metadata=row[10],
259+
priority=row[11],
260+
queue=row[12],
261+
state=row[13],
262+
scheduled_at=row[14],
263+
tags=row[15],
264+
)
265+
216266
async def job_get_by_kind_and_unique_properties(self, arg: JobGetByKindAndUniquePropertiesParams) -> Optional[models.RiverJob]:
217267
row = (await self._conn.execute(sqlalchemy.text(JOB_GET_BY_KIND_AND_UNIQUE_PROPERTIES), {
218268
"p1": arg.kind,

src/riverqueue/driver/riversqlalchemy/dbsqlc/river_job.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ CREATE TABLE river_job(
3535
CONSTRAINT kind_length CHECK (char_length(kind) > 0 AND char_length(kind) < 128)
3636
);
3737

38+
-- name: JobGetAll :many
39+
SELECT *
40+
FROM river_job;
41+
3842
-- name: JobGetByKindAndUniqueProperties :one
3943
SELECT *
4044
FROM river_job

tests/client_test.py

Lines changed: 22 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from riverqueue.driver import DriverProtocol, ExecutorProtocol
99
import sqlalchemy
1010

11-
from tests.simple_args import SimpleArgs
12-
1311

1412
@pytest.fixture
1513
def mock_driver() -> DriverProtocol:
@@ -38,17 +36,17 @@ def client(mock_driver) -> Client:
3836
return Client(mock_driver)
3937

4038

41-
def test_insert_with_only_args(client, mock_exec):
39+
def test_insert_with_only_args(client, mock_exec, simple_args):
4240
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
4341
mock_exec.job_insert.return_value = "job_row"
4442

45-
insert_res = client.insert(SimpleArgs())
43+
insert_res = client.insert(simple_args)
4644

4745
mock_exec.job_insert.assert_called_once()
4846
assert insert_res.job == "job_row"
4947

5048

51-
def test_insert_tx(mock_driver, client):
49+
def test_insert_tx(mock_driver, client, simple_args):
5250
mock_exec = MagicMock(spec=ExecutorProtocol)
5351
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
5452
mock_exec.job_insert.return_value = "job_row"
@@ -61,17 +59,17 @@ def mock_unwrap_executor(tx: sqlalchemy.Transaction):
6159

6260
mock_driver.unwrap_executor.side_effect = mock_unwrap_executor
6361

64-
insert_res = client.insert_tx(mock_tx, SimpleArgs())
62+
insert_res = client.insert_tx(mock_tx, simple_args)
6563

6664
mock_exec.job_insert.assert_called_once()
6765
assert insert_res.job == "job_row"
6866

6967

70-
def test_insert_with_insert_opts_from_args(client, mock_exec):
68+
def test_insert_with_insert_opts_from_args(client, mock_exec, simple_args):
7169
mock_exec.job_insert.return_value = "job_row"
7270

7371
insert_res = client.insert(
74-
SimpleArgs(),
72+
simple_args,
7573
insert_opts=InsertOpts(
7674
max_attempts=23, priority=2, queue="job_custom_queue", tags=["job_custom"]
7775
),
@@ -121,7 +119,7 @@ def to_json() -> str:
121119
assert insert_args.tags == ["job_custom"]
122120

123121

124-
def test_insert_with_insert_opts_precedence(client, mock_exec):
122+
def test_insert_with_insert_opts_precedence(client, mock_exec, simple_args):
125123
@dataclass
126124
class MyArgs:
127125
kind = "my_args"
@@ -142,7 +140,7 @@ def to_json() -> str:
142140
mock_exec.job_insert.return_value = "job_row"
143141

144142
insert_res = client.insert(
145-
SimpleArgs(),
143+
simple_args,
146144
insert_opts=InsertOpts(
147145
max_attempts=17, priority=3, queue="my_queue", tags=["custom"]
148146
),
@@ -158,13 +156,13 @@ def to_json() -> str:
158156
assert insert_args.tags == ["custom"]
159157

160158

161-
def test_insert_with_unique_opts_by_args(client, mock_exec):
159+
def test_insert_with_unique_opts_by_args(client, mock_exec, simple_args):
162160
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_args=True))
163161

164162
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
165163
mock_exec.job_insert.return_value = "job_row"
166164

167-
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
165+
insert_res = client.insert(simple_args, insert_opts=insert_opts)
168166

169167
mock_exec.job_insert.assert_called_once()
170168
assert insert_res.job == "job_row"
@@ -175,15 +173,17 @@ def test_insert_with_unique_opts_by_args(client, mock_exec):
175173

176174

177175
@patch("datetime.datetime")
178-
def test_insert_with_unique_opts_by_period(mock_datetime, client, mock_exec):
176+
def test_insert_with_unique_opts_by_period(
177+
mock_datetime, client, mock_exec, simple_args
178+
):
179179
mock_datetime.now.return_value = datetime(2024, 6, 1, 12, 0, 0, tzinfo=timezone.utc)
180180

181181
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_period=900))
182182

183183
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
184184
mock_exec.job_insert.return_value = "job_row"
185185

186-
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
186+
insert_res = client.insert(simple_args, insert_opts=insert_opts)
187187

188188
mock_exec.job_insert.assert_called_once()
189189
assert insert_res.job == "job_row"
@@ -193,13 +193,13 @@ def test_insert_with_unique_opts_by_period(mock_datetime, client, mock_exec):
193193
assert call_args.kind == "simple"
194194

195195

196-
def test_insert_with_unique_opts_by_queue(client, mock_exec):
196+
def test_insert_with_unique_opts_by_queue(client, mock_exec, simple_args):
197197
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_queue=True))
198198

199199
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
200200
mock_exec.job_insert.return_value = "job_row"
201201

202-
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
202+
insert_res = client.insert(simple_args, insert_opts=insert_opts)
203203

204204
mock_exec.job_insert.assert_called_once()
205205
assert insert_res.job == "job_row"
@@ -209,13 +209,13 @@ def test_insert_with_unique_opts_by_queue(client, mock_exec):
209209
assert call_args.kind == "simple"
210210

211211

212-
def test_insert_with_unique_opts_by_state(client, mock_exec):
212+
def test_insert_with_unique_opts_by_state(client, mock_exec, simple_args):
213213
insert_opts = InsertOpts(unique_opts=UniqueOpts(by_state=["available", "running"]))
214214

215215
mock_exec.job_get_by_kind_and_unique_properties.return_value = None
216216
mock_exec.job_insert.return_value = "job_row"
217217

218-
insert_res = client.insert(SimpleArgs(), insert_opts=insert_opts)
218+
insert_res = client.insert(simple_args, insert_opts=insert_opts)
219219

220220
mock_exec.job_insert.assert_called_once()
221221
assert insert_res.job == "job_row"
@@ -259,20 +259,20 @@ def to_json() -> None:
259259
assert "args should return non-nil from `to_json`" == str(ex.value)
260260

261261

262-
def test_tag_validation(client):
262+
def test_tag_validation(client, simple_args):
263263
client.insert(
264-
SimpleArgs(), insert_opts=InsertOpts(tags=["foo", "bar", "baz", "foo-bar-baz"])
264+
simple_args, insert_opts=InsertOpts(tags=["foo", "bar", "baz", "foo-bar-baz"])
265265
)
266266

267267
with pytest.raises(AssertionError) as ex:
268-
client.insert(SimpleArgs(), insert_opts=InsertOpts(tags=["commas,bad"]))
268+
client.insert(simple_args, insert_opts=InsertOpts(tags=["commas,bad"]))
269269
assert (
270270
r"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
271271
== str(ex.value)
272272
)
273273

274274
with pytest.raises(AssertionError) as ex:
275-
client.insert(SimpleArgs(), insert_opts=InsertOpts(tags=["a" * 256]))
275+
client.insert(simple_args, insert_opts=InsertOpts(tags=["a" * 256]))
276276
assert (
277277
r"tags should be less than 255 characters in length and match regex \A[\w][\w\-]+[\w]\Z"
278278
== str(ex.value)

tests/conftest.py

Lines changed: 66 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,30 @@
1+
from dataclasses import dataclass
2+
import json
13
import os
4+
from typing import Iterator
25

36
import pytest
47
import sqlalchemy
58
import sqlalchemy.ext.asyncio
69

10+
from riverqueue.driver.riversqlalchemy.dbsqlc import river_job
11+
12+
13+
def engine_opts() -> dict:
14+
"""
15+
Use to pass verbose logging options to an SQLAlchemy when `RIVER_DEBUG=true`
16+
in the environment.
17+
"""
18+
19+
if os.getenv("RIVER_DEBUG") == "true":
20+
return dict(echo=True, echo_pool="debug")
21+
22+
return dict()
23+
724

825
@pytest.fixture(scope="session")
926
def engine() -> sqlalchemy.Engine:
10-
return sqlalchemy.create_engine(test_database_url())
27+
return sqlalchemy.create_engine(test_database_url(), **engine_opts())
1128

1229

1330
# @pytest_asyncio.fixture(scope="session")
@@ -22,10 +39,17 @@ def engine_async() -> sqlalchemy.ext.asyncio.AsyncEngine:
2239
# This statement disables pooling which isn't ideal, but I've spent
2340
# too many hours trying to figure this out so I'm calling it.
2441
poolclass=sqlalchemy.pool.NullPool,
42+
**engine_opts(),
2543
)
2644

2745

2846
def test_database_url(is_async: bool = False) -> str:
47+
"""
48+
Produces a test URL based on `TEST_DATABASE_URL` or River's default
49+
convention and modifies it so that it's protocol includes an appropriate
50+
driver to make SQLAlchemy happy.
51+
"""
52+
2953
database_url = os.getenv("TEST_DATABASE_URL", "postgres://localhost/river_test")
3054

3155
# sqlalchemy removed support for postgres:// for reasons beyond comprehension
@@ -36,3 +60,44 @@ def test_database_url(is_async: bool = False) -> str:
3660
database_url = database_url.replace("postgresql://", "postgresql+asyncpg://")
3761

3862
return database_url
63+
64+
65+
@dataclass
66+
class SimpleArgs:
67+
test_name: str
68+
69+
kind: str = "simple"
70+
71+
def to_json(self) -> str:
72+
return json.dumps({"test_name": self.test_name})
73+
74+
75+
@pytest.fixture
76+
def simple_args(request: pytest.FixtureRequest):
77+
"""
78+
Returns an instance of SimpleArgs encapsulating the running test's name. This
79+
can be useful in cases where a test is accidentally leaving leftovers in the
80+
database.
81+
"""
82+
83+
return SimpleArgs(test_name=request.node.name)
84+
85+
86+
@pytest.fixture(autouse=True)
87+
def check_leftover_jobs(engine) -> Iterator[None]:
88+
"""
89+
Autorunning fixture that checks for leftover jobs after each test case. I
90+
previously had a huge amount of trouble tracking down tests that were
91+
inserting rows despite being in a test transaction and ended up adding this
92+
check, along with naming inserted jobs after their test case. If it turns
93+
these measures haven't been needed in a long time, we can probably remove
94+
them.
95+
"""
96+
97+
yield
98+
99+
with engine.begin() as conn_tx:
100+
jobs = river_job.Querier(conn_tx).job_get_all()
101+
assert (
102+
list(jobs) == []
103+
), "test case should not have persisted any jobs after run"

0 commit comments

Comments
 (0)