@@ -31,21 +31,27 @@ def run_queue(queue, beaver_config, logger=None):
31
31
break
32
32
33
33
if int (time .time ()) - last_update_time > queue_timeout :
34
- logger .info ('Queue timeout of "{0}" seconds exceeded, stopping queue' .format (queue_timeout ))
34
+ logger .info ('Main consumer queue timeout of "{0}" seconds exceeded, stopping queue' .format (queue_timeout ))
35
35
break
36
36
37
37
try :
38
38
if queue .full ():
39
- logger .debug ("Queue is full" )
39
+ logger .error ("Main consumer queue is full" )
40
+
40
41
else :
41
- logger .debug ("Queue Size is: " + str (queue .qsize ()))
42
- command , data = queue .get (block = True , timeout = wait_timeout )
43
- if command == "callback" :
44
- last_update_time = int (time .time ())
45
- logger .debug ('Last update time now {0}' .format (last_update_time ))
42
+ if count == 1000 :
43
+ logger .debug ("Main consumer queue Size is: " + str (queue .qsize ()))
44
+ count = 0
45
+ command , data = queue .get (block = True , timeout = wait_timeout )
46
+ if command == "callback" :
47
+ last_update_time = int (time .time ())
48
+ logger .debug ('Last update time now {0}' .format (last_update_time ))
46
49
except Queue .Empty :
47
- logger .debug ('No data' )
48
- continue
50
+ if not queue .empty ():
51
+ logger .error ('Recieved timeout from main consumer queue - stopping queue' )
52
+ break
53
+ else :
54
+ logger .debug ('No data' )
49
55
50
56
if command == 'callback' :
51
57
if data .get ('ignore_empty' , False ):
@@ -67,15 +73,15 @@ def run_queue(queue, beaver_config, logger=None):
67
73
try :
68
74
transport .callback (** data )
69
75
count += 1
70
- logger .debug ("Number of transports: " + str (count ))
71
76
break
72
- except TransportException :
77
+ except TransportException , e :
73
78
failure_count = failure_count + 1
74
79
if failure_count > beaver_config .get ('max_failure' ):
75
80
failure_count = beaver_config .get ('max_failure' )
76
81
77
82
sleep_time = beaver_config .get ('respawn_delay' ) ** failure_count
78
83
logger .info ('Caught transport exception, reconnecting in %d seconds' % sleep_time )
84
+ logger .debug (e )
79
85
80
86
try :
81
87
transport .invalidate ()
0 commit comments