Skip to content

gh-101659: Use the Raw Allocator in the _xxinterpchannels Module #103287

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 93 additions & 22 deletions Modules/_xxinterpchannelsmodule.c
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,77 @@
#include "pycore_interpreteridobject.h"


/*
This module has the following process-global state:

_globals (static struct globals):
module_count (int)
channels (struct _channels):
numopen (int64_t)
next_id; (int64_t)
mutex (PyThread_type_lock)
head (linked list of struct _channelref *):
id (int64_t)
objcount (Py_ssize_t)
next (struct _channelref *):
...
chan (struct _channel *):
open (int)
mutex (PyThread_type_lock)
closing (struct _channel_closing *):
ref (struct _channelref *):
...
ends (struct _channelends *):
numsendopen (int64_t)
numrecvopen (int64_t)
send (struct _channelend *):
interp (int64_t)
open (int)
next (struct _channelend *)
recv (struct _channelend *):
...
queue (struct _channelqueue *):
count (int64_t)
first (struct _channelitem *):
next (struct _channelitem *):
...
data (_PyCrossInterpreterData *):
data (void *)
obj (PyObject *)
interp (int64_t)
new_object (xid_newobjectfunc)
free (xid_freefunc)
last (struct _channelitem *):
...

The above state includes the following allocations by the module:

* 1 top-level mutex (to protect the rest of the state)
* for each channel:
* 1 struct _channelref
* 1 struct _channel
* 0-1 struct _channel_closing
* 1 struct _channelends
* 2 struct _channelend
* 1 struct _channelqueue
* for each item in each channel:
* 1 struct _channelitem
* 1 _PyCrossInterpreterData

The only objects in that global state are the references held by each
channel's queue, which are safely managed via the _PyCrossInterpreterData_*()
API.. The module does not create any objects that are shared globally.
*/

#define MODULE_NAME "_xxinterpchannels"


#define GLOBAL_MALLOC(TYPE) \
PyMem_RawMalloc(sizeof(TYPE))
#define GLOBAL_FREE(VAR) \
PyMem_RawFree(VAR)


