diff --git a/docs/source/telegram.ext.delayqueue.rst b/docs/source/telegram.ext.delayqueue.rst index ee79b849478..7426a923c74 100644 --- a/docs/source/telegram.ext.delayqueue.rst +++ b/docs/source/telegram.ext.delayqueue.rst @@ -4,4 +4,3 @@ telegram.ext.DelayQueue .. autoclass:: telegram.ext.DelayQueue :members: :show-inheritance: - :special-members: diff --git a/docs/source/telegram.ext.messagequeue.rst b/docs/source/telegram.ext.messagequeue.rst index 98bcb6e6357..f9ff9721044 100644 --- a/docs/source/telegram.ext.messagequeue.rst +++ b/docs/source/telegram.ext.messagequeue.rst @@ -4,4 +4,3 @@ telegram.ext.MessageQueue .. autoclass:: telegram.ext.MessageQueue :members: :show-inheritance: - :special-members: diff --git a/setup.cfg b/setup.cfg index 1aebfe10b12..4b750862aa0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -20,6 +20,7 @@ ignore=vendor [pylint.message-control] disable = C0330,R0801,R0913,R0904,R0903,R0902,W0511,C0116,C0115,W0703,R0914,R0914,C0302,R0912,R0915,R0401 +ignored-argument-names = _.*|^ignored_|^unused_|delay_queue [tool:pytest] testpaths = tests diff --git a/telegram/bot.py b/telegram/bot.py index c74e40121e8..a10d5cb76a3 100644 --- a/telegram/bot.py +++ b/telegram/bot.py @@ -36,6 +36,7 @@ TypeVar, Union, no_type_check, + cast, ) from decorator import decorate @@ -103,7 +104,7 @@ from telegram.utils.types import FileInput, JSONDict if TYPE_CHECKING: - from telegram.ext import Defaults + from telegram.ext import Defaults, MessageQueue RT = TypeVar('RT') @@ -113,10 +114,10 @@ def info(func: Callable[..., RT]) -> Callable[..., RT]: @functools.wraps(func) def decorator(self: 'Bot', *args: Any, **kwargs: Any) -> RT: if not self.bot: - self.get_me() + self.get_me(delay_queue=None) if self._commands is None: - self.get_my_commands() + self.get_my_commands(delay_queue=None) result = func(self, *args, **kwargs) return result @@ -124,12 +125,10 @@ def decorator(self: 'Bot', *args: Any, **kwargs: Any) -> RT: return decorator -def log( - func: Callable[..., RT], *args: Any, **kwargs: Any # pylint: disable=W0613 -) -> Callable[..., RT]: +def log(func: Callable[..., RT]) -> Callable[..., RT]: logger = logging.getLogger(func.__module__) - def decorator(self: 'Bot', *args: Any, **kwargs: Any) -> RT: # pylint: disable=W0613 + def decorator(_: Callable, *args: Any, **kwargs: Any) -> RT: # pylint: disable=W0613 logger.debug('Entering: %s', func.__name__) result = func(*args, **kwargs) logger.debug(result) @@ -139,6 +138,60 @@ def decorator(self: 'Bot', *args: Any, **kwargs: Any) -> RT: # pylint: disable= return decorate(func, decorator) +def mq(func: Callable[..., RT]) -> Callable[..., RT]: + logger = logging.getLogger(func.__module__) + + def decorator(_: Callable, *args: Any, **kwargs: Any) -> RT: + self = cast('Bot', args[0]) + arg_spec = inspect.getfullargspec(func) + idx = arg_spec.args.index('delay_queue') + delay_queue = args[idx] + + if not self.message_queue or not self.message_queue.running: + if delay_queue: + logger.warning( + 'Ignoring call to MessageQueue, because it is either not set or not running.' + ) + return func(*args, **kwargs) + + if not delay_queue: + return func(*args, **kwargs) + + if delay_queue == self.message_queue.DEFAULT_QUEUE_NAME: + # For default queue, check if we're in a group setting or not + chat_id: Union[str, int] = '' + if 'chat_id' in arg_spec.args: + idx = arg_spec.args.index('chat_id') + chat_id = args[idx] + + if not chat_id: + is_group = False + elif isinstance(chat_id, str) and chat_id.startswith('@'): + is_group = True + else: + try: + is_group = int(chat_id) < 0 + except ValueError: + is_group = False + + logger.debug( + 'Processing MessageQueue call with chat id %s through the %s queue.', + chat_id, + 'group' if is_group else 'default', + ) + + queue = self.message_queue.GROUP_QUEUE_NAME if is_group else delay_queue + return self.message_queue.put( # type: ignore[return-value] + func, queue, *args, **kwargs + ) + + return self.message_queue.put( # type: ignore[return-value] + func, delay_queue, *args, **kwargs + ) + + return decorate(func, decorator) + + class Bot(TelegramObject): """This object represents a Telegram Bot. @@ -152,12 +205,19 @@ class Bot(TelegramObject): private_key_password (:obj:`bytes`, optional): Password for above private key. defaults (:class:`telegram.ext.Defaults`, optional): An object containing default values to be used if not set explicitly in the bot methods. + message_queue (:class:`telegram.ext.MessageQueue`, optional): A message queue to pass + requests through in order to avoid flood limits. Note: - Most bot methods have the argument ``api_kwargs`` which allows to pass arbitrary keywords - to the Telegram API. This can be used to access new features of the API before they were - incorporated into PTB. However, this is not guaranteed to work, i.e. it will fail for - passing files. + * Most bot methods have the argument ``api_kwargs`` which allows to pass arbitrary keywords + to the Telegram API. This can be used to access new features of the API before they were + incorporated into PTB. However, this is not guaranteed to work, i.e. it will fail for + passing files. + * Most bot methods have the argument ``delay_queue`` which allows you to pass the request + to the specified delay queue of the :attr:`message_queue`. This will have an effect only, + if the :class:`telegram.ext.MessageQueue` is set and running. When passing a request + through the :attr:`message_queue`, the bot method will return a + :class:`telegram.utils.Promise` instead of the documented return value. """ @@ -186,6 +246,9 @@ def __new__(cls, *args: Any, **kwargs: Any) -> 'Bot': # pylint: disable=W0613 for kwarg_name in needs_default if (getattr(defaults, kwarg_name) is not DEFAULT_NONE) } + # ... do some special casing for delay_queue because that may depend on the method + if 'delay_queue' in default_kwargs: + default_kwargs['delay_queue'] = defaults.delay_queue_per_method[method_name] # ... apply the defaults using a partial if default_kwargs: setattr(instance, method_name, functools.partial(method, **default_kwargs)) @@ -201,11 +264,12 @@ def __init__( private_key: bytes = None, private_key_password: bytes = None, defaults: 'Defaults' = None, + message_queue: 'MessageQueue' = None, ): self.token = self._validate_token(token) - # Gather default self.defaults = defaults + self.message_queue = message_queue if base_url is None: base_url = 'https://api.telegram.org/bot' @@ -370,7 +434,10 @@ def name(self) -> str: return f'@{self.username}' @log - def get_me(self, timeout: int = None, api_kwargs: JSONDict = None) -> Optional[User]: + @mq + def get_me( + self, timeout: int = None, api_kwargs: JSONDict = None, delay_queue: str = None + ) -> Optional[User]: """A simple method for testing your bot's auth token. Requires no parameters. Args: @@ -379,6 +446,8 @@ def get_me(self, timeout: int = None, api_kwargs: JSONDict = None) -> Optional[U the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.User`: A :class:`telegram.User` instance representing that bot if the @@ -395,6 +464,7 @@ def get_me(self, timeout: int = None, api_kwargs: JSONDict = None) -> Optional[U return self.bot @log + @mq def send_message( self, chat_id: Union[int, str], @@ -408,6 +478,7 @@ def send_message( api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send text messages. @@ -437,6 +508,8 @@ def send_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent message is returned. @@ -466,12 +539,14 @@ def send_message( ) @log + @mq def delete_message( self, chat_id: Union[str, int], message_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to delete a message, including service messages, with the following @@ -496,6 +571,8 @@ def delete_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -511,6 +588,7 @@ def delete_message( return result # type: ignore[return-value] @log + @mq def forward_message( self, chat_id: Union[int, str], @@ -519,6 +597,7 @@ def forward_message( disable_notification: bool = False, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to forward messages of any kind. @@ -535,6 +614,8 @@ def forward_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -561,6 +642,7 @@ def forward_message( ) @log + @mq def send_photo( self, chat_id: int, @@ -575,6 +657,7 @@ def send_photo( allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send photos. @@ -619,6 +702,8 @@ def send_photo( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -651,6 +736,7 @@ def send_photo( ) @log + @mq def send_audio( self, chat_id: Union[int, str], @@ -669,6 +755,7 @@ def send_audio( allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send audio files, if you want Telegram clients to display them in the @@ -732,6 +819,8 @@ def send_audio( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -772,6 +861,7 @@ def send_audio( ) @log + @mq def send_document( self, chat_id: Union[int, str], @@ -788,6 +878,7 @@ def send_document( disable_content_type_detection: bool = None, allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send general files. @@ -845,6 +936,8 @@ def send_document( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -881,6 +974,7 @@ def send_document( ) @log + @mq def send_sticker( self, chat_id: Union[int, str], @@ -891,6 +985,7 @@ def send_sticker( timeout: float = 20, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send static .WEBP or animated .TGS stickers. @@ -923,6 +1018,8 @@ def send_sticker( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -945,6 +1042,7 @@ def send_sticker( ) @log + @mq def send_video( self, chat_id: Union[int, str], @@ -964,6 +1062,7 @@ def send_video( allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send video files, Telegram clients support mp4 videos @@ -1030,6 +1129,8 @@ def send_video( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1072,6 +1173,7 @@ def send_video( ) @log + @mq def send_video_note( self, chat_id: Union[int, str], @@ -1086,6 +1188,7 @@ def send_video_note( api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """ As of v.4.0, Telegram clients support rounded square mp4 videos of up to 1 minute long. @@ -1139,6 +1242,8 @@ def send_video_note( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1171,6 +1276,7 @@ def send_video_note( ) @log + @mq def send_animation( self, chat_id: Union[int, str], @@ -1189,6 +1295,7 @@ def send_animation( allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send animation files (GIF or H.264/MPEG-4 AVC video without sound). @@ -1249,6 +1356,8 @@ def send_animation( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1289,6 +1398,7 @@ def send_animation( ) @log + @mq def send_voice( self, chat_id: Union[int, str], @@ -1304,6 +1414,7 @@ def send_voice( allow_sending_without_reply: bool = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, filename: str = None, + delay_queue: str = None, ) -> Optional[Message]: """ Use this method to send audio files, if you want Telegram clients to display the file @@ -1353,6 +1464,8 @@ def send_voice( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1387,6 +1500,7 @@ def send_voice( ) @log + @mq def send_media_group( self, chat_id: Union[int, str], @@ -1396,6 +1510,7 @@ def send_media_group( timeout: float = 20, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> List[Optional[Message]]: """Use this method to send a group of photos or videos as an album. @@ -1414,6 +1529,8 @@ def send_media_group( timeout (:obj:`int` | :obj:`float`, optional): Send file timeout (default: 20 seconds). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: List[:class:`telegram.Message`]: An array of the sent Messages. @@ -1446,6 +1563,7 @@ def send_media_group( return [Message.de_json(res, self) for res in result] # type: ignore @log + @mq def send_location( self, chat_id: Union[int, str], @@ -1462,6 +1580,7 @@ def send_location( heading: int = None, proximity_alert_radius: int = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send point on the map. @@ -1497,6 +1616,8 @@ def send_location( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1542,6 +1663,7 @@ def send_location( ) @log + @mq def edit_message_live_location( self, chat_id: Union[str, int] = None, @@ -1556,6 +1678,7 @@ def edit_message_live_location( horizontal_accuracy: float = None, heading: int = None, proximity_alert_radius: int = None, + delay_queue: str = None, ) -> Union[Optional[Message], bool]: """Use this method to edit live location messages sent by the bot or via the bot (for inline bots). A location can be edited until its :attr:`live_period` expires or @@ -1589,6 +1712,8 @@ def edit_message_live_location( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is not an inline message, the @@ -1631,6 +1756,7 @@ def edit_message_live_location( ) @log + @mq def stop_message_live_location( self, chat_id: Union[str, int] = None, @@ -1639,6 +1765,7 @@ def stop_message_live_location( reply_markup: InlineKeyboardMarkup = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Union[Optional[Message], bool]: """Use this method to stop updating a live location message sent by the bot or via the bot (for inline bots) before live_period expires. @@ -1658,6 +1785,8 @@ def stop_message_live_location( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is sent by the bot, the @@ -1681,6 +1810,7 @@ def stop_message_live_location( ) @log + @mq def send_venue( self, chat_id: Union[int, str], @@ -1699,6 +1829,7 @@ def send_venue( google_place_id: str = None, google_place_type: str = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send information about a venue. @@ -1740,6 +1871,8 @@ def send_venue( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1793,6 +1926,7 @@ def send_venue( ) @log + @mq def send_contact( self, chat_id: Union[int, str], @@ -1807,6 +1941,7 @@ def send_contact( vcard: str = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send phone contacts. @@ -1837,6 +1972,8 @@ def send_contact( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1879,6 +2016,7 @@ def send_contact( ) @log + @mq def send_game( self, chat_id: Union[int, str], @@ -1889,6 +2027,7 @@ def send_game( timeout: float = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Optional[Message]: """Use this method to send a game. @@ -1911,6 +2050,8 @@ def send_game( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -1933,12 +2074,14 @@ def send_game( ) @log + @mq def send_chat_action( self, chat_id: Union[str, int], action: str, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method when you need to tell the user that something is happening on the bot's @@ -1957,6 +2100,8 @@ def send_chat_action( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -1972,6 +2117,7 @@ def send_chat_action( return result # type: ignore[return-value] @log + @mq def answer_inline_query( self, inline_query_id: str, @@ -1984,6 +2130,7 @@ def answer_inline_query( timeout: float = None, current_offset: str = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to send answers to an inline query. No more than 50 results per query are @@ -2025,6 +2172,8 @@ def answer_inline_query( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Example: An inline bot that sends YouTube videos can ask the user to connect the bot to their @@ -2128,6 +2277,7 @@ def _set_defaults(res): ) @log + @mq def get_user_profile_photos( self, user_id: Union[str, int], @@ -2135,6 +2285,7 @@ def get_user_profile_photos( limit: int = 100, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Optional[UserProfilePhotos]: """Use this method to get a list of profile pictures for a user. @@ -2149,6 +2300,8 @@ def get_user_profile_photos( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.UserProfilePhotos` @@ -2169,6 +2322,7 @@ def get_user_profile_photos( return UserProfilePhotos.de_json(result, self) # type: ignore @log + @mq def get_file( self, file_id: Union[ @@ -2176,6 +2330,7 @@ def get_file( ], timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> File: """ Use this method to get basic info about a file and prepare it for downloading. For the @@ -2202,6 +2357,8 @@ def get_file( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.File` @@ -2229,6 +2386,7 @@ def get_file( return File.de_json(result, self) # type: ignore @log + @mq def kick_chat_member( self, chat_id: Union[str, int], @@ -2236,6 +2394,7 @@ def kick_chat_member( timeout: float = None, until_date: Union[int, datetime] = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to kick a user from a group or a supergroup or a channel. In the case of @@ -2257,6 +2416,8 @@ def kick_chat_member( bot will be used. api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool` On success, :obj:`True` is returned. @@ -2279,6 +2440,7 @@ def kick_chat_member( return result # type: ignore[return-value] @log + @mq def unban_chat_member( self, chat_id: Union[str, int], @@ -2286,6 +2448,7 @@ def unban_chat_member( timeout: float = None, api_kwargs: JSONDict = None, only_if_banned: bool = None, + delay_queue: str = None, ) -> bool: """Use this method to unban a previously kicked user in a supergroup or channel. @@ -2305,6 +2468,8 @@ def unban_chat_member( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool` On success, :obj:`True` is returned. @@ -2323,6 +2488,7 @@ def unban_chat_member( return result # type: ignore[return-value] @log + @mq def answer_callback_query( self, callback_query_id: str, @@ -2332,6 +2498,7 @@ def answer_callback_query( cache_time: int = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to send answers to callback queries sent from inline keyboards. The answer @@ -2362,6 +2529,8 @@ def answer_callback_query( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool` On success, :obj:`True` is returned. @@ -2386,6 +2555,7 @@ def answer_callback_query( return result # type: ignore[return-value] @log + @mq def edit_message_text( self, text: str, @@ -2398,6 +2568,7 @@ def edit_message_text( timeout: float = None, api_kwargs: JSONDict = None, entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, + delay_queue: str = None, ) -> Union[Optional[Message], bool]: """ Use this method to edit text and game messages. @@ -2425,6 +2596,8 @@ def edit_message_text( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is not an inline message, the @@ -2458,6 +2631,7 @@ def edit_message_text( ) @log + @mq def edit_message_caption( self, chat_id: Union[str, int] = None, @@ -2469,6 +2643,7 @@ def edit_message_caption( parse_mode: str = None, api_kwargs: JSONDict = None, caption_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, + delay_queue: str = None, ) -> Union[Message, bool]: """ Use this method to edit captions of messages. @@ -2496,6 +2671,8 @@ def edit_message_caption( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is not an inline message, the @@ -2535,6 +2712,7 @@ def edit_message_caption( ) @log + @mq def edit_message_media( self, chat_id: Union[str, int] = None, @@ -2544,6 +2722,7 @@ def edit_message_media( reply_markup: InlineKeyboardMarkup = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Union[Message, bool]: """ Use this method to edit animation, audio, document, photo, or video messages. If a message @@ -2569,6 +2748,8 @@ def edit_message_media( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is sent by the bot, the @@ -2602,6 +2783,7 @@ def edit_message_media( ) @log + @mq def edit_message_reply_markup( self, chat_id: Union[str, int] = None, @@ -2610,6 +2792,7 @@ def edit_message_reply_markup( reply_markup: Optional[InlineKeyboardMarkup] = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Union[Message, bool]: """ Use this method to edit only the reply markup of messages sent by the bot or via the bot @@ -2630,6 +2813,8 @@ def edit_message_reply_markup( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, if edited message is not an inline message, the @@ -2663,6 +2848,7 @@ def edit_message_reply_markup( ) @log + @mq def get_updates( self, offset: int = None, @@ -2671,6 +2857,7 @@ def get_updates( read_latency: float = 2.0, allowed_updates: List[str] = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> List[Update]: """Use this method to receive incoming updates using long polling. @@ -2697,6 +2884,8 @@ def get_updates( updates may be received for a short period of time. api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Note: 1. This method will not work if an outgoing webhook is set up. @@ -2868,8 +3057,13 @@ def delete_webhook( return result # type: ignore[return-value] @log + @mq def leave_chat( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method for your bot to leave a group, supergroup or channel. @@ -2881,6 +3075,8 @@ def leave_chat( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool` On success, :obj:`True` is returned. @@ -2896,8 +3092,13 @@ def leave_chat( return result # type: ignore[return-value] @log + @mq def get_chat( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Chat: """ Use this method to get up to date information about the chat (current name of the user for @@ -2911,6 +3112,8 @@ def get_chat( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Chat` @@ -2929,8 +3132,13 @@ def get_chat( return Chat.de_json(result, self) # type: ignore @log + @mq def get_chat_administrators( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> List[ChatMember]: """ Use this method to get a list of administrators in a chat. @@ -2943,6 +3151,8 @@ def get_chat_administrators( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: List[:class:`telegram.ChatMember`]: On success, returns a list of ``ChatMember`` @@ -2961,8 +3171,13 @@ def get_chat_administrators( return [ChatMember.de_json(x, self) for x in result] # type: ignore @log + @mq def get_chat_members_count( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> int: """Use this method to get the number of members in a chat. @@ -2974,6 +3189,8 @@ def get_chat_members_count( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`int`: Number of members in the chat. @@ -2989,12 +3206,14 @@ def get_chat_members_count( return result # type: ignore[return-value] @log + @mq def get_chat_member( self, chat_id: Union[str, int], user_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> ChatMember: """Use this method to get information about a member of a chat. @@ -3007,6 +3226,8 @@ def get_chat_member( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.ChatMember` @@ -3022,12 +3243,14 @@ def get_chat_member( return ChatMember.de_json(result, self) # type: ignore @log + @mq def set_chat_sticker_set( self, chat_id: Union[str, int], sticker_set_name: str, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to set a new group sticker set for a supergroup. The bot must be an administrator in the chat for this to work and must have the appropriate @@ -3044,6 +3267,8 @@ def set_chat_sticker_set( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3055,8 +3280,13 @@ def set_chat_sticker_set( return result # type: ignore[return-value] @log + @mq def delete_chat_sticker_set( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to delete a group sticker set from a supergroup. The bot must be an administrator in the chat for this to work and must have the appropriate admin rights. @@ -3071,6 +3301,8 @@ def delete_chat_sticker_set( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3081,6 +3313,7 @@ def delete_chat_sticker_set( return result # type: ignore[return-value] + @log def get_webhook_info(self, timeout: float = None, api_kwargs: JSONDict = None) -> WebhookInfo: """Use this method to get current webhook status. Requires no parameters. @@ -3102,6 +3335,7 @@ def get_webhook_info(self, timeout: float = None, api_kwargs: JSONDict = None) - return WebhookInfo.de_json(result, self) # type: ignore @log + @mq def set_game_score( self, user_id: Union[int, str], @@ -3113,6 +3347,7 @@ def set_game_score( disable_edit_message: bool = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Union[Message, bool]: """ Use this method to set the score of the specified user in a game. @@ -3135,6 +3370,8 @@ def set_game_score( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: The edited message, or if the message wasn't sent by the bot @@ -3166,6 +3403,7 @@ def set_game_score( ) @log + @mq def get_game_high_scores( self, user_id: Union[int, str], @@ -3174,6 +3412,7 @@ def get_game_high_scores( inline_message_id: Union[str, int] = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> List[GameHighScore]: """ Use this method to get data for high score tables. Will return the score of the specified @@ -3192,6 +3431,8 @@ def get_game_high_scores( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: List[:class:`telegram.GameHighScore`] @@ -3214,6 +3455,7 @@ def get_game_high_scores( return [GameHighScore.de_json(hs, self) for hs in result] # type: ignore @log + @mq def send_invoice( self, chat_id: Union[int, str], @@ -3242,6 +3484,7 @@ def send_invoice( timeout: float = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Message: """Use this method to send invoices. @@ -3297,6 +3540,8 @@ def send_invoice( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -3355,6 +3600,7 @@ def send_invoice( ) @log + @mq def answer_shipping_query( self, shipping_query_id: str, @@ -3363,6 +3609,7 @@ def answer_shipping_query( error_message: str = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ If you sent an invoice requesting a shipping address and the parameter is_flexible was @@ -3385,6 +3632,8 @@ def answer_shipping_query( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3420,6 +3669,7 @@ def answer_shipping_query( return result # type: ignore[return-value] @log + @mq def answer_pre_checkout_query( self, pre_checkout_query_id: str, @@ -3427,6 +3677,7 @@ def answer_pre_checkout_query( error_message: str = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Once the user has confirmed their payment and shipping details, the Bot API sends the final @@ -3452,6 +3703,8 @@ def answer_pre_checkout_query( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3479,6 +3732,7 @@ def answer_pre_checkout_query( return result # type: ignore[return-value] @log + @mq def restrict_chat_member( self, chat_id: Union[str, int], @@ -3487,6 +3741,7 @@ def restrict_chat_member( until_date: Union[int, datetime] = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to restrict a user in a supergroup. The bot must be an administrator in @@ -3515,6 +3770,8 @@ def restrict_chat_member( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3540,6 +3797,7 @@ def restrict_chat_member( return result # type: ignore[return-value] @log + @mq def promote_chat_member( self, chat_id: Union[str, int], @@ -3555,6 +3813,7 @@ def promote_chat_member( timeout: float = None, api_kwargs: JSONDict = None, is_anonymous: bool = None, + delay_queue: str = None, ) -> bool: """ Use this method to promote or demote a user in a supergroup or a channel. The bot must be @@ -3590,6 +3849,8 @@ def promote_chat_member( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3624,12 +3885,14 @@ def promote_chat_member( return result # type: ignore[return-value] @log + @mq def set_chat_permissions( self, chat_id: Union[str, int], permissions: ChatPermissions, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to set default chat permissions for all members. The bot must be an @@ -3645,6 +3908,8 @@ def set_chat_permissions( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3660,6 +3925,7 @@ def set_chat_permissions( return result # type: ignore[return-value] @log + @mq def set_chat_administrator_custom_title( self, chat_id: Union[int, str], @@ -3667,6 +3933,7 @@ def set_chat_administrator_custom_title( custom_title: str, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to set a custom title for administrators promoted by the bot in a @@ -3683,6 +3950,8 @@ def set_chat_administrator_custom_title( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3700,8 +3969,13 @@ def set_chat_administrator_custom_title( return result # type: ignore[return-value] @log + @mq def export_chat_invite_link( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> str: """ Use this method to generate a new invite link for a chat; any previously generated link @@ -3716,6 +3990,8 @@ def export_chat_invite_link( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`str`: New invite link on success. @@ -3731,12 +4007,14 @@ def export_chat_invite_link( return result # type: ignore[return-value] @log + @mq def set_chat_photo( self, chat_id: Union[str, int], photo: FileInput, timeout: float = 20, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to set a new profile photo for the chat. @@ -3755,6 +4033,8 @@ def set_chat_photo( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3770,8 +4050,13 @@ def set_chat_photo( return result # type: ignore[return-value] @log + @mq def delete_chat_photo( - self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None + self, + chat_id: Union[str, int], + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to delete a chat photo. Photos can't be changed for private chats. The bot @@ -3786,6 +4071,8 @@ def delete_chat_photo( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3801,12 +4088,14 @@ def delete_chat_photo( return result # type: ignore[return-value] @log + @mq def set_chat_title( self, chat_id: Union[str, int], title: str, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to change the title of a chat. Titles can't be changed for private chats. @@ -3822,6 +4111,8 @@ def set_chat_title( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3837,12 +4128,14 @@ def set_chat_title( return result # type: ignore[return-value] @log + @mq def set_chat_description( self, chat_id: Union[str, int], description: str, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to change the description of a group, a supergroup or a channel. The bot @@ -3858,6 +4151,8 @@ def set_chat_description( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3873,6 +4168,7 @@ def set_chat_description( return result # type: ignore[return-value] @log + @mq def pin_chat_message( self, chat_id: Union[str, int], @@ -3880,6 +4176,7 @@ def pin_chat_message( disable_notification: bool = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to add a message to the list of pinned messages in a chat. If the @@ -3899,6 +4196,8 @@ def pin_chat_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3917,12 +4216,14 @@ def pin_chat_message( ) @log + @mq def unpin_chat_message( self, chat_id: Union[str, int], timeout: float = None, api_kwargs: JSONDict = None, message_id: Union[str, int] = None, + delay_queue: str = None, ) -> bool: """ Use this method to remove a message from the list of pinned messages in a chat. If the @@ -3940,6 +4241,8 @@ def unpin_chat_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -3994,8 +4297,13 @@ def unpin_all_chat_messages( ) @log + @mq def get_sticker_set( - self, name: str, timeout: float = None, api_kwargs: JSONDict = None + self, + name: str, + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> StickerSet: """Use this method to get a sticker set. @@ -4006,6 +4314,8 @@ def get_sticker_set( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.StickerSet` @@ -4021,12 +4331,14 @@ def get_sticker_set( return StickerSet.de_json(result, self) # type: ignore @log + @mq def upload_sticker_file( self, user_id: Union[str, int], png_sticker: FileInput, timeout: float = 20, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> File: """ Use this method to upload a .png file with a sticker for later use in @@ -4051,6 +4363,8 @@ def upload_sticker_file( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.File`: On success, the uploaded File is returned. @@ -4066,6 +4380,7 @@ def upload_sticker_file( return File.de_json(result, self) # type: ignore @log + @mq def create_new_sticker_set( self, user_id: Union[str, int], @@ -4078,6 +4393,7 @@ def create_new_sticker_set( timeout: float = 20, tgs_sticker: FileInput = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to create new sticker set owned by a user. @@ -4129,6 +4445,8 @@ def create_new_sticker_set( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4155,6 +4473,7 @@ def create_new_sticker_set( return result # type: ignore[return-value] @log + @mq def add_sticker_to_set( self, user_id: Union[str, int], @@ -4165,6 +4484,7 @@ def add_sticker_to_set( timeout: float = 20, tgs_sticker: FileInput = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to add a new sticker to a set created by the bot. @@ -4210,6 +4530,8 @@ def add_sticker_to_set( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4234,8 +4556,14 @@ def add_sticker_to_set( return result # type: ignore[return-value] @log + @mq def set_sticker_position_in_set( - self, sticker: str, position: int, timeout: float = None, api_kwargs: JSONDict = None + self, + sticker: str, + position: int, + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to move a sticker in a set created by the bot to a specific position. @@ -4247,6 +4575,8 @@ def set_sticker_position_in_set( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4264,8 +4594,13 @@ def set_sticker_position_in_set( return result # type: ignore[return-value] @log + @mq def delete_sticker_from_set( - self, sticker: str, timeout: float = None, api_kwargs: JSONDict = None + self, + sticker: str, + timeout: float = None, + api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to delete a sticker from a set created by the bot. @@ -4276,6 +4611,8 @@ def delete_sticker_from_set( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4291,6 +4628,7 @@ def delete_sticker_from_set( return result # type: ignore[return-value] @log + @mq def set_sticker_set_thumb( self, name: str, @@ -4298,6 +4636,7 @@ def set_sticker_set_thumb( thumb: FileInput = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """Use this method to set the thumbnail of a sticker set. Animated thumbnails can be set for animated sticker sets only. @@ -4325,6 +4664,8 @@ def set_sticker_set_thumb( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4343,12 +4684,14 @@ def set_sticker_set_thumb( return result # type: ignore[return-value] @log + @mq def set_passport_data_errors( self, user_id: Union[str, int], errors: List[PassportElementError], timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Informs a user that some of the Telegram Passport elements they provided contains errors. @@ -4369,6 +4712,8 @@ def set_passport_data_errors( creation of the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`bool`: On success, :obj:`True` is returned. @@ -4384,6 +4729,7 @@ def set_passport_data_errors( return result # type: ignore[return-value] @log + @mq def send_poll( self, chat_id: Union[int, str], @@ -4405,6 +4751,7 @@ def send_poll( api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, explanation_entities: Union[List[MessageEntity], Tuple[MessageEntity, ...]] = None, + delay_queue: str = None, ) -> Message: """ Use this method to send a native poll. @@ -4455,6 +4802,8 @@ def send_poll( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -4508,6 +4857,7 @@ def send_poll( ) @log + @mq def stop_poll( self, chat_id: Union[int, str], @@ -4515,6 +4865,7 @@ def stop_poll( reply_markup: InlineKeyboardMarkup = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Poll: """ Use this method to stop a poll which was sent by the bot. @@ -4530,6 +4881,8 @@ def stop_poll( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Poll`: On success, the stopped Poll with the final results is @@ -4554,6 +4907,7 @@ def stop_poll( return Poll.de_json(result, self) # type: ignore @log + @mq def send_dice( self, chat_id: Union[int, str], @@ -4564,6 +4918,7 @@ def send_dice( emoji: str = None, api_kwargs: JSONDict = None, allow_sending_without_reply: bool = None, + delay_queue: str = None, ) -> Message: """ Use this method to send an animated emoji, which will have a random value. On success, the @@ -4589,6 +4944,8 @@ def send_dice( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.Message`: On success, the sent Message is returned. @@ -4616,8 +4973,9 @@ def send_dice( ) @log + @mq def get_my_commands( - self, timeout: float = None, api_kwargs: JSONDict = None + self, timeout: float = None, api_kwargs: JSONDict = None, delay_queue: str = None ) -> List[BotCommand]: """ Use this method to get the current list of the bot's commands. @@ -4628,6 +4986,8 @@ def get_my_commands( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: List[:class:`telegram.BotCommand]`: On success, the commands set for the bot @@ -4643,11 +5003,13 @@ def get_my_commands( return self._commands @log + @mq def set_my_commands( self, commands: List[Union[BotCommand, Tuple[str, str]]], timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> bool: """ Use this method to change the list of the bot's commands. @@ -4661,6 +5023,8 @@ def set_my_commands( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :obj:`True`: On success @@ -4717,6 +5081,7 @@ def close(self) -> bool: return self._post('close') # type: ignore[return-value] @log + @mq def copy_message( self, chat_id: Union[int, str], @@ -4731,6 +5096,7 @@ def copy_message( reply_markup: ReplyMarkup = None, timeout: float = None, api_kwargs: JSONDict = None, + delay_queue: str = None, ) -> Optional[MessageId]: """ Use this method to copy messages of any kind. The method is analogous to the method @@ -4764,6 +5130,8 @@ def copy_message( the connection pool). api_kwargs (:obj:`dict`, optional): Arbitrary keyword arguments to be passed to the Telegram API. + delay_queue (:obj:`str`, optional): The name of the delay queue to pass this request + through. Defaults to :obj:`None`. Returns: :class:`telegram.MessageId`: On success diff --git a/telegram/ext/__init__.py b/telegram/ext/__init__.py index b614e292c74..58a4d238a80 100644 --- a/telegram/ext/__init__.py +++ b/telegram/ext/__init__.py @@ -18,6 +18,7 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. """Extensions over the Telegram Bot API to facilitate bot making""" +from .messagequeue import MessageQueue, DelayQueue, DelayQueueError from .basepersistence import BasePersistence from .picklepersistence import PicklePersistence from .dictpersistence import DictPersistence @@ -39,8 +40,6 @@ from .conversationhandler import ConversationHandler from .precheckoutqueryhandler import PreCheckoutQueryHandler from .shippingqueryhandler import ShippingQueryHandler -from .messagequeue import MessageQueue -from .messagequeue import DelayQueue from .pollanswerhandler import PollAnswerHandler from .pollhandler import PollHandler from .defaults import Defaults @@ -69,6 +68,7 @@ 'ShippingQueryHandler', 'MessageQueue', 'DelayQueue', + 'DelayQueueError', 'DispatcherHandlerStop', 'run_async', 'CallbackContext', diff --git a/telegram/ext/defaults.py b/telegram/ext/defaults.py index d39bc03e435..6e0f088a36d 100644 --- a/telegram/ext/defaults.py +++ b/telegram/ext/defaults.py @@ -18,7 +18,8 @@ # along with this program. If not, see [http://www.gnu.org/licenses/]. # pylint: disable=R0201, E0401 """This module contains the class Defaults, which allows to pass default values to Updater.""" -from typing import Any, NoReturn, Optional, Union +from collections import defaultdict +from typing import Union, Optional, Any, NoReturn, Dict, DefaultDict import pytz @@ -48,6 +49,12 @@ class Defaults: run_async (:obj:`bool`): Optional. Default setting for the ``run_async`` parameter of handlers and error handlers registered through :meth:`Dispatcher.add_handler` and :meth:`Dispatcher.add_error_handler`. + delay_queue (:obj:`str`, optional): A :class:`telegram.ext.DelayQueue` the bots + :class:`telegram.ext.MessageQueue` should use. + delay_queue_per_method (Dict[:obj:`str`, :obj:`str`], optional): A dictionary specifying + for each bot method a :class:`telegram.ext.DelayQueue` the bots + :class:`telegram.ext.MessageQueue` should use. Methods not specified here will use + :attr:`delay_queue`. Parameters: parse_mode (:obj:`str`, optional): Send Markdown or HTML, if you want Telegram apps to show @@ -71,6 +78,12 @@ class Defaults: run_async (:obj:`bool`, optional): Default setting for the ``run_async`` parameter of handlers and error handlers registered through :meth:`Dispatcher.add_handler` and :meth:`Dispatcher.add_error_handler`. Defaults to :obj:`False`. + delay_queue (:obj:`str`, optional): A :class:`telegram.ext.DelayQueue` the bots + :class:`telegram.ext.MessageQueue` should use. Defaults to :obj:`None`. + delay_queue_per_method (Dict[:obj:`str`, :obj:`str`], optional): A dictionary specifying + for each bot method a :class:`telegram.ext.DelayQueue` the bots + :class:`telegram.ext.MessageQueue` should use. Methods not specified here will use + :attr:`delay_queue`. Defaults to :obj:`None`. """ def __init__( @@ -85,6 +98,8 @@ def __init__( tzinfo: pytz.BaseTzInfo = pytz.utc, run_async: bool = False, allow_sending_without_reply: bool = None, + delay_queue: str = None, + delay_queue_per_method: Dict[str, Optional[str]] = None, ): self._parse_mode = parse_mode self._disable_notification = disable_notification @@ -94,6 +109,10 @@ def __init__( self._quote = quote self._tzinfo = tzinfo self._run_async = run_async + self._delay_queue = delay_queue + self._delay_queue_per_method: DefaultDict[str, Optional[str]] = defaultdict( + lambda: self.delay_queue, delay_queue_per_method or {} + ) @property def parse_mode(self) -> Optional[str]: @@ -183,6 +202,28 @@ def run_async(self, value: Any) -> NoReturn: "not have any effect." ) + @property + def delay_queue(self) -> Optional[str]: + return self._delay_queue + + @delay_queue.setter + def delay_queue(self, value: Any) -> NoReturn: + raise AttributeError( + "You can not assign a new value to defaults after because it would " + "not have any effect." + ) + + @property + def delay_queue_per_method(self) -> DefaultDict[str, Optional[str]]: + return self._delay_queue_per_method + + @delay_queue_per_method.setter + def delay_queue_per_method(self, value: Any) -> NoReturn: + raise AttributeError( + "You can not assign a new value to defaults after because it would " + "not have any effect." + ) + def __hash__(self) -> int: return hash( ( @@ -194,6 +235,8 @@ def __hash__(self) -> int: self._quote, self._tzinfo, self._run_async, + self._delay_queue, + ((key, value) for key, value in self._delay_queue_per_method.items()), ) ) diff --git a/telegram/ext/dispatcher.py b/telegram/ext/dispatcher.py index acee97c657e..e6c5ba7c80a 100644 --- a/telegram/ext/dispatcher.py +++ b/telegram/ext/dispatcher.py @@ -256,35 +256,38 @@ def _pooled(self) -> None: promise.run() - if not promise.exception: - self.update_persistence(update=promise.update) - continue + self.post_process_promise(promise) - if isinstance(promise.exception, DispatcherHandlerStop): - self.logger.warning( - 'DispatcherHandlerStop is not supported with async functions; func: %s', - promise.pooled_function.__name__, - ) - continue + def post_process_promise(self, promise: Promise) -> None: + if not promise.exception: + self.update_persistence(update=promise.update) + return - # Avoid infinite recursion of error handlers. - if promise.pooled_function in self.error_handlers: - self.logger.error('An uncaught error was raised while handling the error.') - continue + if isinstance(promise.exception, DispatcherHandlerStop): + self.logger.warning( + 'DispatcherHandlerStop is not supported with async functions; func: %s', + promise.pooled_function.__name__, + ) + return - # Don't perform error handling for a `Promise` with deactivated error handling. This - # should happen only via the deprecated `@run_async` decorator or `Promises` created - # within error handlers - if not promise.error_handling: - self.logger.error('A promise with deactivated error handling raised an error.') - continue + # Avoid infinite recursion of error handlers. + if promise.pooled_function in self.error_handlers: + self.logger.error('An uncaught error was raised while handling the error.') + return - # If we arrive here, an exception happened in the promise and was neither - # DispatcherHandlerStop nor raised by an error handler. So we can and must handle it - try: - self.dispatch_error(promise.update, promise.exception, promise=promise) - except Exception: - self.logger.exception('An uncaught error was raised while handling the error.') + # Don't perform error handling for a `Promise` with deactivated error handling. This + # should happen only via the deprecated `@run_async` decorator or `Promises` created + # within error handlers + if not promise.error_handling: + self.logger.error('A promise with deactivated error handling raised an error.') + return + + # If we arrive here, an exception happened in the promise and was neither + # DispatcherHandlerStop nor raised by an error handler. So we can and must handle it + try: + self.dispatch_error(promise.update, promise.exception, promise=promise) + except Exception: + self.logger.exception('An uncaught error was raised while handling the error.') def run_async( self, func: Callable[..., Any], *args: Any, update: Any = None, **kwargs: Any diff --git a/telegram/ext/messagequeue.py b/telegram/ext/messagequeue.py index 5a785fa882d..3a91ac34d87 100644 --- a/telegram/ext/messagequeue.py +++ b/telegram/ext/messagequeue.py @@ -20,96 +20,159 @@ # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/] """A throughput-limiting message processor for Telegram bots.""" + import functools -import queue as q +import logging import threading import time -from typing import TYPE_CHECKING, Any, Callable, List, NoReturn +import warnings +from queue import PriorityQueue + +from typing import Callable, Any, TYPE_CHECKING, List, NoReturn, ClassVar, Dict, Optional +from telegram.utils.deprecate import TelegramDeprecationWarning from telegram.utils.promise import Promise +from telegram import constants if TYPE_CHECKING: from telegram import Bot - -# We need to count < 1s intervals, so the most accurate timer is needed -curtime = time.perf_counter + from telegram.ext import Dispatcher class DelayQueueError(RuntimeError): """Indicates processing errors.""" +@functools.total_ordering +class PriorityWrapper: + def __init__(self, priority: int, promise: Promise): + self.priority = priority + self.promise = promise + + def __lt__(self, other: object) -> bool: + if not isinstance(other, PriorityWrapper): + raise NotImplementedError + return self.priority < other.priority + + def __eq__(self, other: object) -> bool: + if not isinstance(other, PriorityWrapper): + raise NotImplementedError + return self.priority == other.priority + + class DelayQueue(threading.Thread): """ Processes callbacks from queue with specified throughput limits. Creates a separate thread to process callbacks with delays. - Attributes: - burst_limit (:obj:`int`): Number of maximum callbacks to process per time-window. - time_limit (:obj:`int`): Defines width of time-window used when each processing limit is - calculated. - exc_route (:obj:`callable`): A callable, accepting 1 positional argument; used to route - exceptions from processor thread to main thread; - name (:obj:`str`): Thread's name. + .. versionchanged:: 13.2 + DelayQueue was almost completely overhauled in v13.2. Please read the docs carefully, if + you're upgrading from lower versions. + + Note: + For most use cases, the :attr:`parent` argument should be set to + :attr:`MessageQueue.default_queue` to ensure that the global flood limits are not + exceeded. Args: - queue (:obj:`Queue`, optional): Used to pass callbacks to thread. Creates ``Queue`` - implicitly if not provided. + parent (:class:`telegram.ext.DelayQueue`, optional): Pass another delay queue to put all + requests through that delay queue after they were processed by this queue. Defaults to + :obj:`None`. burst_limit (:obj:`int`, optional): Number of maximum callbacks to process per time-window - defined by :attr:`time_limit_ms`. Defaults to 30. + defined by :attr:`time_limit_ms`. Defaults to + :attr:`telegram.constants.MAX_MESSAGES_PER_SECOND`. time_limit_ms (:obj:`int`, optional): Defines width of time-window used when each processing limit is calculated. Defaults to 1000. - exc_route (:obj:`callable`, optional): A callable, accepting 1 positional argument; used to - route exceptions from processor thread to main thread; is called on `Exception` + error_handler (:obj:`callable`, optional): A callable, accepting 1 positional argument. + Used to route exceptions from processor thread to main thread. Is called on `Exception` subclass exceptions. If not provided, exceptions are routed through dummy handler, - which re-raises them. + which re-raises them. If :attr:`dispatcher` is set, error handling will *always* be + done by the dispatcher. + exc_route (:obj:`callable`, optional): Deprecated alias of :attr:`error_handler`. autostart (:obj:`bool`, optional): If :obj:`True`, processor is started immediately after object's creation; if :obj:`False`, should be started manually by `start` method. Defaults to :obj:`True`. name (:obj:`str`, optional): Thread's name. Defaults to ``'DelayQueue-N'``, where N is sequential number of object created. + priority (:obj:`int`, optional): Priority of the delay queue. Higher priority callbacks are + processed before lower priority callbacks, even if scheduled later. Higher number means + lower priority (i.e. ``priority = 0`` is processed *before* ``priority = 1``). Only + relevant, if the delay queue has a :attr:`parent``. Defaults to ``0``. + + Attributes: + burst_limit (:obj:`int`): Number of maximum callbacks to process per time-window. + time_limit (:obj:`int`): Defines width of time-window used when each processing limit is + calculated. + name (:obj:`str`): Thread's name. + error_handler (:obj:`callable`): Optional. A callable, accepting 1 positional argument. + Used to route exceptions from processor thread to main thread. + dispatcher (:class:`telegram.ext.Disptacher`): Optional. The dispatcher to use for error + handling. """ - _instcnt = 0 # instance counter + INSTANCE_COUNT: ClassVar[int] = 0 # instance counter def __init__( self, - queue: q.Queue = None, - burst_limit: int = 30, + burst_limit: int = constants.MAX_MESSAGES_PER_SECOND, time_limit_ms: int = 1000, exc_route: Callable[[Exception], None] = None, autostart: bool = True, name: str = None, + parent: 'DelayQueue' = None, + error_handler: Callable[[Exception], None] = None, + priority: int = 0, ): - self._queue = queue if queue is not None else q.Queue() + self.logger = logging.getLogger(__name__) self.burst_limit = burst_limit self.time_limit = time_limit_ms / 1000 - self.exc_route = exc_route if exc_route is not None else self._default_exception_handler + self.parent = parent + self.dispatcher: Optional['Dispatcher'] = None + self.priority = priority + self._queue: 'PriorityQueue[PriorityWrapper]' = PriorityQueue() + + if exc_route and error_handler: + raise ValueError('Only one of exc_route or error_handler can be passed.') + if exc_route: + warnings.warn( + 'The exc_route argument is deprecated. Use error_handler instead.', + TelegramDeprecationWarning, + stacklevel=2, + ) + self.exc_route = exc_route or error_handler or self._default_exception_handler + self.__exit_req = False # flag to gently exit thread - self.__class__._instcnt += 1 + self.__class__.INSTANCE_COUNT += 1 + if name is None: - name = f'{self.__class__.__name__}-{self.__class__._instcnt}' + name = f'{self.__class__.__name__}-{self.__class__.INSTANCE_COUNT}' super().__init__(name=name) - self.daemon = False + if autostart: # immediately start processing super().start() - def run(self) -> None: + def set_dispatcher(self, dispatcher: 'Dispatcher') -> None: """ - Do not use the method except for unthreaded testing purposes, the method normally is - automatically called by autostart argument. + Sets the dispatcher to use for error handling. + Args: + dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher. """ + self.dispatcher = dispatcher + def run(self) -> None: times: List[float] = [] # used to store each callable processing time + while True: - item = self._queue.get() + promise = self._queue.get().promise if self.__exit_req: return # shutdown thread + # delay routine now = time.perf_counter() t_delta = now - self.time_limit # calculate early to improve perf. + if times and t_delta > times[-1]: # if last call was before the limit time-window # used to impr. perf. in long-interval calls case @@ -120,12 +183,18 @@ def run(self) -> None: times.append(now) if len(times) >= self.burst_limit: # if throughput limit was hit time.sleep(times[1] - t_delta) + # finally process one - try: - func, args, kwargs = item - func(*args, **kwargs) - except Exception as exc: # re-route any exceptions - self.exc_route(exc) # to prevent thread exit + if self.parent: + # put through parent, if specified + self.parent.put(promise=promise, priority=self.priority) + else: + promise.run() + # error handling + if self.dispatcher: + self.dispatcher.post_process_promise(promise) + elif promise.exception: + self.exc_route(promise.exception) # re-route any exceptions def stop(self, timeout: float = None) -> None: """Used to gently stop processor and shutdown its thread. @@ -140,136 +209,229 @@ def stop(self, timeout: float = None) -> None: """ self.__exit_req = True # gently request - self._queue.put(None) # put something to unfreeze if frozen + self._queue.put( + PriorityWrapper( + 0, Promise(PriorityQueue, [], {}) # put something to unfreeze if frozen + ) + ) + self.logger.debug('Waiting for DelayQueue %s to shut down.', self.name) super().join(timeout=timeout) + self.logger.debug('DelayQueue %s shut down.', self.name) @staticmethod def _default_exception_handler(exc: Exception) -> NoReturn: - """ - Dummy exception handler which re-raises exception in thread. Could be possibly overwritten - by subclasses. - - """ - raise exc - def __call__(self, func: Callable, *args: Any, **kwargs: Any) -> None: - """Used to process callbacks in throughput-limiting thread through queue. + def put( + self, + func: Callable = None, + args: Any = None, + kwargs: Any = None, + promise: Promise = None, + priority: int = 0, + ) -> Promise: + """Used to process callbacks in throughput-limiting thread through queue. You must either + pass a :class:`telegram.utils.Promise` or all of ``func``, ``args`` and ``kwargs``. Args: - func (:obj:`callable`): The actual function (or any callable) that is processed through - queue. - *args (:obj:`list`): Variable-length `func` arguments. - **kwargs (:obj:`dict`): Arbitrary keyword-arguments to `func`. + func (:obj:`callable`, optional): The actual function (or any callable) that is + processed through queue. + args (:obj:`list`, optional): Variable-length `func` arguments. + kwargs (:obj:`dict`, optional): Arbitrary keyword-arguments to `func`. + promise (:class:`telegram.utils.Promise`, optional): A promise. + priority (:obj:`int`, optional): Priority of the callback. Defaults to ``0``. """ + if not bool(promise) ^ all(v is not None for v in [func, args, kwargs]): + raise ValueError('You must pass either a promise or all all func, args, kwargs.') if not self.is_alive() or self.__exit_req: raise DelayQueueError('Could not process callback in stopped thread') - self._queue.put((func, args, kwargs)) + + if not promise: + promise = Promise(func, args, kwargs) # type: ignore[arg-type] + self._queue.put(PriorityWrapper(priority, promise)) + return promise -# The most straightforward way to implement this is to use 2 sequential delay -# queues, like on classic delay chain schematics in electronics. -# So, message path is: -# msg --> group delay if group msg, else no delay --> normal msg delay --> out -# This way OS threading scheduler cares of timings accuracy. -# (see time.time, time.clock, time.perf_counter, time.sleep @ docs.python.org) class MessageQueue: """ Implements callback processing with proper delays to avoid hitting Telegram's message limits. - Contains two ``DelayQueue``, for group and for all messages, interconnected in delay chain. - Callables are processed through *group* ``DelayQueue``, then through *all* ``DelayQueue`` for - group-type messages. For non-group messages, only the *all* ``DelayQueue`` is used. + By default contains two :class:`telegram.ext.DelayQueue` instances, for general requests and + group requests where the default delay queue is the parent of the group requests one. + + .. versionchanged:: 13.2 + MessageQueue was almost completely overhauled in v13.2. Please read the docs carefully, if + you're upgrading from lower versions. Args: all_burst_limit (:obj:`int`, optional): Number of maximum *all-type* callbacks to process - per time-window defined by :attr:`all_time_limit_ms`. Defaults to 30. + per time-window defined by :attr:`all_time_limit_ms`. Defaults to + :attr:`telegram.constants.MAX_MESSAGES_PER_SECOND`. all_time_limit_ms (:obj:`int`, optional): Defines width of *all-type* time-window used when each processing limit is calculated. Defaults to 1000 ms. group_burst_limit (:obj:`int`, optional): Number of maximum *group-type* callbacks to - process per time-window defined by :attr:`group_time_limit_ms`. Defaults to 20. + process per time-window defined by :attr:`group_time_limit_ms`. Defaults to + :attr:`telegram.constants.MAX_MESSAGES_PER_MINUTE_PER_GROUP`. group_time_limit_ms (:obj:`int`, optional): Defines width of *group-type* time-window used when each processing limit is calculated. Defaults to 60000 ms. - exc_route (:obj:`callable`, optional): A callable, accepting one positional argument; used - to route exceptions from processor threads to main thread; is called on ``Exception`` + error_handler (:obj:`callable`, optional): A callable, accepting 1 positional argument. + Used to route exceptions from processor thread to main thread. Is called on `Exception` subclass exceptions. If not provided, exceptions are routed through dummy handler, - which re-raises them. - autostart (:obj:`bool`, optional): If :obj:`True`, processors are started immediately after - object's creation; if :obj:`False`, should be started manually by :attr:`start` method. - Defaults to :obj:`True`. + which re-raises them. If :attr:`dispatcher` is set, error handling will *always* be + done by the dispatcher. + exc_route (:obj:`callable`, optional): Deprecated alias of :attr:`error_handler`. + autostart (:obj:`bool`, optional): If :obj:`True`, both default delay queues are started + immediately after object's creation. Defaults to :obj:`True`. + + Attributes: + running (:obj:`bool`): Whether this message queue has started it's delay queues or not. + dispatcher (:class:`telegram.ext.Disptacher`): Optional. The Dispatcher to use for error + handling. + delay_queues (Dict[:obj:`str`, :class:`telegram.ext.DelayQueue`]): A dictionary containing + all registered delay queues, where the keys are the names of the delay queues. By + default includes bot :attr:`default_queue` and :attr:`group_queue` under the keys + :attr:`DEFAULT_QUEUE_NAME` and :attr:`GROUP_QUEUE_NAME`, respectively. """ def __init__( self, - all_burst_limit: int = 30, + all_burst_limit: int = constants.MAX_MESSAGES_PER_SECOND, all_time_limit_ms: int = 1000, - group_burst_limit: int = 20, + group_burst_limit: int = constants.MAX_MESSAGES_PER_MINUTE_PER_GROUP, group_time_limit_ms: int = 60000, exc_route: Callable[[Exception], None] = None, autostart: bool = True, + error_handler: Callable[[Exception], None] = None, ): - # create according delay queues, use composition - self._all_delayq = DelayQueue( - burst_limit=all_burst_limit, - time_limit_ms=all_time_limit_ms, - exc_route=exc_route, - autostart=autostart, - ) - self._group_delayq = DelayQueue( + self.running = autostart + self.dispatcher: Optional['Dispatcher'] = None + + if exc_route and error_handler: + raise ValueError('Only one of exc_route or error_handler can be passed.') + if exc_route: + warnings.warn( + 'The exc_route argument is deprecated. Use error_handler instead.', + TelegramDeprecationWarning, + stacklevel=2, + ) + + self.delay_queues: Dict[str, DelayQueue] = { + self.DEFAULT_QUEUE_NAME: DelayQueue( + burst_limit=all_burst_limit, + time_limit_ms=all_time_limit_ms, + error_handler=exc_route or error_handler, + autostart=autostart, + name=self.DEFAULT_QUEUE_NAME, + ) + } + self.delay_queues[self.GROUP_QUEUE_NAME] = DelayQueue( burst_limit=group_burst_limit, time_limit_ms=group_time_limit_ms, - exc_route=exc_route, + error_handler=exc_route or error_handler, autostart=autostart, + name=self.GROUP_QUEUE_NAME, + parent=self.delay_queues[self.DEFAULT_QUEUE_NAME], ) + def add_delay_queue(self, delay_queue: DelayQueue) -> None: + """ + Adds a new :class:`telegram.ext.DelayQueue` to this message queue. If the message queue is + already running, also starts the delay queue. Also takes care of setting the + :class:`telegram.ext.Dispatcher`, if :attr:`dispatcher` is set. + + Args: + delay_queue (:class:`telegram.ext.DelayQueue`): The delay queue to add. + """ + self.delay_queues[delay_queue.name] = delay_queue + if self.dispatcher: + delay_queue.set_dispatcher(self.dispatcher) + if self.running and not delay_queue.is_alive(): + delay_queue.start() + + def remove_delay_queue(self, name: str, timeout: float = None) -> None: + """ + Removes the :class:`telegram.ext.DelayQueue` with the given name. If the message queue is + still running, also stops the delay queue. + + Args: + name (:obj:`str`): The name of the delay queue to remove. + timeout (:obj:`float`, optional): The timeout to pass to + :meth:`telegram.ext.DelayQueue.stop`. + """ + delay_queue = self.delay_queues.pop(name) + if self.running and delay_queue.is_alive(): + delay_queue.stop(timeout) + def start(self) -> None: - """Method is used to manually start the ``MessageQueue`` processing.""" - self._all_delayq.start() - self._group_delayq.start() + """Starts the all :class:`telegram.ext.DelayQueue` registered for this message queue.""" + self.running = True + for delay_queue in self.delay_queues.values(): + delay_queue.start() def stop(self, timeout: float = None) -> None: - self._group_delayq.stop(timeout=timeout) - self._all_delayq.stop(timeout=timeout) + """ + Stops the all :class:`telegram.ext.DelayQueue` registered for this message queue. - stop.__doc__ = DelayQueue.stop.__doc__ or '' # reuse docstring if any + Args: + timeout (:obj:`float`, optional): The timeout to pass to + :meth:`telegram.ext.DelayQueue.stop`. + """ + self.running = False + for delay_queue in self.delay_queues.values(): + delay_queue.stop(timeout) - def __call__(self, promise: Callable, is_group_msg: bool = False) -> Callable: + def put(self, func: Callable, delay_queue: str, *args: Any, **kwargs: Any) -> Promise: """ - Processes callables in throughput-limiting queues to avoid hitting limits (specified with - :attr:`burst_limit` and :attr:`time_limit`. + Processes callables in throughput-limiting queues to avoid hitting limits. Args: - promise (:obj:`callable`): Mainly the ``telegram.utils.promise.Promise`` (see Notes for - other callables), that is processed in delay queues. - is_group_msg (:obj:`bool`, optional): Defines whether ``promise`` would be processed in - group*+*all* ``DelayQueue``s (if set to :obj:`True`), or only through *all* - ``DelayQueue`` (if set to :obj:`False`), resulting in needed delays to avoid - hitting specified limits. Defaults to :obj:`False`. - - Note: - Method is designed to accept ``telegram.utils.promise.Promise`` as ``promise`` - argument, but other callables could be used too. For example, lambdas or simple - functions could be used to wrap original func to be called with needed args. In that - case, be sure that either wrapper func does not raise outside exceptions or the proper - :attr:`exc_route` handler is provided. + func (:obj:`callable`): The callable to process + delay_queue (:obj:`str`): The name of the :class:`telegram.ext.DelayQueue` to use. + *args (:obj:`tuple`, optional): Arguments to ``func``. + **kwargs (:obj:`dict`, optional): Keyword arguments to ``func``. Returns: - :obj:`callable`: Used as ``promise`` argument. + :class:`telegram.ext.Promise`. """ + return self.delay_queues[delay_queue].put(func, args, kwargs) - if not is_group_msg: # ignore middle group delay - self._all_delayq(promise) - else: # use middle group delay - self._group_delayq(self._all_delayq, promise) - return promise + def set_dispatcher(self, dispatcher: 'Dispatcher') -> None: + """ + Sets the dispatcher to use for error handling. + + Args: + dispatcher (:class:`telegram.ext.Dispatcher`): The dispatcher. + """ + self.dispatcher = dispatcher + + DEFAULT_QUEUE_NAME: ClassVar[str] = 'default_delay_queue' + """:obj:`str`: The default delay queues name.""" + GROUP_QUEUE_NAME: ClassVar[str] = 'group_delay_queue' + """:obj:`str`: The name of the default delay queue for group requests.""" + + @property + def default_queue(self) -> DelayQueue: + """ + Shortcut for ``MessageQueue.delay_queues[MessageQueue.DEFAULT_QUEUE_NAME]``. + """ + return self.delay_queues[self.DEFAULT_QUEUE_NAME] + + @property + def group_queue(self) -> DelayQueue: + """ + Shortcut for ``MessageQueue.delay_queues[MessageQueue.GROUP_QUEUE_NAME]``. + """ + return self.delay_queues[self.GROUP_QUEUE_NAME] def queuedmessage(method: Callable) -> Callable: """A decorator to be used with :attr:`telegram.Bot` send* methods. + .. deprecated:: 13.2 + Note: As it probably wouldn't be a good idea to make this decorator a property, it has been coded as decorator function, so it implies that first positional argument to wrapped MUST be @@ -301,14 +463,26 @@ def queuedmessage(method: Callable) -> Callable: @functools.wraps(method) def wrapped(self: 'Bot', *args: Any, **kwargs: Any) -> Any: + warnings.warn( + 'The @queuedmessage decorator is deprecated. Use the `delay_queue` parameter of' + 'the various bot methods instead.', + TelegramDeprecationWarning, + stacklevel=2, + ) + # pylint: disable=W0212 queued = kwargs.pop( 'queued', self._is_messages_queued_default # type: ignore[attr-defined] ) - isgroup = kwargs.pop('isgroup', False) + is_group = kwargs.pop('isgroup', False) if queued: - prom = Promise(method, (self,) + args, kwargs) - return self._msg_queue(prom, isgroup) # type: ignore[attr-defined] + if not is_group: + return self._msg_queue.put( # type: ignore[attr-defined] + method, MessageQueue.DEFAULT_QUEUE_NAME, self, *args, **kwargs + ) + return self._msg_queue.put( # type: ignore[attr-defined] + method, MessageQueue.GROUP_QUEUE_NAME, self, *args, **kwargs + ) return method(self, *args, **kwargs) return wrapped diff --git a/telegram/ext/updater.py b/telegram/ext/updater.py index 4f258885005..8ab14a525a1 100644 --- a/telegram/ext/updater.py +++ b/telegram/ext/updater.py @@ -29,7 +29,7 @@ from telegram import Bot, TelegramError from telegram.error import InvalidToken, RetryAfter, TimedOut, Unauthorized -from telegram.ext import Dispatcher, JobQueue +from telegram.ext import Dispatcher, JobQueue, MessageQueue from telegram.utils.deprecate import TelegramDeprecationWarning from telegram.utils.helpers import get_signal_name from telegram.utils.request import Request @@ -94,6 +94,8 @@ class Updater: used). defaults (:class:`telegram.ext.Defaults`, optional): An object containing default values to be used if not set explicitly in the bot methods. + message_queue (:class:`telegram.ext.MessageQueue`, optional): A message queue to use with + the bot. Will be started automatically and the dispatcher will be set. Note: * You must supply either a :attr:`bot` or a :attr:`token` argument. @@ -122,6 +124,7 @@ def __init__( use_context: bool = True, dispatcher: Dispatcher = None, base_file_url: str = None, + message_queue: MessageQueue = None, ): if defaults and bot: @@ -181,6 +184,7 @@ def __init__( private_key=private_key, private_key_password=private_key_password, defaults=defaults, + message_queue=message_queue, ) self.update_queue: Queue = Queue() self.job_queue = JobQueue() @@ -196,6 +200,10 @@ def __init__( use_context=use_context, ) self.job_queue.set_dispatcher(self.dispatcher) + if self.bot.message_queue: + self.bot.message_queue.set_dispatcher(self.dispatcher) + if not self.bot.message_queue.running: + self.bot.message_queue.start() else: con_pool_size = dispatcher.workers + 4 @@ -634,6 +642,7 @@ def stop(self) -> None: self.running = False self._stop_httpd() + self._stop_message_queue() self._stop_dispatcher() self._join_threads() @@ -657,6 +666,11 @@ def _stop_dispatcher(self) -> None: self.logger.debug('Requesting Dispatcher to stop...') self.dispatcher.stop() + def _stop_message_queue(self) -> None: + if self.bot.message_queue: + self.logger.debug('Requesting MessageQueue to stop...') + self.bot.message_queue.stop() + @no_type_check def _join_threads(self) -> None: for thr in self.__threads: diff --git a/tests/test_bot.py b/tests/test_bot.py index 718f0b4a36d..be2c647d22e 100644 --- a/tests/test_bot.py +++ b/tests/test_bot.py @@ -16,6 +16,7 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. +import logging import time import datetime as dtm from pathlib import Path @@ -43,10 +44,13 @@ Dice, MessageEntity, ParseMode, + Message, ) from telegram.constants import MAX_INLINE_QUERY_RESULTS from telegram.error import BadRequest, InvalidToken, NetworkError, RetryAfter +from telegram.ext import MessageQueue, DelayQueue from telegram.utils.helpers import from_timestamp, escape_markdown, to_timestamp +from telegram.utils.promise import Promise from tests.conftest import expect_bad_request BASE_TIME = time.time() @@ -94,6 +98,15 @@ def inline_results(): return inline_results_callback() +@pytest.fixture(scope='function') +def mq_bot(bot, monkeypatch): + bot.message_queue = MessageQueue() + bot.message_queue.add_delay_queue(DelayQueue(name='custom_dq')) + yield bot + bot.message_queue.stop() + bot.message_queue = None + + class TestBot: @pytest.mark.parametrize( 'token', @@ -1804,3 +1817,127 @@ def test_copy_message_with_default(self, default_bot, chat_id, media_message): assert len(message.caption_entities) == 1 else: assert len(message.caption_entities) == 0 + + @pytest.mark.parametrize( + 'chat_id,expected', + [ + ('123', 'default'), + (123, 'default'), + ('-123', 'group'), + (-123, 'group'), + ('@supergroup', 'group'), + ('foobar', 'default'), + ], + ) + def test_message_queue_group_chat_detection( + self, mq_bot, monkeypatch, chat_id, expected, caplog + ): + def _post(*args, **kwargs): + pass + + monkeypatch.setattr(mq_bot, '_post', _post) + with caplog.at_level(logging.DEBUG): + result = mq_bot.send_message( + chat_id, 'text', delay_queue=MessageQueue.DEFAULT_QUEUE_NAME + ) + assert isinstance(result, Promise) + assert len(caplog.records) == 4 + assert expected in caplog.records[1].getMessage() + + with caplog.at_level(logging.DEBUG): + result = mq_bot.send_message( + text='text', chat_id=chat_id, delay_queue=MessageQueue.DEFAULT_QUEUE_NAME + ) + assert isinstance(result, Promise) + assert len(caplog.records) == 8 + assert expected in caplog.records[5].getMessage() + + with caplog.at_level(logging.DEBUG): + assert isinstance(mq_bot.get_me(delay_queue=MessageQueue.DEFAULT_QUEUE_NAME), Promise) + assert len(caplog.records) == 12 + assert 'default' in caplog.records[9].getMessage() + + def test_stopped_message_queue(self, mq_bot, caplog): + mq_bot.message_queue.stop() + with caplog.at_level(logging.DEBUG): + result = mq_bot.get_me(delay_queue=MessageQueue.DEFAULT_QUEUE_NAME) + assert not isinstance(result, Promise) + assert len(caplog.records) >= 2 + assert 'Ignoring call to MessageQueue' in caplog.records[1].getMessage() + + def test_no_message_queue(self, bot, caplog): + with caplog.at_level(logging.DEBUG): + result = bot.get_me(delay_queue=MessageQueue.DEFAULT_QUEUE_NAME) + assert not isinstance(result, Promise) + assert len(caplog.records) >= 2 + assert 'Ignoring call to MessageQueue' in caplog.records[1].getMessage() + + def test_message_queue_custom_delay_queue(self, chat_id, mq_bot, monkeypatch): + test_flag = False + orig_put = mq_bot.message_queue.delay_queues['custom_dq'].put + + def put(*args, **kwargs): + nonlocal test_flag + test_flag = True + return orig_put(*args, **kwargs) + + result = mq_bot.send_message(chat_id, 'general kenobi') + assert not isinstance(result, Promise) + + monkeypatch.setattr(mq_bot.message_queue.delay_queues['custom_dq'], 'put', put) + result = mq_bot.send_message(chat_id, 'hello there', delay_queue='custom_dq') + assert isinstance(result, Promise) + assert test_flag + + @pytest.mark.parametrize( + 'default_bot', + [ + { + 'delay_queue': MessageQueue.DEFAULT_QUEUE_NAME, + 'delay_queue_per_method': { + 'send_dice': MessageQueue.GROUP_QUEUE_NAME, + 'send_poll': None, + }, + } + ], + indirect=True, + ) + def test_message_queue_with_defaults(self, chat_id, default_bot, monkeypatch): + default_bot.message_queue = MessageQueue() + + default_counter = 0 + group_counter = 0 + orig_default_put = default_bot.message_queue.default_queue.put + orig_group_put = default_bot.message_queue.default_queue.put + + def default_put(*args, **kwargs): + nonlocal default_counter + default_counter += 1 + return orig_default_put(*args, **kwargs) + + def group_put(*args, **kwargs): + nonlocal group_counter + group_counter += 1 + return orig_group_put(*args, **kwargs) + + try: + monkeypatch.setattr(default_bot.message_queue.default_queue, 'put', default_put) + monkeypatch.setattr(default_bot.message_queue.group_queue, 'put', group_put) + + result = default_bot.send_message(chat_id, 'general kenobi') + assert isinstance(result, Promise) + assert default_counter == 1 + assert group_counter == 0 + + result = default_bot.send_poll(chat_id, 'question', options=['1', '2']) + assert isinstance(result, Message) + assert default_counter == 1 + assert group_counter == 0 + + result = default_bot.send_dice(chat_id) + assert isinstance(result, Promise) + assert default_counter == 1 + assert group_counter == 1 + finally: + default_bot.message_queue.stop() + default_bot.message_queue = None diff --git a/tests/test_defaults.py b/tests/test_defaults.py index 23059b70459..ca22d63cd15 100644 --- a/tests/test_defaults.py +++ b/tests/test_defaults.py @@ -43,6 +43,10 @@ def test_data_assignment(self, cdp): defaults.tzinfo = True with pytest.raises(AttributeError): defaults.run_async = True + with pytest.raises(AttributeError): + defaults.delay_queue = True + with pytest.raises(AttributeError): + defaults.delay_queue_per_method = True def test_equality(self): a = Defaults(parse_mode='HTML', quote=True) diff --git a/tests/test_messagequeue.py b/tests/test_messagequeue.py index f9ebfc90159..942f6555924 100644 --- a/tests/test_messagequeue.py +++ b/tests/test_messagequeue.py @@ -16,55 +16,378 @@ # # You should have received a copy of the GNU Lesser Public License # along with this program. If not, see [http://www.gnu.org/licenses/]. - -import os from time import sleep, perf_counter import pytest -import telegram.ext.messagequeue as mq +from telegram import Bot +from telegram.ext import MessageQueue, DelayQueue, DelayQueueError +from telegram.ext.messagequeue import queuedmessage -@pytest.mark.skipif( - os.getenv('GITHUB_ACTIONS', False) and os.name == 'nt', - reason="On windows precise timings are not accurate.", -) class TestDelayQueue: N = 128 burst_limit = 30 time_limit_ms = 1000 margin_ms = 0 - testtimes = [] + test_times = [] + test_flag = None + + @pytest.fixture(autouse=True) + def reset(self): + DelayQueue.INSTANCE_COUNT = 0 + self.test_flag = None def call(self): - self.testtimes.append(perf_counter()) + self.test_times.append(perf_counter()) + + def callback_raises_exception(self): + raise DelayQueueError('TestError') + + def test_auto_start_false(self): + delay_queue = DelayQueue(autostart=False) + assert not delay_queue.is_alive() - def test_delayqueue_limits(self): - dsp = mq.DelayQueue( + def test_name(self): + delay_queue = DelayQueue(autostart=False) + assert delay_queue.name == 'DelayQueue-1' + delay_queue = DelayQueue(autostart=False) + assert delay_queue.name == 'DelayQueue-2' + delay_queue = DelayQueue(name='test_queue', autostart=False) + assert delay_queue.name == 'test_queue' + + def test_exc_route_deprecation(self, recwarn): + with pytest.raises(ValueError, match='Only one of exc_route or '): + DelayQueue(exc_route=True, error_handler=True, autostart=False) + + DelayQueue(exc_route=True, autostart=False) + assert len(recwarn) == 1 + assert str(recwarn[0].message).startswith('The exc_route argument is') + + def test_delay_queue_limits(self): + delay_queue = DelayQueue( burst_limit=self.burst_limit, time_limit_ms=self.time_limit_ms, autostart=True ) - assert dsp.is_alive() is True + assert delay_queue.is_alive() is True + + try: + for _ in range(self.N): + delay_queue.put(self.call, [], {}) + + start_time = perf_counter() + # wait up to 20 sec more than needed + app_end_time = ( + (self.N * self.burst_limit / (1000 * self.time_limit_ms)) + start_time + 20 + ) + while not delay_queue._queue.empty() and perf_counter() < app_end_time: + sleep(0.5) + assert delay_queue._queue.empty() is True # check loop exit condition + + delay_queue.stop() + assert delay_queue.is_alive() is False + + assert self.test_times or self.N == 0 + passes, fails = [], [] + delta = (self.time_limit_ms - self.margin_ms) / 1000 + for start, stop in enumerate(range(self.burst_limit + 1, len(self.test_times))): + part = self.test_times[start:stop] + if (part[-1] - part[0]) >= delta: + passes.append(part) + else: + fails.append(part) + assert fails == [] + finally: + delay_queue.stop() + + def test_with_priority(self): + parent_queue = DelayQueue() + high_priority_queue = DelayQueue(parent=parent_queue, priority=0) + low_priority_queue = DelayQueue(parent=parent_queue, priority=1) + high_priority_count = 0 + low_priority_count = 0 + event_list = [] + + def low_priority_callback(): + nonlocal low_priority_count + nonlocal event_list + event_list.append((low_priority_count, 'low')) + low_priority_count += 1 + + def high_priority_callback(): + nonlocal high_priority_count + nonlocal event_list + event_list.append((high_priority_count, 'high')) + high_priority_count += 1 + + # enqueue low priority first + for _ in range(3): + low_priority_queue.put(low_priority_callback, args=[], kwargs={}) - for _ in range(self.N): - dsp(self.call) + # enqueue high priority second + for _ in range(3): + high_priority_queue.put(high_priority_callback, args=[], kwargs={}) - starttime = perf_counter() - # wait up to 20 sec more than needed - app_endtime = (self.N * self.burst_limit / (1000 * self.time_limit_ms)) + starttime + 20 - while not dsp._queue.empty() and perf_counter() < app_endtime: + try: sleep(1) - assert dsp._queue.empty() is True # check loop exit condition - - dsp.stop() - assert dsp.is_alive() is False - - assert self.testtimes or self.N == 0 - passes, fails = [], [] - delta = (self.time_limit_ms - self.margin_ms) / 1000 - for start, stop in enumerate(range(self.burst_limit + 1, len(self.testtimes))): - part = self.testtimes[start:stop] - if (part[-1] - part[0]) >= delta: - passes.append(part) - else: - fails.append(part) - assert fails == [] + # high priority events should be handled first + assert event_list == [ + (0, 'high'), + (1, 'high'), + (2, 'high'), + (0, 'low'), + (1, 'low'), + (2, 'low'), + ] + finally: + parent_queue.stop() + low_priority_queue.stop() + high_priority_queue.stop() + + def test_put_errors(self): + delay_queue = DelayQueue(autostart=False) + with pytest.raises(DelayQueueError, match='stopped thread'): + delay_queue.put(promise=True) + + delay_queue.start() + try: + with pytest.raises(ValueError, match='You must pass either'): + delay_queue.put() + with pytest.raises(ValueError, match='You must pass either'): + delay_queue.put(promise=True, args=True, kwargs=True, func=True) + with pytest.raises(ValueError, match='You must pass either'): + delay_queue.put(args=True) + finally: + delay_queue.stop() + + def test_default_error_handler_without_dispatcher(self, monkeypatch): + @staticmethod + def exc_route(exception): + self.test_flag = ( + isinstance(exception, DelayQueueError) and str(exception) == 'TestError' + ) + + monkeypatch.setattr(DelayQueue, '_default_exception_handler', exc_route) + + delay_queue = DelayQueue() + try: + delay_queue.put(self.callback_raises_exception, [], {}) + sleep(0.5) + assert self.test_flag + finally: + delay_queue.stop() + + def test_custom_error_handler_without_dispatcher(self): + def exc_route(exception): + self.test_flag = ( + isinstance(exception, DelayQueueError) and str(exception) == 'TestError' + ) + + delay_queue = DelayQueue(exc_route=exc_route) + try: + delay_queue.put(self.callback_raises_exception, [], {}) + sleep(0.5) + assert self.test_flag + finally: + delay_queue.stop() + + def test_custom_error_handler_with_dispatcher(self, cdp): + def error_handler(_, context): + self.test_flag = ( + isinstance(context.error, DelayQueueError) and str(context.error) == 'TestError' + ) + + cdp.add_error_handler(error_handler) + delay_queue = DelayQueue() + delay_queue.set_dispatcher(cdp) + try: + delay_queue.put(self.callback_raises_exception, [], {}) + sleep(0.5) + assert self.test_flag + finally: + delay_queue.stop() + + def test_parent(self, monkeypatch): + def put(*args, **kwargs): + self.test_flag = bool(kwargs.pop('promise', False)) + + parent = DelayQueue(name='parent') + monkeypatch.setattr(parent, 'put', put) + + delay_queue = DelayQueue(parent=parent) + try: + delay_queue.put(self.call, [], {}) + sleep(0.5) + assert self.test_flag + finally: + parent.stop() + delay_queue.stop() + + +class TestMessageQueue: + test_flag = None + + @pytest.fixture(autouse=True) + def reset(self): + DelayQueue.INSTANCE_COUNT = 0 + self.test_flag = None + + def call(self, arg, kwarg=None): + self.test_flag = arg == 1 and kwarg == 'foo' + + def callback_raises_exception(self): + raise DelayQueueError('TestError') + + def test_auto_start_false(self): + message_queue = MessageQueue(autostart=False) + assert not any(thread.is_alive() for thread in message_queue.delay_queues.values()) + + def test_exc_route_deprecation(self, recwarn): + with pytest.raises(ValueError, match='Only one of exc_route or '): + MessageQueue(exc_route=True, error_handler=True, autostart=False) + + MessageQueue(exc_route=True, autostart=False) + assert len(recwarn) == 1 + assert str(recwarn[0].message).startswith('The exc_route argument is') + + def test_add_delay_queue_autostart_false(self): + message_queue = MessageQueue(autostart=False) + delay_queue = DelayQueue(autostart=False, name='dq') + try: + message_queue.add_delay_queue(delay_queue) + assert 'dq' in message_queue.delay_queues + assert not any(thread.is_alive() for thread in message_queue.delay_queues.values()) + message_queue.start() + assert all(thread.is_alive() for thread in message_queue.delay_queues.values()) + + message_queue.stop() + assert not any(thread.is_alive() for thread in message_queue.delay_queues.values()) + finally: + delay_queue.stop() + message_queue.stop() + + @pytest.mark.parametrize('autostart', [True, False]) + def test_add_delay_queue_autostart_true(self, autostart): + message_queue = MessageQueue() + delay_queue = DelayQueue(name='dq', autostart=autostart) + try: + message_queue.add_delay_queue(delay_queue) + assert 'dq' in message_queue.delay_queues + assert delay_queue.is_alive() + assert all(thread.is_alive() for thread in message_queue.delay_queues.values()) + + message_queue.stop() + assert not any(thread.is_alive() for thread in message_queue.delay_queues.values()) + finally: + delay_queue.stop() + message_queue.stop() + + def test_add_delay_queue_dispatcher(self, dp): + message_queue = MessageQueue(autostart=False) + message_queue.set_dispatcher(dispatcher=dp) + delay_queue = DelayQueue(autostart=False, name='dq') + message_queue.add_delay_queue(delay_queue) + assert delay_queue.dispatcher is dp + + @pytest.mark.parametrize('autostart', [True, False]) + def test_remove_delay_queue(self, autostart): + message_queue = MessageQueue(autostart=autostart) + delay_queue = DelayQueue(name='dq') + try: + message_queue.add_delay_queue(delay_queue) + assert 'dq' in message_queue.delay_queues + assert delay_queue.is_alive() + + message_queue.remove_delay_queue('dq') + assert 'dq' not in message_queue.delay_queues + if autostart: + assert not delay_queue.is_alive() + finally: + delay_queue.stop() + if autostart: + message_queue.stop() + + def test_put(self): + group_flag = None + + message_queue = MessageQueue() + original_put = message_queue.default_queue.put + + def put(*args, **kwargs): + nonlocal group_flag + group_flag = True + return original_put(*args, **kwargs) + + message_queue.group_queue.put = put + + try: + message_queue.put(self.call, MessageQueue.GROUP_QUEUE_NAME, 1, kwarg='foo') + sleep(0.5) + assert self.test_flag is True + # make sure that group queue was called, too + assert group_flag is True + finally: + message_queue.group_queue.put = original_put + message_queue.stop() + + +@pytest.fixture(scope='function') +def mq_bot(bot, monkeypatch): + class MQBot(Bot): + def __init__(self, *args, **kwargs): + self.test = None + self.default_count = 0 + self.group_count = 0 + super().__init__(*args, **kwargs) + # below 2 attributes should be provided for decorator usage + self._is_messages_queued_default = True + self._msg_queue = MessageQueue() + + @queuedmessage + def test_method(self, input, *args, **kwargs): + self.test = input + + bot = MQBot(token=bot.token) + + orig_default_put = bot._msg_queue.default_queue.put + orig_group_put = bot._msg_queue.group_queue.put + + def step_default_counter(*args, **kwargs): + orig_default_put(*args, **kwargs) + bot.default_count += 1 + + def step_group_counter(*args, **kwargs): + orig_group_put(*args, **kwargs) + bot.group_count += 1 + + monkeypatch.setattr(bot._msg_queue.default_queue, 'put', step_default_counter) + monkeypatch.setattr(bot._msg_queue.group_queue, 'put', step_group_counter) + yield bot + bot._msg_queue.stop() + + +class TestDecorator: + def test_queued_kwarg(self, mq_bot): + mq_bot.test_method('received', queued=False) + sleep(0.5) + assert mq_bot.default_count == 0 + assert mq_bot.group_count == 0 + assert mq_bot.test == 'received' + + mq_bot.test_method('received1') + sleep(0.5) + assert mq_bot.default_count == 1 + assert mq_bot.group_count == 0 + assert mq_bot.test == 'received1' + + def test_isgroup_kwarg(self, mq_bot): + mq_bot.test_method('received', isgroup=False) + sleep(0.5) + assert mq_bot.default_count == 1 + assert mq_bot.group_count == 0 + assert mq_bot.test == 'received' + + mq_bot.test_method('received1', isgroup=True) + sleep(0.5) + assert mq_bot.default_count == 2 + assert mq_bot.group_count == 1 + assert mq_bot.test == 'received1' diff --git a/tests/test_official.py b/tests/test_official.py index 56f830bb781..e6c4c9cbc7f 100644 --- a/tests/test_official.py +++ b/tests/test_official.py @@ -36,6 +36,7 @@ 'timeout', 'bot', 'api_kwargs', + 'delay_queue', } diff --git a/tests/test_updater.py b/tests/test_updater.py index 745836acd3d..18b949761cc 100644 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -38,13 +38,13 @@ from telegram import TelegramError, Message, User, Chat, Update, Bot from telegram.error import Unauthorized, InvalidToken, TimedOut, RetryAfter -from telegram.ext import Updater, Dispatcher, DictPersistence, Defaults +from telegram.ext import Updater, Dispatcher, DictPersistence, Defaults, MessageQueue from telegram.utils.deprecate import TelegramDeprecationWarning from telegram.utils.webhookhandler import WebhookServer signalskip = pytest.mark.skipif( sys.platform == 'win32', - reason='Can\'t send signals without stopping ' 'whole process on windows', + reason='Can\'t send signals without stopping whole process on windows', ) @@ -583,3 +583,27 @@ def test_mutual_exclude_use_context_dispatcher(self): def test_defaults_warning(self, bot): with pytest.warns(TelegramDeprecationWarning, match='no effect when a Bot is passed'): Updater(bot=bot, defaults=Defaults()) + + def test_message_queue(self, bot, caplog): + updater = Updater(bot.token, message_queue=MessageQueue()) + updater.running = True + try: + assert updater.bot.message_queue.dispatcher is updater.dispatcher + with caplog.at_level(logging.DEBUG): + updater.stop() + assert caplog.records[1].getMessage() == 'Requesting MessageQueue to stop...' + assert not updater.bot.message_queue.running + finally: + updater.stop() + + updater = Updater(bot.token, message_queue=MessageQueue(autostart=False)) + updater.running = True + try: + assert updater.bot.message_queue.running + assert updater.bot.message_queue.dispatcher is updater.dispatcher + with caplog.at_level(logging.DEBUG): + updater.stop() + assert caplog.records[1].getMessage() == 'Requesting MessageQueue to stop...' + assert not updater.bot.message_queue.running + finally: + updater.stop()