Skip to content

feat: attempts with connection improvements #118

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ class CacheType(Enum):
DEFAULT_RESOLVER_TYPE = ResolverType.RPC
DEFAULT_RETRY_BACKOFF = 1000
DEFAULT_RETRY_BACKOFF_MAX = 120000
DEFAULT_RETRY_GRACE_ATTEMPTS = 5
DEFAULT_RETRY_GRACE_PERIOD_SECONDS = 5
DEFAULT_STREAM_DEADLINE = 600000
DEFAULT_TLS = False

Expand All @@ -41,7 +41,7 @@ class CacheType(Enum):
ENV_VAR_RESOLVER_TYPE = "FLAGD_RESOLVER"
ENV_VAR_RETRY_BACKOFF_MS = "FLAGD_RETRY_BACKOFF_MS"
ENV_VAR_RETRY_BACKOFF_MAX_MS = "FLAGD_RETRY_BACKOFF_MAX_MS"
ENV_VAR_RETRY_GRACE_ATTEMPTS = "FLAGD_RETRY_GRACE_ATTEMPTS"
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS = "FLAGD_RETRY_GRACE_PERIOD"
ENV_VAR_STREAM_DEADLINE_MS = "FLAGD_STREAM_DEADLINE_MS"
ENV_VAR_TLS = "FLAGD_TLS"

