Skip to content

Commit db66c57

Browse files
committed
feat(ourlogs): Batch logs into a single envelope
1 parent a2eaa8a commit db66c57

File tree

6 files changed

+336
-146
lines changed

6 files changed

+336
-146
lines changed

sentry_sdk/_log_batcher.py

Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import json
2+
import os
3+
import random
4+
import threading
5+
from datetime import datetime, timezone
6+
from typing import Optional, List, Callable, TYPE_CHECKING, Any
7+
8+
from sentry_sdk.utils import format_timestamp
9+
from sentry_sdk.envelope import Envelope
10+
11+
if TYPE_CHECKING:
12+
from sentry_sdk._types import Log
13+
14+
15+
class LogBatcher:
16+
MAX_LOGS_BEFORE_FLUSH = 100
17+
FLUSH_WAIT_TIME = 5.0
18+
19+
def __init__(
20+
self,
21+
capture_func, # type: Callable[[Envelope], None]
22+
):
23+
# type: (...) -> None
24+
self._log_buffer = [] # type: List[Log]
25+
self._capture_func = capture_func
26+
self._running = True
27+
self._lock = threading.Lock()
28+
29+
self._flush_event = threading.Event() # type: threading.Event
30+
31+
self._flusher = None # type: Optional[threading.Thread]
32+
self._flusher_pid = None # type: Optional[int]
33+
34+
def _ensure_thread(self):
35+
# type: (...) -> bool
36+
"""For forking processes we might need to restart this thread.
37+
This ensures that our process actually has that thread running.
38+
"""
39+
if not self._running:
40+
return False
41+
42+
pid = os.getpid()
43+
if self._flusher_pid == pid:
44+
return True
45+
46+
with self._lock:
47+
# Double-checked locking
48+
if self._flusher_pid == pid:
49+
return True
50+
51+
self._flusher_pid = pid
52+
53+
self._flusher = threading.Thread(target=self._flush_loop)
54+
self._flusher.daemon = True
55+
56+
try:
57+
self._flusher.start()
58+
except RuntimeError:
59+
# Unfortunately at this point the interpreter is in a state that no
60+
# longer allows us to spawn a thread and we have to bail.
61+
self._running = False
62+
return False
63+
64+
return True
65+
66+
def _flush_loop(self):
67+
# type: (...) -> None
68+
while self._running:
69+
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
70+
self._flush()
71+
72+
def add(
73+
self,
74+
log, # type: Log
75+
):
76+
# type: (...) -> None
77+
if not self._ensure_thread() or self._flusher is None:
78+
return None
79+
80+
with self._lock:
81+
self._log_buffer.append(log)
82+
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
83+
self._flush_event.set()
84+
85+
def kill(self):
86+
# type: (...) -> None
87+
if self._flusher is None:
88+
return
89+
90+
self._running = False
91+
self._flush_event.set()
92+
self._flusher = None
93+
94+
def flush(self):
95+
# type: (...) -> None
96+
self._flush()
97+
98+
@staticmethod
99+
def _log_to_otel(log):
100+
# type: (Log) -> Any
101+
def format_attribute(key, val):
102+
# type: (str, int | float | str | bool) -> Any
103+
if isinstance(val, bool):
104+
return {"key": key, "value": {"boolValue": val}}
105+
if isinstance(val, int):
106+
return {"key": key, "value": {"intValue": str(val)}}
107+
if isinstance(val, float):
108+
return {"key": key, "value": {"doubleValue": val}}
109+
if isinstance(val, str):
110+
return {"key": key, "value": {"stringValue": val}}
111+
return {"key": key, "value": {"stringValue": json.dumps(val)}}
112+
113+
otel_log = {
114+
"severityText": log["severity_text"],
115+
"severityNumber": log["severity_number"],
116+
"body": {"stringValue": log["body"]},
117+
"timeUnixNano": str(log["time_unix_nano"]),
118+
"attributes": [
119+
format_attribute(k, v) for (k, v) in log["attributes"].items()
120+
],
121+
}
122+
123+
if "trace_id" in log:
124+
otel_log["traceId"] = log["trace_id"]
125+
126+
return otel_log
127+
128+
def _flush(self):
129+
# type: (...) -> Optional[Envelope]
130+
131+
envelope = Envelope(
132+
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
133+
)
134+
with self._lock:
135+
for log in self._log_buffer:
136+
envelope.add_log(self._log_to_otel(log))
137+
self._log_buffer.clear()
138+
if envelope.items:
139+
self._capture_func(envelope)
140+
return envelope
141+
return None

sentry_sdk/client.py

Lines changed: 15 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,3 @@
1-
import json
21
import os
32
import uuid
43
import random
@@ -65,6 +64,7 @@
6564
from sentry_sdk.session import Session
6665
from sentry_sdk.spotlight import SpotlightClient
6766
from sentry_sdk.transport import Transport
67+
from sentry_sdk._log_batcher import LogBatcher
6868

6969
I = TypeVar("I", bound=Integration) # noqa: E741
7070

@@ -178,6 +178,7 @@ def __init__(self, options=None):
178178
self.transport = None # type: Optional[Transport]
179179
self.monitor = None # type: Optional[Monitor]
180180
self.metrics_aggregator = None # type: Optional[MetricsAggregator]
181+
self.log_batcher = None # type: Optional[LogBatcher]
181182

182183
def __getstate__(self, *args, **kwargs):
183184
# type: (*Any, **Any) -> Any
@@ -375,6 +376,12 @@ def _capture_envelope(envelope):
375376
"Metrics not supported on Python 3.6 and lower with gevent."
376377
)
377378

