From 828a227ded9f06853bfa94599b545a15d1d3db53 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 20 Feb 2018 17:29:51 +0000 Subject: [PATCH 01/18] Convert channel IDs into channels when shared. --- Modules/_xxsubinterpretersmodule.c | 34 ++++++++++++++++++++++++++++-- 1 file changed, 32 insertions(+), 2 deletions(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index f5e2ea3c79d683..e42a4b139699d9 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -1525,8 +1525,38 @@ static PyObject * _channelid_from_xid(_PyCrossInterpreterData *data) { struct _channelid_xid *xid = (struct _channelid_xid *)data->data; - return (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end, - _global_channels(), 0); + PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end, + _global_channels(), 0); + if (xid->end == 0) { + return cid; + } + + /* Try returning a high-level channel end but fall back to the ID. */ + PyObject *highlevel = PyImport_ImportModule("interpreters"); + if (highlevel == NULL) { + PyErr_Clear(); + highlevel = PyImport_ImportModule("test.support.interpreters"); + if (highlevel == NULL) { + goto error; + } + } + const char *clsname = (xid->end == CHANNEL_RECV) ? "RecvChannel" : + "SendChannel"; + PyObject *cls = PyObject_GetAttrString(highlevel, clsname); + Py_DECREF(highlevel); + if (cls == NULL) { + goto error; + } + PyObject *chan = PyObject_CallFunctionObjArgs(cls, cid, NULL); + if (chan == NULL) { + goto error; + } + Py_DECREF(cid); + return chan; + +error: + PyErr_Clear(); + return cid; } static int From bac0f1271015cf0484a9591bf89d0fa90dcbce20 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 20 Feb 2018 17:30:46 +0000 Subject: [PATCH 02/18] Only convert channel IDs when requested. --- Lib/test/test__xxsubinterpreters.py | 38 +++++++++++++++++++++-------- Modules/_xxsubinterpretersmodule.c | 28 ++++++++++++++------- 2 files changed, 47 insertions(+), 19 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 4ef77716c662dd..cccbd6bb1998d5 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -17,18 +17,18 @@ def _captured_script(script): indented = script.replace('\n', '\n ') wrapped = dedent(f""" import contextlib - with open({w}, 'w') as chan: - with contextlib.redirect_stdout(chan): + with open({w}, 'w') as spipe: + with contextlib.redirect_stdout(spipe): {indented} """) return wrapped, open(r) def _run_output(interp, request, shared=None): - script, chan = _captured_script(request) - with chan: + script, rpipe = _captured_script(request) + with rpipe: interpreters.run_string(interp, script, shared) - return chan.read() + return rpipe.read() @contextlib.contextmanager @@ -37,8 +37,8 @@ def _running(interp): def run(): interpreters.run_string(interp, dedent(f""" # wait for "signal" - with open({r}) as chan: - chan.read() + with open({r}) as rpipe: + rpipe.read() """)) t = threading.Thread(target=run) @@ -46,8 +46,8 @@ def run(): yield - with open(w, 'w') as chan: - chan.write('done') + with open(w, 'w') as spipe: + spipe.write('done') t.join() @@ -1209,7 +1209,7 @@ def test_recv_empty(self): with self.assertRaises(interpreters.ChannelEmptyError): interpreters.channel_recv(cid) - def test_run_string_arg(self): + def test_run_string_arg_unresolved(self): cid = interpreters.channel_create() interp = interpreters.create() @@ -1224,6 +1224,24 @@ def test_run_string_arg(self): self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') + def test_run_string_arg_resolved(self): + cid = interpreters.channel_create() + cid = interpreters._channel_id(cid, _resolve=True) + interp = interpreters.create() + + out = _run_output(interp, dedent(""" + import _xxsubinterpreters as _interpreters + print(chan.end) + _interpreters.channel_send(chan, b'spam') + #print(chan.id.end) + #_interpreters.channel_send(chan.id, b'spam') + """), + dict(chan=cid.send)) + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') + if __name__ == '__main__': unittest.main() diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index e42a4b139699d9..3adebf27135dc5 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -1304,12 +1304,13 @@ typedef struct channelid { PyObject_HEAD int64_t id; int end; + int resolve; _channels *channels; } channelid; static channelid * newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels, - int force) + int force, int resolve) { channelid *self = PyObject_New(channelid, cls); if (self == NULL) { @@ -1317,6 +1318,7 @@ newchannelid(PyTypeObject *cls, int64_t cid, int end, _channels *channels, } self->id = cid; self->end = end; + self->resolve = resolve; self->channels = channels; if (_channels_add_id_object(channels, cid) != 0) { @@ -1337,14 +1339,15 @@ static _channels * _global_channels(void); static PyObject * channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds) { - static char *kwlist[] = {"id", "send", "recv", "force", NULL}; + static char *kwlist[] = {"id", "send", "recv", "force", "_resolve", NULL}; PyObject *id; int send = -1; int recv = -1; int force = 0; + int resolve = 0; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O|$ppp:ChannelID.__init__", kwlist, - &id, &send, &recv, &force)) + "O|$pppp:ChannelID.__new__", kwlist, + &id, &send, &recv, &force, &resolve)) return NULL; // Coerce and check the ID. @@ -1376,7 +1379,8 @@ channelid_new(PyTypeObject *cls, PyObject *args, PyObject *kwds) end = CHANNEL_RECV; } - return (PyObject *)newchannelid(cls, cid, end, _global_channels(), force); + return (PyObject *)newchannelid(cls, cid, end, _global_channels(), + force, resolve); } static void @@ -1519,17 +1523,22 @@ channelid_richcompare(PyObject *self, PyObject *other, int op) struct _channelid_xid { int64_t id; int end; + int resolve; }; static PyObject * _channelid_from_xid(_PyCrossInterpreterData *data) { struct _channelid_xid *xid = (struct _channelid_xid *)data->data; + // Note that we do not preserve the "resolve" flag. PyObject *cid = (PyObject *)newchannelid(&ChannelIDtype, xid->id, xid->end, - _global_channels(), 0); + _global_channels(), 0, 0); if (xid->end == 0) { return cid; } + if (!xid->resolve) { + return cid; + } /* Try returning a high-level channel end but fall back to the ID. */ PyObject *highlevel = PyImport_ImportModule("interpreters"); @@ -1568,6 +1577,7 @@ _channelid_shared(PyObject *obj, _PyCrossInterpreterData *data) } xid->id = ((channelid *)obj)->id; xid->end = ((channelid *)obj)->end; + xid->resolve = ((channelid *)obj)->resolve; data->data = xid; data->obj = obj; @@ -1583,7 +1593,7 @@ channelid_end(PyObject *self, void *end) channelid *cid = (channelid *)self; if (end != NULL) { return (PyObject *)newchannelid(Py_TYPE(self), cid->id, *(int *)end, - cid->channels, force); + cid->channels, force, cid->resolve); } if (cid->end == CHANNEL_SEND) { @@ -2378,7 +2388,7 @@ channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) return NULL; } PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, cid, 0, - &_globals.channels, 0); + &_globals.channels, 0, 0); if (id == NULL) { if (_channel_destroy(&_globals.channels, cid) != 0) { // XXX issue a warning? @@ -2436,7 +2446,7 @@ channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) int64_t *cur = cids; for (int64_t i=0; i < count; cur++, i++) { PyObject *id = (PyObject *)newchannelid(&ChannelIDtype, *cur, 0, - &_globals.channels, 0); + &_globals.channels, 0, 0); if (id == NULL) { Py_DECREF(ids); ids = NULL; From 5242a349edbf5d82a9987f4719c8edea337a273d Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Tue, 13 Mar 2018 20:37:46 -0600 Subject: [PATCH 03/18] channel_drop_interpreter -> channel_release. --- Lib/test/test__xxsubinterpreters.py | 48 +++++++++++++++-------------- Modules/_xxsubinterpretersmodule.c | 12 ++++---- 2 files changed, 31 insertions(+), 29 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index cccbd6bb1998d5..3a115dded66fd0 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -904,18 +904,20 @@ def test_ids_global(self): #################### - def test_drop_single_user(self): + # XXX Add more tests for channel_release(). + + def test_release_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + interpreters.channel_release(cid, send=True, recv=True) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send(cid, b'eggs') with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_drop_multiple_users(self): + def test_release_multiple_users(self): cid = interpreters.channel_create() id1 = interpreters.create() id2 = interpreters.create() @@ -926,98 +928,98 @@ def test_drop_multiple_users(self): out = _run_output(id2, dedent(f""" import _xxsubinterpreters as _interpreters obj = _interpreters.channel_recv({int(cid)}) - _interpreters.channel_drop_interpreter({int(cid)}) + _interpreters.channel_release({int(cid)}) print(repr(obj)) """)) interpreters.run_string(id1, dedent(f""" - _interpreters.channel_drop_interpreter({int(cid)}) + _interpreters.channel_release({int(cid)}) """)) self.assertEqual(out.strip(), "b'spam'") - def test_drop_no_kwargs(self): + def test_release_no_kwargs(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid) + interpreters.channel_release(cid) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send(cid, b'eggs') with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_drop_multiple_times(self): + def test_release_multiple_times(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + interpreters.channel_release(cid, send=True, recv=True) with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + interpreters.channel_release(cid, send=True, recv=True) - def test_drop_with_unused_items(self): + def test_release_with_unused_items(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'ham') - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + interpreters.channel_release(cid, send=True, recv=True) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_drop_never_used(self): + def test_release_never_used(self): cid = interpreters.channel_create() - interpreters.channel_drop_interpreter(cid) + interpreters.channel_release(cid) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send(cid, b'spam') with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_drop_by_unassociated_interp(self): + def test_release_by_unassociated_interp(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_drop_interpreter({int(cid)}) + _interpreters.channel_release({int(cid)}) """)) obj = interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid) + interpreters.channel_release(cid) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send(cid, b'eggs') self.assertEqual(obj, b'spam') - def test_drop_close_if_unassociated(self): + def test_release_close_if_unassociated(self): cid = interpreters.channel_create() interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters obj = _interpreters.channel_send({int(cid)}, b'spam') - _interpreters.channel_drop_interpreter({int(cid)}) + _interpreters.channel_release({int(cid)}) """)) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_drop_partially(self): + def test_release_partially(self): # XXX Is partial close too weird/confusing? cid = interpreters.channel_create() interpreters.channel_send(cid, None) interpreters.channel_recv(cid) interpreters.channel_send(cid, b'spam') - interpreters.channel_drop_interpreter(cid, send=True) + interpreters.channel_release(cid, send=True) obj = interpreters.channel_recv(cid) self.assertEqual(obj, b'spam') - def test_drop_used_multiple_times_by_single_user(self): + def test_release_used_multiple_times_by_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) - interpreters.channel_drop_interpreter(cid, send=True, recv=True) + interpreters.channel_release(cid, send=True, recv=True) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_send(cid, b'eggs') diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 3adebf27135dc5..88c2b655079aad 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -2530,7 +2530,7 @@ Close the channel for all interpreters. Once the channel's ID has\n\ no more ref counts the channel will be destroyed."); static PyObject * -channel_drop_interpreter(PyObject *self, PyObject *args, PyObject *kwds) +channel_release(PyObject *self, PyObject *args, PyObject *kwds) { // Note that only the current interpreter is affected. static char *kwlist[] = {"id", "send", "recv", NULL}; @@ -2538,7 +2538,7 @@ channel_drop_interpreter(PyObject *self, PyObject *args, PyObject *kwds) int send = -1; int recv = -1; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O|$pp:channel_drop_interpreter", kwlist, + "O|$pp:channel_release", kwlist, &id, &send, &recv)) return NULL; @@ -2564,8 +2564,8 @@ channel_drop_interpreter(PyObject *self, PyObject *args, PyObject *kwds) Py_RETURN_NONE; } -PyDoc_STRVAR(channel_drop_interpreter_doc, -"channel_drop_interpreter(ID, *, send=None, recv=None)\n\ +PyDoc_STRVAR(channel_release_doc, +"channel_release(ID, *, send=None, recv=None)\n\ \n\ Close the channel for the current interpreter. 'send' and 'recv'\n\ (bool) may be used to indicate the ends to close. By default both\n\ @@ -2608,8 +2608,8 @@ static PyMethodDef module_functions[] = { METH_VARARGS, channel_recv_doc}, {"channel_close", channel_close, METH_O, channel_close_doc}, - {"channel_drop_interpreter", (PyCFunction)channel_drop_interpreter, - METH_VARARGS | METH_KEYWORDS, channel_drop_interpreter_doc}, + {"channel_release", (PyCFunction)channel_release, + METH_VARARGS | METH_KEYWORDS, channel_release_doc}, {"_channel_id", (PyCFunction)channel__channel_id, METH_VARARGS | METH_KEYWORDS, NULL}, From 7b2407dce0bc0b9891c353b017b35bb4de5cb314 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 23 Mar 2018 14:02:17 -0700 Subject: [PATCH 04/18] Organize tests. --- Lib/test/test__xxsubinterpreters.py | 419 +++++++++++++++++----------- 1 file changed, 255 insertions(+), 164 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 3a115dded66fd0..cf5ce1bd74e11b 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -12,6 +12,9 @@ interpreters = support.import_module('_xxsubinterpreters') +################################## +# helpers + def _captured_script(script): r, w = os.pipe() indented = script.replace('\n', '\n ') @@ -51,6 +54,27 @@ def run(): t.join() +class TestBase(unittest.TestCase): + + def tearDown(self): + for id in interpreters.list_all(): + if id == 0: # main + continue + try: + interpreters.destroy(id) + except RuntimeError: + pass # already destroyed + + for cid in interpreters.channel_list_all(): + try: + interpreters.channel_destroy(cid) + except interpreters.ChannelNotFoundError: + pass # already destroyed + + +################################## +# misc. tests + class IsShareableTests(unittest.TestCase): def test_default_shareables(self): @@ -100,23 +124,8 @@ class SubBytes(bytes): interpreters.is_shareable(obj)) -class TestBase(unittest.TestCase): - - def tearDown(self): - for id in interpreters.list_all(): - if id == 0: # main - continue - try: - interpreters.destroy(id) - except RuntimeError: - pass # already destroyed - - for cid in interpreters.channel_list_all(): - try: - interpreters.channel_destroy(cid) - except interpreters.ChannelNotFoundError: - pass # already destroyed - +################################## +# interpreter tests class ListAllTests(TestBase): @@ -783,6 +792,9 @@ def f(): self.assertEqual(retcode, 0) +################################## +# channel tests + class ChannelIDTests(TestBase): def test_default_kwargs(self): @@ -904,9 +916,177 @@ def test_ids_global(self): #################### - # XXX Add more tests for channel_release(). + def test_send_recv_main(self): + cid = interpreters.channel_create() + orig = b'spam' + interpreters.channel_send(cid, orig) + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, orig) + self.assertIsNot(obj, orig) - def test_release_single_user(self): + def test_send_recv_same_interpreter(self): + id1 = interpreters.create() + out = _run_output(id1, dedent(""" + import _xxsubinterpreters as _interpreters + cid = _interpreters.channel_create() + orig = b'spam' + _interpreters.channel_send(cid, orig) + obj = _interpreters.channel_recv(cid) + assert obj is not orig + assert obj == orig + """)) + + def test_send_recv_different_interpreters(self): + cid = interpreters.channel_create() + id1 = interpreters.create() + out = _run_output(id1, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_send({int(cid)}, b'spam') + """)) + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'spam') + + def test_send_recv_different_threads(self): + cid = interpreters.channel_create() + + def f(): + while True: + try: + obj = interpreters.channel_recv(cid) + break + except interpreters.ChannelEmptyError: + time.sleep(0.1) + interpreters.channel_send(cid, obj) + t = threading.Thread(target=f) + t.start() + + interpreters.channel_send(cid, b'spam') + t.join() + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'spam') + + def test_send_recv_different_interpreters_and_threads(self): + cid = interpreters.channel_create() + id1 = interpreters.create() + out = None + + def f(): + nonlocal out + out = _run_output(id1, dedent(f""" + import time + import _xxsubinterpreters as _interpreters + while True: + try: + obj = _interpreters.channel_recv({int(cid)}) + break + except _interpreters.ChannelEmptyError: + time.sleep(0.1) + assert(obj == b'spam') + _interpreters.channel_send({int(cid)}, b'eggs') + """)) + t = threading.Thread(target=f) + t.start() + + interpreters.channel_send(cid, b'spam') + t.join() + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'eggs') + + def test_send_not_found(self): + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_send(10, b'spam') + + def test_recv_not_found(self): + with self.assertRaises(interpreters.ChannelNotFoundError): + interpreters.channel_recv(10) + + def test_recv_empty(self): + cid = interpreters.channel_create() + with self.assertRaises(interpreters.ChannelEmptyError): + interpreters.channel_recv(cid) + + def test_run_string_arg_unresolved(self): + cid = interpreters.channel_create() + interp = interpreters.create() + + out = _run_output(interp, dedent(""" + import _xxsubinterpreters as _interpreters + print(cid.end) + _interpreters.channel_send(cid, b'spam') + """), + dict(cid=cid.send)) + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') + + def test_run_string_arg_resolved(self): + cid = interpreters.channel_create() + cid = interpreters._channel_id(cid, _resolve=True) + interp = interpreters.create() + + out = _run_output(interp, dedent(""" + import _xxsubinterpreters as _interpreters + print(chan.end) + _interpreters.channel_send(chan, b'spam') + #print(chan.id.end) + #_interpreters.channel_send(chan.id, b'spam') + """), + dict(chan=cid.send)) + obj = interpreters.channel_recv(cid) + + self.assertEqual(obj, b'spam') + self.assertEqual(out.strip(), 'send') + + +class ChannelReleaseTests(TestBase): + + # XXX Add more test coverage a la the tests for close(). + + """ + - main / interp / other + - run in: current thread / new thread / other thread / different threads + - end / opposite + - force / no force + - used / not used (associated / not associated) + - empty / emptied / never emptied / partly emptied + - closed / not closed + - released / not released + - creator (interp) / other + - associated interpreter not running + - associated interpreter destroyed + """ + + """ + use + pre-release + release + after + check + """ + + """ + release in: main, interp1 + creator: same, other (incl. interp2) + + use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-release: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-release forced: None,send,recv,both in None,same,other(incl. interp2),same+other(incl. interp2),all + + release: same + release forced: same + + use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + release after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + check released: send/recv for same/other(incl. interp2) + check closed: send/recv for same/other(incl. interp2) + """ + + def test_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) @@ -917,7 +1097,7 @@ def test_release_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_release_multiple_users(self): + def test_multiple_users(self): cid = interpreters.channel_create() id1 = interpreters.create() id2 = interpreters.create() @@ -937,7 +1117,7 @@ def test_release_multiple_users(self): self.assertEqual(out.strip(), "b'spam'") - def test_release_no_kwargs(self): + def test_no_kwargs(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) @@ -948,7 +1128,7 @@ def test_release_no_kwargs(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_release_multiple_times(self): + def test_multiple_times(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) @@ -957,7 +1137,7 @@ def test_release_multiple_times(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_release(cid, send=True, recv=True) - def test_release_with_unused_items(self): + def test_with_unused_items(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'ham') @@ -966,7 +1146,7 @@ def test_release_with_unused_items(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_release_never_used(self): + def test_never_used(self): cid = interpreters.channel_create() interpreters.channel_release(cid) @@ -975,7 +1155,7 @@ def test_release_never_used(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_release_by_unassociated_interp(self): + def test_by_unassociated_interp(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interp = interpreters.create() @@ -990,7 +1170,7 @@ def test_release_by_unassociated_interp(self): interpreters.channel_send(cid, b'eggs') self.assertEqual(obj, b'spam') - def test_release_close_if_unassociated(self): + def test_close_if_unassociated(self): cid = interpreters.channel_create() interp = interpreters.create() interpreters.run_string(interp, dedent(f""" @@ -1002,7 +1182,7 @@ def test_release_close_if_unassociated(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_release_partially(self): + def test_partially(self): # XXX Is partial close too weird/confusing? cid = interpreters.channel_create() interpreters.channel_send(cid, None) @@ -1013,7 +1193,7 @@ def test_release_partially(self): self.assertEqual(obj, b'spam') - def test_release_used_multiple_times_by_single_user(self): + def test_used_multiple_times_by_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'spam') @@ -1026,9 +1206,48 @@ def test_release_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - #################### - def test_close_single_user(self): +class ChannelCloseTests(TestBase): + + """ + - main / interp / other + - run in: current thread / new thread / other thread / different threads + - end / opposite + - force / no force + - used / not used (associated / not associated) + - empty / emptied / never emptied / partly emptied + - closed / not closed + - released / not released + - creator (interp) / other + - associated interpreter not running + - associated interpreter destroyed + """ + + """ + use + pre-close + close + after + check + """ + + """ + close in: main, interp1 + creator: same, other (incl. interp2) + + use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-close: None,send,recv in None,same,other(incl. interp2),same+other(incl. interp2),all + pre-close forced: None,send,recv in None,same,other(incl. interp2),same+other(incl. interp2),all + + close: same + close forced: same + + use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + close after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + check closed: send/recv for same/other(incl. interp2) + """ + + def test_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) @@ -1039,7 +1258,7 @@ def test_close_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_close_multiple_users(self): + def test_multiple_users(self): cid = interpreters.channel_create() id1 = interpreters.create() id2 = interpreters.create() @@ -1063,7 +1282,7 @@ def test_close_multiple_users(self): """)) self.assertIn('ChannelClosedError', str(cm.exception)) - def test_close_multiple_times(self): + def test_multiple_times(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_recv(cid) @@ -1072,7 +1291,7 @@ def test_close_multiple_times(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_close(cid) - def test_close_with_unused_items(self): + def test_with_unused_items(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'ham') @@ -1081,7 +1300,7 @@ def test_close_with_unused_items(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_close_never_used(self): + def test_never_used(self): cid = interpreters.channel_create() interpreters.channel_close(cid) @@ -1090,7 +1309,7 @@ def test_close_never_used(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - def test_close_by_unassociated_interp(self): + def test_by_unassociated_interp(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interp = interpreters.create() @@ -1103,7 +1322,7 @@ def test_close_by_unassociated_interp(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_close(cid) - def test_close_used_multiple_times_by_single_user(self): + def test_used_multiple_times_by_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') interpreters.channel_send(cid, b'spam') @@ -1116,134 +1335,6 @@ def test_close_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - #################### - - def test_send_recv_main(self): - cid = interpreters.channel_create() - orig = b'spam' - interpreters.channel_send(cid, orig) - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, orig) - self.assertIsNot(obj, orig) - - def test_send_recv_same_interpreter(self): - id1 = interpreters.create() - out = _run_output(id1, dedent(""" - import _xxsubinterpreters as _interpreters - cid = _interpreters.channel_create() - orig = b'spam' - _interpreters.channel_send(cid, orig) - obj = _interpreters.channel_recv(cid) - assert obj is not orig - assert obj == orig - """)) - - def test_send_recv_different_interpreters(self): - cid = interpreters.channel_create() - id1 = interpreters.create() - out = _run_output(id1, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') - """)) - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, b'spam') - - def test_send_recv_different_threads(self): - cid = interpreters.channel_create() - - def f(): - while True: - try: - obj = interpreters.channel_recv(cid) - break - except interpreters.ChannelEmptyError: - time.sleep(0.1) - interpreters.channel_send(cid, obj) - t = threading.Thread(target=f) - t.start() - - interpreters.channel_send(cid, b'spam') - t.join() - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, b'spam') - - def test_send_recv_different_interpreters_and_threads(self): - cid = interpreters.channel_create() - id1 = interpreters.create() - out = None - - def f(): - nonlocal out - out = _run_output(id1, dedent(f""" - import time - import _xxsubinterpreters as _interpreters - while True: - try: - obj = _interpreters.channel_recv({int(cid)}) - break - except _interpreters.ChannelEmptyError: - time.sleep(0.1) - assert(obj == b'spam') - _interpreters.channel_send({int(cid)}, b'eggs') - """)) - t = threading.Thread(target=f) - t.start() - - interpreters.channel_send(cid, b'spam') - t.join() - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, b'eggs') - - def test_send_not_found(self): - with self.assertRaises(interpreters.ChannelNotFoundError): - interpreters.channel_send(10, b'spam') - - def test_recv_not_found(self): - with self.assertRaises(interpreters.ChannelNotFoundError): - interpreters.channel_recv(10) - - def test_recv_empty(self): - cid = interpreters.channel_create() - with self.assertRaises(interpreters.ChannelEmptyError): - interpreters.channel_recv(cid) - - def test_run_string_arg_unresolved(self): - cid = interpreters.channel_create() - interp = interpreters.create() - - out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters - print(cid.end) - _interpreters.channel_send(cid, b'spam') - """), - dict(cid=cid.send)) - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, b'spam') - self.assertEqual(out.strip(), 'send') - - def test_run_string_arg_resolved(self): - cid = interpreters.channel_create() - cid = interpreters._channel_id(cid, _resolve=True) - interp = interpreters.create() - - out = _run_output(interp, dedent(""" - import _xxsubinterpreters as _interpreters - print(chan.end) - _interpreters.channel_send(chan, b'spam') - #print(chan.id.end) - #_interpreters.channel_send(chan.id, b'spam') - """), - dict(chan=cid.send)) - obj = interpreters.channel_recv(cid) - - self.assertEqual(obj, b'spam') - self.assertEqual(out.strip(), 'send') - if __name__ == '__main__': unittest.main() From 6a5d1fcd3adb62643ec69f20d0ffd4a9573eb2b0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Thu, 22 Mar 2018 19:45:09 -0700 Subject: [PATCH 05/18] Fix function signatures. --- Modules/_xxsubinterpretersmodule.c | 157 +++++++++++++++++++---------- 1 file changed, 102 insertions(+), 55 deletions(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 88c2b655079aad..be75f4901d863c 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -2155,10 +2155,13 @@ Create a new interpreter and return a unique generated ID."); static PyObject * -interp_destroy(PyObject *self, PyObject *args) +interp_destroy(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"id", NULL}; PyObject *id; - if (!PyArg_UnpackTuple(args, "destroy", 1, 1, &id)) { + // XXX Use "L" for id? + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:destroy", kwlist, &id)) { return NULL; } if (!PyLong_Check(id)) { @@ -2202,7 +2205,7 @@ interp_destroy(PyObject *self, PyObject *args) } PyDoc_STRVAR(destroy_doc, -"destroy(ID)\n\ +"destroy(id)\n\ \n\ Destroy the identified interpreter.\n\ \n\ @@ -2278,22 +2281,20 @@ Return the ID of main interpreter."); static PyObject * -interp_run_string(PyObject *self, PyObject *args) +interp_run_string(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"id", "script", "shared", NULL}; PyObject *id, *code; PyObject *shared = NULL; - if (!PyArg_UnpackTuple(args, "run_string", 2, 3, &id, &code, &shared)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OU|O:run_string", kwlist, + &id, &code, &shared)) { return NULL; } if (!PyLong_Check(id)) { PyErr_SetString(PyExc_TypeError, "first arg (ID) must be an int"); return NULL; } - if (!PyUnicode_Check(code)) { - PyErr_SetString(PyExc_TypeError, - "second arg (code) must be a string"); - return NULL; - } // Look up the interpreter. PyInterpreterState *interp = _look_up(id); @@ -2321,7 +2322,7 @@ interp_run_string(PyObject *self, PyObject *args) } PyDoc_STRVAR(run_string_doc, -"run_string(ID, sourcetext)\n\ +"run_string(id, script, shared)\n\ \n\ Execute the provided string in the identified interpreter.\n\ \n\ @@ -2329,12 +2330,15 @@ See PyRun_SimpleStrings."); static PyObject * -object_is_shareable(PyObject *self, PyObject *args) +object_is_shareable(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"obj", NULL}; PyObject *obj; - if (!PyArg_UnpackTuple(args, "is_shareable", 1, 1, &obj)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:is_shareable", kwlist, &obj)) { return NULL; } + if (_PyObject_CheckCrossInterpreterData(obj) == 0) { Py_RETURN_TRUE; } @@ -2350,10 +2354,12 @@ False otherwise."); static PyObject * -interp_is_running(PyObject *self, PyObject *args) +interp_is_running(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"id", NULL}; PyObject *id; - if (!PyArg_UnpackTuple(args, "is_running", 1, 1, &id)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:is_running", kwlist, &id)) { return NULL; } if (!PyLong_Check(id)) { @@ -2400,15 +2406,17 @@ channel_create(PyObject *self, PyObject *Py_UNUSED(ignored)) } PyDoc_STRVAR(channel_create_doc, -"channel_create() -> ID\n\ +"channel_create() -> cid\n\ \n\ Create a new cross-interpreter channel and return a unique generated ID."); static PyObject * -channel_destroy(PyObject *self, PyObject *args) +channel_destroy(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"cid", NULL}; PyObject *id; - if (!PyArg_UnpackTuple(args, "channel_destroy", 1, 1, &id)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:channel_destroy", kwlist, &id)) { return NULL; } int64_t cid = _coerce_id(id); @@ -2423,7 +2431,7 @@ channel_destroy(PyObject *self, PyObject *args) } PyDoc_STRVAR(channel_destroy_doc, -"channel_destroy(ID)\n\ +"channel_destroy(cid)\n\ \n\ Close and finalize the channel. Afterward attempts to use the channel\n\ will behave as though it never existed."); @@ -2461,16 +2469,18 @@ channel_list_all(PyObject *self, PyObject *Py_UNUSED(ignored)) } PyDoc_STRVAR(channel_list_all_doc, -"channel_list_all() -> [ID]\n\ +"channel_list_all() -> [cid]\n\ \n\ Return the list of all IDs for active channels."); static PyObject * -channel_send(PyObject *self, PyObject *args) +channel_send(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"cid", "obj", NULL}; PyObject *id; PyObject *obj; - if (!PyArg_UnpackTuple(args, "channel_send", 2, 2, &id, &obj)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "OO:channel_send", kwlist, &id, &obj)) { return NULL; } int64_t cid = _coerce_id(id); @@ -2485,15 +2495,17 @@ channel_send(PyObject *self, PyObject *args) } PyDoc_STRVAR(channel_send_doc, -"channel_send(ID, obj)\n\ +"channel_send(cid, obj)\n\ \n\ Add the object's data to the channel's queue."); static PyObject * -channel_recv(PyObject *self, PyObject *args) +channel_recv(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"cid", NULL}; PyObject *id; - if (!PyArg_UnpackTuple(args, "channel_recv", 1, 1, &id)) { + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O:channel_recv", kwlist, &id)) { return NULL; } int64_t cid = _coerce_id(id); @@ -2505,17 +2517,34 @@ channel_recv(PyObject *self, PyObject *args) } PyDoc_STRVAR(channel_recv_doc, -"channel_recv(ID) -> obj\n\ +"channel_recv(cid) -> obj\n\ \n\ Return a new object from the data at the from of the channel's queue."); static PyObject * -channel_close(PyObject *self, PyObject *id) +channel_close(PyObject *self, PyObject *args, PyObject *kwds) { + static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; + PyObject *id; + int send = 0; + int recv = 0; + int force = 0; + if (!PyArg_ParseTupleAndKeywords(args, kwds, + "O|$ppp:channel_release", kwlist, + &id, &send, &recv, &force)) { + return NULL; + } int64_t cid = _coerce_id(id); if (cid < 0) { return NULL; } + if (send == 0 && recv == 0) { + send = 1; + recv = 1; + } + + // XXX Handle the ends. + // XXX Handle force is True. if (_channel_close(&_globals.channels, cid) != 0) { return NULL; @@ -2524,40 +2553,58 @@ channel_close(PyObject *self, PyObject *id) } PyDoc_STRVAR(channel_close_doc, -"channel_close(ID)\n\ +"channel_close(cid, *, send=None, recv=None, force=False)\n\ +\n\ +Close the channel for all interpreters.\n\ +\n\ +If the channel is empty then the keyword args are ignored and both\n\ +ends are immediately closed. Otherwise, if 'force' is True then\n\ +all queued items are released and both ends are immediately\n\ +closed.\n\ +\n\ +If the channel is not empty *and* 'force' is False then following\n\ +happens:\n\ +\n\ + * recv is True (regardless of send):\n\ + - raise ChannelNotEmptyError\n\ + * recv is None and send is None:\n\ + - raise ChannelNotEmptyError\n\ + * send is True and recv is not True:\n\ + - fully close the 'send' end\n\ + - close the 'recv' end to interpreters not already receiving\n\ + - fully close it once empty\n\ \n\ -Close the channel for all interpreters. Once the channel's ID has\n\ -no more ref counts the channel will be destroyed."); +Closing an already closed channel results in a ChannelClosedError.\n\ +\n\ +Once the channel's ID has no more ref counts in any interpreter\n\ +the channel will be destroyed."); static PyObject * channel_release(PyObject *self, PyObject *args, PyObject *kwds) { // Note that only the current interpreter is affected. - static char *kwlist[] = {"id", "send", "recv", NULL}; + static char *kwlist[] = {"cid", "send", "recv", "force", NULL}; PyObject *id; - int send = -1; - int recv = -1; + int send = 0; + int recv = 0; + int force = 0; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O|$pp:channel_release", kwlist, - &id, &send, &recv)) + "O|$ppp:channel_release", kwlist, + &id, &send, &recv, &force)) { return NULL; - + } int64_t cid = _coerce_id(id); if (cid < 0) { return NULL; } - if (send < 0 && recv < 0) { + if (send == 0 && recv == 0) { send = 1; recv = 1; } - else { - if (send < 0) { - send = 0; - } - if (recv < 0) { - recv = 0; - } - } + + // XXX Handle force is True. + // XXX Fix implicit release. + if (_channel_drop(&_globals.channels, cid, send, recv) != 0) { return NULL; } @@ -2565,7 +2612,7 @@ channel_release(PyObject *self, PyObject *args, PyObject *kwds) } PyDoc_STRVAR(channel_release_doc, -"channel_release(ID, *, send=None, recv=None)\n\ +"channel_release(cid, *, send=None, recv=None, force=True)\n\ \n\ Close the channel for the current interpreter. 'send' and 'recv'\n\ (bool) may be used to indicate the ends to close. By default both\n\ @@ -2581,7 +2628,7 @@ static PyMethodDef module_functions[] = { {"create", (PyCFunction)interp_create, METH_VARARGS, create_doc}, {"destroy", (PyCFunction)interp_destroy, - METH_VARARGS, destroy_doc}, + METH_VARARGS | METH_KEYWORDS, destroy_doc}, {"list_all", interp_list_all, METH_NOARGS, list_all_doc}, {"get_current", interp_get_current, @@ -2589,25 +2636,25 @@ static PyMethodDef module_functions[] = { {"get_main", interp_get_main, METH_NOARGS, get_main_doc}, {"is_running", (PyCFunction)interp_is_running, - METH_VARARGS, is_running_doc}, + METH_VARARGS | METH_KEYWORDS, is_running_doc}, {"run_string", (PyCFunction)interp_run_string, - METH_VARARGS, run_string_doc}, + METH_VARARGS | METH_KEYWORDS, run_string_doc}, {"is_shareable", (PyCFunction)object_is_shareable, - METH_VARARGS, is_shareable_doc}, + METH_VARARGS | METH_KEYWORDS, is_shareable_doc}, {"channel_create", channel_create, METH_NOARGS, channel_create_doc}, {"channel_destroy", (PyCFunction)channel_destroy, - METH_VARARGS, channel_destroy_doc}, + METH_VARARGS | METH_KEYWORDS, channel_destroy_doc}, {"channel_list_all", channel_list_all, METH_NOARGS, channel_list_all_doc}, {"channel_send", (PyCFunction)channel_send, - METH_VARARGS, channel_send_doc}, + METH_VARARGS | METH_KEYWORDS, channel_send_doc}, {"channel_recv", (PyCFunction)channel_recv, - METH_VARARGS, channel_recv_doc}, - {"channel_close", channel_close, - METH_O, channel_close_doc}, + METH_VARARGS | METH_KEYWORDS, channel_recv_doc}, + {"channel_close", (PyCFunction)channel_close, + METH_VARARGS | METH_KEYWORDS, channel_close_doc}, {"channel_release", (PyCFunction)channel_release, METH_VARARGS | METH_KEYWORDS, channel_release_doc}, {"_channel_id", (PyCFunction)channel__channel_id, From c34ff9070d68deacfead36e233d357283fbb7865 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 12:50:25 -0600 Subject: [PATCH 06/18] Add InterpreterID/ChannelID.__str__(). --- Lib/test/test__xxsubinterpreters.py | 56 ++++++++++++++++------------- Modules/_xxsubinterpretersmodule.c | 18 ++++++++-- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index cf5ce1bd74e11b..7206cac84d784c 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -162,7 +162,7 @@ def test_subinterpreter(self): interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(int(_interpreters.get_current())) + print(_interpreters.get_current()) """)) cur = int(out.strip()) _, expected = interpreters.list_all() @@ -182,7 +182,7 @@ def test_from_subinterpreter(self): interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(int(_interpreters.get_main())) + print(_interpreters.get_main()) """)) main = int(out.strip()) self.assertEqual(main, expected) @@ -206,7 +206,7 @@ def test_from_subinterpreter(self): interp = interpreters.create() out = _run_output(interp, dedent(f""" import _xxsubinterpreters as _interpreters - if _interpreters.is_running({int(interp)}): + if _interpreters.is_running({interp}): print(True) else: print(False) @@ -266,6 +266,10 @@ def test_does_not_exist(self): with self.assertRaises(RuntimeError): interpreters.InterpreterID(int(id) + 1) # unforced + def test_str(self): + id = interpreters.InterpreterID(10, force=True) + self.assertEqual(str(id), '10') + def test_repr(self): id = interpreters.InterpreterID(10, force=True) self.assertEqual(repr(id), 'InterpreterID(10)') @@ -323,7 +327,7 @@ def test_in_subinterpreter(self): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters id = _interpreters.create() - print(int(id)) + print(id) """)) id2 = int(out.strip()) @@ -338,7 +342,7 @@ def f(): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters id = _interpreters.create() - print(int(id)) + print(id) """)) id2 = int(out.strip()) @@ -432,7 +436,7 @@ def test_from_current(self): script = dedent(f""" import _xxsubinterpreters as _interpreters try: - _interpreters.destroy({int(id)}) + _interpreters.destroy({id}) except RuntimeError: pass """) @@ -446,7 +450,7 @@ def test_from_sibling(self): id2 = interpreters.create() script = dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.destroy({int(id2)}) + _interpreters.destroy({id2}) """) interpreters.run_string(id1, script) @@ -854,6 +858,10 @@ def test_does_not_exist(self): with self.assertRaises(interpreters.ChannelNotFoundError): interpreters._channel_id(int(cid) + 1) # unforced + def test_str(self): + cid = interpreters._channel_id(10, force=True) + self.assertEqual(str(cid), '10') + def test_repr(self): cid = interpreters._channel_id(10, force=True) self.assertEqual(repr(cid), 'ChannelID(10)') @@ -900,7 +908,7 @@ def test_ids_global(self): out = _run_output(id1, dedent(""" import _xxsubinterpreters as _interpreters cid = _interpreters.channel_create() - print(int(cid)) + print(cid) """)) cid1 = int(out.strip()) @@ -908,7 +916,7 @@ def test_ids_global(self): out = _run_output(id2, dedent(""" import _xxsubinterpreters as _interpreters cid = _interpreters.channel_create() - print(int(cid)) + print(cid) """)) cid2 = int(out.strip()) @@ -942,7 +950,7 @@ def test_send_recv_different_interpreters(self): id1 = interpreters.create() out = _run_output(id1, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) obj = interpreters.channel_recv(cid) @@ -980,12 +988,12 @@ def f(): import _xxsubinterpreters as _interpreters while True: try: - obj = _interpreters.channel_recv({int(cid)}) + obj = _interpreters.channel_recv({cid}) break except _interpreters.ChannelEmptyError: time.sleep(0.1) assert(obj == b'spam') - _interpreters.channel_send({int(cid)}, b'eggs') + _interpreters.channel_send({cid}, b'eggs') """)) t = threading.Thread(target=f) t.start() @@ -1103,16 +1111,16 @@ def test_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) out = _run_output(id2, dedent(f""" import _xxsubinterpreters as _interpreters - obj = _interpreters.channel_recv({int(cid)}) - _interpreters.channel_release({int(cid)}) + obj = _interpreters.channel_recv({cid}) + _interpreters.channel_release({cid}) print(repr(obj)) """)) interpreters.run_string(id1, dedent(f""" - _interpreters.channel_release({int(cid)}) + _interpreters.channel_release({cid}) """)) self.assertEqual(out.strip(), "b'spam'") @@ -1161,7 +1169,7 @@ def test_by_unassociated_interp(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_release({int(cid)}) + _interpreters.channel_release({cid}) """)) obj = interpreters.channel_recv(cid) interpreters.channel_release(cid) @@ -1175,8 +1183,8 @@ def test_close_if_unassociated(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - obj = _interpreters.channel_send({int(cid)}, b'spam') - _interpreters.channel_release({int(cid)}) + obj = _interpreters.channel_send({cid}, b'spam') + _interpreters.channel_release({cid}) """)) with self.assertRaises(interpreters.ChannelClosedError): @@ -1264,21 +1272,21 @@ def test_multiple_users(self): id2 = interpreters.create() interpreters.run_string(id1, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) interpreters.run_string(id2, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_recv({int(cid)}) + _interpreters.channel_recv({cid}) """)) interpreters.channel_close(cid) with self.assertRaises(interpreters.RunFailedError) as cm: interpreters.run_string(id1, dedent(f""" - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) self.assertIn('ChannelClosedError', str(cm.exception)) with self.assertRaises(interpreters.RunFailedError) as cm: interpreters.run_string(id2, dedent(f""" - _interpreters.channel_send({int(cid)}, b'spam') + _interpreters.channel_send({cid}, b'spam') """)) self.assertIn('ChannelClosedError', str(cm.exception)) @@ -1315,7 +1323,7 @@ def test_by_unassociated_interp(self): interp = interpreters.create() interpreters.run_string(interp, dedent(f""" import _xxsubinterpreters as _interpreters - _interpreters.channel_close({int(cid)}) + _interpreters.channel_close({cid}) """)) with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index be75f4901d863c..b52328db1ac3c9 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -1413,6 +1413,13 @@ channelid_repr(PyObject *self) return PyUnicode_FromFormat(fmt, name, cid->id); } +static PyObject * +channelid_str(PyObject *self) +{ + channelid *cid = (channelid *)self; + return PyUnicode_FromFormat("%d", cid->id); +} + PyObject * channelid_int(PyObject *self) { @@ -1637,7 +1644,7 @@ static PyTypeObject ChannelIDtype = { 0, /* tp_as_mapping */ channelid_hash, /* tp_hash */ 0, /* tp_call */ - 0, /* tp_str */ + (reprfunc)channelid_str, /* tp_str */ 0, /* tp_getattro */ 0, /* tp_setattro */ 0, /* tp_as_buffer */ @@ -1918,6 +1925,13 @@ interpid_repr(PyObject *self) return PyUnicode_FromFormat("%s(%d)", name, id->id); } +static PyObject * +interpid_str(PyObject *self) +{ + interpid *id = (interpid *)self; + return PyUnicode_FromFormat("%d", id->id); +} + PyObject * interpid_int(PyObject *self) { @@ -2039,7 +2053,7 @@ static PyTypeObject InterpreterIDtype = { 0, /* tp_as_mapping */ interpid_hash, /* tp_hash */ 0, /* tp_call */ - 0, /* tp_str */ + (reprfunc)interpid_str, /* tp_str */ 0, /* tp_getattro */ 0, /* tp_setattro */ 0, /* tp_as_buffer */ From cbbc5627dcef9bc51694a0b92bee0193a92b7276 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 13:28:28 -0600 Subject: [PATCH 07/18] Return an InterpreterID from get_main(). --- Lib/test/test__xxsubinterpreters.py | 16 ++++++++++++++-- Modules/_xxsubinterpretersmodule.c | 3 ++- 2 files changed, 16 insertions(+), 3 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 7206cac84d784c..784f372914a6b2 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -156,13 +156,16 @@ def test_main(self): main = interpreters.get_main() cur = interpreters.get_current() self.assertEqual(cur, main) + self.assertIsInstance(cur, interpreters.InterpreterID) def test_subinterpreter(self): main = interpreters.get_main() interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(_interpreters.get_current()) + cur = _interpreters.get_current() + print(cur) + assert isinstance(cur, _interpreters.InterpreterID) """)) cur = int(out.strip()) _, expected = interpreters.list_all() @@ -176,13 +179,16 @@ def test_from_main(self): [expected] = interpreters.list_all() main = interpreters.get_main() self.assertEqual(main, expected) + self.assertIsInstance(main, interpreters.InterpreterID) def test_from_subinterpreter(self): [expected] = interpreters.list_all() interp = interpreters.create() out = _run_output(interp, dedent(""" import _xxsubinterpreters as _interpreters - print(_interpreters.get_main()) + main = _interpreters.get_main() + print(main) + assert isinstance(main, _interpreters.InterpreterID) """)) main = int(out.strip()) self.assertEqual(main, expected) @@ -293,6 +299,7 @@ class CreateTests(TestBase): def test_in_main(self): id = interpreters.create() + self.assertIsInstance(id, interpreters.InterpreterID) self.assertIn(id, interpreters.list_all()) @@ -328,6 +335,7 @@ def test_in_subinterpreter(self): import _xxsubinterpreters as _interpreters id = _interpreters.create() print(id) + assert isinstance(id, _interpreters.InterpreterID) """)) id2 = int(out.strip()) @@ -892,6 +900,10 @@ def test_equality(self): class ChannelTests(TestBase): + def test_create_cid(self): + cid = interpreters.channel_create() + self.assertIsInstance(cid, interpreters.ChannelID) + def test_sequential_ids(self): before = interpreters.channel_list_all() id1 = interpreters.channel_create() diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index b52328db1ac3c9..29b76f84ff96bc 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -2285,7 +2285,8 @@ static PyObject * interp_get_main(PyObject *self, PyObject *Py_UNUSED(ignored)) { // Currently, 0 is always the main interpreter. - return PyLong_FromLongLong(0); + PY_INT64_T id = 0; + return (PyObject *)newinterpid(&InterpreterIDtype, id, 0); } PyDoc_STRVAR(get_main_doc, From 49e80652bf1cf604b8b589823de91109b61f28a5 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 15:38:00 -0600 Subject: [PATCH 08/18] Add support for int. --- Lib/test/test__xxsubinterpreters.py | 44 ++++++++++++++++++++++++++++- Python/pystate.c | 28 ++++++++++++++++++ 2 files changed, 71 insertions(+), 1 deletion(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 784f372914a6b2..f0be6edccc11c3 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -83,6 +83,8 @@ def test_default_shareables(self): None, # builtin objects b'spam', + 10, + -10, ] for obj in shareables: with self.subTest(obj): @@ -110,7 +112,6 @@ class SubBytes(bytes): object, object(), Exception(), - 42, 100.0, 'spam', # user-defined types and objects @@ -124,6 +125,47 @@ class SubBytes(bytes): interpreters.is_shareable(obj)) +class ShareableTypeTests(unittest.TestCase): + + def setUp(self): + super().setUp() + self.cid = interpreters.channel_create() + + def tearDown(self): + interpreters.channel_destroy(self.cid) + super().tearDown() + + def _assert_values(self, values): + for obj in values: + with self.subTest(obj): + interpreters.channel_send(self.cid, obj) + got = interpreters.channel_recv(self.cid) + + self.assertEqual(got, obj) + self.assertIs(type(got), type(obj)) + # XXX Check the following in the channel tests? + #self.assertIsNot(got, obj) + + def test_singletons(self): + for obj in [None]: + with self.subTest(obj): + interpreters.channel_send(self.cid, obj) + got = interpreters.channel_recv(self.cid) + + # XXX What about between interpreters? + self.assertIs(got, obj) + + def test_types(self): + self._assert_values([ + b'spam', + 9999, + self.cid, + ]) + + def test_int(self): + self._assert_values(range(-1, 258)) + + ################################## # interpreter tests diff --git a/Python/pystate.c b/Python/pystate.c index 140d2fba8efd3d..8c4d0cf75fc671 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1348,6 +1348,29 @@ _bytes_shared(PyObject *obj, _PyCrossInterpreterData *data) return 0; } +static PyObject * +_new_long_object(_PyCrossInterpreterData *data) +{ + return PyLong_FromLongLong((int64_t)(data->data)); +} + +static int +_long_shared(PyObject *obj, _PyCrossInterpreterData *data) +{ + int64_t value = PyLong_AsLongLong(obj); + if (value == -1 && PyErr_Occurred()) { + if (PyErr_ExceptionMatches(PyExc_OverflowError)) { + PyErr_SetString(PyExc_OverflowError, "try sending as bytes"); + } + return -1; + } + data->data = (void *)value; + data->obj = NULL; + data->new_object = _new_long_object; + data->free = NULL; + return 0; +} + static PyObject * _new_none_object(_PyCrossInterpreterData *data) { @@ -1374,6 +1397,11 @@ _register_builtins_for_crossinterpreter_data(void) Py_FatalError("could not register None for cross-interpreter sharing"); } + // int + if (_register_xidata(&PyLong_Type, _long_shared) != 0) { + Py_FatalError("could not register int for cross-interpreter sharing"); + } + // bytes if (_register_xidata(&PyBytes_Type, _bytes_shared) != 0) { Py_FatalError("could not register bytes for cross-interpreter sharing"); From cb41fa51ba99e67d6a60107966bc3517affc3414 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 16:08:55 -0600 Subject: [PATCH 09/18] Correctly handle \x00 when sharing bytes. --- Lib/test/test__xxsubinterpreters.py | 4 ++++ Python/pystate.c | 16 +++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index f0be6edccc11c3..77c1f3b2ef15a1 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -162,6 +162,10 @@ def test_types(self): self.cid, ]) + def test_bytes(self): + self._assert_values(i.to_bytes(2, 'little', signed=True) + for i in range(-1, 258)) + def test_int(self): self._assert_values(range(-1, 258)) diff --git a/Python/pystate.c b/Python/pystate.c index 8c4d0cf75fc671..151cbd61f2dbd2 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1332,19 +1332,29 @@ _PyCrossInterpreterData_Lookup(PyObject *obj) /* cross-interpreter data for builtin types */ +struct _shared_bytes_data { + char *bytes; + Py_ssize_t len; +}; + static PyObject * _new_bytes_object(_PyCrossInterpreterData *data) { - return PyBytes_FromString((char *)(data->data)); + struct _shared_bytes_data *shared = (struct _shared_bytes_data *)(data->data); + return PyBytes_FromStringAndSize(shared->bytes, shared->len); } static int _bytes_shared(PyObject *obj, _PyCrossInterpreterData *data) { - data->data = (void *)(PyBytes_AS_STRING(obj)); + struct _shared_bytes_data *shared = PyMem_NEW(struct _shared_bytes_data, 1); + if (PyBytes_AsStringAndSize(obj, &shared->bytes, &shared->len) < 0) { + return -1; + } + data->data = (void *)shared; data->obj = obj; // Will be "released" (decref'ed) when data released. data->new_object = _new_bytes_object; - data->free = NULL; // Do not free the data (it belongs to the object). + data->free = PyMem_Free; return 0; } From 13e6b7afc8bd90613d6d5cfaa0ea9d9d92a93d93 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 16:39:32 -0600 Subject: [PATCH 10/18] Test channel_close() across a broad spectrum of cases. --- Lib/test/test__xxsubinterpreters.py | 587 +++++++++++++++++++++++++++- 1 file changed, 574 insertions(+), 13 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 77c1f3b2ef15a1..3def4440f477e2 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1,6 +1,9 @@ +from collections import namedtuple import contextlib +import itertools import os import pickle +import sys from textwrap import dedent, indent import threading import time @@ -15,6 +18,12 @@ ################################## # helpers +def powerset(*sets): + return itertools.chain.from_iterable( + combinations(sets, r) + for r in range(len(sets)+1)) + + def _captured_script(script): r, w = os.pipe() indented = script.replace('\n', '\n ') @@ -54,22 +63,233 @@ def run(): t.join() +#@contextmanager +#def run_threaded(id, source, **shared): +# def run(): +# run_interp(id, source, **shared) +# t = threading.Thread(target=run) +# t.start() +# yield +# t.join() + + +def run_interp(id, source, **shared): + _run_interp(id, source, shared) + + +def _run_interp(id, source, shared, _mainns={}): + source = dedent(source) + main = interpreters.get_main() + if main == id: + if interpreters.get_current() != main: + raise RuntimeError + # XXX Run a func? + exec(source, _mainns) + else: + interpreters.run_string(id, source, shared) + + +def run_interp_threaded(id, source, **shared): + def run(): + _run(id, source, shared) + t = threading.Thread(target=run) + t.start() + t.join() + + +class Interpreter(namedtuple('Interpreter', 'name id')): + + @classmethod + def from_raw(cls, raw): + if isinstance(raw, cls): + return raw + elif isinstance(raw, str): + return cls(raw) + else: + raise NotImplementedError + + def __new__(cls, name=None, id=None): + main = interpreters.get_main() + if id == main: + if not name: + name = 'main' + elif name != 'main': + raise ValueError( + 'name mismatch (expected "main", got "{}")'.format(name)) + id = main + elif id is not None: + if not name: + name = 'interp' + elif name == 'main': + raise ValueError('name mismatch (unexpected "main")') + if not isinstance(id, interpreters.InterpreterID): + id = interpreters.InterpreterID(id) + elif not name or name == 'main': + name = 'main' + id = main + else: + id = interpreters.create() + self = super().__new__(cls, name, id) + return self + + +# XXX expect_channel_closed() is unnecessary once we improve exc propagation. + +@contextlib.contextmanager +def expect_channel_closed(): + try: + yield + except interpreters.ChannelClosedError: + pass + else: + assert False, 'channel not closed' + + +class ChannelAction(namedtuple('ChannelAction', 'action end interp')): + + def __new__(cls, action, end=None, interp=None): + if not end: + end = 'both' + if not interp: + interp = 'main' + self = super().__new__(cls, action, end, interp) + return self + + def __init__(self, *args, **kwargs): + if self.action == 'use': + if self.end not in ('same', 'opposite', 'send', 'recv'): + raise ValueError(self.end) + elif self.action in ('close', 'force-close'): + if self.end not in ('both', 'same', 'opposite', 'send', 'recv'): + raise ValueError(self.end) + else: + raise ValueError(self.action) + if self.interp not in ('main', 'same', 'other'): + raise ValueError(self.interp) + + def resolve_end(self, end): + if self.end == 'same': + return end + elif self.end == 'opposite': + return 'recv' if end == 'send' else 'send' + else: + return self.end + + def resolve_interp(self, interp, other): + if self.interp == 'same': + return interp + elif self.interp == 'other': + if other is None: + raise RuntimeError + return other + elif self.interp == 'main': + if interp.name == 'main': + return interp + elif other and other.name == 'main': + return other + else: + raise RuntimeError + # Per __init__(), there aren't any others. + + +class ChannelState(namedtuple('ChannelState', 'pending closed')): + + def __new__(cls, pending=0, *, closed=False): + self = super().__new__(cls, pending, closed) + return self + + def incr(self): + return type(self)(self.pending + 1, closed=self.closed) + + def decr(self): + return type(self)(self.pending - 1, closed=self.closed) + + def close(self, *, force=True): + if self.closed: + if not force or self.pending == 0: + return self + return type(self)(0 if force else self.pending, closed=True) + + +def run_action(cid, action, end, state, *, hideclosed=True): + if state.closed: + if action == 'use' and end == 'recv' and state.pending: + expectfail = False + else: + expectfail = True + else: + expectfail = False + + try: + result = _run_action(cid, action, end, state) + except interpreters.ChannelClosedError: + if not hideclosed and not expectfail: + raise + result = state.close() + else: + if expectfail: + raise ... # XXX + return result + + +def _run_action(cid, action, end, state): + if action == 'use': + if end == 'send': + interpreters.channel_send(cid, b'spam') + return state.incr() + elif end == 'recv': + if not state.pending: + try: + interpreters.channel_recv(cid) + except interpreters.ChannelEmptyError: + return state + else: + raise Exception('expected ChannelEmptyError') + else: + interpreters.channel_recv(cid) + return state.decr() + else: + raise ValueError(end) + elif action == 'close': + if end == 'boyh': + interpreters.channel_close(cid) + else: + interpreters.channel_close(cid) + #interpreters.channel_close(cid, end) + return state.close() + elif action == 'force-close': + if end == 'both': + interpreters.channel_close(cid, force=True) + else: + interpreters.channel_close(cid, end, force=True) + return state.close(force=True) + else: + raise ValueError(action) + + +def clean_up_interpreters(): + for id in interpreters.list_all(): + if id == 0: # main + continue + try: + interpreters.destroy(id) + except RuntimeError: + pass # already destroyed + + +def clean_up_channels(): + for cid in interpreters.channel_list_all(): + try: + interpreters.channel_destroy(cid) + except interpreters.ChannelNotFoundError: + pass # already destroyed + + class TestBase(unittest.TestCase): def tearDown(self): - for id in interpreters.list_all(): - if id == 0: # main - continue - try: - interpreters.destroy(id) - except RuntimeError: - pass # already destroyed - - for cid in interpreters.channel_list_all(): - try: - interpreters.channel_destroy(cid) - except interpreters.ChannelNotFoundError: - pass # already destroyed + clean_up_interpreters() + clean_up_channels() ################################## @@ -1273,6 +1493,111 @@ def test_used_multiple_times_by_single_user(self): interpreters.channel_recv(cid) +class ChannelCloseFixture(namedtuple('ChannelCloseFixture', + 'end interp other extra creator')): + + def __new__(cls, end, interp, other, extra, creator): + assert end in ('send', 'recv') + interp = Interpreter.from_raw(interp) + other = Interpreter.from_raw(other) + extra = Interpreter.from_raw(extra) + if not creator: + creator = 'same' + self = super().__new__(cls, end, interp, other, extra, creator) + self._state = ChannelState() + self._prepped = set() + self._interps = { + interp.name: interp, + other.name: other, + extra.name: extra, + } + return self + + @property + def state(self): + return self._state + + @property + def cid(self): + try: + return self._cid + except AttributeError: + creator = self._get_interpreter(self.creator) + self._cid = self._new_channel(creator) + return self._cid + + def get_interpreter(self, interp): + interp = self._get_interpreter(interp) + self._prep_interpreter(interp) + return interp + + def expect_closed_error(self, end=None): + if end is None: + end = self.end + if end == 'recv' and self.state.closed == 'send': + return False + return bool(self.state.closed) + + def prep_interpreter(self, interp): + self._prep_interpreter(interp) + + def record_action(self, action, result): + self._state = result + + def clean_up(self): + clean_up_interpreters() + clean_up_channels() + + # internal methods + + def _new_channel(self, creator): + if creator.name == 'main': + return interpreters.channel_create() + else: + ch = interpreters.channel_create() + run_interp(creator.id, f""" + import _xxsubinterpreters + cid = _xxsubinterpreters.channel_create() + # We purposefully send back an int to avoid tying the + # channel to the other interpreter. + _xxsubinterpreters.channel_send({ch}, int(cid)) + del _xxsubinterpreters + """) + self._cid = interpreters.channel_recv(ch) + return self._cid + + def _get_interpreter(self, interp): + if interp in ('same', 'interp'): + return self.interp + elif interp == 'other': + return self.other + elif interp == 'extra': + return self.extra + else: + name = interp + try: + interp = self._interps[name] + except KeyError: + interp = self._interps[name] = Interpreter(name) + return interp + + def _prep_interpreter(self, interp): + if interp.id in self._prepped: + return + self._prepped.add(interp.id) + if interp.name == 'main': + return + run_interp(interp.id, f""" + import _xxsubinterpreters as interpreters + import test.test__xxsubinterpreters as helpers + ChannelState = helpers.ChannelState + try: + cid + except NameError: + cid = interpreters._channel_id({self.cid}) + """) + + class ChannelCloseTests(TestBase): """ @@ -1287,6 +1612,8 @@ class ChannelCloseTests(TestBase): - creator (interp) / other - associated interpreter not running - associated interpreter destroyed + + - close after unbound """ """ @@ -1313,6 +1640,240 @@ class ChannelCloseTests(TestBase): check closed: send/recv for same/other(incl. interp2) """ + def iter_action_sets(self): + # - used / not used (associated / not associated) + # - empty / emptied / never emptied / partly emptied + # - closed / not closed + # - released / not released + + yield [] + yield [ + ChannelAction('use', 'recv', 'same'), + ] + yield [ + ChannelAction('use', 'send', 'same'), + ] + yield [ + ChannelAction('use', 'recv', 'same'), + ChannelAction('use', 'send', 'same'), + ] + +# interp: None, end, opposite, both +# other: None, end, opposite, both +# +# ends = ['recv', 'send'] +# for interpreters in powerset(interpreters): +# +# actions = [] +# for interp in interpreters: +# actions.append( +# ChannelAction('use', end, interp)) +# yield actions +# +# +# if other is None: +# for interp_ends in powerset(ends): +# actions = [] +# for end_ in interp_ends: +# action = ChannelAction('use', end_, 'same') +# actions.append(action) +# +# yield actions +# +# interpreters = ['same'] +# if other is not None: +# interpreters.append('other') +# interpreter_sets = powerset(interpreters) +# +# for interpreters in powerset(interpreters): +# for other_ends in powerset(ends if other else []): +# for interp_ends in powerset(ends): +# actions = [] +# for interp_ in interpreters: +# if interp_ +# +# +# for interp_ends in end_sets: +# actions = [] +# for end_ in interp_ends: +# action = ChannelAction('use', end_, 'same') +# actions.append(action) +# if other is None: +# yield actions +# +# for interp_ends in end_sets: +# +# for other_ends in end_sets: +# +# for interp_ in ('same', 'other'): +# if interp_ == 'other' and other is None: +# continue +# for end_ in ('same', 'opposite'): +# yield ChannelAction(action, end_, interp_) +# +# ChannelAction('close', end, interp) +# +# # use +# # pre-close +# ... +# return () # XXX +# +# def _iter_use_action_sets(self, interp): +# for ends in powerset(['recv', 'send']): +# actions = [] +# for end in ends: +# actions.append( +# ChannelAction('use', end, interp)) +# yield actions +# +# def _iter_close_actions(self, interpreters): +# for end in ['recv', 'send']: +# for op in ['close', 'force-close']: +# for interp in interpreters: +# yield ChannelAction(op, end, interp) +# yield None + + def run_actions(self, fix, actions): + for action in actions: + self.run_action(fix, action) + + def run_action(self, fix, action, *, hideclosed=True): + end = action.resolve_end(fix.end) + interp = action.resolve_interp(fix.interp, fix.other) + fix.prep_interpreter(interp) + if interp.name == 'main': + result = run_action( + fix.cid, + action.action, + end, + fix.state, + hideclosed=hideclosed, + ) + fix.record_action(action, result) + else: + _cid = interpreters.channel_create() + run_interp(interp.id, f""" + result = helpers.run_action( + {fix.cid}, + {repr(action.action)}, + {repr(end)}, + {repr(fix.state)}, + hideclosed={hideclosed}, + ) + interpreters.channel_send({_cid}, result.pending.to_bytes(1, 'little')) + interpreters.channel_send({_cid}, b'X' if result.closed else b'') + """) + result = ChannelState( + pending=int.from_bytes(interpreters.channel_recv(_cid), 'little'), + closed=bool(interpreters.channel_recv(_cid)), + ) + fix.record_action(action, result) + + def iter_fixtures(self): + # XXX threads? + interpreters = [ + ('main', 'interp', 'extra'), + ('interp', 'main', 'extra'), + ('interp1', 'interp2', 'extra'), + ('interp1', 'interp2', 'main'), + ] + for interp, other, extra in interpreters: + for creator in ('same', 'other', 'creator'): + for end in ('send', 'recv'): + yield ChannelCloseFixture(end, interp, other, extra, creator) + + def _close(self, fix, *, force): + op = 'force-close' if force else 'close' + close = ChannelAction(op, fix.end, 'same') + if not fix.expect_closed_error(): + self.run_action(fix, close, hideclosed=False) + else: + with self.assertRaises(interpreters.ChannelClosedError): + self.run_action(fix, close, hideclosed=False) + + def _assert_closed_in_interp(self, fix, interp=None): + if interp is None or interp.name == 'main': + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(fix.cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(fix.cid, b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(fix.cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(fix.cid, force=True) + else: + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_recv(cid) + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_send(cid, b'spam') + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_close(cid) + """) + run_interp(interp.id, f""" + with helpers.expect_channel_closed(): + interpreters.channel_close(cid, force=True) + """) + + def _assert_closed(self, fix): + self.assertTrue(fix.state.closed) + + for _ in range(fix.state.pending): + interpreters.channel_recv(fix.cid) + self._assert_closed_in_interp(fix) + + for interp in ('same', 'other'): + interp = fix.get_interpreter(interp) + if interp.name == 'main': + continue + self._assert_closed_in_interp(fix, interp) + + interp = fix.get_interpreter('fresh') + self._assert_closed_in_interp(fix, interp) + + def test_exhaustive(self): + verbose = False + actions = [ + ChannelAction('use', 'same', 'same'), + ] + i = 0 + for actions in self.iter_action_sets(): + print() + for fix in self.iter_fixtures(): + i += 1 + if verbose: + print(i, fix) + else: + if (i - 1) % 6 == 0: + print(' ', end='') + print('.', end=''); sys.stdout.flush() + with self.subTest('{} {}'.format(i, fix)): + fix.prep_interpreter(fix.interp) + self.run_actions(fix, actions) + + self._close(fix, force=False) + + self._assert_closed(fix) + # XXX Things slow down if we have too many interpreters. + fix.clean_up() + print() + +# def test_exhaustive_force(self): +# actions = [] +# for fix in self.iter_fixtures(): +# with self.subTest(fix): +# self.run_actions(fix, actions) +# +# self._close(fix, force=True) +# +# self._assert_closed(fix) + + # focused tests + def test_single_user(self): cid = interpreters.channel_create() interpreters.channel_send(cid, b'spam') From 2ee8cb08fb1c23919d56a246d8f126d2ba31136e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 18:24:46 -0600 Subject: [PATCH 11/18] Fix a typo. --- Modules/_xxsubinterpretersmodule.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index 29b76f84ff96bc..a456ec5551e90b 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -2545,7 +2545,7 @@ channel_close(PyObject *self, PyObject *args, PyObject *kwds) int recv = 0; int force = 0; if (!PyArg_ParseTupleAndKeywords(args, kwds, - "O|$ppp:channel_release", kwlist, + "O|$ppp:channel_close", kwlist, &id, &send, &recv, &force)) { return NULL; } From ecb58347b57cb974b8c66979bb70761f1ddd7a49 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 18:27:42 -0600 Subject: [PATCH 12/18] Add an exhaustive set of permutated tests for close_channel(). --- Lib/test/test__xxsubinterpreters.py | 470 ++++++++++++++++------------ 1 file changed, 263 insertions(+), 207 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 3def4440f477e2..3c9cb6cdca5c68 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -164,7 +164,7 @@ def __init__(self, *args, **kwargs): raise ValueError(self.end) else: raise ValueError(self.action) - if self.interp not in ('main', 'same', 'other'): + if self.interp not in ('main', 'same', 'other', 'extra'): raise ValueError(self.interp) def resolve_end(self, end): @@ -175,13 +175,17 @@ def resolve_end(self, end): else: return self.end - def resolve_interp(self, interp, other): + def resolve_interp(self, interp, other, extra): if self.interp == 'same': return interp elif self.interp == 'other': if other is None: raise RuntimeError return other + elif self.interp == 'extra': + if extra is None: + raise RuntimeError + return extra elif self.interp == 'main': if interp.name == 'main': return interp @@ -1457,6 +1461,7 @@ def test_by_unassociated_interp(self): self.assertEqual(obj, b'spam') def test_close_if_unassociated(self): + # XXX Something's not right with this test... cid = interpreters.channel_create() interp = interpreters.create() interpreters.run_string(interp, dedent(f""" @@ -1492,25 +1497,123 @@ def test_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) + # close + + def test_close_single_user(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_multiple_users(self): + cid = interpreters.channel_create() + id1 = interpreters.create() + id2 = interpreters.create() + interpreters.run_string(id1, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_send({cid}, b'spam') + """)) + interpreters.run_string(id2, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_recv({cid}) + """)) + interpreters.channel_close(cid) + with self.assertRaises(interpreters.RunFailedError) as cm: + interpreters.run_string(id1, dedent(f""" + _interpreters.channel_send({cid}, b'spam') + """)) + self.assertIn('ChannelClosedError', str(cm.exception)) + with self.assertRaises(interpreters.RunFailedError) as cm: + interpreters.run_string(id2, dedent(f""" + _interpreters.channel_send({cid}, b'spam') + """)) + self.assertIn('ChannelClosedError', str(cm.exception)) + + def test_close_multiple_times(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(cid) + + def test_close_with_unused_items(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'ham') + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_never_used(self): + cid = interpreters.channel_create() + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_by_unassociated_interp(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interp = interpreters.create() + interpreters.run_string(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_close({cid}) + """)) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(cid) + + def test_close_used_multiple_times_by_single_user(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + class ChannelCloseFixture(namedtuple('ChannelCloseFixture', 'end interp other extra creator')): + # Set this to True to avoid creating interpreters, e.g. when + # scanning through test permutations without running them. + QUICK = False + def __new__(cls, end, interp, other, extra, creator): assert end in ('send', 'recv') - interp = Interpreter.from_raw(interp) - other = Interpreter.from_raw(other) - extra = Interpreter.from_raw(extra) + if cls.QUICK: + known = {} + else: + interp = Interpreter.from_raw(interp) + other = Interpreter.from_raw(other) + extra = Interpreter.from_raw(extra) + known = { + interp.name: interp, + other.name: other, + extra.name: extra, + } if not creator: creator = 'same' self = super().__new__(cls, end, interp, other, extra, creator) - self._state = ChannelState() self._prepped = set() - self._interps = { - interp.name: interp, - other.name: other, - extra.name: extra, - } + self._state = ChannelState() + self._known = known return self @property @@ -1576,9 +1679,9 @@ def _get_interpreter(self, interp): else: name = interp try: - interp = self._interps[name] + interp = self._known[name] except KeyError: - interp = self._interps[name] = Interpreter(name) + interp = self._known[name] = Interpreter(name) return interp def _prep_interpreter(self, interp): @@ -1598,7 +1701,7 @@ def _prep_interpreter(self, interp): """) -class ChannelCloseTests(TestBase): +class ExhaustiveChannelTests(TestBase): """ - main / interp / other @@ -1626,17 +1729,17 @@ class ChannelCloseTests(TestBase): """ close in: main, interp1 - creator: same, other (incl. interp2) + creator: same, other, extra - use: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all - pre-close: None,send,recv in None,same,other(incl. interp2),same+other(incl. interp2),all - pre-close forced: None,send,recv in None,same,other(incl. interp2),same+other(incl. interp2),all + use: None,send,recv,send/recv in None,same,other,same+other,all + pre-close: None,send,recv in None,same,other,same+other,all + pre-close forced: None,send,recv in None,same,other,same+other,all close: same close forced: same - use after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all - close after: None,send,recv,send/recv in None,same,other(incl. interp2),same+other(incl. interp2),all + use after: None,send,recv,send/recv in None,same,other,extra,same+other,all + close after: None,send,recv,send/recv in None,same,other,extra,same+other,all check closed: send/recv for same/other(incl. interp2) """ @@ -1646,92 +1749,121 @@ def iter_action_sets(self): # - closed / not closed # - released / not released + # never used yield [] + + # only pre-closed (and possible used after) + for closeactions in self._iter_close_action_sets('same', 'other'): + yield closeactions + for postactions in self._iter_post_close_action_sets(): + yield closeactions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + yield closeactions + for postactions in self._iter_post_close_action_sets(): + yield closeactions + postactions + + # used + for useactions in self._iter_use_action_sets('same', 'other'): + yield useactions + for closeactions in self._iter_close_action_sets('same', 'other'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for useactions in self._iter_use_action_sets('other', 'extra'): + yield useactions + for closeactions in self._iter_close_action_sets('same', 'other'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + for closeactions in self._iter_close_action_sets('other', 'extra'): + actions = useactions + closeactions + yield actions + for postactions in self._iter_post_close_action_sets(): + yield actions + postactions + + def _iter_use_action_sets(self, interp1, interp2): + interps = (interp1, interp2) + + # only recv end used yield [ - ChannelAction('use', 'recv', 'same'), + ChannelAction('use', 'recv', interp1), ] yield [ - ChannelAction('use', 'send', 'same'), + ChannelAction('use', 'recv', interp2), ] yield [ - ChannelAction('use', 'recv', 'same'), - ChannelAction('use', 'send', 'same'), + ChannelAction('use', 'recv', interp1), + ChannelAction('use', 'recv', interp2), ] -# interp: None, end, opposite, both -# other: None, end, opposite, both -# -# ends = ['recv', 'send'] -# for interpreters in powerset(interpreters): -# -# actions = [] -# for interp in interpreters: -# actions.append( -# ChannelAction('use', end, interp)) -# yield actions -# -# -# if other is None: -# for interp_ends in powerset(ends): -# actions = [] -# for end_ in interp_ends: -# action = ChannelAction('use', end_, 'same') -# actions.append(action) -# -# yield actions -# -# interpreters = ['same'] -# if other is not None: -# interpreters.append('other') -# interpreter_sets = powerset(interpreters) -# -# for interpreters in powerset(interpreters): -# for other_ends in powerset(ends if other else []): -# for interp_ends in powerset(ends): -# actions = [] -# for interp_ in interpreters: -# if interp_ -# -# -# for interp_ends in end_sets: -# actions = [] -# for end_ in interp_ends: -# action = ChannelAction('use', end_, 'same') -# actions.append(action) -# if other is None: -# yield actions -# -# for interp_ends in end_sets: -# -# for other_ends in end_sets: -# -# for interp_ in ('same', 'other'): -# if interp_ == 'other' and other is None: -# continue -# for end_ in ('same', 'opposite'): -# yield ChannelAction(action, end_, interp_) -# -# ChannelAction('close', end, interp) -# -# # use -# # pre-close -# ... -# return () # XXX -# -# def _iter_use_action_sets(self, interp): -# for ends in powerset(['recv', 'send']): -# actions = [] -# for end in ends: -# actions.append( -# ChannelAction('use', end, interp)) -# yield actions -# -# def _iter_close_actions(self, interpreters): -# for end in ['recv', 'send']: -# for op in ['close', 'force-close']: -# for interp in interpreters: -# yield ChannelAction(op, end, interp) -# yield None + # never emptied + yield [ + ChannelAction('use', 'send', interp1), + ] + yield [ + ChannelAction('use', 'send', interp2), + ] + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ] + + # partially emptied + for interp1 in interps: + for interp2 in interps: + for interp3 in interps: + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ChannelAction('use', 'recv', interp3), + ] + + # fully emptied + for interp1 in interps: + for interp2 in interps: + for interp3 in interps: + for interp4 in interps: + yield [ + ChannelAction('use', 'send', interp1), + ChannelAction('use', 'send', interp2), + ChannelAction('use', 'recv', interp3), + ChannelAction('use', 'recv', interp4), + ] + + def _iter_close_action_sets(self, interp1, interp2): + ends = ('recv', 'send') + interps = (interp1, interp2) + for force in (True, False): + op = 'force-close' if force else 'close' + for interp in interps: + for end in ends: + yield [ + ChannelAction(op, end, interp), + ] + for recvop in ('close', 'force-close'): + for sendop in ('close', 'force-close'): + for recv in interps: + for send in interps: + yield [ + ChannelAction(recvop, 'recv', recv), + ChannelAction(sendop, 'send', send), + ] + + def _iter_post_close_action_sets(self): + for interp in ('same', 'extra', 'other'): + yield [ + ChannelAction('use', 'recv', interp), + ] + yield [ + ChannelAction('use', 'send', interp), + ] def run_actions(self, fix, actions): for action in actions: @@ -1739,7 +1871,7 @@ def run_actions(self, fix, actions): def run_action(self, fix, action, *, hideclosed=True): end = action.resolve_end(fix.end) - interp = action.resolve_interp(fix.interp, fix.other) + interp = action.resolve_interp(fix.interp, fix.other, fix.extra) fix.prep_interpreter(interp) if interp.name == 'main': result = run_action( @@ -1835,132 +1967,56 @@ def _assert_closed(self, fix): interp = fix.get_interpreter('fresh') self._assert_closed_in_interp(fix, interp) - def test_exhaustive(self): - verbose = False - actions = [ - ChannelAction('use', 'same', 'same'), - ] + def _iter_close_tests(self, verbose=False): i = 0 for actions in self.iter_action_sets(): print() for fix in self.iter_fixtures(): i += 1 + if i > 1000: + return if verbose: - print(i, fix) + if (i - 1) % 6 == 0: + print() + print(i, fix, '({} actions)'.format(len(actions))) else: if (i - 1) % 6 == 0: print(' ', end='') print('.', end=''); sys.stdout.flush() - with self.subTest('{} {}'.format(i, fix)): - fix.prep_interpreter(fix.interp) - self.run_actions(fix, actions) - - self._close(fix, force=False) - - self._assert_closed(fix) - # XXX Things slow down if we have too many interpreters. - fix.clean_up() + yield i, fix, actions + if verbose: + print('---') print() -# def test_exhaustive_force(self): -# actions = [] -# for fix in self.iter_fixtures(): -# with self.subTest(fix): -# self.run_actions(fix, actions) -# -# self._close(fix, force=True) -# -# self._assert_closed(fix) - - # focused tests - - def test_single_user(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - def test_multiple_users(self): - cid = interpreters.channel_create() - id1 = interpreters.create() - id2 = interpreters.create() - interpreters.run_string(id1, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_send({cid}, b'spam') - """)) - interpreters.run_string(id2, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_recv({cid}) - """)) - interpreters.channel_close(cid) - with self.assertRaises(interpreters.RunFailedError) as cm: - interpreters.run_string(id1, dedent(f""" - _interpreters.channel_send({cid}, b'spam') - """)) - self.assertIn('ChannelClosedError', str(cm.exception)) - with self.assertRaises(interpreters.RunFailedError) as cm: - interpreters.run_string(id2, dedent(f""" - _interpreters.channel_send({cid}, b'spam') - """)) - self.assertIn('ChannelClosedError', str(cm.exception)) - - def test_multiple_times(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) + # This is useful for scanning through the possible tests. + def _skim_close_tests(self): + ChannelCloseFixture.QUICK = True + for i, fix, actions in self._iter_close_tests(): + pass - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_close(cid) + def test_close(self): + for i, fix, actions in self._iter_close_tests(): + with self.subTest('{} {} {}'.format(i, fix, actions)): + fix.prep_interpreter(fix.interp) + self.run_actions(fix, actions) - def test_with_unused_items(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'ham') - interpreters.channel_close(cid) + self._close(fix, force=False) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - def test_never_used(self): - cid = interpreters.channel_create() - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'spam') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) + self._assert_closed(fix) + # XXX Things slow down if we have too many interpreters. + fix.clean_up() - def test_by_unassociated_interp(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interp = interpreters.create() - interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_close({cid}) - """)) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_close(cid) + def test_force_close(self): + for i, fix, actions in self._iter_close_tests(): + with self.subTest('{} {} {}'.format(i, fix, actions)): + fix.prep_interpreter(fix.interp) + self.run_actions(fix, actions) - def test_used_multiple_times_by_single_user(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) + self._close(fix, force=True) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) + self._assert_closed(fix) + # XXX Things slow down if we have too many interpreters. + fix.clean_up() if __name__ == '__main__': From 262e53ef9c9ae9c72076ef2f73209f1e327329fb Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 6 Apr 2018 18:43:55 -0600 Subject: [PATCH 13/18] Fix run_action() for close operations. --- Lib/test/test__xxsubinterpreters.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 3c9cb6cdca5c68..51a946abda094b 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -255,17 +255,18 @@ def _run_action(cid, action, end, state): else: raise ValueError(end) elif action == 'close': - if end == 'boyh': - interpreters.channel_close(cid) - else: - interpreters.channel_close(cid) - #interpreters.channel_close(cid, end) + kwargs = {} + if end in ('recv', 'send'): + kwargs[end] = True + interpreters.channel_close(cid, **kwargs) return state.close() elif action == 'force-close': - if end == 'both': - interpreters.channel_close(cid, force=True) - else: - interpreters.channel_close(cid, end, force=True) + kwargs = { + 'force': True, + } + if end in ('recv', 'send'): + kwargs[end] = True + interpreters.channel_close(cid, **kwargs) return state.close(force=True) else: raise ValueError(action) From 7028808eb5715f43d3af8d75f029127888a99426 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 13 Apr 2018 10:50:20 -0600 Subject: [PATCH 14/18] Skip the exhaustive tests. --- Lib/test/test__xxsubinterpreters.py | 1 + 1 file changed, 1 insertion(+) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 51a946abda094b..39555554351e05 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1702,6 +1702,7 @@ def _prep_interpreter(self, interp): """) +@unittest.skip('these tests take several hours to run') class ExhaustiveChannelTests(TestBase): """ From 5479620f2f11379118650b195d60fe06afb06023 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 13 Apr 2018 10:49:45 -0600 Subject: [PATCH 15/18] Add a note about lookup of cross-interpreter types. --- Python/pystate.c | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/Python/pystate.c b/Python/pystate.c index 151cbd61f2dbd2..a2bd1e512890b7 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1308,6 +1308,10 @@ _PyCrossInterpreterData_Register_Class(PyTypeObject *cls, return res; } +/* Cross-interpreter objects are looked up by exact match on the class. + We can reassess this policy when we move from a global registry to a + tp_* slot. */ + crossinterpdatafunc _PyCrossInterpreterData_Lookup(PyObject *obj) { From 13c51d497d8a73d9c78d3095f6688f6d2e0897a0 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 13 Apr 2018 11:16:23 -0600 Subject: [PATCH 16/18] Add support for str. --- Lib/test/test__xxsubinterpreters.py | 4 ++-- Python/pystate.c | 32 +++++++++++++++++++++++++++++ 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index 39555554351e05..a8b254abbf3357 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -308,6 +308,7 @@ def test_default_shareables(self): None, # builtin objects b'spam', + 'spam', 10, -10, ] @@ -338,14 +339,13 @@ class SubBytes(bytes): object(), Exception(), 100.0, - 'spam', # user-defined types and objects Cheese, Cheese('Wensleydale'), SubBytes(b'spam'), ] for obj in not_shareables: - with self.subTest(obj): + with self.subTest(repr(obj)): self.assertFalse( interpreters.is_shareable(obj)) diff --git a/Python/pystate.c b/Python/pystate.c index a2bd1e512890b7..d276bfc7e8afc6 100644 --- a/Python/pystate.c +++ b/Python/pystate.c @@ -1362,6 +1362,33 @@ _bytes_shared(PyObject *obj, _PyCrossInterpreterData *data) return 0; } +struct _shared_str_data { + int kind; + const void *buffer; + Py_ssize_t len; +}; + +static PyObject * +_new_str_object(_PyCrossInterpreterData *data) +{ + struct _shared_str_data *shared = (struct _shared_str_data *)(data->data); + return PyUnicode_FromKindAndData(shared->kind, shared->buffer, shared->len); +} + +static int +_str_shared(PyObject *obj, _PyCrossInterpreterData *data) +{ + struct _shared_str_data *shared = PyMem_NEW(struct _shared_str_data, 1); + shared->kind = PyUnicode_KIND(obj); + shared->buffer = PyUnicode_DATA(obj); + shared->len = PyUnicode_GET_LENGTH(obj) - 1; + data->data = (void *)shared; + data->obj = obj; // Will be "released" (decref'ed) when data released. + data->new_object = _new_str_object; + data->free = PyMem_Free; + return 0; +} + static PyObject * _new_long_object(_PyCrossInterpreterData *data) { @@ -1420,6 +1447,11 @@ _register_builtins_for_crossinterpreter_data(void) if (_register_xidata(&PyBytes_Type, _bytes_shared) != 0) { Py_FatalError("could not register bytes for cross-interpreter sharing"); } + + // str + if (_register_xidata(&PyUnicode_Type, _str_shared) != 0) { + Py_FatalError("could not register str for cross-interpreter sharing"); + } } From 4cf75a202347c8579b758cfda0e6543463b5a793 Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Mon, 23 Apr 2018 17:25:15 -0600 Subject: [PATCH 17/18] Move the normal channel_close() tests. --- Lib/test/test__xxsubinterpreters.py | 180 ++++++++++++++-------------- 1 file changed, 90 insertions(+), 90 deletions(-) diff --git a/Lib/test/test__xxsubinterpreters.py b/Lib/test/test__xxsubinterpreters.py index a8b254abbf3357..118f2e4895fe12 100644 --- a/Lib/test/test__xxsubinterpreters.py +++ b/Lib/test/test__xxsubinterpreters.py @@ -1333,6 +1333,96 @@ def test_run_string_arg_resolved(self): self.assertEqual(obj, b'spam') self.assertEqual(out.strip(), 'send') + # close + + def test_close_single_user(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_multiple_users(self): + cid = interpreters.channel_create() + id1 = interpreters.create() + id2 = interpreters.create() + interpreters.run_string(id1, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_send({cid}, b'spam') + """)) + interpreters.run_string(id2, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_recv({cid}) + """)) + interpreters.channel_close(cid) + with self.assertRaises(interpreters.RunFailedError) as cm: + interpreters.run_string(id1, dedent(f""" + _interpreters.channel_send({cid}, b'spam') + """)) + self.assertIn('ChannelClosedError', str(cm.exception)) + with self.assertRaises(interpreters.RunFailedError) as cm: + interpreters.run_string(id2, dedent(f""" + _interpreters.channel_send({cid}, b'spam') + """)) + self.assertIn('ChannelClosedError', str(cm.exception)) + + def test_close_multiple_times(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(cid) + + def test_close_with_unused_items(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'ham') + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_never_used(self): + cid = interpreters.channel_create() + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'spam') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + + def test_close_by_unassociated_interp(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interp = interpreters.create() + interpreters.run_string(interp, dedent(f""" + import _xxsubinterpreters as _interpreters + _interpreters.channel_close({cid}) + """)) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_close(cid) + + def test_close_used_multiple_times_by_single_user(self): + cid = interpreters.channel_create() + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_send(cid, b'spam') + interpreters.channel_recv(cid) + interpreters.channel_close(cid) + + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_send(cid, b'eggs') + with self.assertRaises(interpreters.ChannelClosedError): + interpreters.channel_recv(cid) + class ChannelReleaseTests(TestBase): @@ -1498,96 +1588,6 @@ def test_used_multiple_times_by_single_user(self): with self.assertRaises(interpreters.ChannelClosedError): interpreters.channel_recv(cid) - # close - - def test_close_single_user(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - def test_close_multiple_users(self): - cid = interpreters.channel_create() - id1 = interpreters.create() - id2 = interpreters.create() - interpreters.run_string(id1, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_send({cid}, b'spam') - """)) - interpreters.run_string(id2, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_recv({cid}) - """)) - interpreters.channel_close(cid) - with self.assertRaises(interpreters.RunFailedError) as cm: - interpreters.run_string(id1, dedent(f""" - _interpreters.channel_send({cid}, b'spam') - """)) - self.assertIn('ChannelClosedError', str(cm.exception)) - with self.assertRaises(interpreters.RunFailedError) as cm: - interpreters.run_string(id2, dedent(f""" - _interpreters.channel_send({cid}, b'spam') - """)) - self.assertIn('ChannelClosedError', str(cm.exception)) - - def test_close_multiple_times(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_close(cid) - - def test_close_with_unused_items(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'ham') - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - def test_close_never_used(self): - cid = interpreters.channel_create() - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'spam') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - - def test_close_by_unassociated_interp(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interp = interpreters.create() - interpreters.run_string(interp, dedent(f""" - import _xxsubinterpreters as _interpreters - _interpreters.channel_close({cid}) - """)) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_close(cid) - - def test_close_used_multiple_times_by_single_user(self): - cid = interpreters.channel_create() - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_send(cid, b'spam') - interpreters.channel_recv(cid) - interpreters.channel_close(cid) - - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_send(cid, b'eggs') - with self.assertRaises(interpreters.ChannelClosedError): - interpreters.channel_recv(cid) - class ChannelCloseFixture(namedtuple('ChannelCloseFixture', 'end interp other extra creator')): From bffbbfce0031b524a5ded6fa956db381dcb6c37e Mon Sep 17 00:00:00 2001 From: Eric Snow Date: Fri, 27 Apr 2018 12:10:49 -0600 Subject: [PATCH 18/18] Do not mask errors in _channel_recv(). --- Modules/_xxsubinterpretersmodule.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Modules/_xxsubinterpretersmodule.c b/Modules/_xxsubinterpretersmodule.c index a456ec5551e90b..5184f6593db15e 100644 --- a/Modules/_xxsubinterpretersmodule.c +++ b/Modules/_xxsubinterpretersmodule.c @@ -1250,7 +1250,9 @@ _channel_recv(_channels *channels, int64_t id) _PyCrossInterpreterData *data = _channel_next(chan, interp->id); PyThread_release_lock(mutex); if (data == NULL) { - PyErr_Format(ChannelEmptyError, "channel %d is empty", id); + if (!PyErr_Occurred()) { + PyErr_Format(ChannelEmptyError, "channel %d is empty", id); + } return NULL; }