3
3
from __future__ import print_function
4
4
5
5
import threading
6
- import time
7
6
8
7
try :
9
8
from queue import Queue , Full , Empty
15
14
16
15
__all__ = ["EventTime" , "FluentSender" ]
17
16
18
- _global_sender = None
19
-
20
- DEFAULT_QUEUE_TIMEOUT = 0.05
21
17
DEFAULT_QUEUE_MAXSIZE = 100
22
18
DEFAULT_QUEUE_CIRCULAR = False
23
19
20
+ _TOMBSTONE = object ()
21
+
22
+ _global_sender = None
23
+
24
24
25
25
def _set_global_sender (sender ): # pragma: no cover
26
26
""" [For testing] Function to set global sender directly
@@ -42,8 +42,9 @@ def close(): # pragma: no cover
42
42
get_global_sender ().close ()
43
43
44
44
45
- class CommunicatorThread (threading .Thread ):
46
- def __init__ (self , tag ,
45
+ class FluentSender (sender .FluentSender ):
46
+ def __init__ (self ,
47
+ tag ,
47
48
host = 'localhost' ,
48
49
port = 24224 ,
49
50
bufmax = 1 * 1024 * 1024 ,
@@ -52,76 +53,42 @@ def __init__(self, tag,
52
53
buffer_overflow_handler = None ,
53
54
nanosecond_precision = False ,
54
55
msgpack_kwargs = None ,
55
- queue_timeout = DEFAULT_QUEUE_TIMEOUT ,
56
56
queue_maxsize = DEFAULT_QUEUE_MAXSIZE ,
57
- queue_circular = DEFAULT_QUEUE_CIRCULAR , * args , ** kwargs ):
58
- super (CommunicatorThread , self ).__init__ (** kwargs )
59
- self ._queue = Queue (maxsize = queue_maxsize )
60
- self ._do_run = True
61
- self ._queue_timeout = queue_timeout
57
+ queue_circular = DEFAULT_QUEUE_CIRCULAR ,
58
+ ** kwargs ):
59
+ """
60
+ :param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
61
+ """
62
+ super (FluentSender , self ).__init__ (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
63
+ verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
64
+ nanosecond_precision = nanosecond_precision ,
65
+ msgpack_kwargs = msgpack_kwargs ,
66
+ ** kwargs )
62
67
self ._queue_maxsize = queue_maxsize
63
68
self ._queue_circular = queue_circular
64
- self ._conn_close_lock = threading .Lock ()
65
- self ._sender = sender .FluentSender (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
66
- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
67
- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs )
68
69
69
- def send (self , bytes_ ):
70
- if self ._queue_circular and self ._queue .full ():
71
- # discard oldest
72
- try :
73
- self ._queue .get (block = False )
74
- except Empty : # pragma: no cover
75
- pass
76
- try :
77
- self ._queue .put (bytes_ , block = (not self ._queue_circular ))
78
- except Full :
79
- return False
80
- return True
70
+ self ._thread_guard = threading .Event () # This ensures visibility across all variables
71
+ self ._closed = False
81
72
82
- def run (self ):
83
- while self ._do_run :
84
- try :
85
- bytes_ = self ._queue .get (block = True , timeout = self ._queue_timeout )
86
- except Empty :
87
- continue
88
- with self ._conn_close_lock :
89
- self ._sender ._send (bytes_ )
90
-
91
- def close (self , flush = True , discard = True ):
92
- if discard :
93
- while not self ._queue .empty ():
94
- try :
95
- self ._queue .get (block = False )
96
- except Empty :
97
- break
98
- while flush and (not self ._queue .empty ()):
99
- time .sleep (0.1 )
100
- self ._do_run = False
101
- self ._sender .close ()
102
-
103
- def _close (self ):
104
- with self ._conn_close_lock :
105
- self ._sender ._close ()
106
-
107
- @property
108
- def last_error (self ):
109
- return self ._sender .last_error
110
-
111
- @last_error .setter
112
- def last_error (self , err ):
113
- self ._sender .last_error = err
114
-
115
- def clear_last_error (self , _thread_id = None ):
116
- self ._sender .clear_last_error (_thread_id = _thread_id )
117
-
118
- @property
119
- def queue_timeout (self ):
120
- return self ._queue_timeout
121
-
122
- @queue_timeout .setter
123
- def queue_timeout (self , value ):
124
- self ._queue_timeout = value
73
+ self ._queue = Queue (maxsize = queue_maxsize )
74
+ self ._send_thread = threading .Thread (target = self ._send_loop ,
75
+ name = "AsyncFluentSender %d" % id (self ))
76
+ self ._send_thread .daemon = True
77
+ self ._send_thread .start ()
78
+
79
+ def close (self , flush = True ):
80
+ with self .lock :
81
+ if self ._closed :
82
+ return
83
+ self ._closed = True
84
+ if not flush :
85
+ while True :
86
+ try :
87
+ self ._queue .get (block = False )
88
+ except Empty :
89
+ break
90
+ self ._queue .put (_TOMBSTONE )
91
+ self ._send_thread .join ()
125
92
126
93
@property
127
94
def queue_maxsize (self ):
@@ -135,91 +102,35 @@ def queue_blocking(self):
135
102
def queue_circular (self ):
136
103
return self ._queue_circular
137
104
138
-
139
- class FluentSender (sender .FluentSender ):
140
- def __init__ (self ,
141
- tag ,
142
- host = 'localhost' ,
143
- port = 24224 ,
144
- bufmax = 1 * 1024 * 1024 ,
145
- timeout = 3.0 ,
146
- verbose = False ,
147
- buffer_overflow_handler = None ,
148
- nanosecond_precision = False ,
149
- msgpack_kwargs = None ,
150
- queue_timeout = DEFAULT_QUEUE_TIMEOUT ,
151
- queue_maxsize = DEFAULT_QUEUE_MAXSIZE ,
152
- queue_circular = DEFAULT_QUEUE_CIRCULAR ,
153
- ** kwargs ): # This kwargs argument is not used in __init__. This will be removed in the next major version.
154
- super (FluentSender , self ).__init__ (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
155
- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
156
- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs ,
157
- ** kwargs )
158
- self ._communicator = CommunicatorThread (tag = tag , host = host , port = port , bufmax = bufmax , timeout = timeout ,
159
- verbose = verbose , buffer_overflow_handler = buffer_overflow_handler ,
160
- nanosecond_precision = nanosecond_precision , msgpack_kwargs = msgpack_kwargs ,
161
- queue_timeout = queue_timeout , queue_maxsize = queue_maxsize ,
162
- queue_circular = queue_circular )
163
- self ._communicator .start ()
164
-
165
105
def _send (self , bytes_ ):
166
- return self ._communicator .send (bytes_ = bytes_ )
167
-
168
- def _close (self ):
169
- # super(FluentSender, self)._close()
170
- self ._communicator ._close ()
171
-
172
- def _send_internal (self , bytes_ ):
173
- assert False # pragma: no cover
174
-
175
- def _send_data (self , bytes_ ):
176
- assert False # pragma: no cover
177
-
178
- # override reconnect, so we don't open a socket here (since it
179
- # will be opened by the CommunicatorThread)
180
- def _reconnect (self ):
181
- return
182
-
183
- def close (self ):
184
- self ._communicator .close (flush = True )
185
- self ._communicator .join ()
186
- return super (FluentSender , self ).close ()
187
-
188
- @property
189
- def last_error (self ):
190
- return self ._communicator .last_error
191
-
192
- @last_error .setter
193
- def last_error (self , err ):
194
- self ._communicator .last_error = err
195
-
196
- def clear_last_error (self , _thread_id = None ):
197
- self ._communicator .clear_last_error (_thread_id = _thread_id )
106
+ with self .lock :
107
+ if self ._closed :
108
+ return False
109
+ if self ._queue_circular and self ._queue .full ():
110
+ # discard oldest
111
+ try :
112
+ self ._queue .get (block = False )
113
+ except Empty : # pragma: no cover
114
+ pass
115
+ try :
116
+ self ._queue .put (bytes_ , block = (not self ._queue_circular ))
117
+ except Full : # pragma: no cover
118
+ return False # this actually can't happen
198
119
199
- @property
200
- def queue_timeout (self ):
201
- return self ._communicator .queue_timeout
120
+ return True
202
121
203
- @queue_timeout .setter
204
- def queue_timeout (self , value ):
205
- self ._communicator .queue_timeout = value
122
+ def _send_loop (self ):
123
+ send_internal = super (FluentSender , self )._send_internal
206
124
207
- @property
208
- def queue_maxsize (self ):
209
- return self ._communicator .queue_maxsize
210
-
211
- @property
212
- def queue_blocking (self ):
213
- return self ._communicator .queue_blocking
214
-
215
- @property
216
- def queue_circular (self ):
217
- return self ._communicator .queue_circular
125
+ try :
126
+ while True :
127
+ bytes_ = self ._queue .get (block = True )
128
+ if bytes_ is _TOMBSTONE :
129
+ break
218
130
219
- def __enter__ (self ):
220
- return self
131
+ send_internal (bytes_ )
132
+ finally :
133
+ self ._close ()
221
134
222
- def __exit__ (self , typ , value , traceback ):
223
- # give time to the comm. thread to send its queued messages
224
- time .sleep (0.2 )
135
+ def __exit__ (self , exc_type , exc_val , exc_tb ):
225
136
self .close ()
0 commit comments