From c90abef1f63379e8d70d3ca73cd4772d214ed331 Mon Sep 17 00:00:00 2001 From: starry69 Date: Fri, 5 Mar 2021 07:11:58 +0530 Subject: [PATCH 01/16] Handle promise states in conversation timeout Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index ae615935cab..fd287a3a4e2 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -608,10 +608,18 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No context = job.context # type:ignore[union-attr,assignment] callback_context = context.callback_context + # async conversations can return CH.END without telling the + # current timeout handlers, make sure to not call timeout + # handlers in this case + state = self.conversations[context.conversation_key] + if isinstance(state, tuple) and isinstance(state[1], Promise): + if state.done and state.result() == self.END: + return + with self._timeout_jobs_lock: found_job = self.timeout_jobs[context.conversation_key] if found_job is not job: - # The timeout has been canceled in handle_update + # The timeout has been cancelled in handle_update return del self.timeout_jobs[context.conversation_key] From e90c0aeb2ab241210db47ebfce9ba845fd7e6061 Mon Sep 17 00:00:00 2001 From: starry69 Date: Fri, 5 Mar 2021 16:42:33 +0530 Subject: [PATCH 02/16] warn if nested conversation & timeout Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 11 ++++++++++- tests/test_conversationhandler.py | 23 +++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index fd287a3a4e2..bf238af8b9e 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -291,6 +291,15 @@ def __init__( ) break + if self.conversation_timeout: + for handler in all_handlers: + if isinstance(handler, self.__class__): + warnings.warn( + "Setting `conversation_timeout` may lead to unexpected " + "behaviour when using nested conversation handlers." + ) + break + if self.run_async: for handler in all_handlers: handler.run_async = True @@ -613,7 +622,7 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No # handlers in this case state = self.conversations[context.conversation_key] if isinstance(state, tuple) and isinstance(state[1], Promise): - if state.done and state.result() == self.END: + if state[1].done and state[1].result() == self.END: return with self._timeout_jobs_lock: diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index f8db5dafa4e..834c16c61c2 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -1126,6 +1126,29 @@ def slowbrew(_bot, update): assert handler.conversations.get((self.group.id, user1.id)) is None assert self.is_timeout + def test_conversation_timeout_warning_only_shown_once(self, recwarn): + ConversationHandler( + entry_points=self.entry_points, + states={ + self.THIRSTY: [ + ConversationHandler( + entry_points=self.entry_points, + states={ + self.BREWING: [CommandHandler('startCoding', self.code)], + }, + fallbacks=self.fallbacks, + ) + ] + }, + fallbacks=self.fallbacks, + conversation_timeout=100, + ) + assert len(recwarn) == 1 + assert str(recwarn[0].message) == ( + "Setting `conversation_timeout` may lead to unexpected " + "behaviour when using nested conversation handlers." + ) + def test_per_message_warning_is_only_shown_once(self, recwarn): ConversationHandler( entry_points=self.entry_points, From 4ee495512439294a8e7e07b855adf2bd9137f300 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sat, 6 Mar 2021 14:03:35 +0530 Subject: [PATCH 03/16] Add notes and test for conversation_timeout Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 52 +++++++++++++++++++---------- tests/test_conversationhandler.py | 50 +++++++++++++++++++++++++++ 2 files changed, 84 insertions(+), 18 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index bf238af8b9e..5f907a491a2 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -143,6 +143,12 @@ class ConversationHandler(Handler[Update]): received update and the corresponding ``context`` will be handled by ALL the handler's who's :attr:`check_update` method returns :obj:`True` that are in the state :attr:`ConversationHandler.TIMEOUT`. + + Note: + * `conversation_timeout` with `run_async` may fail, if the :obj:`Promise` takes longer to + finish. + * Using `conversation_timeout` with nested conversations may cause unexpected behaviour. + name (:obj:`str`, optional): The name for this conversationhandler. Required for persistence. persistent (:obj:`bool`, optional): If the conversations dict for this handler should be @@ -432,6 +438,20 @@ def _get_key(self, update: Update) -> Tuple[int, ...]: return tuple(key) + def _resovle_promomise(self, state: Tuple) -> object: + old_state, new_state = state + try: + res = new_state.result(0) + res = res if res is not None else old_state + except Exception as exc: + self.logger.exception("Promise function raised exception") + self.logger.exception("%s", exc) + res = old_state + finally: + if res is None and old_state is None: + res = self.END + return res + def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 """ Determines whether an update should be handled by this conversationhandler, and if so in @@ -464,21 +484,14 @@ def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0 if isinstance(state, tuple) and len(state) == 2 and isinstance(state[1], Promise): self.logger.debug('waiting for promise...') - old_state, new_state = state - if new_state.done.wait(0): - try: - res = new_state.result(0) - res = res if res is not None else old_state - except Exception as exc: - self.logger.exception("Promise function raised exception") - self.logger.exception("%s", exc) - res = old_state - finally: - if res is None and old_state is None: - res = self.END - self.update_state(res, key) - with self._conversations_lock: - state = self.conversations.get(key) + # check if promise is finished or not + if state[1].done.wait(0): + res = self._resovle_promomise(state) + self.update_state(res, key) + with self._conversations_lock: + state = self.conversations.get(key) + + # if not then handle WAITING state instead else: hdlrs = self.states.get(self.WAITING, []) for hdlr in hdlrs: @@ -621,9 +634,12 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No # current timeout handlers, make sure to not call timeout # handlers in this case state = self.conversations[context.conversation_key] - if isinstance(state, tuple) and isinstance(state[1], Promise): - if state[1].done and state[1].result() == self.END: - return + if ( + isinstance(state, tuple) + and isinstance(state[1], Promise) + and self._resovle_promomise(state) == self.END + ): + return with self._timeout_jobs_lock: found_job = self.timeout_jobs[context.conversation_key] diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index 834c16c61c2..0d79c034df8 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -789,6 +789,56 @@ def test_conversation_timeout(self, dp, bot, user1): sleep(0.7) assert handler.conversations.get((self.group.id, user1.id)) is None + def test_timeout_not_triggered_on_conv_end_async(self, bot, dp, user1): + def timeout(*a, **kw): + self.test_flag = True + + self.states.update({ConversationHandler.TIMEOUT: [TypeHandler(Update, timeout)]}) + handler = ConversationHandler( + entry_points=self.entry_points, + states=self.states, + fallbacks=self.fallbacks, + conversation_timeout=0.75, + run_async=True, + ) + dp.add_handler(handler) + + message = Message( + 0, + None, + self.group, + from_user=user1, + text='/start', + entities=[ + MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len('/start')) + ], + bot=bot, + ) + # start the conversation + dp.process_update(Update(update_id=0, message=message)) + sleep(0.1) + message.text = '/brew' + message.entities[0].length = len('/brew') + dp.process_update(Update(update_id=1, message=message)) + sleep(0.1) + message.text = '/pourCoffee' + message.entities[0].length = len('/pourCoffee') + dp.process_update(Update(update_id=2, message=message)) + sleep(0.1) + message.text = '/end' + message.entities[0].length = len('/end') + dp.process_update(Update(update_id=3, message=message)) + sleep(0.1) + message.text = 'resolve promise pls' + message.entities[0].length = len('resolve promise pls') + dp.process_update(Update(update_id=4, message=message)) + sleep(0.1) + # assert promise got resolved + assert handler.conversations.get((self.group.id, user1.id)) is None + sleep(1) + # assert timeout handler didn't got called + assert self.test_flag is False + def test_conversation_timeout_dispatcher_handler_stop(self, dp, bot, user1, caplog): handler = ConversationHandler( entry_points=self.entry_points, From 0c6c8f49b0c14a1e2b706ea24ed26afbdb613b70 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sat, 6 Mar 2021 14:19:07 +0530 Subject: [PATCH 04/16] Try to fix pre-commit Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 5f907a491a2..00169965944 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -145,9 +145,10 @@ class ConversationHandler(Handler[Update]): :attr:`ConversationHandler.TIMEOUT`. Note: - * `conversation_timeout` with `run_async` may fail, if the :obj:`Promise` takes longer to - finish. - * Using `conversation_timeout` with nested conversations may cause unexpected behaviour. + * `conversation_timeout` with `run_async` may fail, if the :obj:`Promise` + takes longer to finish. + * Using `conversation_timeout` with nested conversations may cause unexpected + behaviour. name (:obj:`str`, optional): The name for this conversationhandler. Required for persistence. @@ -450,7 +451,7 @@ def _resovle_promomise(self, state: Tuple) -> object: finally: if res is None and old_state is None: res = self.END - return res + return res def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 """ From 1d368cc0658f2dfa78967cee792ea6e0215e0485 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sun, 7 Mar 2021 09:03:13 +0530 Subject: [PATCH 05/16] Test promise exception Signed-off-by: starry69 --- tests/test_conversationhandler.py | 42 +++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index 0d79c034df8..fc4cab592ac 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -753,6 +753,48 @@ def test_all_update_types(self, dp, bot, user1): assert not handler.check_update(Update(0, pre_checkout_query=pre_checkout_query)) assert not handler.check_update(Update(0, shipping_query=shipping_query)) + def test_promise_exception(self, dp, bot, user1, caplog): + def conv_entry(*a, **kw): + return 1 + + def raise_error(*a, **kw): + raise Exception("Oh no") + + handler = ConversationHandler( + entry_points=[CommandHandler("start", conv_entry)], + states={1: [MessageHandler(Filters.all, raise_error)]}, + fallbacks=self.fallbacks, + run_async=True, + ) + dp.add_handler(handler) + + message = Message( + 0, + None, + self.group, + from_user=user1, + text='/start', + entities=[ + MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len('/start')) + ], + bot=bot, + ) + # start the conversation + dp.process_update(Update(update_id=0, message=message)) + sleep(0.1) + message.text = "error" + dp.process_update(Update(update_id=0, message=message)) + sleep(0.1) + message.text = "resolve promise pls" + caplog.clear() + with caplog.at_level(logging.ERROR): + dp.process_update(Update(update_id=0, message=message)) + sleep(0.5) + assert len(caplog.records) == 3 + assert caplog.records[0].message == "Promise function raised exception" + # assert res is old state + assert handler.conversations.get((self.group.id, user1.id))[0] == 1 + def test_conversation_timeout(self, dp, bot, user1): handler = ConversationHandler( entry_points=self.entry_points, From 38f5f21fd51e8a7729613ab3a01cebb9c85b11fe Mon Sep 17 00:00:00 2001 From: starry69 Date: Sun, 7 Mar 2021 09:48:38 +0530 Subject: [PATCH 06/16] Welp Signed-off-by: starry69 --- tests/test_conversationhandler.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index fc4cab592ac..7ebb5fc53a3 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -792,6 +792,7 @@ def raise_error(*a, **kw): sleep(0.5) assert len(caplog.records) == 3 assert caplog.records[0].message == "Promise function raised exception" + assert caplog.records[1].message == "Oh no" # assert res is old state assert handler.conversations.get((self.group.id, user1.id))[0] == 1 From e3cee4049a2532e4d6d53019953e8d22f01bbd10 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sun, 7 Mar 2021 15:49:03 +0530 Subject: [PATCH 07/16] improve docs Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 16 ++++++++++------ tests/test_conversationhandler.py | 22 ++++++++++++++++------ 2 files changed, 26 insertions(+), 12 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 00169965944..b3750304b00 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -145,10 +145,13 @@ class ConversationHandler(Handler[Update]): :attr:`ConversationHandler.TIMEOUT`. Note: - * `conversation_timeout` with `run_async` may fail, if the :obj:`Promise` - takes longer to finish. - * Using `conversation_timeout` with nested conversations may cause unexpected - behaviour. + * `conversation_timeout` with `run_async` may fail, if the + :class:`telegram.ext.utils.promise.Promise` takes longer to finish than the + timeout. + * Using `conversation_timeout` with nested conversations is currently not + supported. You can still try to use it, but it will likely behave differently + from what you expect. + name (:obj:`str`, optional): The name for this conversationhandler. Required for persistence. @@ -302,8 +305,9 @@ def __init__( for handler in all_handlers: if isinstance(handler, self.__class__): warnings.warn( - "Setting `conversation_timeout` may lead to unexpected " - "behaviour when using nested conversation handlers." + "Using `conversation_timeout` with nested conversations is currently not " + "supported. You can still try to use it, but it will likely behave " + "differently from what you expect." ) break diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index 7ebb5fc53a3..7b1110bc506 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -758,7 +758,7 @@ def conv_entry(*a, **kw): return 1 def raise_error(*a, **kw): - raise Exception("Oh no") + raise Exception("promise exception") handler = ConversationHandler( entry_points=[CommandHandler("start", conv_entry)], @@ -792,7 +792,7 @@ def raise_error(*a, **kw): sleep(0.5) assert len(caplog.records) == 3 assert caplog.records[0].message == "Promise function raised exception" - assert caplog.records[1].message == "Oh no" + assert caplog.records[1].message == "promise exception" # assert res is old state assert handler.conversations.get((self.group.id, user1.id))[0] == 1 @@ -1227,19 +1227,29 @@ def test_conversation_timeout_warning_only_shown_once(self, recwarn): ConversationHandler( entry_points=self.entry_points, states={ - self.BREWING: [CommandHandler('startCoding', self.code)], + self.BREWING: [CommandHandler('pourCoffee', self.drink)], }, fallbacks=self.fallbacks, ) - ] + ], + self.DRINKING: [ + ConversationHandler( + entry_points=self.entry_points, + states={ + self.CODING: [CommandHandler('startCoding', self.code)], + }, + fallbacks=self.fallbacks, + ) + ], }, fallbacks=self.fallbacks, conversation_timeout=100, ) assert len(recwarn) == 1 assert str(recwarn[0].message) == ( - "Setting `conversation_timeout` may lead to unexpected " - "behaviour when using nested conversation handlers." + "Using `conversation_timeout` with nested conversations is currently not " + "supported. You can still try to use it, but it will likely behave " + "differently from what you expect." ) def test_per_message_warning_is_only_shown_once(self, recwarn): From 448e74864881376d23841b0188c64aad4058ceba Mon Sep 17 00:00:00 2001 From: starry69 Date: Sun, 7 Mar 2021 20:26:36 +0530 Subject: [PATCH 08/16] typo Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index b3750304b00..21ca30d9471 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -443,7 +443,7 @@ def _get_key(self, update: Update) -> Tuple[int, ...]: return tuple(key) - def _resovle_promomise(self, state: Tuple) -> object: + def _resolve_promise(self, state: Tuple) -> object: old_state, new_state = state try: res = new_state.result(0) @@ -491,7 +491,7 @@ def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0 # check if promise is finished or not if state[1].done.wait(0): - res = self._resovle_promomise(state) + res = self._resolve_promise(state) self.update_state(res, key) with self._conversations_lock: state = self.conversations.get(key) @@ -642,7 +642,7 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No if ( isinstance(state, tuple) and isinstance(state[1], Promise) - and self._resovle_promomise(state) == self.END + and self._resolve_promise(state) == self.END ): return From 31896acca57c20aeb8deee478ac22028a4645a46 Mon Sep 17 00:00:00 2001 From: starry69 Date: Thu, 11 Mar 2021 09:42:43 +0530 Subject: [PATCH 09/16] try to fix codecov Signed-off-by: starry69 --- tests/test_conversationhandler.py | 9 +-------- 1 file changed, 1 insertion(+), 8 deletions(-) diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index 7b1110bc506..fd3f806eb18 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -841,7 +841,7 @@ def timeout(*a, **kw): entry_points=self.entry_points, states=self.states, fallbacks=self.fallbacks, - conversation_timeout=0.75, + conversation_timeout=0.5, run_async=True, ) dp.add_handler(handler) @@ -871,13 +871,6 @@ def timeout(*a, **kw): message.text = '/end' message.entities[0].length = len('/end') dp.process_update(Update(update_id=3, message=message)) - sleep(0.1) - message.text = 'resolve promise pls' - message.entities[0].length = len('resolve promise pls') - dp.process_update(Update(update_id=4, message=message)) - sleep(0.1) - # assert promise got resolved - assert handler.conversations.get((self.group.id, user1.id)) is None sleep(1) # assert timeout handler didn't got called assert self.test_flag is False From 1e5984e9f21017f14414a6d74586663f072d0af9 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sat, 20 Mar 2021 06:51:40 +0530 Subject: [PATCH 10/16] refactor timeout logic with promise.add_done_cb Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 48 +++++++++++++++++------------ telegram/ext/utils/promise.py | 19 +++++++++++- tests/test_promise.py | 13 ++++++++ 3 files changed, 60 insertions(+), 20 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 21ca30d9471..fa4e023715c 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -21,6 +21,7 @@ import logging import warnings +import functools from threading import Lock from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Tuple, cast, ClassVar @@ -457,6 +458,21 @@ def _resolve_promise(self, state: Tuple) -> object: res = self.END return res + def _schedule_job( + self, + new_state: object, + dispatcher: 'Dispatcher', + update: Update, + context: Optional[CallbackContext], + conversation_key: Tuple[int, ...], + ) -> None: + if new_state != self.END: + self.timeout_jobs[conversation_key] = dispatcher.job_queue.run_once( + self._trigger_timeout, # type: ignore[arg-type] + self.conversation_timeout, + context=_ConversationTimeoutContext(conversation_key, update, dispatcher, context), + ) + def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 """ Determines whether an update should be handled by this conversationhandler, and if so in @@ -578,15 +594,20 @@ def handle_update( # type: ignore[override] new_state = exception.state raise_dp_handler_stop = True with self._timeout_jobs_lock: - if self.conversation_timeout and new_state != self.END and dispatcher.job_queue: + if self.conversation_timeout and dispatcher.job_queue: # Add the new timeout job - self.timeout_jobs[conversation_key] = dispatcher.job_queue.run_once( - self._trigger_timeout, # type: ignore[arg-type] - self.conversation_timeout, - context=_ConversationTimeoutContext( - conversation_key, update, dispatcher, context - ), - ) + if isinstance(new_state, Promise): + new_state.add_done_callback( + functools.partial( + self._schedule_job, + dispatcher=dispatcher, + update=update, + context=context, + conversation_key=conversation_key, + ) + ) + else: + self._schedule_job(new_state, dispatcher, update, context, conversation_key) if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent: self.update_state(self.END, conversation_key) @@ -635,17 +656,6 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No context = job.context # type:ignore[union-attr,assignment] callback_context = context.callback_context - # async conversations can return CH.END without telling the - # current timeout handlers, make sure to not call timeout - # handlers in this case - state = self.conversations[context.conversation_key] - if ( - isinstance(state, tuple) - and isinstance(state[1], Promise) - and self._resolve_promise(state) == self.END - ): - return - with self._timeout_jobs_lock: found_job = self.timeout_jobs[context.conversation_key] if found_job is not job: diff --git a/telegram/ext/utils/promise.py b/telegram/ext/utils/promise.py index 60442686af5..4978835074c 100644 --- a/telegram/ext/utils/promise.py +++ b/telegram/ext/utils/promise.py @@ -69,6 +69,7 @@ def __init__( self.update = update self.error_handling = error_handling self.done = Event() + self._done_callback: Optional[Callable] = None self._result: Optional[RT] = None self._exception: Optional[Exception] = None @@ -76,7 +77,11 @@ def run(self) -> None: """Calls the :attr:`pooled_function` callable.""" try: - self._result = self.pooled_function(*self.args, **self.kwargs) + if self._done_callback: + self._result = self._done_callback(self.result()) + self._done_callback = None + else: + self._result = self.pooled_function(*self.args, **self.kwargs) except Exception as exc: self._exception = exc @@ -106,6 +111,18 @@ def result(self, timeout: float = None) -> Optional[RT]: raise self._exception # pylint: disable=raising-bad-type return self._result + def add_done_callback(self, callback: Callable) -> None: + """ + Callback to be run when :class:`telegram.ext.utils.promise.Promise` becomes done. + + Args: + callback (:obj:`callable`): The callable that will be called when promise is done. + """ + if self.done.wait(0): + callback(self.result()) + else: + self._done_callback = callback + @property def exception(self) -> Optional[Exception]: """The exception raised by :attr:`pooled_function` or ``None`` if no exception has been diff --git a/tests/test_promise.py b/tests/test_promise.py index 46e3d29b65b..688f03af9c2 100644 --- a/tests/test_promise.py +++ b/tests/test_promise.py @@ -63,3 +63,16 @@ def callback(): with pytest.raises(TelegramError, match='Error'): promise.result() + + def test_done_callback(self): + def callback(): + return "done!" + + def done_callback(_): + self.test_flag = True + + promise = Promise(callback, [], {}) + promise.run() + promise.add_done_callback(done_callback) + assert promise.result() == "done!" + assert self.test_flag is True From 23dddfe5faa99177829ba908b7c31630e552100c Mon Sep 17 00:00:00 2001 From: starry69 Date: Sat, 20 Mar 2021 20:49:59 +0530 Subject: [PATCH 11/16] small fix Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 6 ++++-- telegram/ext/utils/promise.py | 8 +++----- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index fa4e023715c..36b8c18b013 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -467,9 +467,11 @@ def _schedule_job( conversation_key: Tuple[int, ...], ) -> None: if new_state != self.END: - self.timeout_jobs[conversation_key] = dispatcher.job_queue.run_once( + self.timeout_jobs[ + conversation_key + ] = dispatcher.job_queue.run_once( # type:ignore[union-attr] self._trigger_timeout, # type: ignore[arg-type] - self.conversation_timeout, + self.conversation_timeout, # type: ignore[arg-type] context=_ConversationTimeoutContext(conversation_key, update, dispatcher, context), ) diff --git a/telegram/ext/utils/promise.py b/telegram/ext/utils/promise.py index 4978835074c..8a91f10d62b 100644 --- a/telegram/ext/utils/promise.py +++ b/telegram/ext/utils/promise.py @@ -77,17 +77,15 @@ def run(self) -> None: """Calls the :attr:`pooled_function` callable.""" try: - if self._done_callback: - self._result = self._done_callback(self.result()) - self._done_callback = None - else: - self._result = self.pooled_function(*self.args, **self.kwargs) + self._result = self.pooled_function(*self.args, **self.kwargs) except Exception as exc: self._exception = exc finally: self.done.set() + if self._done_callback: + self._done_callback(self.result()) def __call__(self) -> None: self.run() From a1e858c93507dd68de56daed6eae843787539da0 Mon Sep 17 00:00:00 2001 From: starry69 Date: Mon, 22 Mar 2021 15:22:01 +0530 Subject: [PATCH 12/16] Address review Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 61 ++++++++++++++----------- telegram/ext/utils/promise.py | 7 ++- tests/test_conversationhandler.py | 70 +++++++++++++++++++++++++++++ tests/test_promise.py | 44 +++++++++++++++++- 4 files changed, 155 insertions(+), 27 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 36b8c18b013..01ffaf4cac4 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -22,8 +22,9 @@ import logging import warnings import functools +import datetime from threading import Lock -from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Tuple, cast, ClassVar +from typing import TYPE_CHECKING, Dict, List, NoReturn, Optional, Union, Tuple, cast, ClassVar from telegram import Update from telegram.ext import ( @@ -146,9 +147,6 @@ class ConversationHandler(Handler[Update]): :attr:`ConversationHandler.TIMEOUT`. Note: - * `conversation_timeout` with `run_async` may fail, if the - :class:`telegram.ext.utils.promise.Promise` takes longer to finish than the - timeout. * Using `conversation_timeout` with nested conversations is currently not supported. You can still try to use it, but it will likely behave differently from what you expect. @@ -226,7 +224,7 @@ def __init__( per_chat: bool = True, per_user: bool = True, per_message: bool = False, - conversation_timeout: int = None, + conversation_timeout: Union[float, datetime.timedelta] = None, name: str = None, persistent: bool = False, map_to_parent: Dict[object, object] = None, @@ -373,7 +371,9 @@ def per_message(self, value: object) -> NoReturn: raise ValueError('You can not assign a new value to per_message after initialization.') @property - def conversation_timeout(self) -> Optional[int]: + def conversation_timeout( + self, + ) -> Optional[Union[float, datetime.timedelta]]: return self._conversation_timeout @conversation_timeout.setter @@ -467,13 +467,19 @@ def _schedule_job( conversation_key: Tuple[int, ...], ) -> None: if new_state != self.END: - self.timeout_jobs[ - conversation_key - ] = dispatcher.job_queue.run_once( # type:ignore[union-attr] - self._trigger_timeout, # type: ignore[arg-type] - self.conversation_timeout, # type: ignore[arg-type] - context=_ConversationTimeoutContext(conversation_key, update, dispatcher, context), - ) + try: + # both job_queue & conversation_timeout are checked before calling _schedule_job + j_queue = dispatcher.job_queue + self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr] + self._trigger_timeout, # type: ignore[arg-type] + self.conversation_timeout, # type: ignore[arg-type] + context=_ConversationTimeoutContext( + conversation_key, update, dispatcher, context + ), + ) + except Exception as exc: + self.logger.exception("Failed to add timeout job due to exception") + self.logger.exception("%s", exc) def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 """ @@ -596,20 +602,25 @@ def handle_update( # type: ignore[override] new_state = exception.state raise_dp_handler_stop = True with self._timeout_jobs_lock: - if self.conversation_timeout and dispatcher.job_queue: - # Add the new timeout job - if isinstance(new_state, Promise): - new_state.add_done_callback( - functools.partial( - self._schedule_job, - dispatcher=dispatcher, - update=update, - context=context, - conversation_key=conversation_key, + if self.conversation_timeout: + if dispatcher.job_queue is not None: + # Add the new timeout job + if isinstance(new_state, Promise): + new_state.add_done_callback( + functools.partial( + self._schedule_job, + dispatcher=dispatcher, + update=update, + context=context, + conversation_key=conversation_key, + ) + ) + else: + self._schedule_job( + new_state, dispatcher, update, context, conversation_key ) - ) else: - self._schedule_job(new_state, dispatcher, update, context, conversation_key) + self.logger.warning("`conversation_timeout` can't work without JobQueue!") if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent: self.update_state(self.END, conversation_key) diff --git a/telegram/ext/utils/promise.py b/telegram/ext/utils/promise.py index 8a91f10d62b..0fd01f9a2f2 100644 --- a/telegram/ext/utils/promise.py +++ b/telegram/ext/utils/promise.py @@ -85,7 +85,10 @@ def run(self) -> None: finally: self.done.set() if self._done_callback: - self._done_callback(self.result()) + try: + self._done_callback(self.result()) + except Exception: + pass def __call__(self) -> None: self.run() @@ -115,6 +118,8 @@ def add_done_callback(self, callback: Callable) -> None: Args: callback (:obj:`callable`): The callable that will be called when promise is done. + callback will be called by passing ``Promise.result()`` as only positional argument. + """ if self.done.wait(0): callback(self.result()) diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index fd3f806eb18..f0f422e2a1b 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -753,7 +753,77 @@ def test_all_update_types(self, dp, bot, user1): assert not handler.check_update(Update(0, pre_checkout_query=pre_checkout_query)) assert not handler.check_update(Update(0, shipping_query=shipping_query)) + def test_no_jobqueue_warning(self, dp, bot, user1, caplog): + handler = ConversationHandler( + entry_points=self.entry_points, + states=self.states, + fallbacks=self.fallbacks, + conversation_timeout=0.5, + ) + # save dp.job_queue in temp variable jqueue + # and then set dp.job_queue to None. + jqueue = dp.job_queue + dp.job_queue = None + dp.add_handler(handler) + + message = Message( + 0, + None, + self.group, + from_user=user1, + text='/start', + entities=[ + MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len('/start')) + ], + bot=bot, + ) + + with caplog.at_level(logging.WARNING): + dp.process_update(Update(update_id=0, message=message)) + sleep(0.5) + assert len(caplog.records) == 1 + assert caplog.records[0].message == "`conversation_timeout` can't work without JobQueue!" + # now set dp.job_queue back to it's original value + dp.job_queue = jqueue + + def test_schedule_job_exception(self, dp, bot, user1, monkeypatch, caplog): + def mocked_run_once(*a, **kw): + raise Exception("job error") + + monkeypatch.setattr(dp.job_queue, "run_once", mocked_run_once) + handler = ConversationHandler( + entry_points=self.entry_points, + states=self.states, + fallbacks=self.fallbacks, + conversation_timeout=100, + ) + dp.add_handler(handler) + + message = Message( + 0, + None, + self.group, + from_user=user1, + text='/start', + entities=[ + MessageEntity(type=MessageEntity.BOT_COMMAND, offset=0, length=len('/start')) + ], + bot=bot, + ) + + with caplog.at_level(logging.ERROR): + dp.process_update(Update(update_id=0, message=message)) + sleep(0.5) + assert len(caplog.records) == 2 + assert caplog.records[0].message == "Failed to add timeout job due to exception" + assert caplog.records[1].message == "job error" + def test_promise_exception(self, dp, bot, user1, caplog): + """ + Here we make sure that when a run_async handle raises an + exception, the state isn't changed. + """ + def conv_entry(*a, **kw): return 1 diff --git a/tests/test_promise.py b/tests/test_promise.py index 688f03af9c2..7bb1f83e7e1 100644 --- a/tests/test_promise.py +++ b/tests/test_promise.py @@ -64,7 +64,7 @@ def callback(): with pytest.raises(TelegramError, match='Error'): promise.result() - def test_done_callback(self): + def test_done_cb_after_run(self): def callback(): return "done!" @@ -76,3 +76,45 @@ def done_callback(_): promise.add_done_callback(done_callback) assert promise.result() == "done!" assert self.test_flag is True + + def test_done_cb_after_run_excp(self): + def callback(): + return "done!" + + def done_callback(_): + raise Exception("Error!") + + promise = Promise(callback, [], {}) + promise.run() + assert promise.result() == "done!" + with pytest.raises(Exception) as err: + promise.add_done_callback(done_callback) + assert str(err) == "Error!" + + def test_done_cb_before_run(self): + def callback(): + return "done!" + + def done_callback(_): + self.test_flag = True + + promise = Promise(callback, [], {}) + promise.add_done_callback(done_callback) + assert promise.result(0) != "done!" + assert self.test_flag is False + promise.run() + assert promise.result() == "done!" + assert self.test_flag is True + + def test_done_cb_before_run_excp(self): + def callback(): + return "done!" + + def done_callback(_): + raise Exception("Error!") + + promise = Promise(callback, [], {}) + promise.add_done_callback(done_callback) + assert promise.result(0) != "done!" + promise.run() + assert promise.result() == "done!" From d8555d5c5762a8b7ad529ee3b0cf2304bc4b9e79 Mon Sep 17 00:00:00 2001 From: Hinrich Mahler Date: Wed, 24 Mar 2021 08:11:38 +0100 Subject: [PATCH 13/16] Fix some type hinting --- telegram/ext/conversationhandler.py | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 01ffaf4cac4..8799675c477 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -471,7 +471,7 @@ def _schedule_job( # both job_queue & conversation_timeout are checked before calling _schedule_job j_queue = dispatcher.job_queue self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr] - self._trigger_timeout, # type: ignore[arg-type] + self._trigger_timeout, self.conversation_timeout, # type: ignore[arg-type] context=_ConversationTimeoutContext( conversation_key, update, dispatcher, context @@ -658,35 +658,35 @@ def update_state(self, new_state: object, key: Tuple[int, ...]) -> None: if self.persistent and self.persistence and self.name: self.persistence.update_conversation(self.name, key, new_state) - def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = None) -> None: + def _trigger_timeout(self, context: CallbackContext, job: 'Job' = None) -> None: self.logger.debug('conversation timeout was triggered!') # Backward compatibility with bots that do not use CallbackContext - callback_context = None if isinstance(context, CallbackContext): - job = context.job + ctxt = cast( + _ConversationTimeoutContext, context.job.context # type: ignore[union-attr] + ) + else: + ctxt = cast(_ConversationTimeoutContext, job.context) - context = job.context # type:ignore[union-attr,assignment] - callback_context = context.callback_context + callback_context = ctxt.callback_context with self._timeout_jobs_lock: - found_job = self.timeout_jobs[context.conversation_key] + found_job = self.timeout_jobs[ctxt.conversation_key] if found_job is not job: # The timeout has been cancelled in handle_update return - del self.timeout_jobs[context.conversation_key] + del self.timeout_jobs[ctxt.conversation_key] handlers = self.states.get(self.TIMEOUT, []) for handler in handlers: - check = handler.check_update(context.update) + check = handler.check_update(ctxt.update) if check is not None and check is not False: try: - handler.handle_update( - context.update, context.dispatcher, check, callback_context - ) + handler.handle_update(ctxt.update, ctxt.dispatcher, check, callback_context) except DispatcherHandlerStop: self.logger.warning( 'DispatcherHandlerStop in TIMEOUT state of ' 'ConversationHandler has no effect. Ignoring.' ) - self.update_state(self.END, context.conversation_key) + self.update_state(self.END, ctxt.conversation_key) From ef9815e48ec86f01cc019fe506f4ae985dbbd85b Mon Sep 17 00:00:00 2001 From: starry69 Date: Wed, 24 Mar 2021 13:34:02 +0530 Subject: [PATCH 14/16] Few fixes Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 33 +++++++++++++++-------------- tests/test_conversationhandler.py | 10 +++++++-- tests/test_promise.py | 5 ++++- 3 files changed, 29 insertions(+), 19 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 01ffaf4cac4..ee8b45665cf 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -147,9 +147,9 @@ class ConversationHandler(Handler[Update]): :attr:`ConversationHandler.TIMEOUT`. Note: - * Using `conversation_timeout` with nested conversations is currently not - supported. You can still try to use it, but it will likely behave differently - from what you expect. + Using `conversation_timeout` with nested conversations is currently not + supported. You can still try to use it, but it will likely behave differently + from what you expect. name (:obj:`str`, optional): The name for this conversationhandler. Required for @@ -471,14 +471,16 @@ def _schedule_job( # both job_queue & conversation_timeout are checked before calling _schedule_job j_queue = dispatcher.job_queue self.timeout_jobs[conversation_key] = j_queue.run_once( # type: ignore[union-attr] - self._trigger_timeout, # type: ignore[arg-type] + self._trigger_timeout, self.conversation_timeout, # type: ignore[arg-type] context=_ConversationTimeoutContext( conversation_key, update, dispatcher, context ), ) except Exception as exc: - self.logger.exception("Failed to add timeout job due to exception") + self.logger.exception( + "Failed to schedule timeout job due to the following exception:" + ) self.logger.exception("%s", exc) def check_update(self, update: object) -> CheckUpdateType: # pylint: disable=R0911 @@ -620,7 +622,9 @@ def handle_update( # type: ignore[override] new_state, dispatcher, update, context, conversation_key ) else: - self.logger.warning("`conversation_timeout` can't work without JobQueue!") + self.logger.warning( + "Ignoring `conversation_timeout` because the Dispatcher has no JobQueue." + ) if isinstance(self.map_to_parent, dict) and new_state in self.map_to_parent: self.update_state(self.END, conversation_key) @@ -658,7 +662,7 @@ def update_state(self, new_state: object, key: Tuple[int, ...]) -> None: if self.persistent and self.persistence and self.name: self.persistence.update_conversation(self.name, key, new_state) - def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = None) -> None: + def _trigger_timeout(self, context: CallbackContext, job: 'Job' = None) -> None: self.logger.debug('conversation timeout was triggered!') # Backward compatibility with bots that do not use CallbackContext @@ -666,27 +670,24 @@ def _trigger_timeout(self, context: _ConversationTimeoutContext, job: 'Job' = No if isinstance(context, CallbackContext): job = context.job - context = job.context # type:ignore[union-attr,assignment] - callback_context = context.callback_context + callback_context = cntxt.callback_context with self._timeout_jobs_lock: - found_job = self.timeout_jobs[context.conversation_key] + found_job = self.timeout_jobs[cntxt.conversation_key] if found_job is not job: # The timeout has been cancelled in handle_update return - del self.timeout_jobs[context.conversation_key] + del self.timeout_jobs[cntxt.conversation_key] handlers = self.states.get(self.TIMEOUT, []) for handler in handlers: - check = handler.check_update(context.update) + check = handler.check_update(cntxt.update) if check is not None and check is not False: try: - handler.handle_update( - context.update, context.dispatcher, check, callback_context - ) + handler.handle_update(cntxt.update, cntxt.dispatcher, check, callback_context) except DispatcherHandlerStop: self.logger.warning( 'DispatcherHandlerStop in TIMEOUT state of ' 'ConversationHandler has no effect. Ignoring.' ) - self.update_state(self.END, context.conversation_key) + self.update_state(self.END, cntxt.conversation_key) diff --git a/tests/test_conversationhandler.py b/tests/test_conversationhandler.py index f0f422e2a1b..f8e73dc4346 100644 --- a/tests/test_conversationhandler.py +++ b/tests/test_conversationhandler.py @@ -782,7 +782,10 @@ def test_no_jobqueue_warning(self, dp, bot, user1, caplog): dp.process_update(Update(update_id=0, message=message)) sleep(0.5) assert len(caplog.records) == 1 - assert caplog.records[0].message == "`conversation_timeout` can't work without JobQueue!" + assert ( + caplog.records[0].message + == "Ignoring `conversation_timeout` because the Dispatcher has no JobQueue." + ) # now set dp.job_queue back to it's original value dp.job_queue = jqueue @@ -815,7 +818,10 @@ def mocked_run_once(*a, **kw): dp.process_update(Update(update_id=0, message=message)) sleep(0.5) assert len(caplog.records) == 2 - assert caplog.records[0].message == "Failed to add timeout job due to exception" + assert ( + caplog.records[0].message + == "Failed to schedule timeout job due to the following exception:" + ) assert caplog.records[1].message == "job error" def test_promise_exception(self, dp, bot, user1, caplog): diff --git a/tests/test_promise.py b/tests/test_promise.py index 7bb1f83e7e1..103358ca298 100644 --- a/tests/test_promise.py +++ b/tests/test_promise.py @@ -116,5 +116,8 @@ def done_callback(_): promise = Promise(callback, [], {}) promise.add_done_callback(done_callback) assert promise.result(0) != "done!" - promise.run() + try: + promise.run() + except Exception: + pytest.fail() assert promise.result() == "done!" From aeceab6b69ff8c0fd73048808be6ee01b1cb98fc Mon Sep 17 00:00:00 2001 From: starry69 Date: Wed, 24 Mar 2021 15:06:59 +0530 Subject: [PATCH 15/16] fix tests Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index 77a7882ff7e..e1d7704a25c 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -667,9 +667,8 @@ def _trigger_timeout(self, context: CallbackContext, job: 'Job' = None) -> None: # Backward compatibility with bots that do not use CallbackContext if isinstance(context, CallbackContext): - ctxt = cast( - _ConversationTimeoutContext, context.job.context # type: ignore[union-attr] - ) + job = context.job + ctxt = cast(_ConversationTimeoutContext, job.context) # type: ignore[union-attr] else: ctxt = cast(_ConversationTimeoutContext, job.context) From 9eba935b113524fea330f7fb17fcfd8db660a3c0 Mon Sep 17 00:00:00 2001 From: starry69 Date: Sat, 27 Mar 2021 14:38:00 +0530 Subject: [PATCH 16/16] minor nitpick Signed-off-by: starry69 --- telegram/ext/conversationhandler.py | 2 +- telegram/ext/utils/promise.py | 8 ++++++-- tests/test_promise.py | 14 ++++++++++---- 3 files changed, 17 insertions(+), 7 deletions(-) diff --git a/telegram/ext/conversationhandler.py b/telegram/ext/conversationhandler.py index e1d7704a25c..b2f1c91bf97 100644 --- a/telegram/ext/conversationhandler.py +++ b/telegram/ext/conversationhandler.py @@ -617,7 +617,7 @@ def handle_update( # type: ignore[override] conversation_key=conversation_key, ) ) - else: + elif new_state != self.END: self._schedule_job( new_state, dispatcher, update, context, conversation_key ) diff --git a/telegram/ext/utils/promise.py b/telegram/ext/utils/promise.py index 0fd01f9a2f2..48508e0747d 100644 --- a/telegram/ext/utils/promise.py +++ b/telegram/ext/utils/promise.py @@ -87,8 +87,12 @@ def run(self) -> None: if self._done_callback: try: self._done_callback(self.result()) - except Exception: - pass + except Exception as exc: + logger.warning( + "`done_callback` of a Promise raised the following exception." + " The exception won't be handled by error handlers." + ) + logger.warning("Full traceback:", exc_info=exc) def __call__(self) -> None: self.run() diff --git a/tests/test_promise.py b/tests/test_promise.py index 103358ca298..a0768b5c63e 100644 --- a/tests/test_promise.py +++ b/tests/test_promise.py @@ -16,6 +16,7 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +import logging import pytest from telegram import TelegramError @@ -106,7 +107,7 @@ def done_callback(_): assert promise.result() == "done!" assert self.test_flag is True - def test_done_cb_before_run_excp(self): + def test_done_cb_before_run_excp(self, caplog): def callback(): return "done!" @@ -116,8 +117,13 @@ def done_callback(_): promise = Promise(callback, [], {}) promise.add_done_callback(done_callback) assert promise.result(0) != "done!" - try: + caplog.clear() + with caplog.at_level(logging.WARNING): promise.run() - except Exception: - pytest.fail() + assert len(caplog.records) == 2 + assert caplog.records[0].message == ( + "`done_callback` of a Promise raised the following exception." + " The exception won't be handled by error handlers." + ) + assert caplog.records[1].message.startswith("Full traceback:") assert promise.result() == "done!"