Skip to content

Commit d138b27

Browse files
Non blocking queue implementation in progress
1 parent 821c72e commit d138b27

File tree

1 file changed

+9
-0
lines changed

1 file changed

+9
-0
lines changed

Modules/_interpqueuesmodule.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -578,6 +578,8 @@ _queue_kill_and_wait(_queue *queue)
578578
PyThread_acquire_lock(queue->mutex, WAIT_LOCK);
579579
assert(queue->alive);
580580
queue->alive = 0;
581+
queue->space_available = (PyEvent){1};
582+
queue->has_item = (PyEvent){1};
581583
PyThread_release_lock(queue->mutex);
582584

583585
// Wait for all waiters to fail.
@@ -1147,7 +1149,9 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj, unboundop_t unboundop,
11471149

11481150
// Wait for the queue to have space
11491151
if (block == 1) {
1152+
_queue_mark_waiter(queue, queues->mutex);
11501153
PyEvent_Wait(&queue->space_available);
1154+
_queue_unmark_waiter(queue, queues->mutex);
11511155
}
11521156

11531157
// Convert the object to cross-interpreter data.
@@ -1198,7 +1202,12 @@ queue_get(_queues *queues, int64_t qid,
11981202

11991203
// Wait for the queue to have some value
12001204
if (block == 1) {
1205+
_queue_mark_waiter(queue, queues->mutex);
12011206
PyEvent_Wait(&queue->has_item);
1207+
_queue_unmark_waiter(queue, queues->mutex);
1208+
if (!queue->alive) {
1209+
return ERR_QUEUE_NOT_FOUND;
1210+
}
12021211
}
12031212

12041213
// Pop off the next item from the queue.

0 commit comments

Comments
 (0)