Skip to content

Commit 1e0ebe8

Browse files
committed
JobQueue: minimize the amount of places changing self.__tick state
- start the jobqueue (by default) during __init__() instead of during put() - protect self._next_peek and self.__tick with a Lock - rename self._start() to self._main_loop() - stop() is now blocking until the event loop thread exits
1 parent 35872d7 commit 1e0ebe8

File tree

1 file changed

+75
-54
lines changed

1 file changed

+75
-54
lines changed

telegram/ext/jobqueue.py

Lines changed: 75 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import logging
2222
import time
2323
from threading import Thread, Lock, Event
24-
from queue import PriorityQueue
24+
from queue import PriorityQueue, Empty
2525

2626

2727
class JobQueue(object):
@@ -30,30 +30,38 @@ class JobQueue(object):
3030
Attributes:
3131
queue (PriorityQueue):
3232
bot (Bot):
33+
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
34+
automatically. Defaults to ``False``
3335
3436
Args:
3537
bot (Bot): The bot instance that should be passed to the jobs
3638
3739
"""
3840

39-
def __init__(self, bot):
41+
def __init__(self, bot, prevent_autostart=False):
4042
self.queue = PriorityQueue()
4143
self.bot = bot
4244
self.logger = logging.getLogger(self.__class__.__name__)
43-
self.__lock = Lock()
45+
self.__start_lock = Lock()
46+
self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick
4447
self.__tick = Event()
48+
self.__thread = None
49+
""":type: Thread"""
4550
self._next_peek = None
51+
""":type: float"""
4652
self._running = False
4753

48-
def put(self, job, next_t=None, prevent_autostart=False):
54+
if not prevent_autostart:
55+
self.logger.debug('Auto-starting %s', self.__class__.__name__)
56+
self.start()
57+
58+
def put(self, job, next_t=None):
4959
"""Queue a new job. If the JobQueue is not running, it will be started.
5060
5161
Args:
5262
job (Job): The ``Job`` instance representing the new job
5363
next_t (Optional[float]): Time in seconds in which the job should be executed first.
5464
Defaults to ``job.interval``
55-
prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started
56-
automatically if it is not running. Defaults to ``False``
5765
5866
"""
5967
job.job_queue = self
@@ -68,13 +76,18 @@ def put(self, job, next_t=None, prevent_autostart=False):
6876
self.queue.put((next_t, job))
6977

7078
# Wake up the loop if this job should be executed next
71-
if not self._next_peek or self._next_peek > next_t:
72-
self._next_peek = next_t
73-
self.__tick.set()
79+
self._set_next_peek(next_t)
7480

75-
if not self._running and not prevent_autostart:
76-
self.logger.debug('Auto-starting JobQueue')
77-
self.start()
81+
def _set_next_peek(self, t):
82+
"""
83+
Set next peek if not defined or `t` is before next peek.
84+
In case the next peek was set, also trigger the `self.__tick` event.
85+
86+
"""
87+
with self.__next_peek_lock:
88+
if not self._next_peek or self._next_peek > t:
89+
self._next_peek = t
90+
self.__tick.set()
7891

7992
def tick(self):
8093
"""
@@ -85,74 +98,80 @@ def tick(self):
8598

8699
self.logger.debug('Ticking jobs with t=%f', now)
87100

88-
while not self.queue.empty():
89-
t, job = self.queue.queue[0]
90-
self.logger.debug('Peeked at %s with t=%f', job.name, t)
91-
92-
if t <= now:
93-
self.queue.get()
94-
95-
if job._remove.is_set():
96-
self.logger.debug('Removing job %s', job.name)
97-
continue
98-
99-
elif job.enabled:
100-
self.logger.debug('Running job %s', job.name)
101+
while True:
102+
try:
103+
t, job = self.queue.get(False)
104+
except Empty:
105+
break
101106

102-
try:
103-
job.run(self.bot)
107+
self.logger.debug('Peeked at %s with t=%f', job.name, t)
104108

105-
except:
106-
self.logger.exception(
107-
'An uncaught error was raised while executing job %s', job.name)
109+
if t > now:
110+
# we can get here in two conditions:
111+
# 1. At the second or later pass of the while loop, after we've already processed
112+
# the job(s) we were supposed to at this time.
113+
# 2. At the first iteration of the loop only if `self.put()` had triggered
114+
# `self.__tick` because `self._next_peek` wasn't set
115+
self.logger.debug("Next task isn't due yet. Finished!")
116+
self.queue.put((t, job))
117+
self._set_next_peek(t)
118+
break
119+
120+
if job._remove.is_set():
121+
self.logger.debug('Removing job %s', job.name)
122+
continue
108123

109-
else:
110-
self.logger.debug('Skipping disabled job %s', job.name)
124+
if job.enabled:
125+
self.logger.debug('Running job %s', job.name)
111126

112-
if job.repeat:
113-
self.put(job)
127+
try:
128+
job.run(self.bot)
114129

115-
continue
130+
except:
131+
self.logger.exception(
132+
'An uncaught error was raised while executing job %s', job.name)
116133

117-
self.logger.debug('Next task isn\'t due yet. Finished!')
118-
self._next_peek = t
119-
break
120-
121-
else:
122-
self._next_peek = None
134+
else:
135+
self.logger.debug('Skipping disabled job %s', job.name)
123136

124-
self.__tick.clear()
137+
if job.repeat:
138+
self.put(job)
125139

126140
def start(self):
127141
"""
128142
Starts the job_queue thread.
129143
130144
"""
131-
self.__lock.acquire()
145+
self.__start_lock.acquire()
132146

133147
if not self._running:
134148
self._running = True
135-
self.__lock.release()
136-
job_queue_thread = Thread(target=self._start, name="job_queue")
137-
job_queue_thread.start()
149+
self.__start_lock.release()
150+
self.__thread = Thread(target=self._main_loop, name="job_queue")
151+
self.__thread.start()
138152
self.logger.debug('%s thread started', self.__class__.__name__)
139153

140154
else:
141-
self.__lock.release()
155+
self.__start_lock.release()
142156

143-
def _start(self):
157+
def _main_loop(self):
144158
"""
145159
Thread target of thread ``job_queue``. Runs in background and performs ticks on the job
146160
queue.
147161
148162
"""
149163
while self._running:
150-
self.__tick.wait(self._next_peek and self._next_peek - time.time())
151-
152-
# If we were woken up by set(), wait with the new timeout
153-
if self.__tick.is_set():
164+
# self._next_peek may be (re)scheduled during self.tick() or self.put()
165+
with self.__next_peek_lock:
166+
tmout = self._next_peek and self._next_peek - time.time()
167+
self._next_peek = None
154168
self.__tick.clear()
155-
continue
169+
170+
self.__tick.wait(tmout)
171+
172+
# If we were woken up by self.stop(), just bail out
173+
if not self._running:
174+
break
156175

157176
self.tick()
158177

@@ -162,10 +181,12 @@ def stop(self):
162181
"""
163182
Stops the thread
164183
"""
165-
with self.__lock:
184+
with self.__start_lock:
166185
self._running = False
167186

168187
self.__tick.set()
188+
if self.__thread is not None:
189+
self.__thread.join()
169190

170191
def jobs(self):
171192
"""Returns a tuple of all jobs that are currently in the ``JobQueue``"""

0 commit comments

Comments
 (0)