static PyInterpreterState *
_get_current_interp(void)
{
Expand Down Expand Up @@ -301,7 +369,7 @@ typedef struct _channelitem {
static _channelitem *
_channelitem_new(void)
{
_channelitem *item = PyMem_NEW(_channelitem, 1);
_channelitem *item = GLOBAL_MALLOC(_channelitem);
if (item == NULL) {
PyErr_NoMemory();
return NULL;
Expand All @@ -316,7 +384,8 @@ _channelitem_clear(_channelitem *item)
{
if (item->data != NULL) {
(void)_release_xid_data(item->data, 1);
PyMem_Free(item->data);
// It was allocated in _channel_send().
GLOBAL_FREE(item->data);
item->data = NULL;
}
item->next = NULL;
Expand All @@ -326,7 +395,7 @@ static void
_channelitem_free(_channelitem *item)
{
_channelitem_clear(item);
PyMem_Free(item);
GLOBAL_FREE(item);
}

static void
Expand Down Expand Up @@ -357,7 +426,7 @@ typedef struct _channelqueue {
static _channelqueue *
_channelqueue_new(void)
{
_channelqueue *queue = PyMem_NEW(_channelqueue, 1);
_channelqueue *queue = GLOBAL_MALLOC(_channelqueue);
if (queue == NULL) {
PyErr_NoMemory();
return NULL;
Expand All @@ -381,7 +450,7 @@ static void
_channelqueue_free(_channelqueue *queue)
{
_channelqueue_clear(queue);
PyMem_Free(queue);
GLOBAL_FREE(queue);
}

static int
Expand Down Expand Up @@ -433,7 +502,7 @@ typedef struct _channelend {
static _channelend *
_channelend_new(int64_t interp)
{
_channelend *end = PyMem_NEW(_channelend, 1);
_channelend *end = GLOBAL_MALLOC(_channelend);
if (end == NULL) {
PyErr_NoMemory();
return NULL;
Expand All @@ -447,7 +516,7 @@ _channelend_new(int64_t interp)
static void
_channelend_free(_channelend *end)
{
PyMem_Free(end);
GLOBAL_FREE(end);
}

static void
Expand Down Expand Up @@ -492,7 +561,7 @@ typedef struct _channelassociations {
static _channelends *
_channelends_new(void)
{
_channelends *ends = PyMem_NEW(_channelends, 1);
_channelends *ends = GLOBAL_MALLOC(_channelends);
if (ends== NULL) {
return NULL;
}
Expand All @@ -519,7 +588,7 @@ static void
_channelends_free(_channelends *ends)
{
_channelends_clear(ends);
PyMem_Free(ends);
GLOBAL_FREE(ends);
}

static _channelend *
Expand Down Expand Up @@ -660,20 +729,20 @@ typedef struct _channel {
static _PyChannelState *
_channel_new(PyThread_type_lock mutex)
{
_PyChannelState *chan = PyMem_NEW(_PyChannelState, 1);
_PyChannelState *chan = GLOBAL_MALLOC(_PyChannelState);
if (chan == NULL) {
return NULL;
}
chan->mutex = mutex;
chan->queue = _channelqueue_new();
if (chan->queue == NULL) {
PyMem_Free(chan);
GLOBAL_FREE(chan);
return NULL;
}
chan->ends = _channelends_new();
if (chan->ends == NULL) {
_channelqueue_free(chan->queue);
PyMem_Free(chan);
GLOBAL_FREE(chan);
return NULL;
}
chan->open = 1;
Expand All @@ -691,7 +760,7 @@ _channel_free(_PyChannelState *chan)
PyThread_release_lock(chan->mutex);

PyThread_free_lock(chan->mutex);
PyMem_Free(chan);
GLOBAL_FREE(chan);
}

static int
Expand Down Expand Up @@ -814,7 +883,7 @@ typedef struct _channelref {
static _channelref *
_channelref_new(int64_t id, _PyChannelState *chan)
{
_channelref *ref = PyMem_NEW(_channelref, 1);
_channelref *ref = GLOBAL_MALLOC(_channelref);
if (ref == NULL) {
return NULL;
}
Expand All @@ -841,7 +910,7 @@ _channelref_free(_channelref *ref)
_channel_clear_closing(ref->chan);
}
//_channelref_clear(ref);
PyMem_Free(ref);
GLOBAL_FREE(ref);
}

static _channelref *
Expand Down Expand Up @@ -1163,7 +1232,7 @@ _channel_set_closing(struct _channelref *ref, PyThread_type_lock mutex) {
res = ERR_CHANNEL_CLOSED;
goto done;
}
chan->closing = PyMem_NEW(struct _channel_closing, 1);
chan->closing = GLOBAL_MALLOC(struct _channel_closing);
if (chan->closing == NULL) {
goto done;
}
Expand All @@ -1179,7 +1248,7 @@ static void
_channel_clear_closing(struct _channel *chan) {
PyThread_acquire_lock(chan->mutex, WAIT_LOCK);
if (chan->closing != NULL) {
PyMem_Free(chan->closing);
GLOBAL_FREE(chan->closing);
chan->closing = NULL;
}
PyThread_release_lock(chan->mutex);
Expand Down Expand Up @@ -1257,14 +1326,14 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj)
}

// Convert the object to cross-interpreter data.
_PyCrossInterpreterData *data = PyMem_NEW(_PyCrossInterpreterData, 1);
_PyCrossInterpreterData *data = GLOBAL_MALLOC(_PyCrossInterpreterData);
if (data == NULL) {
PyThread_release_lock(mutex);
return -1;
}
if (_PyObject_GetCrossInterpreterData(obj, data) != 0) {
PyThread_release_lock(mutex);
PyMem_Free(data);
GLOBAL_FREE(data);
return -1;
}

Expand All @@ -1274,7 +1343,7 @@ _channel_send(_channels *channels, int64_t id, PyObject *obj)
if (res != 0) {
// We may chain an exception here:
(void)_release_xid_data(data, 0);
PyMem_Free(data);
GLOBAL_FREE(data);
return res;
}

Expand Down Expand Up @@ -1323,11 +1392,13 @@ _channel_recv(_channels *channels, int64_t id, PyObject **res)
if (obj == NULL) {
assert(PyErr_Occurred());
(void)_release_xid_data(data, 1);
PyMem_Free(data);
// It was allocated in _channel_send().
GLOBAL_FREE(data);
return -1;
}
int release_res = _release_xid_data(data, 0);
PyMem_Free(data);
// It was allocated in _channel_send().
GLOBAL_FREE(data);
if (release_res < 0) {
// The source interpreter has been destroyed already.
assert(PyErr_Occurred());
Expand Down