Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

gh-113884: Refactor queue.SimpleQueue to use a ring buffer to store items #114259

Merged
209 changes: 169 additions & 40 deletions Modules/_queuemodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include "pycore_moduleobject.h" // _PyModule_GetState()
#include "pycore_time.h" // _PyTime_t

#include <stdbool.h>
#include <stddef.h> // offsetof()

typedef struct {
Expand All @@ -25,12 +26,167 @@ static struct PyModuleDef queuemodule;
#define simplequeue_get_state_by_type(type) \
(simplequeue_get_state(PyType_GetModuleByDef(type, &queuemodule)))

static const Py_ssize_t INITIAL_RING_BUF_CAPACITY = 8;

typedef struct {
// Where to place the next item
Py_ssize_t put_idx;

// Where to get the next item
Py_ssize_t get_idx;

PyObject **items;

// Total number of items that may be stored
Py_ssize_t items_cap;

// Number of items stored
Py_ssize_t num_items;
} RingBuf;

static int
RingBuf_Init(RingBuf *buf)
{
buf->put_idx = 0;
buf->get_idx = 0;
buf->items_cap = INITIAL_RING_BUF_CAPACITY;
buf->num_items = 0;
buf->items = PyMem_Calloc(buf->items_cap, sizeof(PyObject *));
if (buf->items == NULL) {
PyErr_NoMemory();
return -1;
}
return 0;
}

static PyObject *
RingBuf_At(RingBuf *buf, Py_ssize_t idx)
{
assert(idx >= 0 && idx < buf->num_items);
return buf->items[(buf->get_idx + idx) % buf->items_cap];
}

static void
RingBuf_Fini(RingBuf *buf)
{
PyObject **items = buf->items;
Py_ssize_t num_items = buf->num_items;
Py_ssize_t cap = buf->items_cap;
Py_ssize_t idx = buf->get_idx;
buf->items = NULL;
buf->put_idx = 0;
buf->get_idx = 0;
buf->num_items = 0;
buf->items_cap = 0;
for (Py_ssize_t n = num_items; n > 0; idx = (idx + 1) % cap, n--) {
Py_DECREF(items[idx]);
}
PyMem_Free(items);
}

// Resize the underlying items array of buf to the new capacity and arrange
// the items contiguously in the new items array.
//
// Returns -1 on allocation failure or 0 on success.
static int
resize_ringbuf(RingBuf *buf, Py_ssize_t capacity)
{
Py_ssize_t new_capacity = Py_MAX(INITIAL_RING_BUF_CAPACITY, capacity);
if (new_capacity == buf->items_cap) {
return 0;
}
assert(buf->num_items <= new_capacity);

PyObject **new_items = PyMem_Calloc(new_capacity, sizeof(PyObject *));
if (new_items == NULL) {
return -1;
}

// Copy the "tail" of the old items array. This corresponds to "head" of
// the abstract ring buffer.
Py_ssize_t tail_size =
Py_MIN(buf->num_items, buf->items_cap - buf->get_idx);
if (tail_size > 0) {
memcpy(new_items, buf->items + buf->get_idx,
tail_size * sizeof(PyObject *));
}

// Copy the "head" of the old items array, if any. This corresponds to the
// "tail" of the abstract ring buffer.
Py_ssize_t head_size = buf->num_items - tail_size;
if (head_size > 0) {
memcpy(new_items + tail_size, buf->items,
head_size * sizeof(PyObject *));
}

PyMem_Free(buf->items);
buf->items = new_items;
buf->items_cap = new_capacity;
buf->get_idx = 0;
buf->put_idx = buf->num_items;

return 0;
}

// Returns a strong reference from the head of the buffer.
static PyObject *
RingBuf_Get(RingBuf *buf)
{
assert(buf->num_items > 0);

if (buf->num_items < (buf->items_cap / 4)) {
// Items is less than 25% occupied, shrink it by 50%. This allows for
// growth without immediately needing to resize the underlying items
// array.
//
// It's safe it ignore allocation failures here; shrinking is an
// optimization that isn't required for correctness.
(void)resize_ringbuf(buf, buf->items_cap / 2);
}

PyObject *item = buf->items[buf->get_idx];
buf->items[buf->get_idx] = NULL;
buf->get_idx = (buf->get_idx + 1) % buf->items_cap;
buf->num_items--;
return item;
}

// Returns 0 on success or -1 if the buffer failed to grow
static int
RingBuf_Put(RingBuf *buf, PyObject *item)
{
assert(buf->num_items <= buf->items_cap);

if (buf->num_items == buf->items_cap) {
// Buffer is full, grow it.
if (resize_ringbuf(buf, buf->items_cap * 2) < 0) {
PyErr_NoMemory();
return -1;
}
}
buf->items[buf->put_idx] = Py_NewRef(item);
buf->put_idx = (buf->put_idx + 1) % buf->items_cap;
buf->num_items++;
return 0;
}

static Py_ssize_t
RingBuf_Len(RingBuf *buf)
{
return buf->num_items;
}

static bool
RingBuf_IsEmpty(RingBuf *buf)
{
return buf->num_items == 0;
}

typedef struct {
PyObject_HEAD
PyThread_type_lock lock;
int locked;
PyObject *lst;
Py_ssize_t lst_pos;
RingBuf buf;
PyObject *weakreflist;
} simplequeueobject;

Expand All @@ -43,7 +199,7 @@ class _queue.SimpleQueue "simplequeueobject *" "simplequeue_get_state_by_type(ty
static int
simplequeue_clear(simplequeueobject *self)
{
Py_CLEAR(self->lst);
RingBuf_Fini(&self->buf);
return 0;
}

Expand All @@ -69,7 +225,10 @@ simplequeue_dealloc(simplequeueobject *self)
static int
simplequeue_traverse(simplequeueobject *self, visitproc visit, void *arg)
{
Py_VISIT(self->lst);
RingBuf *buf = &self->buf;
for (Py_ssize_t i = 0, num_items = buf->num_items; i < num_items; i++) {
Py_VISIT(RingBuf_At(buf, i));
}
Py_VISIT(Py_TYPE(self));
return 0;
}
Expand All @@ -90,15 +249,13 @@ simplequeue_new_impl(PyTypeObject *type)
self = (simplequeueobject *) type->tp_alloc(type, 0);
if (self != NULL) {
self->weakreflist = NULL;
self->lst = PyList_New(0);
self->lock = PyThread_allocate_lock();
self->lst_pos = 0;
if (self->lock == NULL) {
Py_DECREF(self);
PyErr_SetString(PyExc_MemoryError, "can't allocate lock");
return NULL;
}
if (self->lst == NULL) {
if (RingBuf_Init(&self->buf) < 0) {
Py_DECREF(self);
return NULL;
}
Expand Down Expand Up @@ -126,7 +283,7 @@ _queue_SimpleQueue_put_impl(simplequeueobject *self, PyObject *item,
/*[clinic end generated code: output=4333136e88f90d8b input=6e601fa707a782d5]*/
{
/* BEGIN GIL-protected critical section */
if (PyList_Append(self->lst, item) < 0)
if (RingBuf_Put(&self->buf, item) < 0)
return NULL;
if (self->locked) {
/* A get() may be waiting, wake it up */
Expand Down Expand Up @@ -155,33 +312,6 @@ _queue_SimpleQueue_put_nowait_impl(simplequeueobject *self, PyObject *item)
return _queue_SimpleQueue_put_impl(self, item, 0, Py_None);
}

static PyObject *
simplequeue_pop_item(simplequeueobject *self)
{
Py_ssize_t count, n;
PyObject *item;

n = PyList_GET_SIZE(self->lst);
assert(self->lst_pos < n);

item = PyList_GET_ITEM(self->lst, self->lst_pos);
Py_INCREF(Py_None);
PyList_SET_ITEM(self->lst, self->lst_pos, Py_None);
self->lst_pos += 1;
count = n - self->lst_pos;
if (self->lst_pos > count) {
/* The list is more than 50% empty, reclaim space at the beginning */
if (PyList_SetSlice(self->lst, 0, self->lst_pos, NULL)) {
/* Undo pop */
self->lst_pos -= 1;
PyList_SET_ITEM(self->lst, self->lst_pos, item);
return NULL;
}
self->lst_pos = 0;
}
return item;
}

/*[clinic input]
_queue.SimpleQueue.get

Expand Down Expand Up @@ -249,7 +379,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
* So we simply try to acquire the lock in a loop, until the condition
* (queue non-empty) becomes true.
*/
while (self->lst_pos == PyList_GET_SIZE(self->lst)) {
while (RingBuf_IsEmpty(&self->buf)) {
/* First a simple non-blocking try without releasing the GIL */
r = PyThread_acquire_lock_timed(self->lock, 0, 0);
if (r == PY_LOCK_FAILURE && microseconds != 0) {
Expand Down Expand Up @@ -279,8 +409,7 @@ _queue_SimpleQueue_get_impl(simplequeueobject *self, PyTypeObject *cls,
}

/* BEGIN GIL-protected critical section */
assert(self->lst_pos < PyList_GET_SIZE(self->lst));
item = simplequeue_pop_item(self);
item = RingBuf_Get(&self->buf);
if (self->locked) {
PyThread_release_lock(self->lock);
self->locked = 0;
Expand Down Expand Up @@ -320,7 +449,7 @@ static int
_queue_SimpleQueue_empty_impl(simplequeueobject *self)
/*[clinic end generated code: output=1a02a1b87c0ef838 input=1a98431c45fd66f9]*/
{
return self->lst_pos == PyList_GET_SIZE(self->lst);
return RingBuf_IsEmpty(&self->buf);
}

/*[clinic input]
Expand All @@ -333,7 +462,7 @@ static Py_ssize_t
_queue_SimpleQueue_qsize_impl(simplequeueobject *self)
/*[clinic end generated code: output=f9dcd9d0a90e121e input=7a74852b407868a1]*/
{
return PyList_GET_SIZE(self->lst) - self->lst_pos;
return RingBuf_Len(&self->buf);
}

static int
Expand Down
Loading