Skip to content

Pass failing jobs to error handlers #2692

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Oct 3, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/source/telegram.ext.job.rst
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,4 @@ telegram.ext.Job
.. autoclass:: telegram.ext.Job
:members:
:show-inheritance:
:special-members: __call__
15 changes: 12 additions & 3 deletions telegram/ext/callbackcontext.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ class CallbackContext(Generic[UD, CD, BD]):
that raised the error. Only present when the raising function was run asynchronously
using :meth:`telegram.ext.Dispatcher.run_async`.
job (:class:`telegram.ext.Job`): Optional. The job which originated this callback.
Only present when passed to the callback of :class:`telegram.ext.Job`.
Only present when passed to the callback of :class:`telegram.ext.Job` or in error
handlers if the error is caused by a job.

.. versionchanged:: 14.0
:attr:`job` is now also present in error handlers if the error is caused by a job.

"""

Expand Down Expand Up @@ -231,6 +235,7 @@ def from_error(
dispatcher: 'Dispatcher[CCT, UD, CD, BD]',
async_args: Union[List, Tuple] = None,
async_kwargs: Dict[str, object] = None,
job: 'Job' = None,
) -> 'CCT':
"""
Constructs an instance of :class:`telegram.ext.CallbackContext` to be passed to the error
Expand All @@ -244,12 +249,15 @@ def from_error(
error (:obj:`Exception`): The error.
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher associated with this
context.
async_args (List[:obj:`object`]): Optional. Positional arguments of the function that
async_args (List[:obj:`object`], optional): Positional arguments of the function that
raised the error. Pass only when the raising function was run asynchronously using
:meth:`telegram.ext.Dispatcher.run_async`.
async_kwargs (Dict[:obj:`str`, :obj:`object`]): Optional. Keyword arguments of the
async_kwargs (Dict[:obj:`str`, :obj:`object`], optional): Keyword arguments of the
function that raised the error. Pass only when the raising function was run
asynchronously using :meth:`telegram.ext.Dispatcher.run_async`.
job (:class:`telegram.ext.Job`, optional): The job associated with the error.

.. versionadded:: 14.0

Returns:
:class:`telegram.ext.CallbackContext`
Expand All @@ -258,6 +266,7 @@ def from_error(
self.error = error
self.async_args = async_args
self.async_kwargs = async_kwargs
self.job = job
return self

@classmethod
Expand Down
14 changes: 11 additions & 3 deletions telegram/ext/dispatcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,7 @@

if TYPE_CHECKING:
from telegram import Bot
from telegram.ext import JobQueue
from telegram.ext.callbackcontext import CallbackContext
from telegram.ext import JobQueue, Job, CallbackContext

DEFAULT_GROUP: int = 0

Expand Down Expand Up @@ -668,6 +667,7 @@ def dispatch_error(
update: Optional[object],
error: Exception,
promise: Promise = None,
job: 'Job' = None,
) -> bool:
"""Dispatches an error by passing it to all error handlers registered with
:meth:`add_error_handler`. If one of the error handlers raises
Expand All @@ -686,6 +686,9 @@ def dispatch_error(
error (:obj:`Exception`): The error that was raised.
promise (:class:`telegram.utils.Promise`, optional): The promise whose pooled function
raised the error.
job (:class:`telegram.ext.Job`, optional): The job that caused the error.

.. versionadded:: 14.0

Returns:
:obj:`bool`: :obj:`True` if one of the error handlers raised
Expand All @@ -697,7 +700,12 @@ def dispatch_error(
if self.error_handlers:
for callback, run_async in self.error_handlers.items(): # pylint: disable=W0621
context = self.context_types.context.from_error(
update, error, self, async_args=async_args, async_kwargs=async_kwargs
update=update,
error=error,
dispatcher=self,
async_args=async_args,
async_kwargs=async_kwargs,
job=job,
)
if run_async:
self.run_async(callback, update, context, update=update)
Expand Down
90 changes: 45 additions & 45 deletions telegram/ext/jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,16 @@
"""This module contains the classes JobQueue and Job."""

import datetime
import logging
from typing import TYPE_CHECKING, Callable, List, Optional, Tuple, Union, cast, overload
from typing import TYPE_CHECKING, Callable, Optional, Tuple, Union, cast, overload

import pytz
from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED, JobEvent
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.job import Job as APSJob

from telegram.ext.callbackcontext import CallbackContext
from telegram.utils.types import JSONDict

if TYPE_CHECKING:
from telegram.ext import Dispatcher
from telegram.ext import Dispatcher, CallbackContext
import apscheduler.job # noqa: F401


Expand All @@ -44,35 +41,15 @@ class JobQueue:

"""

__slots__ = ('_dispatcher', 'logger', 'scheduler')
__slots__ = ('_dispatcher', 'scheduler')

def __init__(self) -> None:
self._dispatcher: 'Dispatcher' = None # type: ignore[assignment]
self.logger = logging.getLogger(self.__class__.__name__)
self.scheduler = BackgroundScheduler(timezone=pytz.utc)
self.scheduler.add_listener(
self._update_persistence, mask=EVENT_JOB_EXECUTED | EVENT_JOB_ERROR
)

# Dispatch errors and don't log them in the APS logger
def aps_log_filter(record): # type: ignore
return 'raised an exception' not in record.msg

logging.getLogger('apscheduler.executors.default').addFilter(aps_log_filter)
self.scheduler.add_listener(self._dispatch_error, EVENT_JOB_ERROR)

def _build_args(self, job: 'Job') -> List[CallbackContext]:
return [self._dispatcher.context_types.context.from_job(job, self._dispatcher)]

def _tz_now(self) -> datetime.datetime:
return datetime.datetime.now(self.scheduler.timezone)

def _update_persistence(self, _: JobEvent) -> None:
self._dispatcher.update_persistence()

def _dispatch_error(self, event: JobEvent) -> None:
self._dispatcher.dispatch_error(None, event.exception)

@overload
def _parse_time_input(self, time: None, shift_day: bool = False) -> None:
...
Expand Down Expand Up @@ -169,11 +146,11 @@ def run_once(
date_time = self._parse_time_input(when, shift_day=True)

j = self.scheduler.add_job(
callback,
job,
name=name,
trigger='date',
run_date=date_time,
args=self._build_args(job),
args=(self._dispatcher,),
timezone=date_time.tzinfo or self.scheduler.timezone,
**job_kwargs,
)
Expand Down Expand Up @@ -261,9 +238,9 @@ def run_repeating(
interval = interval.total_seconds()

j = self.scheduler.add_job(
callback,
job,
trigger='interval',
args=self._build_args(job),
args=(self._dispatcher,),
start_date=dt_first,
end_date=dt_last,
seconds=interval,
Expand Down Expand Up @@ -317,9 +294,9 @@ def run_monthly(
job = Job(callback, context, name, self)

j = self.scheduler.add_job(
callback,
job,
trigger='cron',
args=self._build_args(job),
args=(self._dispatcher,),
name=name,
day='last' if day == -1 else day,
hour=when.hour,
Expand Down Expand Up @@ -374,9 +351,9 @@ def run_daily(
job = Job(callback, context, name, self)

j = self.scheduler.add_job(
callback,
job,
name=name,
args=self._build_args(job),
args=(self._dispatcher,),
trigger='cron',
day_of_week=','.join([str(d) for d in days]),
hour=time.hour,
Expand Down Expand Up @@ -416,7 +393,7 @@ def run_custom(
name = name or callback.__name__
job = Job(callback, context, name, self)

j = self.scheduler.add_job(callback, args=self._build_args(job), name=name, **job_kwargs)
j = self.scheduler.add_job(job, args=(self._dispatcher,), name=name, **job_kwargs)

job.job = j
return job
Expand All @@ -434,7 +411,7 @@ def stop(self) -> None:
def jobs(self) -> Tuple['Job', ...]:
"""Returns a tuple of all *scheduled* jobs that are currently in the ``JobQueue``."""
return tuple(
Job._from_aps_job(job, self) # pylint: disable=W0212
Job._from_aps_job(job) # pylint: disable=protected-access
for job in self.scheduler.get_jobs()
)

Expand Down Expand Up @@ -509,11 +486,39 @@ def __init__(
self.job = cast(APSJob, job) # skipcq: PTC-W0052

def run(self, dispatcher: 'Dispatcher') -> None:
"""Executes the callback function independently of the jobs schedule."""
"""Executes the callback function independently of the jobs schedule. Also calls
:meth:`telegram.ext.Dispatcher.update_persistence`.

.. versionchaged:: 14.0
Calls :meth:`telegram.ext.Dispatcher.update_persistence`.

Args:
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher this job is associated
with.
"""
try:
self.callback(dispatcher.context_types.context.from_job(self, dispatcher))
except Exception as exc:
dispatcher.dispatch_error(None, exc)
dispatcher.dispatch_error(None, exc, job=self)
finally:
dispatcher.update_persistence(None)

def __call__(self, dispatcher: 'Dispatcher') -> None:
"""Shortcut for::

job.run(dispatcher)

Warning:
The fact that jobs are callable should be considered an implementation detail and not
as part of PTBs public API.

.. versionadded:: 14.0

Args:
dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher this job is associated
with.
"""
self.run(dispatcher=dispatcher)

def schedule_removal(self) -> None:
"""
Expand Down Expand Up @@ -551,13 +556,8 @@ def next_t(self) -> Optional[datetime.datetime]:
return self.job.next_run_time

@classmethod
def _from_aps_job(cls, job: APSJob, job_queue: JobQueue) -> 'Job':
# context based callbacks
if len(job.args) == 1:
context = job.args[0].job.context
else:
context = job.args[1].context
return cls(job.func, context=context, name=job.name, job_queue=job_queue, job=job)
def _from_aps_job(cls, job: APSJob) -> 'Job':
return job.func

def __getattr__(self, item: str) -> object:
return getattr(self.job, item)
Expand Down
8 changes: 5 additions & 3 deletions tests/test_jobqueue.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def job_context_based_callback(self, context):
self.result += 1

def error_handler_context(self, update, context):
self.received_error = str(context.error)
self.received_error = (str(context.error), context.job)

def error_handler_raise_error(self, *args):
raise Exception('Failing bigly')
Expand Down Expand Up @@ -426,10 +426,12 @@ def test_dispatch_error_context(self, job_queue, dp):

job = job_queue.run_once(self.job_with_exception, 0.05)
sleep(0.1)
assert self.received_error == 'Test Error'
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job
self.received_error = None
job.run(dp)
assert self.received_error == 'Test Error'
assert self.received_error[0] == 'Test Error'
assert self.received_error[1] is job

# Remove handler
dp.remove_error_handler(self.error_handler_context)
Expand Down