379+
self.log_batcher = None
380+
if experiments.get("enable_logs", False):
381+
from sentry_sdk._log_batcher import LogBatcher
382+
383+
self.log_batcher = LogBatcher(capture_func=_capture_envelope)
384+
378385
max_request_body_size = ("always", "never", "small", "medium")
379386
if self.options["max_request_body_size"] not in max_request_body_size:
380387
raise ValueError(
@@ -451,6 +458,7 @@ def _capture_envelope(envelope):
451458
if (
452459
self.monitor
453460
or self.metrics_aggregator
461+
or self.log_batcher
454462
or has_profiling_enabled(self.options)
455463
or isinstance(self.transport, BaseHttpTransport)
456464
):
@@ -868,15 +876,11 @@ def capture_event(
868876

869877
def _capture_experimental_log(self, current_scope, log):
870878
# type: (Scope, Log) -> None
871-
logs_enabled = self.options["_experiments"].get("enable_sentry_logs", False)
879+
logs_enabled = self.options["_experiments"].get("enable_logs", False)
872880
if not logs_enabled:
873881
return
874882
isolation_scope = current_scope.get_isolation_scope()
875883

876-
headers = {
877-
"sent_at": format_timestamp(datetime.now(timezone.utc)),
878-
} # type: dict[str, object]
879-
880884
environment = self.options.get("environment")
881885
if environment is not None and "sentry.environment" not in log["attributes"]:
882886
log["attributes"]["sentry.environment"] = environment
@@ -913,46 +917,13 @@ def _capture_experimental_log(self, current_scope, log):
913917
f'[Sentry Logs] {log["body"]}',
914918
)
915919

916-
envelope = Envelope(headers=headers)
917-
918920
before_emit_log = self.options["_experiments"].get("before_emit_log")
919921
if before_emit_log is not None:
920922
log = before_emit_log(log, {})
921923
if log is None:
922924
return
923925

924-
def format_attribute(key, val):
925-
# type: (str, int | float | str | bool) -> Any
926-
if isinstance(val, bool):
927-
return {"key": key, "value": {"boolValue": val}}
928-
if isinstance(val, int):
929-
return {"key": key, "value": {"intValue": str(val)}}
930-
if isinstance(val, float):
931-
return {"key": key, "value": {"doubleValue": val}}
932-
if isinstance(val, str):
933-
return {"key": key, "value": {"stringValue": val}}
934-
return {"key": key, "value": {"stringValue": json.dumps(val)}}
935-
936-
otel_log = {
937-
"severityText": log["severity_text"],
938-
"severityNumber": log["severity_number"],
939-
"body": {"stringValue": log["body"]},
940-
"timeUnixNano": str(log["time_unix_nano"]),
941-
"attributes": [
942-
format_attribute(k, v) for (k, v) in log["attributes"].items()
943-
],
944-
}
945-
946-
if "trace_id" in log:
947-
otel_log["traceId"] = log["trace_id"]
948-
949-
envelope.add_log(otel_log) # TODO: batch these
950-
951-
if self.spotlight:
952-
self.spotlight.capture_envelope(envelope)
953-
954-
if self.transport is not None:
955-
self.transport.capture_envelope(envelope)
926+
self.log_batcher.add(log)
956927

957928
def capture_session(
958929
self, session # type: Session
@@ -1006,6 +977,8 @@ def close(
1006977
self.session_flusher.kill()
1007978
if self.metrics_aggregator is not None:
1008979
self.metrics_aggregator.kill()
980+
if self.log_batcher is not None:
981+
self.log_batcher.kill()
1009982
if self.monitor:
1010983
self.monitor.kill()
1011984
self.transport.kill()
@@ -1030,6 +1003,8 @@ def flush(
10301003
self.session_flusher.flush()
10311004
if self.metrics_aggregator is not None:
10321005
self.metrics_aggregator.flush()
1006+
if self.log_batcher is not None:
1007+
self.log_batcher.flush()
10331008
self.transport.flush(timeout=timeout, callback=callback)
10341009

10351010
def __enter__(self):

sentry_sdk/consts.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ class CompressionAlgo(Enum):
7878
Callable[[str, MetricValue, MeasurementUnit, MetricTags], bool]
7979
],
8080
"metric_code_locations": Optional[bool],
81-
"enable_sentry_logs": Optional[bool],
81+
"enable_logs": Optional[bool],
8282
},
8383
total=False,
8484
)

sentry_sdk/integrations/logging.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ def emit(self, record):
348348
if not client.is_active():
349349
return
350350

351-
if not client.options["_experiments"].get("enable_sentry_logs", False):
351+
if not client.options["_experiments"].get("enable_logs", False):
352352
return
353353

354354
SentryLogsHandler._capture_log_from_record(client, record)

sentry_sdk/logger.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ def _capture_log(severity_text, severity_number, template, **kwargs):
1717
if "attributes" in kwargs:
1818
attrs.update(kwargs.pop("attributes"))
1919
for k, v in kwargs.items():
20-
attrs[f"sentry.message.parameters.{k}"] = v if isinstance(v, str) else repr(v)
20+
attrs[f"sentry.message.parameters.{k}"] = (
21+
v
22+
if (isinstance(v, str) or isinstance(v, int) or isinstance(v, bool))
23+
else repr(v)
24+
)
2125

2226
# noinspection PyProtectedMember
2327
client._capture_experimental_log(

0 commit comments

Comments
 (0)