4
4
import urllib3 # type: ignore
5
5
import certifi
6
6
import gzip
7
+ import time
7
8
8
9
from datetime import datetime , timedelta
9
10
10
11
from sentry_sdk .utils import Dsn , logger , capture_internal_exceptions , json_dumps
11
12
from sentry_sdk .worker import BackgroundWorker
12
- from sentry_sdk .envelope import Envelope
13
+ from sentry_sdk .envelope import Envelope , Item , PayloadRef
13
14
14
15
from sentry_sdk ._types import MYPY
15
16
@@ -131,6 +132,8 @@ def __init__(
131
132
self ._auth = self .parsed_dsn .to_auth ("sentry.python/%s" % VERSION )
132
133
self ._disabled_until = {} # type: Dict[DataCategory, datetime]
133
134
self ._retry = urllib3 .util .Retry ()
135
+ self ._event_loss_counters = {}
136
+ self ._last_event_loss_sent = None
134
137
135
138
self ._pool = self ._make_pool (
136
139
self .parsed_dsn ,
@@ -143,6 +146,13 @@ def __init__(
143
146
144
147
self .hub_cls = Hub
145
148
149
+ def record_lost_event (self , reason ):
150
+ """This increments a counter for event loss by reason. The counters are flushed
151
+ periodically automatically (typically once a minute).
152
+ """
153
+ # This is not locked because we are okay with small mismeasuring.
154
+ self ._event_loss_counters [reason ] = self ._event_loss_counters .get (reason , 0 ) + 1
155
+
146
156
def _update_rate_limits (self , response ):
147
157
# type: (urllib3.HTTPResponse) -> None
148
158
@@ -207,7 +217,24 @@ def _send_request(
207
217
208
218
def on_dropped_event (self , reason ):
209
219
# type: (str) -> None
210
- pass
220
+ self .record_lost_event (reason )
221
+
222
+ def _flush_stats (self , force = False ):
223
+ if not (force or self ._last_event_loss_sent is None or \
224
+ self ._last_event_loss_sent < time .time () - 60 ):
225
+ return
226
+ outcomes = self ._event_loss_counters
227
+ self ._event_loss_counters = {}
228
+
229
+ if outcomes :
230
+ self .capture_envelope (Envelope (
231
+ items = [Item (PayloadRef (json = {
232
+ "timestamp" : time .time (),
233
+ "outcomes" : outcomes ,
234
+ }), type = "sdk_outomes" )],
235
+ ))
236
+
237
+ self ._last_event_loss_sent = time .time ()
211
238
212
239
def _check_disabled (self , category ):
213
240
# type: (str) -> bool
@@ -254,9 +281,15 @@ def _send_envelope(
254
281
# type: (...) -> None
255
282
256
283
# remove all items from the envelope which are over quota
257
- envelope .items [:] = [
258
- x for x in envelope .items if not self ._check_disabled (x .data_category )
259
- ]
284
+ new_items = []
285
+ for item in envelope .items :
286
+ if self ._check_disabled (item .data_category ):
287
+ if item .data_category in ("transaction" , "error" , "default" ):
288
+ self .on_dropped_event ("self_rate_limits" )
289
+ else :
290
+ new_items .append (item )
291
+
292
+ envelope .items [:] = new_items
260
293
if not envelope .items :
261
294
return None
262
295
@@ -341,6 +374,8 @@ def send_event_wrapper():
341
374
if not self ._worker .submit (send_event_wrapper ):
342
375
self .on_dropped_event ("full_queue" )
343
376
377
+ self ._flush_stats ()
378
+
344
379
def capture_envelope (
345
380
self , envelope # type: Envelope
346
381
):
@@ -356,13 +391,16 @@ def send_envelope_wrapper():
356
391
if not self ._worker .submit (send_envelope_wrapper ):
357
392
self .on_dropped_event ("full_queue" )
358
393
394
+ self ._flush_stats ()
395
+
359
396
def flush (
360
397
self ,
361
398
timeout , # type: float
362
399
callback = None , # type: Optional[Any]
363
400
):
364
401
# type: (...) -> None
365
402
logger .debug ("Flushing HTTP transport" )
403
+ self ._flush_stats (force = True )
366
404
if timeout > 0 :
367
405
self ._worker .flush (timeout , callback )
368
406
0 commit comments