Expand Down Expand Up @@ -81,7 +81,7 @@ def __init__( # noqa: PLR0913
offline_poll_interval_ms: typing.Optional[int] = None,
retry_backoff_ms: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
deadline_ms: typing.Optional[int] = None,
stream_deadline_ms: typing.Optional[int] = None,
keep_alive_time: typing.Optional[int] = None,
Expand Down Expand Up @@ -115,14 +115,16 @@ def __init__( # noqa: PLR0913
else retry_backoff_max_ms
)

self.retry_grace_attempts: int = (
self.retry_grace_period: int = (
int(
env_or_default(
ENV_VAR_RETRY_GRACE_ATTEMPTS, DEFAULT_RETRY_GRACE_ATTEMPTS, cast=int
ENV_VAR_RETRY_GRACE_PERIOD_SECONDS,
DEFAULT_RETRY_GRACE_PERIOD_SECONDS,
cast=int,
)
)
if retry_grace_attempts is None
else retry_grace_attempts
if retry_grace_period is None
else retry_grace_period
)

self.resolver = (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def __init__( # noqa: PLR0913
cache_type: typing.Optional[CacheType] = None,
max_cache_size: typing.Optional[int] = None,
retry_backoff_max_ms: typing.Optional[int] = None,
retry_grace_attempts: typing.Optional[int] = None,
retry_grace_period: typing.Optional[int] = None,
):
"""
Create an instance of the FlagdProvider
Expand Down Expand Up @@ -84,7 +84,7 @@ def __init__( # noqa: PLR0913
deadline_ms=deadline,
retry_backoff_ms=retry_backoff_ms,
retry_backoff_max_ms=retry_backoff_max_ms,
retry_grace_attempts=retry_grace_attempts,
retry_grace_period=retry_grace_period,
resolver=resolver_type,
offline_flag_source_path=offline_flag_source_path,
stream_deadline_ms=stream_deadline_ms,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from cachebox import BaseCacheImpl, LRUCache
from google.protobuf.json_format import MessageToDict
from google.protobuf.struct_pb2 import Struct
from grpc import ChannelConnectivity

from openfeature.evaluation_context import EvaluationContext
from openfeature.event import ProviderEventDetails
Expand Down Expand Up @@ -47,6 +48,7 @@
[ProviderEventDetails], None
],
):
self.active = False
self.config = config
self.emit_provider_ready = emit_provider_ready
self.emit_provider_error = emit_provider_error
Expand All @@ -57,26 +59,30 @@
if self.config.cache == CacheType.LRU
else None
)
self.stub, self.channel = self._create_stub()
self.retry_backoff_seconds = config.retry_backoff_ms * 0.001
self.retry_backoff_max_seconds = config.retry_backoff_max_ms * 0.001
self.retry_grace_attempts = config.retry_grace_attempts

self.retry_grace_period = config.retry_grace_period
self.streamline_deadline_seconds = config.stream_deadline_ms * 0.001
self.deadline = config.deadline_ms * 0.001
self.connected = False

def _create_stub(
self,
) -> typing.Tuple[evaluation_pb2_grpc.ServiceStub, grpc.Channel]:
config = self.config
channel_factory = grpc.secure_channel if config.tls else grpc.insecure_channel
channel = channel_factory(

# Create the channel with the service config
options = [
("grpc.keepalive_time_ms", config.keep_alive_time),
("grpc.initial_reconnect_backoff_ms", config.retry_backoff_ms),
("grpc.max_reconnect_backoff_ms", config.retry_backoff_max_ms),
("grpc.min_reconnect_backoff_ms", config.deadline_ms),
]
self.channel = channel_factory(
f"{config.host}:{config.port}",
options=(("grpc.keepalive_time_ms", config.keep_alive_time),),
options=options,
)
stub = evaluation_pb2_grpc.ServiceStub(channel)
self.stub = evaluation_pb2_grpc.ServiceStub(self.channel)

return stub, channel
self.thread: typing.Optional[threading.Thread] = None
self.timer: typing.Optional[threading.Timer] = None

self.start_time = time.time()

def initialize(self, evaluation_context: EvaluationContext) -> None:
self.connect()
Expand All @@ -89,11 +95,12 @@

def connect(self) -> None:
self.active = True
self.thread = threading.Thread(
target=self.listen, daemon=True, name="FlagdGrpcServiceWorkerThread"
)
self.thread.start()

# Run monitoring in a separate thread
self.monitor_thread = threading.Thread(
target=self.monitor, daemon=True, name="FlagdGrpcServiceMonitorThread"
)
self.monitor_thread.start()
## block until ready or deadline reached
timeout = self.deadline + time.time()
while not self.connected and time.time() < timeout:
Expand All @@ -105,81 +112,87 @@
"Blocking init finished before data synced. Consider increasing startup deadline to avoid inconsistent evaluations."
)

def monitor(self) -> None:
self.channel.subscribe(self._state_change_callback, try_to_connect=True)

def _state_change_callback(self, new_state: ChannelConnectivity) -> None:
logger.debug(f"gRPC state change: {new_state}")
if new_state == ChannelConnectivity.READY:
if not self.thread or not self.thread.is_alive():
self.thread = threading.Thread(
target=self.listen,
daemon=True,
name="FlagdGrpcServiceWorkerThread",
)
self.thread.start()

if self.timer and self.timer.is_alive():
logger.debug("gRPC error timer expired")
self.timer.cancel()

Check warning on line 131 in providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py

View check run for this annotation

Codecov / codecov/patch

providers/openfeature-provider-flagd/src/openfeature/contrib/provider/flagd/resolvers/grpc.py#L130-L131

Added lines #L130 - L131 were not covered by tests

elif new_state == ChannelConnectivity.TRANSIENT_FAILURE:
# this is the failed reconnect attempt so we are going into stale
self.emit_provider_stale(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
)
)
self.start_time = time.time()
# adding a timer, so we can emit the error event after time
self.timer = threading.Timer(self.retry_grace_period, self.emit_error)

logger.debug("gRPC error timer started")
self.timer.start()
self.connected = False

def emit_error(self) -> None:
logger.debug("gRPC error emitted")
if self.cache:
self.cache.clear()
self.emit_provider_error(
ProviderEventDetails(
message="gRPC sync disconnected, reconnecting",
error_code=ErrorCode.GENERAL,
)
)

def listen(self) -> None:
retry_delay = self.retry_backoff_seconds
logger.debug("gRPC starting listener thread")
call_args = (
{"timeout": self.streamline_deadline_seconds}
if self.streamline_deadline_seconds > 0
else {}
)
retry_counter = 0
while self.active:
request = evaluation_pb2.EventStreamRequest()
call_args["wait_for_ready"] = True
request = evaluation_pb2.EventStreamRequest()

# defining a never ending loop to recreate the stream
while self.active:
try:
logger.debug("Setting up gRPC sync flags connection")
for message in self.stub.EventStream(request, **call_args):
if message.type == "provider_ready":
if not self.connected:
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
self.connected = True
self.emit_provider_ready(
ProviderEventDetails(
message="gRPC sync connection established"
)
self.connected = True
retry_counter = 0
# reset retry delay after successsful read
retry_delay = self.retry_backoff_seconds

)
elif message.type == "configuration_change":
data = MessageToDict(message)["data"]
self.handle_changed_flags(data)

if not self.active:
logger.info("Terminating gRPC sync thread")
return
except grpc.RpcError as e:
logger.error(f"SyncFlags stream error, {e.code()=} {e.details()=}")
# re-create the stub if there's a connection issue - otherwise reconnect does not work as expected
self.stub, self.channel = self._create_stub()
except grpc.RpcError as e: # noqa: PERF203
# although it seems like this error log is not interesting, without it, the retry is not working as expected
logger.debug(f"SyncFlags stream error, {e.code()=} {e.details()=}")
except ParseError:
logger.exception(
f"Could not parse flag data using flagd syntax: {message=}"
)

self.connected = False
self.on_connection_error(retry_counter, retry_delay)

retry_delay = self.handle_retry(retry_counter, retry_delay)

retry_counter = retry_counter + 1

def handle_retry(self, retry_counter: int, retry_delay: float) -> float:
if retry_counter == 0:
logger.info("gRPC sync disconnected, reconnecting immediately")
else:
logger.info(f"gRPC sync disconnected, reconnecting in {retry_delay}s")
time.sleep(retry_delay)
retry_delay = min(1.1 * retry_delay, self.retry_backoff_max_seconds)
return retry_delay

def on_connection_error(self, retry_counter: int, retry_delay: float) -> None:
if retry_counter == self.retry_grace_attempts:
if self.cache:
self.cache.clear()
self.emit_provider_error(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
error_code=ErrorCode.GENERAL,
)
)
elif retry_counter == 1:
self.emit_provider_stale(
ProviderEventDetails(
message=f"gRPC sync disconnected, reconnecting in {retry_delay}s",
)
)

def handle_changed_flags(self, data: typing.Any) -> None:
changed_flags = list(data["flags"].keys())

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,17 +31,19 @@ def __init__(
self.last_modified = 0.0
self.flag_data: typing.Mapping[str, Flag] = {}
self.load_data()
self.active = True
self.thread = threading.Thread(target=self.refresh_file, daemon=True)
self.thread.start()

def shutdown(self) -> None:
self.active = False
pass

def get_flag(self, key: str) -> typing.Optional[Flag]:
return self.flag_data.get(key)

def refresh_file(self) -> None:
while True:
while self.active:
time.sleep(self.poll_interval_seconds)
logger.debug("checking for new flag store contents from file")
last_modified = os.path.getmtime(self.file_path)
Expand Down
21 changes: 0 additions & 21 deletions providers/openfeature-provider-flagd/tests/e2e/conftest.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
import typing

import pytest
from testcontainers.core.container import DockerContainer
from tests.e2e.flagd_container import FlagdContainer
from tests.e2e.steps import * # noqa: F403

JsonPrimitive = typing.Union[str, bool, float, int]
Expand All @@ -18,21 +15,3 @@ def pytest_collection_modifyitems(config):
# this seems to not work with python 3.8
if hasattr(config.option, "markexpr") and config.option.markexpr == "":
config.option.markexpr = marker


@pytest.fixture(autouse=True, scope="module")
def setup(request, port, image):
container: DockerContainer = FlagdContainer(
image=image,
port=port,
)
# Setup code
c = container.start()

def fin():
c.stop()

# Teardown code
request.addfinalizer(fin)

return c.get_exposed_port(port)
Loading
Loading