Skip to content

Commit 7dfef3a

Browse files
committed
Added thread priorities
1 parent f436650 commit 7dfef3a

File tree

6 files changed

+254
-36
lines changed

6 files changed

+254
-36
lines changed

Include/pystate.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,8 @@ typedef struct _ts {
9595
PyObject *async_exc; /* Asynchronous exception to raise */
9696
long thread_id; /* Thread id where this tstate was created */
9797

98+
long thread_prio;
99+
98100
int trash_delete_nesting;
99101
PyObject *trash_delete_later;
100102

@@ -120,6 +122,7 @@ PyAPI_FUNC(PyThreadState *) PyThreadState_Get(void);
120122
PyAPI_FUNC(PyThreadState *) PyThreadState_Swap(PyThreadState *);
121123
PyAPI_FUNC(PyObject *) PyThreadState_GetDict(void);
122124
PyAPI_FUNC(int) PyThreadState_SetAsyncExc(long, PyObject *);
125+
PyAPI_FUNC(long) PyThreadState_SetOrGetPrio(long, int, long);
123126

124127

125128
/* Variable and macro for in-line access to current thread state */

Lib/threading.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
_start_new_thread = thread.start_new_thread
3636
_allocate_lock = thread.allocate_lock
3737
_get_ident = thread.get_ident
38+
_set_prio = thread.set_prio
3839
ThreadError = thread.error
3940
del thread
4041

@@ -645,7 +646,7 @@ class Thread(_Verbose):
645646
__exc_clear = _sys.exc_clear
646647

647648
def __init__(self, group=None, target=None, name=None,
648-
args=(), kwargs=None, verbose=None):
649+
args=(), kwargs=None, verbose=None, prio=None):
649650
"""This constructor should always be called with keyword arguments. Arguments are:
650651
651652
*group* should be None; reserved for future extension when a ThreadGroup
@@ -675,6 +676,7 @@ class is implemented.
675676
self.__name = str(name or _newname())
676677
self.__args = args
677678
self.__kwargs = kwargs
679+
self.__prio = prio
678680
self.__daemonic = self._set_daemon()
679681
self.__ident = None
680682
self.__started = Event()
@@ -797,6 +799,9 @@ def __bootstrap_inner(self):
797799
self._note("%s.__bootstrap(): registering profile hook", self)
798800
_sys.setprofile(_profile_hook)
799801

802+
if self.__prio is not None:
803+
_set_prio(_get_ident(), self.__prio)
804+
800805
try:
801806
self.run()
802807
except SystemExit:

Modules/threadmodule.c

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,50 @@ yet finished.\n\
799799
This function is meant for internal and specialized purposes only.\n\
800800
In most applications `threading.enumerate()` should be used instead.");
801801

802+
static PyObject *
803+
thread_get_prio(PyObject *self, PyObject *args)
804+
{
805+
long id, prio;
806+
if (!PyArg_ParseTuple(args, "l", &id))
807+
return NULL;
808+
prio = PyThreadState_SetOrGetPrio(id, 1, 0);
809+
if (prio < 0) {
810+
PyErr_SetString(ThreadError, "thread id not found");
811+
return NULL;
812+
}
813+
return PyInt_FromLong(prio);
814+
}
815+
816+
PyDoc_STRVAR(get_prio_doc,
817+
"get_prio(thread_id) -> int\n\
818+
\n\
819+
Returns the current scheduling priority for the thread with id\n\
820+
thread_id.\n");
821+
822+
static PyObject *
823+
thread_set_prio(PyObject *self, PyObject *args)
824+
{
825+
long id, prio;
826+
if (!PyArg_ParseTuple(args, "ll", &id, &prio))
827+
return NULL;
828+
if (prio < 0 || prio > 2) {
829+
PyErr_SetString(ThreadError, "invalid thread priority");
830+
return NULL;
831+
}
832+
if (PyThreadState_SetOrGetPrio(id, 0, prio) < 0) {
833+
PyErr_SetString(ThreadError, "thread id not found");
834+
return NULL;
835+
}
836+
Py_INCREF(Py_None);
837+
return Py_None;
838+
}
839+
840+
PyDoc_STRVAR(set_prio_doc,
841+
"set_prio(thread_id, prio) -> None\n\
842+
\n\
843+
Set the scheduling priority for the thread with id thread_id.\n\
844+
Valid priorities are 0-2, where 0 is highest. Default is 1.\n");
845+
802846
static PyObject *
803847
thread_stack_size(PyObject *self, PyObject *args)
804848
{
@@ -874,6 +918,10 @@ static PyMethodDef thread_methods[] = {
874918
METH_NOARGS, get_ident_doc},
875919
{"_count", (PyCFunction)thread__count,
876920
METH_NOARGS, _count_doc},
921+
{"get_prio", (PyCFunction)thread_get_prio,
922+
METH_VARARGS, get_prio_doc},
923+
{"set_prio", (PyCFunction)thread_set_prio,
924+
METH_VARARGS, set_prio_doc},
877925
{"stack_size", (PyCFunction)thread_stack_size,
878926
METH_VARARGS,
879927
stack_size_doc},

Python/ceval.c

Lines changed: 117 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -245,18 +245,29 @@ struct special_linkstruct {
245245

246246
typedef void *PySpecial_lock_type;
247247

248+
#define PRIOS 3
249+
#define PRIO_OVERFLOW 10
250+
248251
typedef struct {
249252
PySpecial_lock_type the_lock;
250-
struct special_linkstruct *wait_queue, *wait_last, *free_queue;
253+
struct special_linkstruct *wait_queue[PRIOS], *wait_last[PRIOS], *free_queue, *current;
254+
int overflow[PRIOS];
255+
int max_prio;
251256
} PySpecialSemaphore;
252257

253258
void
254259
PySpecial_init(PySpecialSemaphore *s)
255260
{
261+
int i;
256262
s->the_lock = PyThread_mutex_alloc();
257-
s->wait_queue = NULL;
258-
s->wait_last = NULL;
263+
for (i=0; i<PRIOS; i++) {
264+
s->wait_queue[i] = NULL;
265+
s->wait_last[i] = NULL;
266+
s->overflow[i] = 0;
267+
}
259268
s->free_queue = NULL;
269+
s->current = NULL;
270+
s->max_prio = PRIOS;
260271
}
261272

262273
static PySpecialSemaphore *interpreter_lock = NULL; /* This is the GIL */
@@ -286,10 +297,54 @@ static struct special_linkstruct *allocate_special_linkstruct(void)
286297
return ls;
287298
}
288299

289-
static void PySpecial_Lock(PySpecialSemaphore *s)
300+
static struct special_linkstruct *special_pick_candidate(PySpecialSemaphore *s)
301+
{
302+
struct special_linkstruct *next = NULL;
303+
304+
if (s->max_prio == PRIOS)
305+
return NULL;
306+
307+
int m = PRIOS;
308+
int p = s->max_prio;
309+
for (; p<PRIOS; p++) {
310+
if (s->wait_queue[p]) {
311+
if (p < m)
312+
m = p;
313+
if (++s->overflow[p] < PRIO_OVERFLOW) {
314+
next = s->wait_queue[p];
315+
break;
316+
}
317+
while (s->overflow[p] >= PRIO_OVERFLOW)
318+
s->overflow[p] -= PRIO_OVERFLOW;
319+
}
320+
}
321+
/* might have skipped a candidate because of the overflow counters */
322+
if (m < PRIOS && !next) {
323+
next = s->wait_queue[m];
324+
p = m;
325+
}
326+
327+
if (next) {
328+
s->current = next;
329+
s->wait_queue[p] = next->queue_next;
330+
next->queue_next = NULL;
331+
PyThread_cond_signal(next->wait);
332+
}
333+
334+
/* update exact max_prio */
335+
for (; m<PRIOS; m++)
336+
if (s->wait_queue[m])
337+
break;
338+
s->max_prio = m;
339+
340+
return next;
341+
}
342+
343+
static void PySpecial_Lock(PySpecialSemaphore *s, int prio)
290344
{
291345
struct special_linkstruct *ls;
292346

347+
assert(prio >= 0 && prio <= PRIOS);
293348
PyThread_mutex_lock(s->the_lock);
294349

295350
if (!s->free_queue)
@@ -298,43 +353,62 @@ static void PySpecial_Lock(PySpecialSemaphore *s)
298353
ls = s->free_queue;
299354
s->free_queue = ls->free_next;
300355

301-
if (!s->wait_queue)
302-
{
303-
ls->in_use = 1;
304-
s->wait_queue = ls;
305-
s->wait_last = ls;
306-
PyThread_mutex_unlock(s->the_lock);
307-
return;
356+
if (!s->current) {
357+
/* lock is free, find someone to run */
358+
struct special_linkstruct *next = NULL;
359+
if (s->max_prio > prio && (s->max_prio == PRIOS || ++s->overflow[prio] < PRIO_OVERFLOW))
360+
;
361+
else {
362+
while (s->overflow[prio] >= PRIO_OVERFLOW)
363+
s->overflow[prio] -= PRIO_OVERFLOW;
364+
/* find and schedule another candidate */
365+
next = special_pick_candidate(s);
366+
}
367+
368+
if (!next) {
369+
/* noone else there - we can run */
370+
assert(!ls->in_use);
371+
ls->in_use = 1;
372+
s->current = ls;
373+
PyThread_mutex_unlock(s->the_lock);
374+
return;
375+
}
308376
}
309377

310-
assert(s->wait_queue != ls);
311-
assert(s->wait_last != ls);
312-
assert(s->wait_last->queue_next == NULL);
313-
assert(!ls->in_use);
314-
s->wait_last->queue_next = ls;
315-
s->wait_last = ls;
378+
/* we need to wait */
379+
assert(s->wait_queue[prio] != ls);
380+
if (s->wait_queue[prio]) {
381+
assert(s->wait_last[prio] != ls);
382+
assert(s->wait_last[prio]->queue_next == NULL);
383+
s->wait_last[prio]->queue_next = ls;
384+
s->wait_last[prio] = ls;
385+
} else {
386+
s->wait_queue[prio] = ls;
387+
s->wait_last[prio] = ls;
388+
}
316389
ls->in_use = 1;
390+
if (prio < s->max_prio)
391+
s->max_prio = prio;
317392

318-
while (s->wait_queue != ls)
393+
while (s->current != ls)
319394
PyThread_cond_wait(ls->wait, s->the_lock);
320395

321396
PyThread_mutex_unlock(s->the_lock);
322397
}
323398

324399
static void PySpecial_Unlock(PySpecialSemaphore *s)
325400
{
326-
struct special_linkstruct *ls;
401+
struct special_linkstruct *ls, *next;
327402

328403
PyThread_mutex_lock(s->the_lock);
329-
ls = s->wait_queue;
404+
ls = s->current;
330405
assert(ls->in_use);
331406

332-
s->wait_queue = ls->queue_next;
333-
if (s->wait_queue)
334-
{
335-
ls->queue_next = NULL;
336-
PyThread_cond_signal(s->wait_queue->wait);
337-
}
407+
next = special_pick_candidate(s);
408+
409+
if (!next)
410+
s->current = NULL;
411+
338412
ls->in_use = 0;
339413

340414
ls->free_next = s->free_queue;
@@ -349,14 +423,14 @@ PyEval_InitThreads(void)
349423
if (interpreter_lock)
350424
return;
351425
interpreter_lock = allocate_special();
352-
PySpecial_Lock(interpreter_lock);
426+
PySpecial_Lock(interpreter_lock, 0);
353427
main_thread = PyThread_get_thread_ident();
354428
}
355429

356430
void
357431
PyEval_AcquireLock(void)
358432
{
359-
PySpecial_Lock(interpreter_lock);
433+
PySpecial_Lock(interpreter_lock, 1);
360434
}
361435

362436
void
@@ -372,7 +446,7 @@ PyEval_AcquireThread(PyThreadState *tstate)
372446
Py_FatalError("PyEval_AcquireThread: NULL new thread state");
373447
/* Check someone has called PyEval_InitThreads() to create the lock */
374448
assert(interpreter_lock);
375-
PySpecial_Lock(interpreter_lock);
449+
PySpecial_Lock(interpreter_lock, tstate->thread_prio);
376450
if (PyThreadState_Swap(tstate) != NULL)
377451
Py_FatalError(
378452
"PyEval_AcquireThread: non-NULL old thread state");
@@ -407,7 +481,7 @@ PyEval_ReInitThreads(void)
407481
create a new lock and waste a little bit of memory */
408482
interpreter_lock = allocate_special();
409483
pending_lock = PyThread_allocate_lock();
410-
PySpecial_Lock(interpreter_lock);
484+
PySpecial_Lock(interpreter_lock, 0);
411485
main_thread = PyThread_get_thread_ident();
412486

413487
/* Update the threading module with the new state.
@@ -454,7 +528,7 @@ PyEval_RestoreThread(PyThreadState *tstate)
454528
#ifdef WITH_THREAD
455529
if (interpreter_lock) {
456530
int err = errno;
457-
PySpecial_Lock(interpreter_lock);
531+
PySpecial_Lock(interpreter_lock, tstate->thread_prio);
458532
errno = err;
459533
}
460534
#endif
@@ -760,6 +834,8 @@ static int _Py_TracingPossible = 0;
760834
per thread, now just a pair o' globals */
761835
int _Py_CheckInterval = 100;
762836
volatile int _Py_Ticker = 0; /* so that we hit a "tick" first thing */
837+
int _ticker_overflow = 0; /* for switching to higher-prio thread */
838+
int _ticker_overflow_lower = 0; /* for switching to lower-prio thread */
763839

764840
PyObject *
765841
PyEval_EvalCode(PyCodeObject *co, PyObject *globals, PyObject *locals)
@@ -1217,23 +1293,29 @@ PyEval_EvalFrameEx(PyFrameObject *f, int throwflag)
12171293
_Py_Ticker = 0;
12181294
}
12191295
#ifdef WITH_THREAD
1220-
if (interpreter_lock && interpreter_lock->wait_queue) {
1221-
/* Give another thread a chance */
1296+
if (interpreter_lock &&
1297+
(interpreter_lock->max_prio < tstate->thread_prio && ++_ticker_overflow < PRIO_OVERFLOW ||
1298+
interpreter_lock->max_prio != PRIOS && ++_ticker_overflow_lower >= PRIO_OVERFLOW)) {
1299+
/* Give another thread a chance */
12221300

12231301
if (PyThreadState_Swap(NULL) != tstate)
12241302
Py_FatalError("ceval: tstate mix-up");
12251303
PySpecial_Unlock(interpreter_lock);
12261304

12271305
/* Other threads may run now */
12281306

1229-
PySpecial_Lock(interpreter_lock);
1307+
PySpecial_Lock(interpreter_lock, tstate->thread_prio);
12301308

12311309
if (PyThreadState_Swap(tstate) != NULL)
12321310
Py_FatalError("ceval: orphan tstate");
12331311

12341312
}
12351313

1236-
if (interpreter_lock) {
1314+
if (interpreter_lock) {
1315+
while (_ticker_overflow >= PRIO_OVERFLOW)
1316+
_ticker_overflow -= PRIO_OVERFLOW;
1317+
while (_ticker_overflow_lower >= PRIO_OVERFLOW)
1318+
_ticker_overflow_lower -= PRIO_OVERFLOW;
12371319
/* Check for thread interrupts */
12381320

12391321
if (tstate->async_exc != NULL) {

0 commit comments

Comments
 (0)