Skip to content

Commit

Permalink
fix(rcm): publisher subscriber system (#5464)
Browse files Browse the repository at this point in the history
Remote Configuration Publisher-Subscriber system.

# Motivation

A common Python web application uses a WSGI server (e.g., Gunicorn) that
employs multiple workers. For each worker, a Remote Config instance
polls new configuration from the Agent. In the context of ASM, the
typical scenario follows this flow:
- Remote Config enables the ASM_FEATURES product (with 1 click
activation). The RC starts polling.
- RC receives a payload to enable ASM_FEATURES. RC activates ASM and
registers new products (ASM_DD, ASM_DATA, ASM_DD).
- RC receives a payload with all products and updates the WAF
configuration.

This flow takes around 10-15 seconds (i.e., with a polling interval of
5s, RC needs 2-3 requests to the agent to keep all information updated).

However, there is a problem with the Gunicorn application: sometimes,
Gunicorn may restart a worker, which needs to retrieve all RC data again
and repeat the aforementioned flow. During this interval, ASM with
Remote Config could raise unexpected behaviors, the new worker wouldn’t
block it since it may not have yet applied all the new configuration.

## Example
The following example runs a Gunicorn application.

```
ddtrace-run gunicorn -w 4 app:app
```

We use [Locust](https://locust.io/) to request the /block endpoint with
a blocked IP. Then, we kill a Gunicorn child process. As a result, we
have the following table:


![locust_1x](https://user-images.githubusercontent.com/6352942/230061832-fe98a741-22f9-443d-be04-8a0f688c7ecd.png)

```
/block: 19495 (request) 19325 (request blocked)
```

The `19495 > 19325` means we have 170 requests that return 200 instead
of 403 (blocked).


# Description

Remote Configuration needs to keep all workers updated as soon as
possible. Therefore, Remote Configuration may start BEFORE the Gunicorn
server in sitecustomize.py, it starts to poll information from the RC
Agent, and for each new payload, through this Pub-sub system, share this
information with all child processes.

In addition to this, there are different Remote Configuration behaviors:

- When the Remote Configuration Client receives a new product target
file payload, we need to call a callback.
- When the Remote Configuration Client receives a new product target
file payload, we need to aggregate this target file data for each
product. After that, call the callback with all aggregated information.
- Remote Configuration may have a callback for each product.
- Remote Configuration may have a callback for one or more products.
- For each payload, Remote Configuration needs to execute specific
actions on the main process and a different action on child processes.

To achieve this goal, a Remote Configuration product may register a
PubSub instance. A PubSub class contains a publisher that receives the
Remote Configuration payload and shares it with Pubsub Subscriber
instance. The Subscriber starts a thread on each child process, waiting
for a new update of the shared data between the Publisher on the main
process and the child process. Remote Configuration creates a thread
listening to the main process for each instance of PubSub. To connect
this publisher and the child processes subscribers, we need a connector
class: Shared Memory or File. Each instance of PubSub works as a
singleton when Remote Configuration dispatches the callbacks. That means
if we register the same instance of PubSub class on different products,
we would have one thread waiting to the main process.

Each DD Product (APM, ASM, DI, CI) may implement its PubSub Class.

Example 1: A callback for one or more Remote Configuration Products

AppSec needs to aggregate different products in the same callback for
all child processes.

```
class AppSecRC(PubSubMergeFirst):
    __shared_data = ConnectorSharedMemory()

    def __init__(self, _preprocess_results, callback, name="Default"):
        self._publisher = self.__publisher_class__(self.__shared_data, _preprocess_results)
        self._subscriber = self.__subscriber_class__(self.__shared_data, callback, name)

asm_callback = AppSecRC(preprocess_1click_activation, appsec_callback, "ASM")

remoteconfig_poller.register("ASM_PRODUCT", asm_callback)
remoteconfig_poller.register("ASM_FEATURES_PRODUCT", asm_callback)
```

Example 2: One Callback for each product
DI needs to aggregate different products in the same callback for all
child processes.

```
class DynamicInstrumentationRC(PubSub):
    __shared_data = ConnectorSharedMemory()

    def __init__(self, _preprocess_results, callback, name="Default"):
        self._publisher = self.__publisher_class__(self.__shared_data, _preprocess_results)
        self._subscriber = self.__subscriber_class__(self.__shared_data, callback, name)


di_callback_1 = DynamicInstrumentationRC(callback=di_callback_1, name="ASM")
di_callback_2 = DynamicInstrumentationRC(callback=di_callback_2, name="ASM")

remoteconfig_poller.register("DI_1_PRODUCT", di_callback)
remoteconfig_poller.register("DI_2_PRODUCT", di_callback_2)
```

## Results
Following the previous example, if we run a application with gunicorn
and we kill some Gunicorn child worker, the result is:


![locust_with_refactor](https://user-images.githubusercontent.com/6352942/230055672-2401ec12-fd07-4783-8731-527559a493e1.png)
```
/block: 15070 (request) 15070 (request blocked)
```
`19495 == 19325` Hurray!

## Extra improvements of this refactor
if we compare with `docker stats` both branches (1.x and this branch) we
see an improvement of memory usage and CPU:

`1.x` branch:
```
CONTAINER ID   NAME                        CPU %     MEM USAGE / LIMIT     MEM %     NET I/O           BLOCK I/O    PIDS
e487b87bc5e0   showcase-flask_showcase_1   34.23%    204.1MiB / 31.07GiB   0.64%     44.8MB / 62.3MB   0B / 451kB   17
```

`avara1986/rc_refactor` branch:
```
CONTAINER ID   NAME                        CPU %     MEM USAGE / LIMIT     MEM %     NET I/O           BLOCK I/O        PIDS
b90ac73a23b6   showcase-flask_showcase_1   14.99%    169.2MiB / 31.07GiB   0.53%     31.3MB / 55.6MB   4.36MB / 303kB   18
```

## PR related

This PR fix this system test problem:
DataDog/system-tests#932


## Checklist

- [x] Change(s) are motivated and described in the PR description.
- [x] Testing strategy is described if automated tests are not included
in the PR.
- [x] Risk is outlined (performance impact, potential for breakage,
maintainability, etc).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] [Library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/contributing.html#Release-Note-Guidelines)
are followed.
- [x] Documentation is included (in-code, generated user docs, [public
corp docs](https://github.com/DataDog/documentation/)).
- [x] PR description includes explicit acknowledgement/acceptance of the
performance implications of this PR as reported in the benchmarks PR
comment.

## Reviewer Checklist

- [x] Title is accurate.
- [x] No unnecessary changes are introduced.
- [x] Description motivates each change.
- [x] Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes unless absolutely necessary.
- [x] Testing strategy adequately addresses listed risk(s).
- [x] Change is maintainable (easy to change, telemetry, documentation).
- [x] Release note makes sense to a user of the library.
- [x] Reviewer has explicitly acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment.

---------

Co-authored-by: Zachary Groves <32471391+ZStriker19@users.noreply.github.com>
  • Loading branch information
avara1986 and ZStriker19 authored Jun 1, 2023
1 parent 5cd5f6a commit 9146377
Show file tree
Hide file tree
Showing 27 changed files with 1,733 additions and 1,406 deletions.
222 changes: 134 additions & 88 deletions ddtrace/appsec/_remoteconfiguration.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
# -*- coding: utf-8 -*-
import os
from typing import TYPE_CHECKING

Expand All @@ -7,8 +8,11 @@
from ddtrace.appsec.utils import _appsec_rc_file_is_not_static
from ddtrace.constants import APPSEC_ENV
from ddtrace.internal.logger import get_logger
from ddtrace.internal.remoteconfig.client import RemoteConfigCallBack
from ddtrace.internal.remoteconfig.client import RemoteConfigCallBackAfterMerge
from ddtrace.internal.remoteconfig._connectors import PublisherSubscriberConnector
from ddtrace.internal.remoteconfig._publishers import RemoteConfigPublisherMergeDicts
from ddtrace.internal.remoteconfig._pubsub import PubSub
from ddtrace.internal.remoteconfig._subscribers import RemoteConfigSubscriber
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller
from ddtrace.internal.utils.formats import asbool


Expand All @@ -18,49 +22,57 @@
# handling python 2.X import error
JSONDecodeError = ValueError # type: ignore


if TYPE_CHECKING: # pragma: no cover
from typing import Any
from typing import Dict

try:
from typing import Literal
except ImportError:
# Python < 3.8. The "type ignore" is to avoid a runtime check just to silence mypy.
from typing_extensions import Literal # type: ignore
from typing import Mapping
from typing import Optional
from typing import Union

from ddtrace import Tracer

log = get_logger(__name__)


class AppSecRC(PubSub):
__subscriber_class__ = RemoteConfigSubscriber
__publisher_class__ = RemoteConfigPublisherMergeDicts
__shared_data__ = PublisherSubscriberConnector()

def __init__(self, _preprocess_results, callback):
self._publisher = self.__publisher_class__(self.__shared_data__, _preprocess_results)
self._subscriber = self.__subscriber_class__(self.__shared_data__, callback, "ASM")


def enable_appsec_rc(test_tracer=None):
# type: (Optional[Tracer]) -> None
# Tracer is a parameter for testing propose
"""Remote config will be used by ASM libraries to receive four different updates from the backend.
Each update has it’s own product:
- ASM_FEATURES product - To allow users enable or disable ASM remotely
- ASM product - To allow clients to activate or deactivate rules
- ASM_DD product - To allow the library to receive rules updates
- ASM_DATA product - To allow the library to receive list of blocked IPs and users
If environment variable `DD_APPSEC_ENABLED` is not set, registering ASM_FEATURE can enable ASM remotely. If
it's set to true, we will register the rest of the products.
Parameters `test_tracer` and `start_subscribers` are needed for testing purposes
"""
# Import tracer here to avoid a circular import
if test_tracer is None:
from ddtrace import tracer
else:
tracer = test_tracer

asm_features_callback = RCAppSecFeaturesCallBack(tracer)
asm_dd_callback = RCASMDDCallBack(tracer)
asm_callback = RCAppSecCallBack(tracer)
log.debug("[%s][P: %s] Register ASM Remote Config Callback", os.getpid(), os.getppid())
asm_callback = AppSecRC(_preprocess_results_appsec_1click_activation, _appsec_callback)

if _appsec_rc_features_is_enabled():
from ddtrace.internal.remoteconfig import RemoteConfig

RemoteConfig.register(PRODUCTS.ASM_FEATURES, asm_features_callback)
remoteconfig_poller.register(PRODUCTS.ASM_FEATURES, asm_callback)

if tracer._appsec_enabled and _appsec_rc_file_is_not_static():
from ddtrace.internal.remoteconfig import RemoteConfig

RemoteConfig.register(PRODUCTS.ASM_DATA, asm_callback) # IP Blocking
RemoteConfig.register(PRODUCTS.ASM, asm_callback) # Exclusion Filters & Custom Rules
RemoteConfig.register(PRODUCTS.ASM_DD, asm_dd_callback) # DD Rules
remoteconfig_poller.register(PRODUCTS.ASM_DATA, asm_callback) # IP Blocking
remoteconfig_poller.register(PRODUCTS.ASM, asm_callback) # Exclusion Filters & Custom Rules
remoteconfig_poller.register(PRODUCTS.ASM_DD, asm_callback) # DD Rules


def _add_rules_to_list(features, feature, message, ruleset):
Expand All @@ -71,103 +83,137 @@ def _add_rules_to_list(features, feature, message, ruleset):
if ruleset.get(feature) is None:
ruleset[feature] = []
ruleset[feature] += rules
log.debug("Reloading Appsec %s: %s", message, rules)
log.debug("Reloading Appsec %s: %s", message, str(rules)[:20])
except JSONDecodeError:
log.error("ERROR Appsec %s: invalid JSON content from remote configuration", message)


def _appsec_rules_data(tracer, features):
# type: (Tracer, Mapping[str, Any]) -> bool
def _appsec_callback(features, test_tracer=None):
# type: (Mapping[str, Any], Optional[Tracer]) -> None
config = features.get("config", {})
_appsec_1click_activation(config, test_tracer)
_appsec_rules_data(config, test_tracer)


def _appsec_rules_data(features, test_tracer):
# type: (Mapping[str, Any], Optional[Tracer]) -> bool
# Tracer is a parameter for testing propose
# Import tracer here to avoid a circular import
if test_tracer is None:
from ddtrace import tracer
else:
tracer = test_tracer

if features and tracer._appsec_processor:
ruleset = {} # type: dict[str, Optional[list[Any]]]
_add_rules_to_list(features, "rules_data", "rules data", ruleset)
_add_rules_to_list(features, "custom_rules", "custom rules", ruleset)
_add_rules_to_list(features, "rules", "Datadog rules", ruleset)
_add_rules_to_list(features, "exclusions", "exclusion filters", ruleset)
_add_rules_to_list(features, "rules_override", "rules override", ruleset)
return tracer._appsec_processor._update_rules({k: v for k, v in ruleset.items() if v is not None})
if ruleset:
return tracer._appsec_processor._update_rules({k: v for k, v in ruleset.items() if v is not None})

return False


class RCASMDDCallBack(RemoteConfigCallBack):
def __init__(self, tracer):
# type: (Tracer) -> None
self.tracer = tracer

def __call__(self, metadata, features):
if features is not None:
_appsec_rules_data(self.tracer, features)

def _preprocess_results_appsec_1click_activation(features, pubsub_instance=None):
# type: (Any, Optional[PubSub]) -> Mapping[str, Any]
"""The main process has the responsibility to enable or disable the ASM products. The child processes don't
care about that, the children only need to know about payload content.
"""
if _appsec_rc_features_is_enabled():
log.debug(
"[%s][P: %s] Receiving ASM Remote Configuration ASM_FEATURES: %s",
os.getpid(),
os.getppid(),
features.get("asm", {}),
)

class RCAppSecFeaturesCallBack(RemoteConfigCallBack):
def __init__(self, tracer):
# type: (Tracer) -> None
self.tracer = tracer
if not pubsub_instance:
pubsub_instance = AppSecRC(_preprocess_results_appsec_1click_activation, _appsec_callback)

def __call__(self, metadata, features):
rc_appsec_enabled = None
if features is not None:
self._appsec_1click_activation(features)

def _appsec_1click_activation(self, features):
# type: (Union[Literal[False], Mapping[str, Any]]) -> None
"""This callback updates appsec enabled in tracer and config instances following this logic:
```
| DD_APPSEC_ENABLED | RC Enabled | Result |
|-------------------|------------|----------|
| <not set> | <not set> | Disabled |
| <not set> | false | Disabled |
| <not set> | true | Enabled |
| false | <not set> | Disabled |
| true | <not set> | Enabled |
| false | true | Disabled |
| true | true | Enabled |
```
"""
if APPSEC_ENV in os.environ:
rc_appsec_enabled = asbool(os.environ.get(APPSEC_ENV))
elif features == {}:
rc_appsec_enabled = False
else:
asm_features = features.get("asm", {})
if asm_features is not None:
rc_appsec_enabled = asm_features.get("enabled")
log.debug(
"[%s][P: %s] ASM Remote Configuration ASM_FEATURES. Appsec enabled: %s",
os.getpid(),
os.getppid(),
rc_appsec_enabled,
)
if rc_appsec_enabled is not None:
from ddtrace.appsec._constants import PRODUCTS

if rc_appsec_enabled and _appsec_rc_file_is_not_static():
remoteconfig_poller.register(PRODUCTS.ASM_DATA, pubsub_instance) # IP Blocking
remoteconfig_poller.register(PRODUCTS.ASM, pubsub_instance) # Exclusion Filters & Custom Rules
remoteconfig_poller.register(PRODUCTS.ASM_DD, pubsub_instance) # DD Rules
else:
remoteconfig_poller.unregister(PRODUCTS.ASM_DATA)
remoteconfig_poller.unregister(PRODUCTS.ASM)
remoteconfig_poller.unregister(PRODUCTS.ASM_DD)

features["asm"] = {"enabled": rc_appsec_enabled}
return features


def _appsec_1click_activation(features, test_tracer=None):
# type: (Mapping[str, Any], Optional[Tracer]) -> None
"""This callback updates appsec enabled in tracer and config instances following this logic:
```
| DD_APPSEC_ENABLED | RC Enabled | Result |
|-------------------|------------|----------|
| <not set> | <not set> | Disabled |
| <not set> | false | Disabled |
| <not set> | true | Enabled |
| false | <not set> | Disabled |
| true | <not set> | Enabled |
| false | true | Disabled |
| true | true | Enabled |
```
"""
if _appsec_rc_features_is_enabled():
# Tracer is a parameter for testing propose
# Import tracer here to avoid a circular import
if test_tracer is None:
from ddtrace import tracer
else:
tracer = test_tracer

log.debug("[%s][P: %s] ASM_FEATURES: %s", os.getpid(), os.getppid(), str(features)[:100])
if APPSEC_ENV in os.environ:
# no one click activation if var env is set
rc_appsec_enabled = asbool(os.environ.get(APPSEC_ENV))
elif features is False:
rc_appsec_enabled = False
else:
rc_appsec_enabled = features.get("asm", {}).get("enabled")
rc_appsec_enabled = features.get("asm", {}).get("enabled", False)

log.debug("APPSEC_ENABLED: %s", rc_appsec_enabled)
if rc_appsec_enabled is not None:
from ddtrace.appsec._constants import PRODUCTS
from ddtrace.internal.remoteconfig import RemoteConfig

log.debug("Updating ASM Remote Configuration ASM_FEATURES: %s", rc_appsec_enabled)
log.debug(
"[%s][P: %s] Updating ASM Remote Configuration ASM_FEATURES: %s",
os.getpid(),
os.getppid(),
rc_appsec_enabled,
)

if rc_appsec_enabled:
if _appsec_rc_file_is_not_static():
asm_dd_callback = RCASMDDCallBack(self.tracer)
asm_callback = RCAppSecCallBack(self.tracer)
RemoteConfig.register(PRODUCTS.ASM_DATA, asm_callback) # IP Blocking
RemoteConfig.register(PRODUCTS.ASM, asm_callback) # Exclusion Filters & Custom Rules
RemoteConfig.register(PRODUCTS.ASM_DD, asm_dd_callback) # DD Rules
if not self.tracer._appsec_enabled:
self.tracer.configure(appsec_enabled=True)
if not tracer._appsec_enabled:
tracer.configure(appsec_enabled=True)
else:
config._appsec_enabled = True

else:
if _appsec_rc_file_is_not_static():
RemoteConfig.unregister(PRODUCTS.ASM_DATA)
RemoteConfig.unregister(PRODUCTS.ASM)
RemoteConfig.unregister(PRODUCTS.ASM_DD)
if self.tracer._appsec_enabled:
self.tracer.configure(appsec_enabled=False)
if tracer._appsec_enabled:
tracer.configure(appsec_enabled=False)
else:
config._appsec_enabled = False


class RCAppSecCallBack(RemoteConfigCallBackAfterMerge):
def __init__(self, tracer):
# type: (Tracer) -> None
super(RCAppSecCallBack, self).__init__()
self.tracer = tracer

def __call__(self, target, features):
# type: (str, Any) -> None
if features is not None:
_appsec_rules_data(self.tracer, features)
15 changes: 15 additions & 0 deletions ddtrace/bootstrap/sitecustomize.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@

log = get_logger(__name__)


if os.environ.get("DD_GEVENT_PATCH_ALL") is not None:
deprecate(
"The environment variable DD_GEVENT_PATCH_ALL is deprecated and will be removed in a future version. ",
Expand Down Expand Up @@ -248,6 +249,20 @@ def _(threading):
else:
log.debug("additional sitecustomize found in: %s", sys.path)

if asbool(os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")):
from ddtrace.internal.remoteconfig.worker import remoteconfig_poller

remoteconfig_poller.enable()

should_start_appsec_remoteconfig = config._appsec_enabled or asbool(
os.environ.get("DD_REMOTE_CONFIGURATION_ENABLED", "true")
)

if should_start_appsec_remoteconfig:
from ddtrace.appsec._remoteconfiguration import enable_appsec_rc

enable_appsec_rc()

# Loading status used in tests to detect if the `sitecustomize` has been
# properly loaded without exceptions. This must be the last action in the module
# when the execution ends with a success.
Expand Down
Loading

0 comments on commit 9146377

Please sign in to comment.