Skip to content

Commit 6ec81dd

Browse files
jh0kertsnoam
authored andcommitted
move cleaning updates to bootstrapping phase (python-telegram-bot#282)
1 parent 252cafb commit 6ec81dd

File tree

2 files changed

+64
-60
lines changed

2 files changed

+64
-60
lines changed

telegram/ext/updater.py

Lines changed: 56 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,23 @@ def __init__(self,
9696
self.__threads = []
9797
""":type: list[Thread]"""
9898

99+
def _init_thread(self, target, name, *args, **kwargs):
100+
thr = Thread(target=self._thread_wrapper, name=name,
101+
args=(target,) + args, kwargs=kwargs)
102+
thr.start()
103+
self.__threads.append(thr)
104+
105+
def _thread_wrapper(self, target, *args, **kwargs):
106+
thr_name = current_thread().name
107+
self.logger.debug('{0} - started'.format(thr_name))
108+
try:
109+
target(*args, **kwargs)
110+
except Exception:
111+
self.__exception_event.set()
112+
self.logger.exception('unhandled exception')
113+
raise
114+
self.logger.debug('{0} - ended'.format(thr_name))
115+
99116
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2,
100117
clean=False, bootstrap_retries=0):
101118
"""
@@ -123,35 +140,16 @@ def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2,
123140
with self.__lock:
124141
if not self.running:
125142
self.running = True
126-
if clean:
127-
self._clean_updates()
128143

129144
# Create & start threads
130145
self._init_thread(self.dispatcher.start, "dispatcher")
131146
self._init_thread(self._start_polling, "updater",
132147
poll_interval, timeout, network_delay,
133-
bootstrap_retries)
148+
bootstrap_retries, clean)
134149

135150
# Return the update queue so the main thread can insert updates
136151
return self.update_queue
137152

138-
def _init_thread(self, target, name, *args, **kwargs):
139-
thr = Thread(target=self._thread_wrapper, name=name,
140-
args=(target,) + args, kwargs=kwargs)
141-
thr.start()
142-
self.__threads.append(thr)
143-
144-
def _thread_wrapper(self, target, *args, **kwargs):
145-
thr_name = current_thread().name
146-
self.logger.debug('{0} - started'.format(thr_name))
147-
try:
148-
target(*args, **kwargs)
149-
except Exception:
150-
self.__exception_event.set()
151-
self.logger.exception('unhandled exception')
152-
raise
153-
self.logger.debug('{0} - ended'.format(thr_name))
154-
155153
def start_webhook(self,
156154
listen='127.0.0.1',
157155
port=80,
@@ -194,20 +192,18 @@ def start_webhook(self,
194192
with self.__lock:
195193
if not self.running:
196194
self.running = True
197-
if clean:
198-
self._clean_updates()
199195

200196
# Create & start threads
201197
self._init_thread(self.dispatcher.start, "dispatcher"),
202198
self._init_thread(self._start_webhook, "updater", listen,
203199
port, url_path, cert, key, bootstrap_retries,
204-
webhook_url)
200+
clean, webhook_url)
205201

206202
# Return the update queue so the main thread can insert updates
207203
return self.update_queue
208204

209205
def _start_polling(self, poll_interval, timeout, network_delay,
210-
bootstrap_retries):
206+
bootstrap_retries, clean):
211207
"""
212208
Thread target of thread 'updater'. Runs in background, pulls
213209
updates from Telegram and inserts them in the update queue of the
@@ -217,7 +213,7 @@ def _start_polling(self, poll_interval, timeout, network_delay,
217213
cur_interval = poll_interval
218214
self.logger.debug('Updater thread started')
219215

220-
self._set_webhook(None, bootstrap_retries, None)
216+
self._bootstrap(bootstrap_retries, clean=clean, webhook_url='')
221217

222218
while self.running:
223219
try:
@@ -249,28 +245,6 @@ def _start_polling(self, poll_interval, timeout, network_delay,
249245

250246
sleep(cur_interval)
251247

252-
def _set_webhook(self, webhook_url, max_retries, cert):
253-
retries = 0
254-
while 1:
255-
try:
256-
# Remove webhook
257-
self.bot.setWebhook(webhook_url=webhook_url,
258-
certificate=cert)
259-
except (Unauthorized, InvalidToken):
260-
raise
261-
except TelegramError:
262-
msg = 'failed to set webhook; try={0} max_retries={1}'.format(
263-
retries, max_retries)
264-
if max_retries < 0 or retries < max_retries:
265-
self.logger.info(msg)
266-
retries += 1
267-
else:
268-
self.logger.exception(msg)
269-
raise
270-
else:
271-
break
272-
sleep(1)
273-
274248
@staticmethod
275249
def _increase_poll_interval(current_interval):
276250
# increase waiting times on subsequent errors up to 30secs
@@ -283,7 +257,7 @@ def _increase_poll_interval(current_interval):
283257
return current_interval
284258

285259
def _start_webhook(self, listen, port, url_path, cert, key,
286-
bootstrap_retries, webhook_url):
260+
bootstrap_retries, clean, webhook_url):
287261
self.logger.debug('Updater thread started')
288262
use_ssl = cert is not None and key is not None
289263
if not url_path.startswith('/'):
@@ -300,8 +274,11 @@ def _start_webhook(self, listen, port, url_path, cert, key,
300274
if not webhook_url:
301275
webhook_url = self._gen_webhook_url(listen, port, url_path)
302276

303-
self._set_webhook(webhook_url, bootstrap_retries,
304-
open(cert, 'rb'))
277+
self._bootstrap(max_retries=bootstrap_retries, clean=clean,
278+
webhook_url=webhook_url, cert=open(cert, 'rb'))
279+
elif clean:
280+
self.logger.warning("cleaning updates is not supported if "
281+
"SSL-termination happens elsewhere; skipping")
305282

306283
self.httpd.serve_forever(poll_interval=1)
307284

@@ -326,12 +303,40 @@ def _check_ssl_cert(self, cert, key):
326303
else:
327304
raise TelegramError('SSL Certificate invalid')
328305

329-
def _gen_webhook_url(self, listen, port, url_path):
306+
@staticmethod
307+
def _gen_webhook_url(listen, port, url_path):
330308
return 'https://{listen}:{port}{path}'.format(
331309
listen=listen,
332310
port=port,
333311
path=url_path)
334312

313+
def _bootstrap(self, max_retries, clean, webhook_url, cert=None):
314+
retries = 0
315+
while True:
316+
317+
try:
318+
if clean:
319+
# Disable webhook for cleaning
320+
self.bot.setWebhook(webhook_url='')
321+
self._clean_updates()
322+
323+
self.bot.setWebhook(webhook_url=webhook_url,
324+
certificate=cert)
325+
except (Unauthorized, InvalidToken):
326+
raise
327+
except TelegramError:
328+
msg = 'error in bootstrap phase; try={0} max_retries={1}'\
329+
.format(retries, max_retries)
330+
if max_retries < 0 or retries < max_retries:
331+
self.logger.warning(msg)
332+
retries += 1
333+
else:
334+
self.logger.exception(msg)
335+
raise
336+
else:
337+
break
338+
sleep(1)
339+
335340
def _clean_updates(self):
336341
self.logger.debug('Cleaning updates from Telegram server')
337342
updates = self.bot.getUpdates()

tests/test_updater.py

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,7 @@ def test_bootstrap_retries_success(self):
489489
retries = 3
490490
self._setup_updater('', messages=0, bootstrap_retries=retries)
491491

492-
self.updater._set_webhook('path', retries, None)
492+
self.updater._bootstrap(retries, False, 'path', None)
493493
self.assertEqual(self.updater.bot.bootstrap_attempts, retries)
494494

495495
def test_bootstrap_retries_unauth(self):
@@ -499,8 +499,8 @@ def test_bootstrap_retries_unauth(self):
499499
bootstrap_retries=retries,
500500
bootstrap_err=Unauthorized())
501501

502-
self.assertRaises(Unauthorized, self.updater._set_webhook, 'path',
503-
retries, None)
502+
self.assertRaises(Unauthorized, self.updater._bootstrap,
503+
retries, False, 'path', None)
504504
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
505505

506506
def test_bootstrap_retries_invalid_token(self):
@@ -510,17 +510,16 @@ def test_bootstrap_retries_invalid_token(self):
510510
bootstrap_retries=retries,
511511
bootstrap_err=InvalidToken())
512512

513-
self.assertRaises(InvalidToken, self.updater._set_webhook, 'path',
514-
retries, None)
513+
self.assertRaises(InvalidToken, self.updater._bootstrap,
514+
retries, False, 'path', None)
515515
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
516516

517517
def test_bootstrap_retries_fail(self):
518518
retries = 1
519519
self._setup_updater('', messages=0, bootstrap_retries=retries)
520520

521-
self.assertRaisesRegexp(TelegramError, 'test',
522-
self.updater._set_webhook, 'path', retries - 1,
523-
None)
521+
self.assertRaisesRegexp(TelegramError, 'test', self.updater._bootstrap,
522+
retries - 1, False, 'path', None)
524523
self.assertEqual(self.updater.bot.bootstrap_attempts, 1)
525524

526525
def test_webhook_invalid_posts(self):
@@ -529,7 +528,7 @@ def test_webhook_invalid_posts(self):
529528
ip = '127.0.0.1'
530529
port = randrange(1024, 49152) # select random port for travis
531530
thr = Thread(target=self.updater._start_webhook,
532-
args=(ip, port, '', None, None, 0, None))
531+
args=(ip, port, '', None, None, 0, False, None))
533532
thr.start()
534533

535534
sleep(0.5)

0 commit comments

Comments
 (0)