-
Notifications
You must be signed in to change notification settings - Fork 5.8k
clean pending update based on Timedelta or datetime #1987
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
d9988e2
deb2515
d7254b1
158419d
bc5d964
b5db0c6
62015f4
07e22b1
01cf389
766cbe6
663aece
684280f
f766b01
1541d64
4804b8b
21e0226
cafb188
531b484
ce35971
33410d3
bd21699
99dd377
cb7bb3b
6c44b67
51953a6
e3f46b8
73adbdd
907e092
c887ac3
eae79ec
30e854f
091b7a9
72098b7
5f60741
2378587
233ba3c
cd7afca
2c8990e
641e1e7
5b6f522
60dad8e
5bf813c
cac4bf4
030b9ad
d3479c0
afe7b24
08a4717
0040c88
97f3197
82f8cee
92346a5
9f5a8ce
1787202
0c4208c
2043335
e4140d9
0e96ed5
d06ecf9
534ec26
fb082a7
7dce85d
e1144a9
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -22,8 +22,10 @@ | |
import ssl | ||
from threading import Thread, Lock, current_thread, Event | ||
from time import sleep | ||
from datetime import datetime, timedelta, timezone | ||
from signal import signal, SIGINT, SIGTERM, SIGABRT | ||
from queue import Queue | ||
from functools import partial | ||
|
||
from telegram import Bot, TelegramError | ||
from telegram.ext import Dispatcher, JobQueue | ||
|
@@ -233,8 +235,25 @@ def start_polling(self, | |
poll_interval (:obj:`float`, optional): Time to wait between polling updates from | ||
Telegram in seconds. Default is 0.0. | ||
timeout (:obj:`float`, optional): Passed to :attr:`telegram.Bot.get_updates`. | ||
clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers | ||
before actually starting to poll. Default is False. | ||
clean (:obj:`bool` | :obj:`datetime.timedelta` | :obj:`datetime.timedelta`, optional): | ||
Whether to clean any pending updates on Telegram servers before actually starting | ||
to poll. This parameter will be interpreted depending on its type. | ||
|
||
* :obj:`bool` ``True`` cleans all updates. Default is ``False``. | ||
* :obj:`datetime.timedelta` will be interpreted as "time before now" cut off. | ||
Pending updates older than the cut off will be cleaned up. | ||
:obj:`datetime.timedelta` is sign independent, both positive and negative deltas | ||
are interpreted as "in the past". | ||
* :obj:`datetime.datetime` will be interpreted as a specific date and time as | ||
cut off. Pending updates older than the cut off will be cleaned up. | ||
If the timezone (``datetime.tzinfo``) is ``None``, UTC will be assumed. | ||
|
||
Note: | ||
If :attr:`clean` is :obj:`datetime.timedelta` or :obj:`datetime.datetime` and | ||
if a :class:`telegram.Update.effective_message` is found with | ||
:attr:`telegram.Message.date` is ``None``, before the :obj:`datetime.timedelta` | ||
or :obj:`datetime.datetime` condition is met, all updates will pass through. | ||
|
||
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
|
||
|
@@ -251,11 +270,38 @@ def start_polling(self, | |
Returns: | ||
:obj:`Queue`: The update queue that can be filled from the main thread. | ||
|
||
Raises: | ||
ValueError: if :attr:`clean` is :obj:`datetime.timedelta` and is < 1 second. | ||
ValueError: if :attr:`clean` is :obj:`datetime.datetime` is not a least 1 second older | ||
than `now()`. | ||
|
||
""" | ||
with self.__lock: | ||
if not self.running: | ||
self.running = True | ||
|
||
if isinstance(clean, timedelta): | ||
if clean.total_seconds() < 0: | ||
clean = clean * -1 | ||
|
||
if clean.total_seconds() < 1: | ||
raise ValueError('Clean as timedelta needs to be >= 1 second') | ||
else: | ||
# convert to datetime | ||
clean = datetime.now(tz=timezone.utc) - clean | ||
elif isinstance(clean, datetime): | ||
|
||
if ( | ||
clean.tzinfo is None or | ||
(clean.tzinfo is not None and clean.tzinfo.utcoffset(clean) is None) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What is the second check good for? If the datetime is aware it shouldn't matter what the UTC offset is? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. per PY 3.6.10 documentation (scroll down just a little to the datetime.timezone section the requirement to be 'aware' is to have both conditions. If UTC offset is
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. PY 3.8.3 documentation even has dedicated a paragraph on it
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I thought about changing it to There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We don't check |
||
): | ||
clean=clean.replace(tzinfo=timezone.utc) | ||
|
||
if clean > (datetime.now(tz=timezone.utc) - timedelta(seconds=1)): | ||
raise ValueError('Clean as datetime ("%s") needs to be at least 1 second' | ||
' older than "now"("%s")' % (clean, | ||
datetime.now(tz=timezone.utc))) | ||
|
||
# Create & start threads | ||
self.job_queue.start() | ||
dispatcher_ready = Event() | ||
|
@@ -291,8 +337,25 @@ def start_webhook(self, | |
url_path (:obj:`str`, optional): Path inside url. | ||
cert (:obj:`str`, optional): Path to the SSL certificate file. | ||
key (:obj:`str`, optional): Path to the SSL key file. | ||
clean (:obj:`bool`, optional): Whether to clean any pending updates on Telegram servers | ||
before actually starting the webhook. Default is ``False``. | ||
clean (:obj:`bool` | :obj:`datetime.timedelta` | :obj:`datetime.timedelta`, optional): | ||
Whether to clean any pending updates on Telegram servers before actually starting | ||
to poll. This parameter will be interpreted depending on its type. | ||
|
||
* :obj:`bool` ``True`` cleans all updates. Default is ``False``. | ||
* :obj:`datetime.timedelta` will be interpreted as "time before now" cut off. | ||
Pending updates older than the cut off will be cleaned up. | ||
:obj:`datetime.timedelta` is sign independent, both positive and negative deltas | ||
are interpreted as "in the past". | ||
* :obj:`datetime.datetime` will be interpreted as a specific date and time as | ||
cut off. Pending updates older than the cut off will be cleaned up. | ||
If the timezone (``datetime.tzinfo``) is ``None``, UTC will be assumed. | ||
|
||
Note: | ||
If :attr:`clean` is :obj:`datetime.timedelta` or :obj:`datetime.datetime` and | ||
if a :class:`telegram.Update.effective_message` is found with | ||
:attr:`telegram.Message.date` is ``None``, before the :obj:`datetime.timedelta` | ||
or :obj:`datetime.datetime` condition is met, all updates will pass through. | ||
|
||
bootstrap_retries (:obj:`int`, optional): Whether the bootstrapping phase of the | ||
`Updater` will retry on failures on the Telegram server. | ||
|
||
|
@@ -308,11 +371,37 @@ def start_webhook(self, | |
Returns: | ||
:obj:`Queue`: The update queue that can be filled from the main thread. | ||
|
||
Raises: | ||
ValueError: if :attr:`clean` is :obj:`datetime.timedelta` and is < 1 second. | ||
ValueError: if :attr:`clean` is :obj:`datetime.datetime` is not a least 1 second older | ||
than `now()`. | ||
|
||
""" | ||
with self.__lock: | ||
if not self.running: | ||
self.running = True | ||
|
||
if isinstance(clean, timedelta): | ||
if clean.total_seconds() < 0: | ||
clean = clean * -1 | ||
|
||
if clean.total_seconds() < 1: | ||
raise ValueError('Clean as timedelta needs to be >= 1 second') | ||
else: | ||
# convert to datetime | ||
clean = datetime.now(tz=timezone.utc) - clean | ||
elif isinstance(clean, datetime): | ||
if ( | ||
clean.tzinfo is None or | ||
(clean.tzinfo is not None and clean.tzinfo.utcoffset(clean) is None) | ||
): | ||
clean=clean.replace(tzinfo=timezone.utc) | ||
|
||
if clean > (datetime.now(tz=timezone.utc) - timedelta(seconds=1)): | ||
raise ValueError('Clean as datetime ("%s") needs to be at least 1 second' | ||
' older than "now"("%s")' % (clean, | ||
datetime.now(tz=timezone.utc))) | ||
|
||
# Create & start threads | ||
self.job_queue.start() | ||
self._init_thread(self.dispatcher.start, "dispatcher"), | ||
|
@@ -475,6 +564,23 @@ def bootstrap_clean_updates(): | |
updates = self.bot.get_updates(updates[-1].update_id + 1) | ||
return False | ||
|
||
def bootstrap_clean_updates_datetime(datetime_cutoff): | ||
self.logger.debug('Cleaning updates from Telegram server with datetime "%s"', | ||
datetime_cutoff) | ||
updates = self.bot.get_updates() | ||
|
||
# reversed as we just need to find the first msg that's too old | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we be sure that the updates are always ordered? I just don't know tbh. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🤦 assumption is the mother of f..... i got a sequential list every time i ran my tests (and a ran a lot of m... mumbling something about dates and time) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note that the clean as bool part also assumes an ordered by update_id list. we could do maybe even better to put this in There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mh … Trusting TG to send updates with incremental
Trusting that the date necessarily increasing with the |
||
for up in reversed(updates): | ||
if up.effective_message.date is None: | ||
# break out and leave all updates as is | ||
return False | ||
elif up.effective_message and (up.effective_message.date < datetime_cutoff): | ||
# break out, we want to process the 'next' and all following msg's | ||
updates = self.bot.get_updates(up.update_id + 1) | ||
return False | ||
|
||
return False | ||
|
||
def bootstrap_set_webhook(): | ||
self.bot.set_webhook(url=webhook_url, | ||
certificate=cert, | ||
|
@@ -500,11 +606,16 @@ def bootstrap_onerr_cb(exc): | |
retries[0] = 0 | ||
|
||
# Clean pending messages, if requested. | ||
if clean: | ||
if isinstance(clean, bool) and clean: | ||
self._network_loop_retry(bootstrap_clean_updates, bootstrap_onerr_cb, | ||
'bootstrap clean updates', bootstrap_interval) | ||
retries[0] = 0 | ||
sleep(1) | ||
elif isinstance(clean, datetime): | ||
bootstrap_clean_updates_datetime_p = partial(bootstrap_clean_updates_datetime, | ||
datetime_cutoff=clean) | ||
self._network_loop_retry(bootstrap_clean_updates_datetime_p, bootstrap_onerr_cb, | ||
'bootstrap clean updates datetime', bootstrap_interval) | ||
retries[0] = 0 | ||
|
||
# Restore/set webhook settings, if needed. Again, we don't know ahead if a webhook is set, | ||
# so we set it anyhow. | ||
|
Uh oh!
There was an error while loading. Please reload this page.