@@ -96,6 +96,23 @@ def __init__(self,
96
96
self .__threads = []
97
97
""":type: list[Thread]"""
98
98
99
+ def _init_thread (self , target , name , * args , ** kwargs ):
100
+ thr = Thread (target = self ._thread_wrapper , name = name ,
101
+ args = (target ,) + args , kwargs = kwargs )
102
+ thr .start ()
103
+ self .__threads .append (thr )
104
+
105
+ def _thread_wrapper (self , target , * args , ** kwargs ):
106
+ thr_name = current_thread ().name
107
+ self .logger .debug ('{0} - started' .format (thr_name ))
108
+ try :
109
+ target (* args , ** kwargs )
110
+ except Exception :
111
+ self .__exception_event .set ()
112
+ self .logger .exception ('unhandled exception' )
113
+ raise
114
+ self .logger .debug ('{0} - ended' .format (thr_name ))
115
+
99
116
def start_polling (self , poll_interval = 0.0 , timeout = 10 , network_delay = 2 ,
100
117
clean = False , bootstrap_retries = 0 ):
101
118
"""
@@ -123,35 +140,16 @@ def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2,
123
140
with self .__lock :
124
141
if not self .running :
125
142
self .running = True
126
- if clean :
127
- self ._clean_updates ()
128
143
129
144
# Create & start threads
130
145
self ._init_thread (self .dispatcher .start , "dispatcher" )
131
146
self ._init_thread (self ._start_polling , "updater" ,
132
147
poll_interval , timeout , network_delay ,
133
- bootstrap_retries )
148
+ bootstrap_retries , clean )
134
149
135
150
# Return the update queue so the main thread can insert updates
136
151
return self .update_queue
137
152
138
- def _init_thread (self , target , name , * args , ** kwargs ):
139
- thr = Thread (target = self ._thread_wrapper , name = name ,
140
- args = (target ,) + args , kwargs = kwargs )
141
- thr .start ()
142
- self .__threads .append (thr )
143
-
144
- def _thread_wrapper (self , target , * args , ** kwargs ):
145
- thr_name = current_thread ().name
146
- self .logger .debug ('{0} - started' .format (thr_name ))
147
- try :
148
- target (* args , ** kwargs )
149
- except Exception :
150
- self .__exception_event .set ()
151
- self .logger .exception ('unhandled exception' )
152
- raise
153
- self .logger .debug ('{0} - ended' .format (thr_name ))
154
-
155
153
def start_webhook (self ,
156
154
listen = '127.0.0.1' ,
157
155
port = 80 ,
@@ -194,20 +192,18 @@ def start_webhook(self,
194
192
with self .__lock :
195
193
if not self .running :
196
194
self .running = True
197
- if clean :
198
- self ._clean_updates ()
199
195
200
196
# Create & start threads
201
197
self ._init_thread (self .dispatcher .start , "dispatcher" ),
202
198
self ._init_thread (self ._start_webhook , "updater" , listen ,
203
199
port , url_path , cert , key , bootstrap_retries ,
204
- webhook_url )
200
+ clean , webhook_url )
205
201
206
202
# Return the update queue so the main thread can insert updates
207
203
return self .update_queue
208
204
209
205
def _start_polling (self , poll_interval , timeout , network_delay ,
210
- bootstrap_retries ):
206
+ bootstrap_retries , clean ):
211
207
"""
212
208
Thread target of thread 'updater'. Runs in background, pulls
213
209
updates from Telegram and inserts them in the update queue of the
@@ -217,7 +213,7 @@ def _start_polling(self, poll_interval, timeout, network_delay,
217
213
cur_interval = poll_interval
218
214
self .logger .debug ('Updater thread started' )
219
215
220
- self ._set_webhook ( None , bootstrap_retries , None )
216
+ self ._bootstrap ( bootstrap_retries , clean = clean , webhook_url = '' )
221
217
222
218
while self .running :
223
219
try :
@@ -249,28 +245,6 @@ def _start_polling(self, poll_interval, timeout, network_delay,
249
245
250
246
sleep (cur_interval )
251
247
252
- def _set_webhook (self , webhook_url , max_retries , cert ):
253
- retries = 0
254
- while 1 :
255
- try :
256
- # Remove webhook
257
- self .bot .setWebhook (webhook_url = webhook_url ,
258
- certificate = cert )
259
- except (Unauthorized , InvalidToken ):
260
- raise
261
- except TelegramError :
262
- msg = 'failed to set webhook; try={0} max_retries={1}' .format (
263
- retries , max_retries )
264
- if max_retries < 0 or retries < max_retries :
265
- self .logger .info (msg )
266
- retries += 1
267
- else :
268
- self .logger .exception (msg )
269
- raise
270
- else :
271
- break
272
- sleep (1 )
273
-
274
248
@staticmethod
275
249
def _increase_poll_interval (current_interval ):
276
250
# increase waiting times on subsequent errors up to 30secs
@@ -283,7 +257,7 @@ def _increase_poll_interval(current_interval):
283
257
return current_interval
284
258
285
259
def _start_webhook (self , listen , port , url_path , cert , key ,
286
- bootstrap_retries , webhook_url ):
260
+ bootstrap_retries , clean , webhook_url ):
287
261
self .logger .debug ('Updater thread started' )
288
262
use_ssl = cert is not None and key is not None
289
263
if not url_path .startswith ('/' ):
@@ -300,8 +274,11 @@ def _start_webhook(self, listen, port, url_path, cert, key,
300
274
if not webhook_url :
301
275
webhook_url = self ._gen_webhook_url (listen , port , url_path )
302
276
303
- self ._set_webhook (webhook_url , bootstrap_retries ,
304
- open (cert , 'rb' ))
277
+ self ._bootstrap (max_retries = bootstrap_retries , clean = clean ,
278
+ webhook_url = webhook_url , cert = open (cert , 'rb' ))
279
+ elif clean :
280
+ self .logger .warning ("cleaning updates is not supported if "
281
+ "SSL-termination happens elsewhere; skipping" )
305
282
306
283
self .httpd .serve_forever (poll_interval = 1 )
307
284
@@ -326,12 +303,40 @@ def _check_ssl_cert(self, cert, key):
326
303
else :
327
304
raise TelegramError ('SSL Certificate invalid' )
328
305
329
- def _gen_webhook_url (self , listen , port , url_path ):
306
+ @staticmethod
307
+ def _gen_webhook_url (listen , port , url_path ):
330
308
return 'https://{listen}:{port}{path}' .format (
331
309
listen = listen ,
332
310
port = port ,
333
311
path = url_path )
334
312
313
+ def _bootstrap (self , max_retries , clean , webhook_url , cert = None ):
314
+ retries = 0
315
+ while True :
316
+
317
+ try :
318
+ if clean :
319
+ # Disable webhook for cleaning
320
+ self .bot .setWebhook (webhook_url = '' )
321
+ self ._clean_updates ()
322
+
323
+ self .bot .setWebhook (webhook_url = webhook_url ,
324
+ certificate = cert )
325
+ except (Unauthorized , InvalidToken ):
326
+ raise
327
+ except TelegramError :
328
+ msg = 'error in bootstrap phase; try={0} max_retries={1}' \
329
+ .format (retries , max_retries )
330
+ if max_retries < 0 or retries < max_retries :
331
+ self .logger .warning (msg )
332
+ retries += 1
333
+ else :
334
+ self .logger .exception (msg )
335
+ raise
336
+ else :
337
+ break
338
+ sleep (1 )
339
+
335
340
def _clean_updates (self ):
336
341
self .logger .debug ('Cleaning updates from Telegram server' )
337
342
updates = self .bot .getUpdates ()
0 commit comments