Skip to content
Prev Previous commit
Handle a small race with waiting->status when releasing.
  • Loading branch information
ericsnowcurrently committed Oct 17, 2023
commit ab148fae9e3a21b6aec00a2823d3464796bbaff9
39 changes: 32 additions & 7 deletions Modules/_xxinterpchannelsmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,13 @@
#include "pycore_pybuffer.h" // _PyBuffer_ReleaseInInterpreterAndRawFree()
#include "pycore_interp.h" // _PyInterpreterState_LookUpID()

#ifdef MS_WINDOWS
#define WIN32_LEAN_AND_MEAN
#include <windows.h> // SwitchToThread()
#elif defined(HAVE_SCHED_H)
#include <sched.h> // sched_yield()
#endif


/*
This module has the following process-global state:
Expand Down Expand Up @@ -596,7 +603,8 @@ typedef struct wait_info {
enum {
WAITING_NO_STATUS = 0,
WAITING_ACQUIRED = 1,
WAITING_RELEASED = 2,
WAITING_RELEASING = 2,
WAITING_RELEASED = 3,
} status;
int received;
_channelitem_id_t itemid;
Expand All @@ -621,7 +629,8 @@ _waiting_init(_waiting_t *waiting)
static void
_waiting_clear(_waiting_t *waiting)
{
assert(waiting->status != WAITING_ACQUIRED);
assert(waiting->status != WAITING_ACQUIRED
&& waiting->status != WAITING_RELEASING);
if (waiting->mutex != NULL) {
PyThread_free_lock(waiting->mutex);
waiting->mutex = NULL;
Expand Down Expand Up @@ -649,12 +658,25 @@ _waiting_release(_waiting_t *waiting, int received)
assert(waiting->status == WAITING_ACQUIRED);
assert(!waiting->received);

waiting->status = WAITING_RELEASING;
PyThread_release_lock(waiting->mutex);
waiting->status = WAITING_RELEASED;
if (waiting->received != received) {
assert(received == 1);
waiting->received = received;
}
waiting->status = WAITING_RELEASED;
}

static void
_waiting_finish_releasing(_waiting_t *waiting)
{
while (waiting->status == WAITING_RELEASING) {
#ifdef MS_WINDOWS
SwitchToThread();
#elif defined(HAVE_SCHED_H)
sched_yield();
#endif
}
}

struct _channelitem;
Expand Down Expand Up @@ -1881,6 +1903,7 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
/* Wait until the object is received. */
if (wait_for_lock(waiting.mutex) < 0) {
assert(PyErr_Occurred());
_waiting_finish_releasing(&waiting);
/* The send() call is failing now, so make sure the item
won't be received. */
_channel_clear_sent(channels, cid, &waiting);
Expand All @@ -1892,14 +1915,16 @@ _channel_send_wait(_channels *channels, int64_t cid, PyObject *obj)
// XXX Emit a warning if not a TimeoutError?
PyErr_Clear();
}
else if (!waiting.received) {
else {
_waiting_finish_releasing(&waiting);
assert(waiting.status == WAITING_RELEASED);
res = ERR_CHANNEL_CLOSED_WAITING;
goto finally;
if (!waiting.received) {
res = ERR_CHANNEL_CLOSED_WAITING;
goto finally;
}
}

/* success! */
assert(waiting.status == WAITING_RELEASED);
res = 0;

finally:
Expand Down