Skip to content

feat(aci): Separate Buffer for Workflows #97549

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
feat(aci): Separate Buffer for Workflows
  • Loading branch information
kcons committed Aug 9, 2025
commit ef15d429bda7f8fca4d98adda136c2928d29e0e8
4 changes: 0 additions & 4 deletions src/sentry/buffer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ class Buffer(Service):
"incr",
"process",
"process_pending",
"process_batch",
"validate",
"push_to_sorted_set",
"push_to_hash",
Expand Down Expand Up @@ -140,9 +139,6 @@ def incr(
def process_pending(self) -> None:
return

def process_batch(self) -> None:
return

def process(
self,
model: type[models.Model] | None,
Expand Down
32 changes: 0 additions & 32 deletions src/sentry/buffer/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,32 +56,6 @@ def _validate_json_roundtrip(value: dict[str, Any], model: type[models.Model]) -
logger.exception("buffer.invalid_value", extra={"value": value, "model": model})


class BufferHookEvent(Enum):
FLUSH = "flush"


class BufferHookRegistry:
def __init__(self, *args: Any, **kwargs: Any) -> None:
self._registry: dict[BufferHookEvent, Callable[..., Any]] = {}

def add_handler(self, key: BufferHookEvent, func: Callable[..., Any]) -> None:
self._registry[key] = func

def has(self, key: BufferHookEvent) -> bool:
return self._registry.get(key) is not None

def callback(self, buffer_hook_event: BufferHookEvent) -> bool:
try:
callback = self._registry[buffer_hook_event]
except KeyError:
logger.exception("buffer_hook_event.missing")

return callback()


redis_buffer_registry = BufferHookRegistry()


# Callable to get the queue name for the given model_key.
# May return None to not assign a queue for the given model_key.
ChooseQueueFunction = Callable[[str], str | None]
Expand Down Expand Up @@ -445,12 +419,6 @@ def get_hash_length(self, model: type[models.Model], field: dict[str, BufferFiel
key = self._make_key(model, field)
return self._execute_redis_operation(key, RedisOperation.HASH_LENGTH)

def process_batch(self) -> None:
try:
redis_buffer_registry.callback(BufferHookEvent.FLUSH)
except Exception:
logger.exception("process_batch.error")

def incr(
self,
model: type[models.Model],
Expand Down
4 changes: 4 additions & 0 deletions src/sentry/conf/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -2088,6 +2088,10 @@ def custom_parameter_sort(parameter: dict) -> tuple[str, int]:
SENTRY_BUFFER = "sentry.buffer.Buffer"
SENTRY_BUFFER_OPTIONS: dict[str, str] = {}

# Workflow Buffer backend
SENTRY_WORKFLOW_BUFFER = "sentry.buffer.Buffer"
SENTRY_WORKFLOW_BUFFER_OPTIONS: dict[str, str] = {}

# Cache backend
# XXX: We explicitly require the cache to be configured as its not optional
# and causes serious confusion with the default django cache
Expand Down
6 changes: 6 additions & 0 deletions src/sentry/options/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -3147,6 +3147,12 @@
default=[],
flags=FLAG_AUTOMATOR_MODIFIABLE,
)
register(
"workflow_engine.buffer.use_new_buffer",
type=Bool,
default=False,
flags=FLAG_AUTOMATOR_MODIFIABLE,
)

# Restrict uptime issue creation for specific host provider identifiers. Items
# in this list map to the `host_provider_id` column in the UptimeSubscription
Expand Down
41 changes: 24 additions & 17 deletions src/sentry/rules/processing/buffer_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

from celery import Task

from sentry import buffer, options
from sentry.buffer.base import BufferField
from sentry.buffer.redis import BufferHookEvent, redis_buffer_registry
from sentry import options
from sentry.buffer.base import Buffer, BufferField
from sentry.db import models
from sentry.utils import metrics
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.utils.registry import NoRegistrationExistsError, Registry

logger = logging.getLogger("sentry.delayed_processing")
Expand Down Expand Up @@ -47,12 +47,19 @@ def hash_args(self) -> BufferHashKeys:
def processing_task(self) -> Task:
raise NotImplementedError

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
raise NotImplementedError


delayed_processing_registry = Registry[type[DelayedProcessingBase]]()


def fetch_group_to_event_data(
project_id: int, model: type[models.Model], batch_key: str | None = None
buffer: LazyServiceWrapper[Buffer],
project_id: int,
model: type[models.Model],
batch_key: str | None = None,
) -> dict[str, str]:
field: dict[str, models.Model | int | str] = {
"project_id": project_id,
Expand All @@ -61,7 +68,7 @@ def fetch_group_to_event_data(
if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=model, field=field)
return buffer.get_hash(model=model, field=field)


def bucket_num_groups(num_groups: int) -> str:
Expand All @@ -71,7 +78,9 @@ def bucket_num_groups(num_groups: int) -> str:
return "1"


def process_in_batches(project_id: int, processing_type: str) -> None:
def process_in_batches(
buffer: LazyServiceWrapper[Buffer], project_id: int, processing_type: str
) -> None:
"""
This will check the number of alertgroup_to_event_data items in the Redis buffer for a project.

Expand Down Expand Up @@ -100,7 +109,7 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
task = processing_info.processing_task
filters: dict[str, BufferField] = asdict(hash_args.filters)

event_count = buffer.backend.get_hash_length(model=hash_args.model, field=filters)
event_count = buffer.get_hash_length(model=hash_args.model, field=filters)
metrics.incr(
f"{processing_type}.num_groups", tags={"num_groups": bucket_num_groups(event_count)}
)
Expand All @@ -118,22 +127,22 @@ def process_in_batches(project_id: int, processing_type: str) -> None:
)

# if the dictionary is large, get the items and chunk them.
alertgroup_to_event_data = fetch_group_to_event_data(project_id, hash_args.model)
alertgroup_to_event_data = fetch_group_to_event_data(buffer, project_id, hash_args.model)

with metrics.timer(f"{processing_type}.process_batch.duration"):
items = iter(alertgroup_to_event_data.items())

while batch := dict(islice(items, batch_size)):
batch_key = str(uuid.uuid4())

buffer.backend.push_to_hash_bulk(
buffer.push_to_hash_bulk(
model=hash_args.model,
filters={**filters, "batch_key": batch_key},
data=batch,
)

# remove the batched items from the project alertgroup_to_event_data
buffer.backend.delete_hash(**asdict(hash_args), fields=list(batch.keys()))
buffer.delete_hash(**asdict(hash_args), fields=list(batch.keys()))

task.apply_async(
kwargs={"project_id": project_id, "batch_key": batch_key},
Expand All @@ -150,14 +159,16 @@ def process_buffer() -> None:
logger.info(log_name, extra={"option": handler.option})
continue

buffer = handler.buffer_backend()

with metrics.timer(f"{processing_type}.process_all_conditions.duration"):
# We need to use a very fresh timestamp here; project scores (timestamps) are
# updated with each relevant event, and some can be updated every few milliseconds.
# The staler this timestamp, the more likely it'll miss some recently updated projects,
# and the more likely we'll have frequently updated projects that are never actually
# retrieved and processed here.
fetch_time = datetime.now(tz=timezone.utc)
project_ids = buffer.backend.get_sorted_set(
project_ids = buffer.get_sorted_set(
handler.buffer_key, min=0, max=fetch_time.timestamp()
)
if should_emit_logs:
Expand All @@ -168,10 +179,6 @@ def process_buffer() -> None:
logger.info(log_name, extra={"project_ids": log_str})

for project_id, _ in project_ids:
process_in_batches(project_id, processing_type)

buffer.backend.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp())

process_in_batches(buffer, project_id, processing_type)

if not redis_buffer_registry.has(BufferHookEvent.FLUSH):
redis_buffer_registry.add_handler(BufferHookEvent.FLUSH, process_buffer)
buffer.delete_key(handler.buffer_key, min=0, max=fetch_time.timestamp())
7 changes: 6 additions & 1 deletion src/sentry/rules/processing/delayed_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from django.db.models import OuterRef, Subquery

from sentry import buffer, features, nodestore
from sentry.buffer.base import BufferField
from sentry.buffer.base import Buffer, BufferField
from sentry.db import models
from sentry.eventstore.models import Event, GroupEvent
from sentry.issues.issue_occurrence import IssueOccurrence
Expand Down Expand Up @@ -52,6 +52,7 @@
from sentry.utils import json, metrics
from sentry.utils.dates import ensure_aware
from sentry.utils.iterators import chunked
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.utils.retries import ConditionalRetryPolicy, exponential_delay
from sentry.utils.safe import safe_execute
from sentry.workflow_engine.processors.log_util import track_batch_performance
Expand Down Expand Up @@ -787,3 +788,7 @@ def hash_args(self) -> BufferHashKeys:
@property
def processing_task(self) -> Task:
return apply_delayed

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
return buffer.backend
4 changes: 2 additions & 2 deletions src/sentry/tasks/process_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,13 @@ def process_pending_batch() -> None:
"""
Process pending buffers in a batch.
"""
from sentry import buffer
from sentry.rules.processing.buffer_processing import process_buffer

lock = get_process_lock("process_pending_batch")

try:
with lock.acquire():
buffer.backend.process_batch()
process_buffer()
except UnableToAcquireLock as error:
logger.warning("process_pending_batch.fail", extra={"error": error})

Expand Down
21 changes: 21 additions & 0 deletions src/sentry/workflow_engine/buffer/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
from django.conf import settings

import sentry.buffer as old_buffer
from sentry import options
from sentry.buffer.base import Buffer
from sentry.utils.services import LazyServiceWrapper

_backend = LazyServiceWrapper(
Buffer, settings.SENTRY_WORKFLOW_BUFFER, settings.SENTRY_WORKFLOW_BUFFER_OPTIONS
)


def validate_new_backend() -> None:
pass


def get_backend() -> LazyServiceWrapper[Buffer]:
if options.get("workflow_engine.buffer.use_new_buffer"):
return _backend
else:
return old_buffer.backend
7 changes: 4 additions & 3 deletions src/sentry/workflow_engine/processors/delayed_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@
from django.utils import timezone
from pydantic import BaseModel, validator

from sentry import buffer, features, nodestore, options
import sentry.workflow_engine.buffer as buffer
from sentry import features, nodestore, options
from sentry.buffer.base import BufferField
from sentry.db import models
from sentry.eventstore.models import Event, GroupEvent
Expand Down Expand Up @@ -310,7 +311,7 @@ def fetch_group_to_event_data(
if batch_key:
field["batch_key"] = batch_key

return buffer.backend.get_hash(model=model, field=field)
return buffer.get_backend().get_hash(model=model, field=field)


def fetch_workflows_envs(
Expand Down Expand Up @@ -771,7 +772,7 @@ def cleanup_redis_buffer(
if batch_key:
filters["batch_key"] = batch_key

buffer.backend.delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)
buffer.get_backend().delete_hash(model=Workflow, filters=filters, fields=hashes_to_delete)


def repr_keys[T, V](d: dict[T, V]) -> dict[str, V]:
Expand Down
8 changes: 5 additions & 3 deletions src/sentry/workflow_engine/processors/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,12 @@
from django.db.models import Q
from django.utils import timezone

from sentry import buffer, features
from sentry import features
from sentry.eventstore.models import GroupEvent
from sentry.models.activity import Activity
from sentry.models.environment import Environment
from sentry.utils import json
from sentry.workflow_engine import buffer
from sentry.workflow_engine.models import Action, DataConditionGroup, Detector, Workflow
from sentry.workflow_engine.models.workflow_data_condition_group import WorkflowDataConditionGroup
from sentry.workflow_engine.processors.action import filter_recently_fired_workflow_actions
Expand Down Expand Up @@ -114,8 +115,9 @@ def enqueue_workflows(
sentry_sdk.set_tag("delayed_workflow_items", items)
return

backend = buffer.get_backend()
for project_id, queue_items in items_by_project_id.items():
buffer.backend.push_to_hash_bulk(
backend.push_to_hash_bulk(
model=Workflow,
filters={"project_id": project_id},
data={queue_item.buffer_key(): queue_item.buffer_value() for queue_item in queue_items},
Expand All @@ -125,7 +127,7 @@ def enqueue_workflows(

sentry_sdk.set_tag("delayed_workflow_items", items)

buffer.backend.push_to_sorted_set(
backend.push_to_sorted_set(
key=WORKFLOW_ENGINE_BUFFER_LIST_KEY, value=list(items_by_project_id.keys())
)

Expand Down
7 changes: 7 additions & 0 deletions src/sentry/workflow_engine/tasks/delayed_workflows.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@

from celery import Task

import sentry.workflow_engine.buffer as buffer
from sentry import options
from sentry.buffer.base import Buffer
from sentry.rules.processing.buffer_processing import (
BufferHashKeys,
DelayedProcessingBase,
Expand All @@ -16,6 +18,7 @@
from sentry.taskworker.config import TaskworkerConfig
from sentry.taskworker.namespaces import workflow_engine_tasks
from sentry.taskworker.retry import Retry
from sentry.utils.lazy_service_wrapper import LazyServiceWrapper
from sentry.workflow_engine.models import Workflow
from sentry.workflow_engine.processors.workflow import WORKFLOW_ENGINE_BUFFER_LIST_KEY
from sentry.workflow_engine.utils import log_context
Expand Down Expand Up @@ -66,3 +69,7 @@ def processing_task(self) -> Task:
if options.get("delayed_workflow.use_workflow_engine_pool"):
return process_delayed_workflows_shim
return process_delayed_workflows

@staticmethod
def buffer_backend() -> LazyServiceWrapper[Buffer]:
return buffer.get_backend()
Loading
Loading