@@ -157,11 +157,11 @@ def set_request(self, request):
157
157
158
158
class PredictionIOHttpConnection (object ):
159
159
160
- def __init__ (self , host , https = True ):
160
+ def __init__ (self , host , https = True , timeout = 5 ):
161
161
if https : # https connection
162
- self ._connection = httplib .HTTPSConnection (host )
162
+ self ._connection = httplib .HTTPSConnection (host , timeout = timeout )
163
163
else :
164
- self ._connection = httplib .HTTPConnection (host )
164
+ self ._connection = httplib .HTTPConnection (host , timeout = timeout )
165
165
166
166
def connect (self ):
167
167
self ._connection .connect ()
@@ -245,7 +245,7 @@ def request(self, method, url, body={}, headers={}):
245
245
return response # AsyncResponse object
246
246
247
247
248
- def connection_worker (host , request_queue , https = True , loop = True ):
248
+ def connection_worker (host , request_queue , https = True , timeout = 5 , loop = True ):
249
249
"""worker function which establishes connection and wait for request jobs
250
250
from the request_queue
251
251
@@ -257,11 +257,12 @@ def connection_worker(host, request_queue, https=True, loop=True):
257
257
DELETE
258
258
KILL
259
259
https: HTTPS (True) or HTTP (False)
260
+ timeout: timeout for HTTP connection attempts and requests in seconds
260
261
loop: This worker function stays in a loop waiting for request
261
262
For testing purpose only. should always be set to True.
262
263
"""
263
264
264
- connect = PredictionIOHttpConnection (host , https )
265
+ connect = PredictionIOHttpConnection (host , https , timeout )
265
266
266
267
# loop waiting for job form request queue
267
268
killed = not loop
@@ -307,27 +308,29 @@ class Connection(object):
307
308
spawn multiple connection_worker threads to handle jobs in the queue q
308
309
"""
309
310
310
- def __init__ (self , host , threads = 1 , qsize = 0 , https = True ):
311
+ def __init__ (self , host , threads = 1 , qsize = 0 , https = True , timeout = 5 ):
311
312
"""constructor
312
313
313
314
Args:
314
315
host: host of the server.
315
316
threads: type int, number of threads to be spawn
316
317
qsize: size of the queue q
317
318
https: indicate it is httpS (True) or http connection (False)
319
+ timeout: timeout for HTTP connection attempts and requests in seconds
318
320
"""
319
321
self .host = host
320
322
self .https = https
321
323
self .q = Queue .Queue (qsize ) # if qsize=0, means infinite
322
324
self .threads = threads
325
+ self .timeout = timeout
323
326
# start thread based on threads number
324
327
self .tid = {} # dictionary of thread object
325
328
326
329
for i in xrange (threads ):
327
330
tname = "PredictionIOThread-%s" % i # thread name
328
331
self .tid [i ] = threading .Thread (
329
332
target = connection_worker , name = tname ,
330
- args = ( self .host , self .q , self .https ) )
333
+ kwargs = { 'host' : self .host , 'request_queue' : self .q , 'https' : self .https , 'timeout' : self . timeout } )
331
334
self .tid [i ].setDaemon (True )
332
335
self .tid [i ].start ()
333
336
0 commit comments