diff --git a/examples/timerbot.py b/examples/timerbot.py index 4c9faccf7d1..8473a543a32 100644 --- a/examples/timerbot.py +++ b/examples/timerbot.py @@ -17,15 +17,15 @@ bot. """ -from telegram.ext import Updater, CommandHandler +from telegram.ext import Updater, CommandHandler, Job import logging # Enable logging logging.basicConfig(format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', - level=logging.INFO) + level=logging.DEBUG) logger = logging.getLogger(__name__) -job_queue = None +timers = dict() # Define a few command handlers. These usually take the two arguments bot and @@ -34,8 +34,8 @@ def start(bot, update): bot.sendMessage(update.message.chat_id, text='Hi! Use /set to ' 'set a timer') -def set(bot, update, args): - """ Adds a job to the queue """ +def set(bot, update, args, job_queue): + """Adds a job to the queue""" chat_id = update.message.chat_id try: # args[0] should contain the time for the timer in seconds @@ -43,29 +43,40 @@ def set(bot, update, args): if due < 0: bot.sendMessage(chat_id, text='Sorry we can not go back to future!') - def alarm(bot): - """ Inner function to send the alarm message """ + def alarm(bot, job): + """Inner function to send the alarm message""" bot.sendMessage(chat_id, text='Beep!') # Add job to queue - job_queue.put(alarm, due, repeat=False) + job = Job(alarm, due, repeat=False) + timers[chat_id] = job + job_queue.put(job) + bot.sendMessage(chat_id, text='Timer successfully set!') - except IndexError: - bot.sendMessage(chat_id, text='Usage: /set ') - except ValueError: + except (IndexError, ValueError): bot.sendMessage(chat_id, text='Usage: /set ') +def unset(bot, update): + """Removes the job if the user changed their mind""" + chat_id = update.message.chat_id + + if chat_id not in timers: + bot.sendMessage(chat_id, text='You have no active timer') + return + + job = timers[chat_id] + job.schedule_removal() + bot.sendMessage(chat_id, text='Timer successfully unset!') + + def error(bot, update, error): logger.warn('Update "%s" caused error "%s"' % (update, error)) def main(): - global job_queue - updater = Updater("TOKEN") - job_queue = updater.job_queue # Get the dispatcher to register handlers dp = updater.dispatcher @@ -73,7 +84,8 @@ def main(): # on different commands - answer in Telegram dp.add_handler(CommandHandler("start", start)) dp.add_handler(CommandHandler("help", start)) - dp.add_handler(CommandHandler("set", set, pass_args=True)) + dp.add_handler(CommandHandler("set", set, pass_args=True, pass_job_queue=True)) + dp.add_handler(CommandHandler("unset", unset)) # log all errors dp.add_error_handler(error) diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index 59a8b800d63..8d89fd4c87c 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -19,7 +19,7 @@ """Extensions over the Telegram Bot API to facilitate bot making""" from .dispatcher import Dispatcher -from .jobqueue import JobQueue +from .jobqueue import JobQueue, Job from .updater import Updater from .callbackqueryhandler import CallbackQueryHandler from .choseninlineresulthandler import ChosenInlineResultHandler @@ -32,7 +32,7 @@ from .stringregexhandler import StringRegexHandler from .typehandler import TypeHandler -__all__ = ('Dispatcher', 'JobQueue', 'Updater', 'CallbackQueryHandler', +__all__ = ('Dispatcher', 'JobQueue', 'Job', 'Updater', 'CallbackQueryHandler', 'ChosenInlineResultHandler', 'CommandHandler', 'Handler', 'InlineQueryHandler', 'MessageHandler', 'Filters', 'RegexHandler', 'StringCommandHandler', 'StringRegexHandler', 'TypeHandler') diff --git a/telegram/ext/callbackqueryhandler.py b/telegram/ext/callbackqueryhandler.py index 669943c96c3..ac2b563fae5 100644 --- a/telegram/ext/callbackqueryhandler.py +++ b/telegram/ext/callbackqueryhandler.py @@ -31,13 +31,20 @@ class CallbackQueryHandler(Handler): callback (function): A function that takes ``bot, update`` as positional arguments. It will be called when the ``check_update`` has determined that an update should be processed by this handler. - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, callback, pass_update_queue=False): - super(CallbackQueryHandler, self).__init__(callback, pass_update_queue) + def __init__(self, callback, pass_update_queue=False, pass_job_queue=False): + super(CallbackQueryHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) def check_update(self, update): return isinstance(update, Update) and update.callback_query diff --git a/telegram/ext/choseninlineresulthandler.py b/telegram/ext/choseninlineresulthandler.py index 09e8bb3a081..167dc954105 100644 --- a/telegram/ext/choseninlineresulthandler.py +++ b/telegram/ext/choseninlineresulthandler.py @@ -32,13 +32,20 @@ class ChosenInlineResultHandler(Handler): callback (function): A function that takes ``bot, update`` as positional arguments. It will be called when the ``check_update`` has determined that an update should be processed by this handler. - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, callback, pass_update_queue=False): - super(ChosenInlineResultHandler, self).__init__(callback, pass_update_queue) + def __init__(self, callback, pass_update_queue=False, pass_job_queue=False): + super(ChosenInlineResultHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) def check_update(self, update): return isinstance(update, Update) and update.chosen_inline_result diff --git a/telegram/ext/commandhandler.py b/telegram/ext/commandhandler.py index 4c2a98c56b3..36c6edf8f56 100644 --- a/telegram/ext/commandhandler.py +++ b/telegram/ext/commandhandler.py @@ -40,9 +40,14 @@ class CommandHandler(Handler): arguments passed to the command as a keyword argument called ` ``args``. It will contain a list of strings, which is the text following the command split on spaces. Default is ``False`` - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ def __init__(self, @@ -50,8 +55,11 @@ def __init__(self, callback, allow_edited=False, pass_args=False, - pass_update_queue=False): - super(CommandHandler, self).__init__(callback, pass_update_queue) + pass_update_queue=False, + pass_job_queue=False): + super(CommandHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) self.command = command self.allow_edited = allow_edited self.pass_args = pass_args diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index 25298262b04..fc7e6d5d40b 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -94,11 +94,16 @@ class Dispatcher(object): handlers update_queue (Queue): The synchronized queue that will contain the updates. + job_queue (Optional[telegram.ext.JobQueue]): The ``JobQueue`` instance to pass onto handler + callbacks + workers (Optional[int]): Number of maximum concurrent worker threads for the ``@run_async`` + decorator """ - def __init__(self, bot, update_queue, workers=4, exception_event=None): + def __init__(self, bot, update_queue, workers=4, exception_event=None, job_queue=None): self.bot = bot self.update_queue = update_queue + self.job_queue = job_queue self.handlers = {} """:type: dict[int, list[Handler]""" diff --git a/telegram/ext/handler.py b/telegram/ext/handler.py index 964bbb98aca..8c0fb2e8b3f 100644 --- a/telegram/ext/handler.py +++ b/telegram/ext/handler.py @@ -31,14 +31,20 @@ class Handler(object): callback (function): A function that takes ``bot, update`` as positional arguments. It will be called when the ``check_update`` has determined that an update should be processed by this handler. - pass_update_queue (optional[bool]): If the callback should be passed - the update queue as a keyword argument called ``update_queue``. It - can be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, callback, pass_update_queue=False): + def __init__(self, callback, pass_update_queue=False, pass_job_queue=False): self.callback = callback self.pass_update_queue = pass_update_queue + self.pass_job_queue = pass_job_queue def check_update(self, update): """ @@ -77,6 +83,8 @@ def collect_optional_args(self, dispatcher): optional_args = dict() if self.pass_update_queue: optional_args['update_queue'] = dispatcher.update_queue + if self.pass_job_queue: + optional_args['job_queue'] = dispatcher.job_queue return optional_args diff --git a/telegram/ext/inlinequeryhandler.py b/telegram/ext/inlinequeryhandler.py index 12cedfe6139..64695105d8e 100644 --- a/telegram/ext/inlinequeryhandler.py +++ b/telegram/ext/inlinequeryhandler.py @@ -31,13 +31,20 @@ class InlineQueryHandler(Handler): callback (function): A function that takes ``bot, update`` as positional arguments. It will be called when the ``check_update`` has determined that an update should be processed by this handler. - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, callback, pass_update_queue=False): - super(InlineQueryHandler, self).__init__(callback, pass_update_queue) + def __init__(self, callback, pass_update_queue=False, pass_job_queue=False): + super(InlineQueryHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) def check_update(self, update): return isinstance(update, Update) and update.inline_query diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index 616468399e2..60fe921b988 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -16,139 +16,240 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. -"""This module contains the class JobQueue.""" +"""This module contains the classes JobQueue and Job.""" import logging import time -from threading import Thread, Lock -from queue import PriorityQueue +from threading import Thread, Lock, Event +from queue import PriorityQueue, Empty class JobQueue(object): - """ - This class allows you to periodically perform tasks with the bot. + """This class allows you to periodically perform tasks with the bot. Attributes: - tick_interval (float): queue (PriorityQueue): bot (Bot): - running (bool): + prevent_autostart (Optional[bool]): If ``True``, the job queue will not be started + automatically. Defaults to ``False`` Args: bot (Bot): The bot instance that should be passed to the jobs - tick_interval (Optional[float]): The interval this queue should check - the newest task in seconds. Defaults to 1.0 """ - def __init__(self, bot, tick_interval=1.0): - self.tick_interval = tick_interval + def __init__(self, bot, prevent_autostart=False): self.queue = PriorityQueue() self.bot = bot - self.logger = logging.getLogger(__name__) - self.__lock = Lock() - self.running = False + self.logger = logging.getLogger(self.__class__.__name__) + self.__start_lock = Lock() + self.__next_peek_lock = Lock() # to protect self._next_peek & self.__tick + self.__tick = Event() + self.__thread = None + """:type: Thread""" + self._next_peek = None + """:type: float""" + self._running = False + + if not prevent_autostart: + self.logger.debug('Auto-starting %s', self.__class__.__name__) + self.start() - def put(self, run, interval, repeat=True, next_t=None, prevent_autostart=False): - """ - Queue a new job. If the JobQueue is not running, it will be started. + def put(self, job, next_t=None): + """Queue a new job. If the JobQueue is not running, it will be started. Args: - run (function): A function that takes the parameter `bot` - interval (float): The interval in seconds in which `run` should be - executed - repeat (Optional[bool]): If `False`, job will only be executed once - next_t (Optional[float]): Time in seconds in which run should be - executed first. Defaults to `interval` - prevent_autostart (Optional[bool]): If `True`, the job queue will - not be started automatically if it is not running. - """ - name = run.__name__ + job (Job): The ``Job`` instance representing the new job + next_t (Optional[float]): Time in seconds in which the job should be executed first. + Defaults to ``job.interval`` - job = JobQueue.Job() - job.run = run - job.interval = interval - job.name = name - job.repeat = repeat + """ + job.job_queue = self if next_t is None: - next_t = interval + next_t = job.interval - next_t += time.time() + now = time.time() + next_t += now - self.logger.debug('Putting a %s with t=%f' % (job.name, next_t)) + self.logger.debug('Putting job %s with t=%f', job.name, next_t) self.queue.put((next_t, job)) - if not self.running and not prevent_autostart: - self.logger.debug('Auto-starting JobQueue') - self.start() + # Wake up the loop if this job should be executed next + self._set_next_peek(next_t) + + def _set_next_peek(self, t): + """ + Set next peek if not defined or `t` is before next peek. + In case the next peek was set, also trigger the `self.__tick` event. + + """ + with self.__next_peek_lock: + if not self._next_peek or self._next_peek > t: + self._next_peek = t + self.__tick.set() def tick(self): """ - Run all jobs that are due and re-enqueue them with their interval + Run all jobs that are due and re-enqueue them with their interval. + """ now = time.time() - self.logger.debug('Ticking jobs with t=%f' % now) - while not self.queue.empty(): - t, j = self.queue.queue[0] - self.logger.debug('Peeked at %s with t=%f' % (j.name, t)) + self.logger.debug('Ticking jobs with t=%f', now) + + while True: + try: + t, job = self.queue.get(False) + except Empty: + break + + self.logger.debug('Peeked at %s with t=%f', job.name, t) + + if t > now: + # we can get here in two conditions: + # 1. At the second or later pass of the while loop, after we've already processed + # the job(s) we were supposed to at this time. + # 2. At the first iteration of the loop only if `self.put()` had triggered + # `self.__tick` because `self._next_peek` wasn't set + self.logger.debug("Next task isn't due yet. Finished!") + self.queue.put((t, job)) + self._set_next_peek(t) + break + + if job._remove.is_set(): + self.logger.debug('Removing job %s', job.name) + continue + + if job.enabled: + self.logger.debug('Running job %s', job.name) - if t < now: - self.queue.get() - self.logger.debug('Running job %s' % j.name) try: - j.run(self.bot) + job.run(self.bot) + except: - self.logger.exception('An uncaught error was raised while ' - 'executing job %s' % j.name) - if j.repeat: - self.put(j.run, j.interval) - continue + self.logger.exception('An uncaught error was raised while executing job %s', + job.name) + + else: + self.logger.debug('Skipping disabled job %s', job.name) - self.logger.debug('Next task isn\'t due yet. Finished!') - break + if job.repeat: + self.put(job) def start(self): """ Starts the job_queue thread. + """ - self.__lock.acquire() - if not self.running: - self.running = True - self.__lock.release() - job_queue_thread = Thread(target=self._start, name="job_queue") - job_queue_thread.start() - self.logger.debug('Job Queue thread started') + self.__start_lock.acquire() + + if not self._running: + self._running = True + self.__start_lock.release() + self.__thread = Thread(target=self._main_loop, name="job_queue") + self.__thread.start() + self.logger.debug('%s thread started', self.__class__.__name__) + else: - self.__lock.release() + self.__start_lock.release() - def _start(self): + def _main_loop(self): """ - Thread target of thread 'job_queue'. Runs in background and performs - ticks on the job queue. + Thread target of thread ``job_queue``. Runs in background and performs ticks on the job + queue. + """ - while self.running: + while self._running: + # self._next_peek may be (re)scheduled during self.tick() or self.put() + with self.__next_peek_lock: + tmout = self._next_peek and self._next_peek - time.time() + self._next_peek = None + self.__tick.clear() + + self.__tick.wait(tmout) + + # If we were woken up by self.stop(), just bail out + if not self._running: + break + self.tick() - time.sleep(self.tick_interval) - self.logger.debug('Job Queue thread stopped') + self.logger.debug('%s thread stopped', self.__class__.__name__) def stop(self): """ Stops the thread """ - with self.__lock: - self.running = False + with self.__start_lock: + self._running = False + + self.__tick.set() + if self.__thread is not None: + self.__thread.join() + + def jobs(self): + """Returns a tuple of all jobs that are currently in the ``JobQueue``""" + return tuple(job[1] for job in self.queue.queue if job) + + +class Job(object): + """This class encapsulates a Job - class Job(object): - """ Inner class that represents a job """ - interval = None - name = None - repeat = None + Attributes: + callback (function): + interval (float): + repeat (bool): + name (str): + enabled (bool): Boolean property that decides if this job is currently active + + Args: + callback (function): The callback function that should be executed by the Job. It should + take two parameters ``bot`` and ``job``, where ``job`` is the ``Job`` instance. It + can be used to terminate the job or modify its interval. + interval (float): The interval in which this job should execute its callback function in + seconds. + repeat (Optional[bool]): If this job should be periodically execute its callback function + (``True``) or only once (``False``). Defaults to ``True`` + context (Optional[object]): Additional data needed for the callback function. Can be + accessed through ``job.context`` in the callback. Defaults to ``None`` + + """ + job_queue = None + + def __init__(self, callback, interval, repeat=True, context=None): + self.callback = callback + self.interval = interval + self.repeat = repeat + self.context = context + + self.name = callback.__name__ + self._remove = Event() + self._enabled = Event() + self._enabled.set() + + def run(self, bot): + """Executes the callback function""" + self.callback(bot, self) + + def schedule_removal(self): + """ + Schedules this job for removal from the ``JobQueue``. It will be removed without executing + its callback function again. + + """ + self._remove.set() + + def is_enabled(self): + return self._enabled.is_set() + + def set_enabled(self, status): + if status: + self._enabled.set() + else: + self._enabled.clear() - def run(self): - pass + enabled = property(is_enabled, set_enabled) - def __lt__(self, other): - return False + def __lt__(self, other): + return False diff --git a/telegram/ext/messagehandler.py b/telegram/ext/messagehandler.py index ddef6bdb11b..36989e7d500 100644 --- a/telegram/ext/messagehandler.py +++ b/telegram/ext/messagehandler.py @@ -105,8 +105,15 @@ class MessageHandler(Handler): be used to insert updates. Default is ``False`` """ - def __init__(self, filters, callback, allow_edited=False, pass_update_queue=False): - super(MessageHandler, self).__init__(callback, pass_update_queue) + def __init__(self, + filters, + callback, + allow_edited=False, + pass_update_queue=False, + pass_job_queue=False): + super(MessageHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) self.filters = filters self.allow_edited = allow_edited diff --git a/telegram/ext/regexhandler.py b/telegram/ext/regexhandler.py index 958f8f6d427..7ef01a108ca 100644 --- a/telegram/ext/regexhandler.py +++ b/telegram/ext/regexhandler.py @@ -45,9 +45,14 @@ class RegexHandler(Handler): pass_groupdict (optional[bool]): If the callback should be passed the result of ``re.match(pattern, text).groupdict()`` as a keyword argument called ``groupdict``. Default is ``False`` - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ def __init__(self, @@ -55,8 +60,11 @@ def __init__(self, callback, pass_groups=False, pass_groupdict=False, - pass_update_queue=False): - super(RegexHandler, self).__init__(callback, pass_update_queue) + pass_update_queue=False, + pass_job_queue=False): + super(RegexHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) if isinstance(pattern, string_types): pattern = re.compile(pattern) diff --git a/telegram/ext/stringcommandhandler.py b/telegram/ext/stringcommandhandler.py index 9d69f98ab3a..47b31500e4e 100644 --- a/telegram/ext/stringcommandhandler.py +++ b/telegram/ext/stringcommandhandler.py @@ -36,13 +36,25 @@ class StringCommandHandler(Handler): arguments passed to the command as a keyword argument called ` ``args``. It will contain a list of strings, which is the text following the command split on spaces. Default is ``False`` - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, command, callback, pass_args=False, pass_update_queue=False): - super(StringCommandHandler, self).__init__(callback, pass_update_queue) + def __init__(self, + command, + callback, + pass_args=False, + pass_update_queue=False, + pass_job_queue=False): + super(StringCommandHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) self.command = command self.pass_args = pass_args diff --git a/telegram/ext/stringregexhandler.py b/telegram/ext/stringregexhandler.py index 5ec3896e8bd..c09489c7b27 100644 --- a/telegram/ext/stringregexhandler.py +++ b/telegram/ext/stringregexhandler.py @@ -44,9 +44,14 @@ class StringRegexHandler(Handler): pass_groupdict (optional[bool]): If the callback should be passed the result of ``re.match(pattern, update).groupdict()`` as a keyword argument called ``groupdict``. Default is ``False`` - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ def __init__(self, @@ -54,8 +59,11 @@ def __init__(self, callback, pass_groups=False, pass_groupdict=False, - pass_update_queue=False): - super(StringRegexHandler, self).__init__(callback, pass_update_queue) + pass_update_queue=False, + pass_job_queue=False): + super(StringRegexHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) if isinstance(pattern, string_types): pattern = re.compile(pattern) diff --git a/telegram/ext/typehandler.py b/telegram/ext/typehandler.py index f8ad76ceb97..7339b3b8966 100644 --- a/telegram/ext/typehandler.py +++ b/telegram/ext/typehandler.py @@ -34,13 +34,25 @@ class TypeHandler(Handler): has determined that an update should be processed by this handler. strict (optional[bool]): Use ``type`` instead of ``isinstance``. Default is ``False`` - pass_update_queue (optional[bool]): If the handler should be passed the - update queue as a keyword argument called ``update_queue``. It can - be used to insert updates. Default is ``False`` + pass_update_queue (optional[bool]): If set to ``True``, a keyword argument called + ``update_queue`` will be passed to the callback function. It will be the ``Queue`` + instance used by the ``Updater`` and ``Dispatcher`` that contains new updates which can + be used to insert updates. Default is ``False``. + pass_job_queue (optional[bool]): If set to ``True``, a keyword argument called + ``job_queue`` will be passed to the callback function. It will be a ``JobQueue`` + instance created by the ``Updater`` which can be used to schedule new jobs. + Default is ``False``. """ - def __init__(self, type, callback, strict=False, pass_update_queue=False): - super(TypeHandler, self).__init__(callback, pass_update_queue) + def __init__(self, + type, + callback, + strict=False, + pass_update_queue=False, + pass_job_queue=False): + super(TypeHandler, self).__init__(callback, + pass_update_queue=pass_update_queue, + pass_job_queue=pass_job_queue) self.type = type self.strict = strict diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 3c3a190c466..83c79e11b8e 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -65,12 +65,7 @@ class Updater(object): ValueError: If both `token` and `bot` are passed or none of them. """ - def __init__(self, - token=None, - base_url=None, - workers=4, - bot=None, - job_queue_tick_interval=1.0): + def __init__(self, token=None, base_url=None, workers=4, bot=None): if (token is None) and (bot is None): raise ValueError('`token` or `bot` must be passed') if (token is not None) and (bot is not None): @@ -81,9 +76,13 @@ def __init__(self, else: self.bot = Bot(token, base_url) self.update_queue = Queue() - self.job_queue = JobQueue(self.bot, job_queue_tick_interval) + self.job_queue = JobQueue(self.bot) self.__exception_event = Event() - self.dispatcher = Dispatcher(self.bot, self.update_queue, workers, self.__exception_event) + self.dispatcher = Dispatcher(self.bot, + self.update_queue, + job_queue=self.job_queue, + workers=workers, + exception_event=self.__exception_event) self.last_update_id = 0 self.logger = logging.getLogger(__name__) self.running = False diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index 8cadd8e1d90..bdfb602fb0d 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -32,7 +32,7 @@ sys.path.append('.') from telegram.utils.request import stop_con_pool -from telegram.ext import JobQueue, Updater +from telegram.ext import JobQueue, Job, Updater from tests.base import BaseTest # Enable logging @@ -53,7 +53,7 @@ class JobQueueTest(BaseTest, unittest.TestCase): """ def setUp(self): - self.jq = JobQueue("Bot", tick_interval=0.005) + self.jq = JobQueue("Bot") self.result = 0 def tearDown(self): @@ -61,46 +61,108 @@ def tearDown(self): self.jq.stop() stop_con_pool() - def job1(self, bot): + def job1(self, bot, job): self.result += 1 - def job2(self, bot): + def job2(self, bot, job): raise Exception("Test Error") + def job3(self, bot, job): + self.result += 1 + job.schedule_removal() + + def job4(self, bot, job): + self.result += job.context + def test_basic(self): - self.jq.put(self.job1, 0.1) + self.jq.put(Job(self.job1, 0.1)) sleep(1.5) self.assertGreaterEqual(self.result, 10) + def test_job_with_context(self): + self.jq.put(Job(self.job4, 0.1, context=5)) + sleep(1.5) + self.assertGreaterEqual(self.result, 50) + def test_noRepeat(self): - self.jq.put(self.job1, 0.1, repeat=False) + self.jq.put(Job(self.job1, 0.1, repeat=False)) sleep(0.5) self.assertEqual(1, self.result) def test_nextT(self): - self.jq.put(self.job1, 0.1, next_t=0.5) + self.jq.put(Job(self.job1, 0.1), next_t=0.5) sleep(0.45) self.assertEqual(0, self.result) sleep(0.1) self.assertEqual(1, self.result) def test_multiple(self): - self.jq.put(self.job1, 0.1, repeat=False) - self.jq.put(self.job1, 0.2, repeat=False) - self.jq.put(self.job1, 0.4) + self.jq.put(Job(self.job1, 0.1, repeat=False)) + self.jq.put(Job(self.job1, 0.2, repeat=False)) + self.jq.put(Job(self.job1, 0.4)) sleep(1) self.assertEqual(4, self.result) + def test_disabled(self): + j0 = Job(self.job1, 0.1) + j1 = Job(self.job1, 0.2) + + self.jq.put(j0) + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(j1) + + j0.enabled = False + j1.enabled = False + + sleep(1) + self.assertEqual(2, self.result) + + def test_schedule_removal(self): + j0 = Job(self.job1, 0.1) + j1 = Job(self.job1, 0.2) + + self.jq.put(j0) + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(j1) + + j0.schedule_removal() + j1.schedule_removal() + + sleep(1) + self.assertEqual(2, self.result) + + def test_schedule_removal_from_within(self): + self.jq.put(Job(self.job1, 0.4)) + self.jq.put(Job(self.job3, 0.2)) + + sleep(1) + self.assertEqual(3, self.result) + + def test_longer_first(self): + self.jq.put(Job(self.job1, 0.2, repeat=False)) + self.jq.put(Job(self.job1, 0.1, repeat=False)) + sleep(0.15) + self.assertEqual(1, self.result) + def test_error(self): - self.jq.put(self.job2, 0.1) - self.jq.put(self.job1, 0.2) + self.jq.put(Job(self.job2, 0.1)) + self.jq.put(Job(self.job1, 0.2)) self.jq.start() - sleep(0.4) - self.assertEqual(1, self.result) + sleep(0.5) + self.assertEqual(2, self.result) + + def test_jobs_tuple(self): + self.jq.stop() + jobs = tuple(Job(self.job1, t) for t in range(5, 25)) + + for job in jobs: + self.jq.put(job) + + self.assertTupleEqual(jobs, self.jq.jobs()) def test_inUpdater(self): - u = Updater(bot="MockBot", job_queue_tick_interval=0.005) - u.job_queue.put(self.job1, 0.5) + u = Updater(bot="MockBot") + u.job_queue.put(Job(self.job1, 0.5)) sleep(0.75) self.assertEqual(1, self.result) u.stop() diff --git a/tests/test_updater.py b/tests/test_updater.py index 95df1051fa8..c5d60649526 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -72,6 +72,11 @@ class UpdaterTest(BaseTest, unittest.TestCase): WebhookHandler """ + updater = None + received_message = None + message_count = None + lock = None + def setUp(self): self.updater = None self.received_message = None @@ -123,9 +128,12 @@ def regexGroupHandlerTest(self, bot, update, groups, groupdict): self.received_message = (groups, groupdict) self.message_count += 1 - def additionalArgsTest(self, bot, update, update_queue, args): + def additionalArgsTest(self, bot, update, update_queue, job_queue, args): + job_queue.put(Job(lambda bot, job: job.schedule_removal(), 0.1)) + self.received_message = update self.message_count += 1 + if args[0] == 'resend': update_queue.put('/test5 noresend') elif args[0] == 'noresend': @@ -151,13 +159,13 @@ def test_addRemoveTelegramMessageHandler(self): d = self.updater.dispatcher from telegram.ext import Filters handler = MessageHandler([Filters.text], self.telegramHandlerTest) - d.addHandler(handler) + d.add_handler(handler) self.updater.start_polling(0.01) sleep(.1) self.assertEqual(self.received_message, 'Test') # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() self.updater.bot.send_messages = 1 @@ -188,7 +196,7 @@ def test_editedMessageHandler(self): def test_addTelegramMessageHandlerMultipleMessages(self): self._setup_updater('Multiple', 100) - self.updater.dispatcher.addHandler(MessageHandler([], self.telegramHandlerTest)) + self.updater.dispatcher.add_handler(MessageHandler([], self.telegramHandlerTest)) self.updater.start_polling(0.0) sleep(2) self.assertEqual(self.received_message, 'Multiple') @@ -199,13 +207,13 @@ def test_addRemoveTelegramRegexHandler(self): d = self.updater.dispatcher regobj = re.compile('Te.*') handler = RegexHandler(regobj, self.telegramHandlerTest) - self.updater.dispatcher.addHandler(handler) + self.updater.dispatcher.add_handler(handler) self.updater.start_polling(0.01) sleep(.1) self.assertEqual(self.received_message, 'Test2') # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() self.updater.bot.send_messages = 1 @@ -216,13 +224,13 @@ def test_addRemoveTelegramCommandHandler(self): self._setup_updater('/test') d = self.updater.dispatcher handler = CommandHandler('test', self.telegramHandlerTest) - self.updater.dispatcher.addHandler(handler) + self.updater.dispatcher.add_handler(handler) self.updater.start_polling(0.01) sleep(.1) self.assertEqual(self.received_message, '/test') # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() self.updater.bot.send_messages = 1 @@ -252,14 +260,14 @@ def test_addRemoveStringRegexHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = StringRegexHandler('Te.*', self.stringHandlerTest) - d.addHandler(handler) + d.add_handler(handler) queue = self.updater.start_polling(0.01) queue.put('Test3') sleep(.1) self.assertEqual(self.received_message, 'Test3') # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() queue.put('Test3') @@ -270,7 +278,7 @@ def test_addRemoveStringCommandHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = StringCommandHandler('test3', self.stringHandlerTest) - d.addHandler(handler) + d.add_handler(handler) queue = self.updater.start_polling(0.01) queue.put('/test3') @@ -278,7 +286,7 @@ def test_addRemoveStringCommandHandler(self): self.assertEqual(self.received_message, '/test3') # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() queue.put('/test3') @@ -288,7 +296,7 @@ def test_addRemoveStringCommandHandler(self): def test_addRemoveErrorHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher - d.addErrorHandler(self.errorHandlerTest) + d.add_error_handler(self.errorHandlerTest) queue = self.updater.start_polling(0.01) error = TelegramError("Unauthorized.") queue.put(error) @@ -296,7 +304,7 @@ def test_addRemoveErrorHandler(self): self.assertEqual(self.received_message, "Unauthorized.") # Remove handler - d.removeErrorHandler(self.errorHandlerTest) + d.remove_error_handler(self.errorHandlerTest) self.reset() queue.put(error) @@ -307,8 +315,8 @@ def test_errorInHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = StringRegexHandler('.*', self.errorRaisingHandlerTest) - d.addHandler(handler) - self.updater.dispatcher.addErrorHandler(self.errorHandlerTest) + d.add_handler(handler) + self.updater.dispatcher.add_error_handler(self.errorHandlerTest) queue = self.updater.start_polling(0.01) queue.put('Test Error 1') @@ -319,7 +327,7 @@ def test_cleanBeforeStart(self): self._setup_updater('') d = self.updater.dispatcher handler = MessageHandler([], self.telegramHandlerTest) - d.addHandler(handler) + d.add_handler(handler) self.updater.start_polling(0.01, clean=True) sleep(.1) self.assertEqual(self.message_count, 0) @@ -328,7 +336,7 @@ def test_cleanBeforeStart(self): def test_errorOnGetUpdates(self): self._setup_updater('', raise_error=True) d = self.updater.dispatcher - d.addErrorHandler(self.errorHandlerTest) + d.add_error_handler(self.errorHandlerTest) self.updater.start_polling(0.01) sleep(.1) self.assertEqual(self.received_message, "Test Error 2") @@ -337,7 +345,7 @@ def test_addRemoveTypeHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = TypeHandler(dict, self.stringHandlerTest) - d.addHandler(handler) + d.add_handler(handler) queue = self.updater.start_polling(0.01) payload = {"Test": 42} queue.put(payload) @@ -345,7 +353,7 @@ def test_addRemoveTypeHandler(self): self.assertEqual(self.received_message, payload) # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() queue.put(payload) @@ -357,8 +365,8 @@ def test_addRemoveInlineQueryHandler(self): d = self.updater.dispatcher handler = InlineQueryHandler(self.telegramInlineHandlerTest) handler2 = ChosenInlineResultHandler(self.telegramInlineHandlerTest) - d.addHandler(handler) - d.addHandler(handler2) + d.add_handler(handler) + d.add_handler(handler2) queue = self.updater.start_polling(0.01) update = Update(update_id=0, inline_query="testquery") update2 = Update(update_id=0, chosen_inline_result="testresult") @@ -371,8 +379,8 @@ def test_addRemoveInlineQueryHandler(self): self.assertEqual(self.received_message[1], "testresult") # Remove handler - d.removeHandler(handler) - d.removeHandler(handler2) + d.remove_handler(handler) + d.remove_handler(handler2) self.reset() queue.put(update) @@ -383,7 +391,7 @@ def test_addRemoveCallbackQueryHandler(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = CallbackQueryHandler(self.telegramCallbackHandlerTest) - d.addHandler(handler) + d.add_handler(handler) queue = self.updater.start_polling(0.01) update = Update(update_id=0, callback_query="testcallback") queue.put(update) @@ -391,7 +399,7 @@ def test_addRemoveCallbackQueryHandler(self): self.assertEqual(self.received_message, "testcallback") # Remove handler - d.removeHandler(handler) + d.remove_handler(handler) self.reset() queue.put(update) @@ -402,7 +410,7 @@ def test_runAsync(self): self._setup_updater('Test5', messages=2) d = self.updater.dispatcher handler = MessageHandler([], self.asyncHandlerTest) - d.addHandler(handler) + d.add_handler(handler) self.updater.start_polling(0.01) sleep(1.2) self.assertEqual(self.received_message, 'Test5') @@ -413,8 +421,9 @@ def test_additionalArgs(self): handler = StringCommandHandler('test5', self.additionalArgsTest, pass_update_queue=True, + pass_job_queue=True, pass_args=True) - self.updater.dispatcher.addHandler(handler) + self.updater.dispatcher.add_handler(handler) queue = self.updater.start_polling(0.01) queue.put('/test5 resend') @@ -429,7 +438,7 @@ def test_regexGroupHandler(self): self.regexGroupHandlerTest, pass_groupdict=True, pass_groups=True) - d.addHandler(handler) + d.add_handler(handler) queue = self.updater.start_polling(0.01) queue.put('This is a test message for regex group matching.') sleep(.1) @@ -440,7 +449,7 @@ def test_runAsyncWithAdditionalArgs(self): self._setup_updater('Test6', messages=2) d = self.updater.dispatcher handler = MessageHandler([], self.asyncAdditionalHandlerTest, pass_update_queue=True) - d.addHandler(handler) + d.add_handler(handler) self.updater.start_polling(0.01) sleep(1.2) self.assertEqual(self.received_message, 'Test6') @@ -450,7 +459,7 @@ def test_webhook(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = MessageHandler([], self.telegramHandlerTest) - d.addHandler(handler) + d.add_handler(handler) ip = '127.0.0.1' port = randrange(1024, 49152) # Select random port for travis @@ -500,7 +509,7 @@ def test_webhook_no_ssl(self): self._setup_updater('', messages=0) d = self.updater.dispatcher handler = MessageHandler([], self.telegramHandlerTest) - d.addHandler(handler) + d.add_handler(handler) ip = '127.0.0.1' port = randrange(1024, 49152) # Select random port for travis