-
Notifications
You must be signed in to change notification settings - Fork 5.7k
Improve timeouts handling in conversation handlers #2417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
19 commits
Select commit
Hold shift + click to select a range
c90abef
Handle promise states in conversation timeout
starry-shivam e90c0ae
warn if nested conversation & timeout
starry-shivam 4ee4955
Add notes and test for conversation_timeout
starry-shivam 0c6c8f4
Try to fix pre-commit
starry-shivam 1d368cc
Test promise exception
starry-shivam 38f5f21
Welp
starry-shivam e3cee40
improve docs
starry-shivam 448e748
typo
starry-shivam a2f9b91
Merge branch 'master' into conv-timeout
Bibo-Joshi 31896ac
try to fix codecov
starry-shivam 1e5984e
refactor timeout logic with promise.add_done_cb
starry-shivam 3015712
Merge branch 'conv-timeout' of https://github.com/python-telegram-bot…
starry-shivam 23dddfe
small fix
starry-shivam a1e858c
Address review
starry-shivam d8555d5
Fix some type hinting
Bibo-Joshi ef9815e
Few fixes
starry-shivam 9bede76
Merge branch 'conv-timeout' of https://github.com/python-telegram-bot…
starry-shivam aeceab6
fix tests
starry-shivam 9eba935
minor nitpick
starry-shivam File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -21,8 +21,10 @@ | |||||
|
||||||
import logging | ||||||
import warnings | ||||||
import functools | ||||||
import datetime | ||||||
from threading import Lock | ||||||
from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Tuple, cast, ClassVar | ||||||
from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Union, Tuple, cast, ClassVar | ||||||
|
||||||
from telegram import Update | ||||||
from telegram.ext import ( | ||||||
|
@@ -143,6 +145,13 @@ class ConversationHandler(Handler[Update]): | |||||
received update and the corresponding ``context`` will be handled by ALL the handler's | ||||||
who's :attr:`check_update` method returns :obj:`True` that are in the state | ||||||
:attr:`ConversationHandler.TIMEOUT`. | ||||||
|
||||||
Note: | ||||||
Using `conversation_timeout` with nested conversations is currently not | ||||||
supported. You can still try to use it, but it will likely behave differently | ||||||
from what you expect. | ||||||
|
||||||
|
||||||
name (:obj:`str`, optional): The name for this conversationhandler. Required for | ||||||
persistence. | ||||||
persistent (:obj:`bool`, optional): If the conversations dict for this handler should be | ||||||
|
@@ -215,7 +224,7 @@ def __init__( | |||||
per_chat: bool = True, | ||||||
per_user: bool = True, | ||||||
per_message: bool = False, | ||||||
conversation_timeout: int = None, | ||||||
conversation_timeout: Union[float, datetime.timedelta] = None, | ||||||
name: str = None, | ||||||
persistent: bool = False, | ||||||
map_to_parent: Dict[object, object] = None, | ||||||
|
@@ -291,6 +300,16 @@ def __init__( | |||||
) | ||||||
break | ||||||
|
||||||
if self.conversation_timeout: | ||||||
for handler in all_handlers: | ||||||
if isinstance(handler, self.__class__): | ||||||
warnings.warn( | ||||||
"Using `conversation_timeout` with nested conversations is currently not " | ||||||
"supported. You can still try to use it, but it will likely behave " | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
|
||||||
"differently from what you expect." | ||||||
) | ||||||
break | ||||||
|
||||||
if self.run_async: | ||||||
for handler in all_handlers: | ||||||
handler.run_async = True | ||||||
|
@@ -352,7 +371,9 @@ def per_message(self, value: object) -> NoReturn: | |||||
raise ValueError('You can not assign a new value to per_message after initialization.') | ||||||
|
||||||
@property | ||||||
def conversation_timeout(self) -> Optional[int]: | ||||||
def conversation_timeout( | ||||||
self, | ||||||
) -> Optional[Union[float, datetime.timedelta]]: | ||||||
return self._conversation_timeout | ||||||
|
||||||
@conversation_timeout.setter | ||||||
|
@@ -423,6 +444,45 @@ def _get_key(self, update: Update) -> Tuple[int, ...]: | |||||
|
||||||
return tuple(key) | ||||||
|
||||||
def _resolve_promise(self, state: Tuple) -> object: | ||||||
Bibo-Joshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
old_state, new_state = state | ||||||
try: | ||||||
res = new_state.result(0) | ||||||
res = res if res is not None else old_state | ||||||
except Exception as exc: | ||||||
self.logger.exception("Promise function raised exception") | ||||||
self.logger.exception("%s", exc) | ||||||
res = old_state | ||||||
finally: | ||||||
if res is None and old_state is None: | ||||||
res = self.END | ||||||
return res | ||||||
|
||||||
def _schedule_job( | ||||||
self, | ||||||
new_state: object, | ||||||
dispatcher: 'Dispatcher', | ||||||
update: Update, | ||||||
context: Optional[CallbackContext], | ||||||
conversation_key: Tuple[int, ...], | ||||||
) -> None: | ||||||
if new_state != self.END: | ||||||
try: | ||||||
# both job_queue & conversation_timeout are checked before calling _schedule_job | ||||||
j_queue = dispatcher.job_queue | ||||||
self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr] | ||||||
self._trigger_timeout, | ||||||
self.conversation_timeout, # type: ignore[arg-type] | ||||||
context=_ConversationTimeoutContext( | ||||||
conversation_key, update, dispatcher, context | ||||||
), | ||||||
) | ||||||
except Exception as exc: | ||||||
self.logger.exception( | ||||||
"Failed to schedule timeout job due to the following exception:" | ||||||
) | ||||||
self.logger.exception("%s", exc) | ||||||
|
||||||
def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 | ||||||
""" | ||||||
Determines whether an update should be handled by this conversationhandler, and if so in | ||||||
|
@@ -455,21 +515,14 @@ def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0 | |||||
if isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], Promise): | ||||||
self.logger.debug('waiting for promise...') | ||||||
|
||||||
old_state, new_state = state | ||||||
if new_state.done.wait(0): | ||||||
try: | ||||||
res = new_state.result(0) | ||||||
res = res if res is not None else old_state | ||||||
except Exception as exc: | ||||||
self.logger.exception("Promise function raised exception") | ||||||
self.logger.exception("%s", exc) | ||||||
res = old_state | ||||||
finally: | ||||||
if res is None and old_state is None: | ||||||
res = self.END | ||||||
self.update_state(res, key) | ||||||
with self._conversations_lock: | ||||||
state = self.conversations.get(key) | ||||||
# check if promise is finished or not | ||||||
Bibo-Joshi marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
if state[1].done.wait(0): | ||||||
res = self._resolve_promise(state) | ||||||
self.update_state(res, key) | ||||||
with self._conversations_lock: | ||||||
state = self.conversations.get(key) | ||||||
|
||||||
# if not then handle WAITING state instead | ||||||
else: | ||||||
hdlrs = self.states.get(self.WAITING, []) | ||||||
for hdlr in hdlrs: | ||||||
|
@@ -551,15 +604,27 @@ def handle_update( # type: ignore[override] | |||||
new_state = exception.state | ||||||
raise_dp_handler_stop = True | ||||||
with self._timeout_jobs_lock: | ||||||
if self.conversation_timeout and new_state != self.END and dispatcher.job_queue: | ||||||
# Add the new timeout job | ||||||
self.timeout_jobs[conversation_key] = dispatcher.job_queue.run_once( | ||||||
self._trigger_timeout, # type: ignore[arg-type] | ||||||
self.conversation_timeout, | ||||||
context=_ConversationTimeoutContext( | ||||||
conversation_key, update, dispatcher, context | ||||||
), | ||||||
) | ||||||
if self.conversation_timeout: | ||||||
if dispatcher.job_queue is not None: | ||||||
# Add the new timeout job | ||||||
if isinstance(new_state, Promise): | ||||||
new_state.add_done_callback( | ||||||
functools.partial( | ||||||
self._schedule_job, | ||||||
dispatcher=dispatcher, | ||||||
update=update, | ||||||
context=context, | ||||||
conversation_key=conversation_key, | ||||||
) | ||||||
) | ||||||
elif new_state != self.END: | ||||||
self._schedule_job( | ||||||
new_state, dispatcher, update, context, conversation_key | ||||||
) | ||||||
else: | ||||||
self.logger.warning( | ||||||
"Ignoring `conversation_timeout` because the Dispatcher has no JobQueue." | ||||||
) | ||||||
|
||||||
if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent: | ||||||
self.update_state(self.END, conversation_key) | ||||||
|
@@ -597,35 +662,35 @@ def update_state(self, new_state: object, key: Tuple[int, ...]) -> None: | |||||
if self.persistent and self.persistence and self.name: | ||||||
self.persistence.update_conversation(self.name, key, new_state) | ||||||
|
||||||
def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = None) -> None: | ||||||
def _trigger_timeout(self, context: CallbackContext, job: 'Job' = None) -> None: | ||||||
self.logger.debug('conversation timeout was triggered!') | ||||||
|
||||||
# Backward compatibility with bots that do not use CallbackContext | ||||||
callback_context = None | ||||||
if isinstance(context, CallbackContext): | ||||||
job = context.job | ||||||
ctxt = cast(_ConversationTimeoutContext, job.context) # type: ignore[union-attr] | ||||||
else: | ||||||
ctxt = cast(_ConversationTimeoutContext, job.context) | ||||||
|
||||||
context = job.context # type:ignore[union-attr,assignment] | ||||||
callback_context = context.callback_context | ||||||
callback_context = ctxt.callback_context | ||||||
|
||||||
with self._timeout_jobs_lock: | ||||||
found_job = self.timeout_jobs[context.conversation_key] | ||||||
found_job = self.timeout_jobs[ctxt.conversation_key] | ||||||
if found_job is not job: | ||||||
# The timeout has been canceled in handle_update | ||||||
# The timeout has been cancelled in handle_update | ||||||
starry-shivam marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||
return | ||||||
del self.timeout_jobs[context.conversation_key] | ||||||
del self.timeout_jobs[ctxt.conversation_key] | ||||||
|
||||||
handlers = self.states.get(self.TIMEOUT, []) | ||||||
for handler in handlers: | ||||||
check = handler.check_update(context.update) | ||||||
check = handler.check_update(ctxt.update) | ||||||
if check is not None and check is not False: | ||||||
try: | ||||||
handler.handle_update( | ||||||
context.update, context.dispatcher, check, callback_context | ||||||
) | ||||||
handler.handle_update(ctxt.update, ctxt.dispatcher, check, callback_context) | ||||||
except DispatcherHandlerStop: | ||||||
self.logger.warning( | ||||||
'DispatcherHandlerStop in TIMEOUT state of ' | ||||||
'ConversationHandler has no effect. Ignoring.' | ||||||
) | ||||||
self.update_state(self.END, context.conversation_key) | ||||||
|
||||||
self.update_state(self.END, ctxt.conversation_key) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.