diff --git a/AUTHORS.rst b/AUTHORS.rst index 3c8dae41fef..d2642262656 100644 --- a/AUTHORS.rst +++ b/AUTHORS.rst @@ -60,6 +60,7 @@ The following wonderful people contributed directly or indirectly to this projec - `Oleg Sushchenko `_ - `Or Bin `_ - `overquota `_ +- `Paolo Lammens `_ - `Patrick Hofmann `_ - `Paul Larsen `_ - `Pieter Schutz `_ diff --git a/telegram/ext/jobqueue.py b/telegram/ext/jobqueue.py index e3cfacfd9ae..625c1f4bdec 100644 --- a/telegram/ext/jobqueue.py +++ b/telegram/ext/jobqueue.py @@ -70,27 +70,19 @@ def __init__(self): def set_dispatcher(self, dispatcher): self._dispatcher = dispatcher - def _put(self, job, next_t=None, last_t=None): - if next_t is None: - next_t = job.interval - if next_t is None: - raise ValueError('next_t is None') - - if isinstance(next_t, datetime.datetime): - next_t = (next_t - datetime.datetime.now()).total_seconds() - - elif isinstance(next_t, datetime.time): - next_datetime = datetime.datetime.combine(datetime.date.today(), next_t) - - if datetime.datetime.now().time() > next_t: - next_datetime += datetime.timedelta(days=1) - - next_t = (next_datetime - datetime.datetime.now()).total_seconds() - - elif isinstance(next_t, datetime.timedelta): - next_t = next_t.total_seconds() + def _put(self, job, next_t=None, previous_t=None): + # """ + # Enqueues the job, scheduling its next run at ` + `. + # `` is the previous time the job was run, stored in `previous_t`, + # and defaults to `None`. + # `` is a time interval in seconds, and is calculated from the + # `next_t` kwarg (if given) or the job's interval. + # """ - next_t += last_t or time.time() + # get time at which to run: + next_t = _to_timestamp(next_t or job.interval, previous_t=previous_t) + if next_t is None: + raise ValueError("'next_t' is None'") self.logger.debug('Putting job %s with t=%f', job.name, next_t) @@ -136,7 +128,7 @@ def run_once(self, callback, when, context=None, name=None): self._put(job, next_t=when) return job - def run_repeating(self, callback, interval, first=None, context=None, name=None): + def run_repeating(self, callback, interval, first=None, last=None, context=None, name=None): """Creates a new ``Job`` that runs at specified intervals and adds it to the queue. Args: @@ -163,6 +155,11 @@ def run_repeating(self, callback, interval, first=None, context=None, name=None) tomorrow. Defaults to ``interval`` + last (:obj:`int` | :obj:`float` | :obj:`datetime.timedelta` | \ + :obj:`datetime.datetime` | :obj:`datetime.time`, optional): + Time in or at which the job should stop running. + This parameter will be interpreted depending on its type (same as for `first`). + If `None`, the job will run indefinitely. context (:obj:`object`, optional): Additional data needed for the callback function. Can be accessed through ``job.context`` in the callback. Defaults to ``None``. name (:obj:`str`, optional): The name of the new job. Defaults to @@ -176,6 +173,7 @@ def run_repeating(self, callback, interval, first=None, context=None, name=None) job = Job(callback, interval=interval, repeat=True, + finish_time=last, context=context, name=name, job_queue=self) @@ -248,6 +246,9 @@ def tick(self): self._set_next_peek(t) break + delay_buffer = 0.01 # tolerance for last + if job.finish_time is not None and now > job.finish_time + delay_buffer: + job.schedule_removal() # job shouldn't run anymore if job.removed: self.logger.debug('Removing job %s', job.name) continue @@ -266,9 +267,9 @@ def tick(self): self.logger.debug('Skipping disabled job %s', job.name) if job.repeat and not job.removed: - self._put(job, last_t=t) + self._put(job, previous_t=t) else: - self.logger.debug('Dropping non-repeating or removed job %s', job.name) + self.logger.debug('Dropping non-repeating, removed, or finished job %s', job.name) def start(self): """Starts the job_queue thread.""" @@ -359,6 +360,7 @@ def __init__(self, callback, interval=None, repeat=True, + finish_time=None, context=None, days=Days.EVERY_DAY, name=None, @@ -368,10 +370,12 @@ def __init__(self, self.context = context self.name = name or callback.__name__ - self._repeat = repeat + self._repeat = None self._interval = None + self._finish_time = None self.interval = interval self.repeat = repeat + self.finish_time = finish_time self._days = None self.days = days @@ -454,6 +458,20 @@ def repeat(self, repeat): raise ValueError("'repeat' can not be set to 'True' when no 'interval' is set") self._repeat = repeat + @property + def finish_time(self): + """:obj:`float`: Optional. Time at which the job should stop repeating.""" + return self._finish_time + + @finish_time.setter + def finish_time(self, time_): + if time_ is not None: + if not self.repeat: + raise ValueError("'finish_time' cannot be set if job doesn't repeat") + self._finish_time = _to_timestamp(time_) + else: + self._finish_time = None + @property def days(self): """Tuple[:obj:`int`]: Optional. Defines on which days of the week the job should run.""" @@ -488,3 +506,38 @@ def job_queue(self, job_queue): def __lt__(self, other): return False + + +def _to_timestamp(t, previous_t=None): + # """ + # Converts a given time object (i.e., `datetime.datetime`, + # `datetime.time`, `datetime.timedelta`, interval from now + # in seconds) to POSIX timestamp. Used for converting given + # kwargs like `first` to a uniform format. + # """ + + now = time.time() + previous_t = previous_t or now + + if t is None: + return None + + if isinstance(t, datetime.timedelta): + return previous_t + t.total_seconds() + + if isinstance(t, Number): + return previous_t + t + + if isinstance(t, datetime.time): + date = datetime.date.today() + + if t < datetime.datetime.today().time(): + date += datetime.timedelta(days=1) + + t = datetime.datetime.combine(date, t) + + if isinstance(t, datetime.datetime): + # replacement for datetime.datetime.timestamp() for py2 + return (t - datetime.datetime.fromtimestamp(now)).total_seconds() + now + + raise TypeError('Unable to convert to timestamp') diff --git a/tests/test_jobqueue.py b/tests/test_jobqueue.py index f0c7ac4051c..f4422d90ac7 100644 --- a/tests/test_jobqueue.py +++ b/tests/test_jobqueue.py @@ -98,6 +98,13 @@ def test_run_repeating_first(self, job_queue): sleep(0.07) assert self.result == 1 + def test_run_repeating_last(self, job_queue): + job_queue.run_repeating(self.job_run_once, 0.05, last=0.1) + sleep(0.12) + assert self.result == 2 + sleep(0.1) + assert self.result == 2 + def test_multiple(self, job_queue): job_queue.run_once(self.job_run_once, 0.01) job_queue.run_once(self.job_run_once, 0.02)