From ccb4d900dde832388e4a1b94722531d91bdf0770 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Mon, 13 May 2024 16:32:03 -0700 Subject: [PATCH 01/15] feat: restore defaults present < 3.6.0, but retain customizability --- src/functions_framework/_http/gunicorn.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index 009a06b7..050f766c 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -21,9 +21,9 @@ class GunicornApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, host, port, debug, **options): self.options = { "bind": "%s:%s" % (host, port), - "workers": os.environ.get("WORKERS", (os.cpu_count() or 1) * 4), - "threads": os.environ.get("THREADS", 1), - "timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 300), + "workers": os.environ.get("WORKERS", 1), + "threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4), + "timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0), "loglevel": "error", "limit_request_line": 0, } From dcb6eb16950d325468c793377ea199187a3133d1 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Thu, 16 May 2024 08:03:23 -0700 Subject: [PATCH 02/15] revert the test, too --- tests/test_http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_http.py b/tests/test_http.py index 0a46fbea..fa488e16 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -105,9 +105,9 @@ def test_gunicorn_application(debug): } assert gunicorn_app.cfg.bind == ["1.2.3.4:1234"] - assert gunicorn_app.cfg.workers == os.cpu_count() * 4 - assert gunicorn_app.cfg.threads == 1 - assert gunicorn_app.cfg.timeout == 300 + assert gunicorn_app.cfg.workers == 1 + assert gunicorn_app.cfg.threads == os.cpu_count() * 4 + assert gunicorn_app.cfg.timeout == 0 assert gunicorn_app.load() == app From 0242768d9e2f424786e38559a86193cd9a4ba73c Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Thu, 16 May 2024 08:18:25 -0700 Subject: [PATCH 03/15] also restore this assert :) --- tests/test_http.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_http.py b/tests/test_http.py index fa488e16..fbfac9d2 100644 --- a/tests/test_http.py +++ b/tests/test_http.py @@ -97,9 +97,9 @@ def test_gunicorn_application(debug): assert gunicorn_app.app == app assert gunicorn_app.options == { "bind": "%s:%s" % (host, port), - "workers": os.cpu_count() * 4, - "threads": 1, - "timeout": 300, + "workers": 1, + "threads": os.cpu_count() * 4, + "timeout": 0, "loglevel": "error", "limit_request_line": 0, } From 60611f56c3b7e310b54565003d1e26309fcc2f86 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Thu, 16 May 2024 19:43:44 -0700 Subject: [PATCH 04/15] feat: (opt-in): terminate handling of work when the request has already timed out MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Overhead-free (or at least very cheap). The “timeout” gunicorn config means drastically different things for sync and non-sync workers: Workers silent for more than this many seconds are killed and restarted. Value is a positive number or 0. Setting it to 0 has the effect of infinite timeouts by disabling timeouts for all workers entirely. Generally, the default of thirty seconds should suffice. Only set this noticeably higher if you’re sure of the repercussions for sync workers. For the non sync workers it just means that the worker process is still communicating and is not tied to the length of time required to handle a single request. So. For cases where threads = 1 (user set or our defaults), we’ll use the sync worker and let the regular timeout functionality do its thing. For cases where threads > 1, we’re using the gthread worker, and timeout means something completely different and not really user-observable. So we’ll leave the communication timeout (default gunicorn “timeout”) at 30 seconds, but create our own gthread-derived worker class to use instead, which terminates request handling (with no mind to gunicorn’s “graceful shutdown” config), to emulate GCFv1. The arbiter spawns these workers, so we have to maintain some sort of global timeout state for us to read in our custom gthread worker. In the future, we should consider letting the user adjust the graceful shutdown seconds. But the default of 30 seems like it’s worked fine historically, so it’s hard to argue for changing it. IIUC, this means that on gen 2, there’s a small behavior difference for the sync workers compared to gen 1, in that gen 2 sync worker workloads will get an extra 30 seconds of timeout to gracefully shut down. I don’t think monkeying with this config and opting-in to sync workers is very common, though, so let’s not worry about it here; everyone should be on the gthread path outlined above. --- playground/main.py | 15 ++ src/functions_framework/_http/gunicorn.py | 41 +++++- src/functions_framework/exceptions.py | 6 +- src/functions_framework/request_timeout.py | 44 ++++++ tests/test_functions/timeout/main.py | 12 ++ tests/test_timeouts.py | 162 +++++++++++++++++++++ tox.ini | 1 + 7 files changed, 274 insertions(+), 7 deletions(-) create mode 100644 playground/main.py create mode 100644 src/functions_framework/request_timeout.py create mode 100644 tests/test_functions/timeout/main.py create mode 100644 tests/test_timeouts.py diff --git a/playground/main.py b/playground/main.py new file mode 100644 index 00000000..d31e4704 --- /dev/null +++ b/playground/main.py @@ -0,0 +1,15 @@ +import functions_framework +import logging +import time + +logger = logging.getLogger(__name__) + + +@functions_framework.http +def main(request): + timeout = 2 + for _ in range(timeout * 10): + time.sleep(0.1) + logger.info("logging message after timeout elapsed") + return "Hello, world!" + diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index 050f766c..22847e50 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,21 +12,40 @@ # See the License for the specific language governing permissions and # limitations under the License. +import gunicorn.app.base import os -import gunicorn.app.base +from gunicorn.workers.gthread import ThreadWorker +from ..request_timeout import ThreadingTimeout + +# global for use in our custom gthread worker; the gunicorn arbiter spawns these +# and it's not possible to inject (and self.timeout means something different to +# async workers!) +TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0)) class GunicornApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, host, port, debug, **options): + threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4)) + self.options = { "bind": "%s:%s" % (host, port), - "workers": os.environ.get("WORKERS", 1), - "threads": os.environ.get("THREADS", (os.cpu_count() or 1) * 4), - "timeout": os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0), - "loglevel": "error", + "workers": int(os.environ.get("WORKERS", 1)), + "threads": threads, + "loglevel": os.environ.get("GUNICORN_LOG_LEVEL", "error"), "limit_request_line": 0, } + + if TIMEOUT_SECONDS > 0: + if threads > 1 and _is_truthy( + os.environ.get("THREADED_TIMEOUT_ENABLED", "False") + ): + self.options["worker_class"] = ( + "functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport" + ) + else: + self.options["timeout"] = TIMEOUT_SECONDS + self.options.update(options) self.app = app @@ -38,3 +57,13 @@ def load_config(self): def load(self): return self.app + + +class GThreadWorkerWithTimeoutSupport(ThreadWorker): + def handle_request(self, req, conn): + with ThreadingTimeout(TIMEOUT_SECONDS, swallow_exc=False): + super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn) + + +def _is_truthy(s): + return str(s).lower() in ("yes", "y", "1", "yeah", "true", "t") diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index 671a28a4..c0c3768e 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -1,4 +1,4 @@ -# Copyright 2020 Google LLC +# Copyright 2024 Google LLC # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -35,3 +35,7 @@ class MissingTargetException(FunctionsFrameworkException): class EventConversionException(FunctionsFrameworkException): pass + + +class RequestTimeoutException(FunctionsFrameworkException): + pass \ No newline at end of file diff --git a/src/functions_framework/request_timeout.py b/src/functions_framework/request_timeout.py new file mode 100644 index 00000000..e7a8e9a6 --- /dev/null +++ b/src/functions_framework/request_timeout.py @@ -0,0 +1,44 @@ +import ctypes +import logging +import threading + +from .exceptions import RequestTimeoutException + + +logger = logging.getLogger(__name__) + + +class ThreadingTimeout(object): + def __init__(self, seconds, swallow_exc=True): + self.seconds = seconds + self.swallow_exc = swallow_exc + self.target_tid = threading.current_thread().ident + self.timer = None + + def __enter__(self): + self.timer = threading.Timer(self.seconds, self._raise_exc) + self.timer.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.timer.cancel() + if exc_type is RequestTimeoutException: + logger.warning( + "Request handling exceeded {0} seconds timeout; terminating request handling...".format( + self.seconds + ), + exc_info=(exc_type, exc_val, exc_tb), + ) + return False + + def _raise_exc(self): + ret = ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.target_tid), ctypes.py_object(RequestTimeoutException) + ) + if ret == 0: + raise ValueError("Invalid thread ID {}".format(self.target_tid)) + elif ret > 1: + ctypes.pythonapi.PyThreadState_SetAsyncExc( + ctypes.c_long(self.target_tid), None + ) + raise SystemError("PyThreadState_SetAsyncExc failed") diff --git a/tests/test_functions/timeout/main.py b/tests/test_functions/timeout/main.py new file mode 100644 index 00000000..e089a779 --- /dev/null +++ b/tests/test_functions/timeout/main.py @@ -0,0 +1,12 @@ +import logging +import time + +logger = logging.getLogger(__name__) + + +def function(request): + # sleep for 1200 total ms (1.8 sec) + for _ in range(12): + time.sleep(0.1) + logger.info("some extra logging message") + return "success", 200 diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py new file mode 100644 index 00000000..692e877a --- /dev/null +++ b/tests/test_timeouts.py @@ -0,0 +1,162 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import os +import pathlib +import pytest +import requests +import socket +import time + +from multiprocessing import Process + +import functions_framework._http.gunicorn +from functions_framework import create_app + +TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" + + +@pytest.mark.skip +def test_no_timeout_allows_request_processing_to_finish(): + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + host = "0.0.0.0" + port = "8080" + options = {} + + gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + app, host, port, False, **options + ) + + os.environ.clear() + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_port(host, port) + + result = requests.get("http://{}:{}/".format(host, port)) + + gunicorn_p.kill() + + assert result.status_code == 200 + + +@pytest.mark.skip +def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(): + + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + host = "0.0.0.0" + port = "8081" + options = {} + + gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + app, host, port, False, **options + ) + + os.environ.clear() + os.environ['CLOUD_RUN_TIMEOUT_SECONDS'] = "1" + os.environ['THREADED_TIMEOUT_ENABLED'] = "false" + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_port(host, port) + + result = requests.get("http://{}:{}/".format(host, port)) + + gunicorn_p.kill() + + assert result.status_code == 200 + + +def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): + monkeypatch.setenv('CLOUD_RUN_TIMEOUT_SECONDS', 1) + monkeypatch.setenv('THREADED_TIMEOUT_ENABLED', "true") + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + host = "0.0.0.0" + port = "8082" + options = {} + + gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + app, host, port, False, **options + ) + + gunicorn_app.run() + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_port(host, port) + + result = requests.get("http://{}:{}/".format(host, port)) + + gunicorn_p.kill() + + assert ( + result.status_code == 500 + ) # TODO: this should be 504, where do i have to translate this? + + +def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill(monkeypatch): + monkeypatch.setenv('CLOUD_RUN_TIMEOUT_SECONDS', "2") + monkeypatch.setenv('THREADED_TIMEOUT_ENABLED', "true") + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + host = "0.0.0.0" + port = "8083" + options = {} + + gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + app, host, port, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_port(host, port) + + result = requests.get("http://{}:{}/".format(host, port)) + + gunicorn_p.kill() + + assert result.status_code == 200 + + +@pytest.mark.skip +def _wait_for_port(host, port, timeout=10): + start_time = time.perf_counter() + while True: + try: + with socket.create_connection((host, port), timeout=timeout): + break + except OSError as ex: + time.sleep(0.01) + if time.perf_counter() - start_time >= timeout: + raise TimeoutError( + "Waited too long for the port {} on host {} to start accepting " + "connections.".format(port, host) + ) from ex \ No newline at end of file diff --git a/tox.ini b/tox.ini index e8c555b5..6ba6d3b4 100644 --- a/tox.ini +++ b/tox.ini @@ -5,6 +5,7 @@ envlist = py{35,36,37,38,39,310}-{ubuntu-latest,macos-latest,windows-latest},lin usedevelop = true deps = docker + pytest-asyncio pytest-cov pytest-integration pretend From 11284f08b7fae562e19ae6e778e56f3be6bfc232 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 10:05:00 -0700 Subject: [PATCH 05/15] fix tests --- src/functions_framework/_http/gunicorn.py | 6 +- tests/test_timeouts.py | 103 +++++++++++++--------- 2 files changed, 68 insertions(+), 41 deletions(-) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index 22847e50..581bcac3 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -21,13 +21,17 @@ # global for use in our custom gthread worker; the gunicorn arbiter spawns these # and it's not possible to inject (and self.timeout means something different to # async workers!) -TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0)) +# set/managed in gunicorn application init for test-friendliness +TIMEOUT_SECONDS = None class GunicornApplication(gunicorn.app.base.BaseApplication): def __init__(self, app, host, port, debug, **options): threads = int(os.environ.get("THREADS", (os.cpu_count() or 1) * 4)) + global TIMEOUT_SECONDS + TIMEOUT_SECONDS = int(os.environ.get("CLOUD_RUN_TIMEOUT_SECONDS", 0)) + self.options = { "bind": "%s:%s" % (host, port), "workers": int(os.environ.get("WORKERS", 1)), diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 692e877a..9713f4ac 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -24,130 +24,136 @@ from functions_framework import create_app TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" +TEST_HOST = "0.0.0.0" +TEST_PORT = "8080" + + +@pytest.fixture(autouse=True) +def run_around_tests(): + _wait_for_no_listen(TEST_HOST, TEST_PORT) + yield -@pytest.mark.skip def test_no_timeout_allows_request_processing_to_finish(): source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" target = "function" app = create_app(target, source) - host = "0.0.0.0" - port = "8080" options = {} gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( - app, host, port, False, **options + app, TEST_HOST, TEST_PORT, False, **options ) - os.environ.clear() - gunicorn_p = Process(target=gunicorn_app.run) gunicorn_p.start() - _wait_for_port(host, port) + _wait_for_listen(TEST_HOST, TEST_PORT) - result = requests.get("http://{}:{}/".format(host, port)) + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) gunicorn_p.kill() + gunicorn_p.join() assert result.status_code == 200 -@pytest.mark.skip -def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(): - +def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "false") source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" target = "function" app = create_app(target, source) - host = "0.0.0.0" - port = "8081" options = {} gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( - app, host, port, False, **options + app, TEST_HOST, TEST_PORT, False, **options ) os.environ.clear() - os.environ['CLOUD_RUN_TIMEOUT_SECONDS'] = "1" - os.environ['THREADED_TIMEOUT_ENABLED'] = "false" + os.environ["CLOUD_RUN_TIMEOUT_SECONDS"] = "1" + os.environ["THREADED_TIMEOUT_ENABLED"] = "false" gunicorn_p = Process(target=gunicorn_app.run) gunicorn_p.start() - _wait_for_port(host, port) + _wait_for_listen(TEST_HOST, TEST_PORT) - result = requests.get("http://{}:{}/".format(host, port)) + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) gunicorn_p.kill() + gunicorn_p.join() assert result.status_code == 200 def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): - monkeypatch.setenv('CLOUD_RUN_TIMEOUT_SECONDS', 1) - monkeypatch.setenv('THREADED_TIMEOUT_ENABLED', "true") + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true") source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" target = "function" app = create_app(target, source) - host = "0.0.0.0" - port = "8082" options = {} gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( - app, host, port, False, **options + app, TEST_HOST, TEST_PORT, False, **options ) - gunicorn_app.run() gunicorn_p = Process(target=gunicorn_app.run) gunicorn_p.start() - _wait_for_port(host, port) + _wait_for_listen(TEST_HOST, TEST_PORT) - result = requests.get("http://{}:{}/".format(host, port)) + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) gunicorn_p.kill() + gunicorn_p.join() - assert ( - result.status_code == 500 - ) # TODO: this should be 504, where do i have to translate this? + # Any exception raised in execution is a 500 error. Cloud Functions 1st gen and + # 2nd gen/Cloud Run infrastructure doing the timeout will return a 408 (gen 1) + # or 504 (gen 2/CR) at the infrastructure layer when request timeouts happen, + # and this code will only be available to the user in logs. + assert result.status_code == 500 -def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill(monkeypatch): - monkeypatch.setenv('CLOUD_RUN_TIMEOUT_SECONDS', "2") - monkeypatch.setenv('THREADED_TIMEOUT_ENABLED', "true") +def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "2") + monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true") source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" target = "function" app = create_app(target, source) - host = "0.0.0.0" - port = "8083" options = {} gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( - app, host, port, False, **options + app, TEST_HOST, TEST_PORT, False, **options ) gunicorn_p = Process(target=gunicorn_app.run) gunicorn_p.start() - _wait_for_port(host, port) + _wait_for_listen(TEST_HOST, TEST_PORT) - result = requests.get("http://{}:{}/".format(host, port)) + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) gunicorn_p.kill() + gunicorn_p.join() assert result.status_code == 200 @pytest.mark.skip -def _wait_for_port(host, port, timeout=10): +def _wait_for_listen(host, port, timeout=10): + # Used in tests to make sure that the gunicorn app has booted and is + # listening before sending a test request start_time = time.perf_counter() while True: try: @@ -157,6 +163,23 @@ def _wait_for_port(host, port, timeout=10): time.sleep(0.01) if time.perf_counter() - start_time >= timeout: raise TimeoutError( - "Waited too long for the port {} on host {} to start accepting " + "Waited too long for port {} on host {} to start accepting " "connections.".format(port, host) - ) from ex \ No newline at end of file + ) from ex + + +@pytest.mark.skip +def _wait_for_no_listen(host, port, timeout=10): + # Used in tests to make sure that the + start_time = time.perf_counter() + while True: + try: + with socket.create_connection((host, port), timeout=timeout): + time.sleep(0.01) + if time.perf_counter() - start_time >= timeout: + raise TimeoutError( + "Waited too long for port {} on host {} to stop accepting " + "connections.".format(port, host) + ) + except OSError as ex: + break From af1f2fb4ccac3ed24e63dfbcc53f506d5fbc0663 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:03:45 -0700 Subject: [PATCH 06/15] small test fixes give up on coverage support for things that are tested in different processes, or in gthread, because it looks like pytest-cov gave up on support for these, where as coverage has out-of-the-box support --- src/functions_framework/_http/gunicorn.py | 25 +++++++++++----------- src/functions_framework/request_timeout.py | 5 ++--- tests/test_timeouts.py | 20 +++++++++-------- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index 581bcac3..a96c2a7b 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -40,15 +40,16 @@ def __init__(self, app, host, port, debug, **options): "limit_request_line": 0, } - if TIMEOUT_SECONDS > 0: - if threads > 1 and _is_truthy( - os.environ.get("THREADED_TIMEOUT_ENABLED", "False") - ): - self.options["worker_class"] = ( - "functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport" - ) - else: - self.options["timeout"] = TIMEOUT_SECONDS + if ( + TIMEOUT_SECONDS > 0 + and threads > 1 + and _is_truthy(os.environ.get("THREADED_TIMEOUT_ENABLED", "False")) + ): # pragma: no cover + self.options["worker_class"] = ( + "functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport" + ) + else: + self.options["timeout"] = TIMEOUT_SECONDS self.options.update(options) self.app = app @@ -63,11 +64,11 @@ def load(self): return self.app -class GThreadWorkerWithTimeoutSupport(ThreadWorker): +class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover def handle_request(self, req, conn): - with ThreadingTimeout(TIMEOUT_SECONDS, swallow_exc=False): + with ThreadingTimeout(TIMEOUT_SECONDS): super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn) -def _is_truthy(s): +def _is_truthy(s): # pragma: no cover return str(s).lower() in ("yes", "y", "1", "yeah", "true", "t") diff --git a/src/functions_framework/request_timeout.py b/src/functions_framework/request_timeout.py index e7a8e9a6..535dfefd 100644 --- a/src/functions_framework/request_timeout.py +++ b/src/functions_framework/request_timeout.py @@ -8,10 +8,9 @@ logger = logging.getLogger(__name__) -class ThreadingTimeout(object): - def __init__(self, seconds, swallow_exc=True): +class ThreadingTimeout(object): # pragma: no cover + def __init__(self, seconds): self.seconds = seconds - self.swallow_exc = swallow_exc self.target_tid = threading.current_thread().ident self.timer = None diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 9713f4ac..5ab697f7 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os import pathlib import pytest import requests @@ -30,10 +29,14 @@ @pytest.fixture(autouse=True) def run_around_tests(): + # the test samples test also listens on 8080, so let's be good stewards of + # the port and make sure it's free _wait_for_no_listen(TEST_HOST, TEST_PORT) yield + _wait_for_no_listen(TEST_HOST, TEST_PORT) +@pytest.mark.slow_integration_test def test_no_timeout_allows_request_processing_to_finish(): source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" target = "function" @@ -53,12 +56,13 @@ def test_no_timeout_allows_request_processing_to_finish(): result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) - gunicorn_p.kill() + gunicorn_p.terminate() gunicorn_p.join() assert result.status_code == 200 +@pytest.mark.slow_integration_test def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "false") @@ -73,10 +77,6 @@ def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): app, TEST_HOST, TEST_PORT, False, **options ) - os.environ.clear() - os.environ["CLOUD_RUN_TIMEOUT_SECONDS"] = "1" - os.environ["THREADED_TIMEOUT_ENABLED"] = "false" - gunicorn_p = Process(target=gunicorn_app.run) gunicorn_p.start() @@ -84,12 +84,13 @@ def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) - gunicorn_p.kill() + gunicorn_p.terminate() gunicorn_p.join() assert result.status_code == 200 +@pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") monkeypatch.setenv("THREADED_TIMEOUT_ENABLED", "true") @@ -111,7 +112,7 @@ def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) - gunicorn_p.kill() + gunicorn_p.terminate() gunicorn_p.join() # Any exception raised in execution is a 500 error. Cloud Functions 1st gen and @@ -121,6 +122,7 @@ def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): assert result.status_code == 500 +@pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill( monkeypatch, ): @@ -144,7 +146,7 @@ def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_ki result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) - gunicorn_p.kill() + gunicorn_p.terminate() gunicorn_p.join() assert result.status_code == 200 From 1e18b82207329eda5123ec0acdce0c3791396b7d Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:07:15 -0700 Subject: [PATCH 07/15] format --- src/functions_framework/exceptions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/functions_framework/exceptions.py b/src/functions_framework/exceptions.py index c0c3768e..81b9f8f0 100644 --- a/src/functions_framework/exceptions.py +++ b/src/functions_framework/exceptions.py @@ -38,4 +38,4 @@ class EventConversionException(FunctionsFrameworkException): class RequestTimeoutException(FunctionsFrameworkException): - pass \ No newline at end of file + pass From 5d170a8a92ff864eafec993bf6e95fc432f50f76 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:12:00 -0700 Subject: [PATCH 08/15] isort everything --- examples/cloud_run_cloud_events/send_cloud_event.py | 2 +- playground/main.py | 3 ++- src/functions_framework/_http/gunicorn.py | 4 +++- src/functions_framework/request_timeout.py | 1 - tests/test_timeouts.py | 6 ++++-- 5 files changed, 10 insertions(+), 6 deletions(-) diff --git a/examples/cloud_run_cloud_events/send_cloud_event.py b/examples/cloud_run_cloud_events/send_cloud_event.py index b523c31a..c08b8f93 100644 --- a/examples/cloud_run_cloud_events/send_cloud_event.py +++ b/examples/cloud_run_cloud_events/send_cloud_event.py @@ -13,9 +13,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from cloudevents.http import CloudEvent, to_structured import requests +from cloudevents.http import CloudEvent, to_structured # Create a cloudevent using https://github.com/cloudevents/sdk-python # Note we only need source and type because the cloudevents constructor by diff --git a/playground/main.py b/playground/main.py index d31e4704..b974ddbc 100644 --- a/playground/main.py +++ b/playground/main.py @@ -1,7 +1,8 @@ -import functions_framework import logging import time +import functions_framework + logger = logging.getLogger(__name__) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index a96c2a7b..fe0141a1 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -12,10 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import gunicorn.app.base import os +import gunicorn.app.base + from gunicorn.workers.gthread import ThreadWorker + from ..request_timeout import ThreadingTimeout # global for use in our custom gthread worker; the gunicorn arbiter spawns these diff --git a/src/functions_framework/request_timeout.py b/src/functions_framework/request_timeout.py index 535dfefd..ed34f66e 100644 --- a/src/functions_framework/request_timeout.py +++ b/src/functions_framework/request_timeout.py @@ -4,7 +4,6 @@ from .exceptions import RequestTimeoutException - logger = logging.getLogger(__name__) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 5ab697f7..aa2fff85 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -12,14 +12,16 @@ # See the License for the specific language governing permissions and # limitations under the License. import pathlib -import pytest -import requests import socket import time from multiprocessing import Process +import pytest +import requests + import functions_framework._http.gunicorn + from functions_framework import create_app TEST_FUNCTIONS_DIR = pathlib.Path(__file__).resolve().parent / "test_functions" From b143694cfe50de76ebcaefd686ca9defd9f4c8fc Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:21:02 -0700 Subject: [PATCH 09/15] skip tests on mac there's something test-specific about how mac pickles functions for execution in multiprocessing.Process which is causing problems. it seems somewhere in the innards of flask and gunicorn and macos... since this feature is opt-in anyway, let's just skip testing darwin. --- tests/test_timeouts.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index aa2fff85..51338586 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -38,6 +38,7 @@ def run_around_tests(): _wait_for_no_listen(TEST_HOST, TEST_PORT) +@pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_no_timeout_allows_request_processing_to_finish(): source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" @@ -64,6 +65,7 @@ def test_no_timeout_allows_request_processing_to_finish(): assert result.status_code == 200 +@pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") @@ -92,6 +94,7 @@ def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): assert result.status_code == 200 +@pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") @@ -124,6 +127,7 @@ def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): assert result.status_code == 500 +@pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill( monkeypatch, From fbffb7dbc0ecfcf3ee8e1de7138e229678c2eff2 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:30:13 -0700 Subject: [PATCH 10/15] sort tuple of dicts in async tests before asserting causes flakes sometimes in workflows --- tests/test_execution_id.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_execution_id.py b/tests/test_execution_id.py index c650ee31..d34dfe63 100644 --- a/tests/test_execution_id.py +++ b/tests/test_execution_id.py @@ -378,4 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy logs = record.err.strip().split("\n") logs_as_json = tuple(json.loads(log) for log in logs) - assert logs_as_json == expected_logs + sort_key = lambda d: d['message'] + assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key) From 6f7d6c7295cb6e3bf0f9ac68eb4e414794affb12 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:31:51 -0700 Subject: [PATCH 11/15] use double-quotes --- tests/test_execution_id.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_execution_id.py b/tests/test_execution_id.py index d34dfe63..a2601817 100644 --- a/tests/test_execution_id.py +++ b/tests/test_execution_id.py @@ -378,5 +378,5 @@ async def test_maintains_execution_id_for_concurrent_requests(monkeypatch, capsy logs = record.err.strip().split("\n") logs_as_json = tuple(json.loads(log) for log in logs) - sort_key = lambda d: d['message'] + sort_key = lambda d: d["message"] assert sorted(logs_as_json, key=sort_key) == sorted(expected_logs, key=sort_key) From 25c6e7d7fbcd2e1dde240cba0c11488c25a1cef7 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:40:52 -0700 Subject: [PATCH 12/15] also skip tests on windows - this is all built for gunicorn, there's no value adding it for windows anyway --- tests/test_timeouts.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 51338586..15afad62 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -38,6 +38,7 @@ def run_around_tests(): _wait_for_no_listen(TEST_HOST, TEST_PORT) +@pytest.mark.skipif("platform.system() == 'Windows'") @pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_no_timeout_allows_request_processing_to_finish(): @@ -65,6 +66,7 @@ def test_no_timeout_allows_request_processing_to_finish(): assert result.status_code == 200 +@pytest.mark.skipif("platform.system() == 'Windows'") @pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): @@ -94,6 +96,7 @@ def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): assert result.status_code == 200 +@pytest.mark.skipif("platform.system() == 'Windows'") @pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): @@ -127,6 +130,7 @@ def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): assert result.status_code == 500 +@pytest.mark.skipif("platform.system() == 'Windows'") @pytest.mark.skipif("platform.system() == 'Darwin'") @pytest.mark.slow_integration_test def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_kill( From 7a9acbe82788864d150fa5429e60ac27bac396d4 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 11:47:02 -0700 Subject: [PATCH 13/15] skip import on windows --- tests/test_timeouts.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 15afad62..5df0fd0c 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -20,7 +20,8 @@ import pytest import requests -import functions_framework._http.gunicorn +ff_gunicorn = pytest.importorskip("functions_framework._http.gunicorn") + from functions_framework import create_app @@ -49,7 +50,7 @@ def test_no_timeout_allows_request_processing_to_finish(): options = {} - gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + gunicorn_app = ff_gunicorn.GunicornApplication( app, TEST_HOST, TEST_PORT, False, **options ) @@ -79,7 +80,7 @@ def test_timeout_but_not_threaded_timeout_enabled_does_not_kill(monkeypatch): options = {} - gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + gunicorn_app = ff_gunicorn.GunicornApplication( app, TEST_HOST, TEST_PORT, False, **options ) @@ -109,7 +110,7 @@ def test_timeout_and_threaded_timeout_enabled_kills(monkeypatch): options = {} - gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + gunicorn_app = ff_gunicorn.GunicornApplication( app, TEST_HOST, TEST_PORT, False, **options ) @@ -145,7 +146,7 @@ def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_ki options = {} - gunicorn_app = functions_framework._http.gunicorn.GunicornApplication( + gunicorn_app = ff_gunicorn.GunicornApplication( app, TEST_HOST, TEST_PORT, False, **options ) From 0a70fb9cc34bbdfed96c7fac7863fddbf09604f8 Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 13:00:12 -0700 Subject: [PATCH 14/15] easy stuff --- src/functions_framework/_http/gunicorn.py | 6 +----- tests/test_functions/timeout/main.py | 2 +- tests/test_timeouts.py | 3 ++- 3 files changed, 4 insertions(+), 7 deletions(-) diff --git a/src/functions_framework/_http/gunicorn.py b/src/functions_framework/_http/gunicorn.py index fe0141a1..92cad90e 100644 --- a/src/functions_framework/_http/gunicorn.py +++ b/src/functions_framework/_http/gunicorn.py @@ -45,7 +45,7 @@ def __init__(self, app, host, port, debug, **options): if ( TIMEOUT_SECONDS > 0 and threads > 1 - and _is_truthy(os.environ.get("THREADED_TIMEOUT_ENABLED", "False")) + and (os.environ.get("THREADED_TIMEOUT_ENABLED", "False").lower() == "true") ): # pragma: no cover self.options["worker_class"] = ( "functions_framework._http.gunicorn.GThreadWorkerWithTimeoutSupport" @@ -70,7 +70,3 @@ class GThreadWorkerWithTimeoutSupport(ThreadWorker): # pragma: no cover def handle_request(self, req, conn): with ThreadingTimeout(TIMEOUT_SECONDS): super(GThreadWorkerWithTimeoutSupport, self).handle_request(req, conn) - - -def _is_truthy(s): # pragma: no cover - return str(s).lower() in ("yes", "y", "1", "yeah", "true", "t") diff --git a/tests/test_functions/timeout/main.py b/tests/test_functions/timeout/main.py index e089a779..09efeb88 100644 --- a/tests/test_functions/timeout/main.py +++ b/tests/test_functions/timeout/main.py @@ -5,7 +5,7 @@ def function(request): - # sleep for 1200 total ms (1.8 sec) + # sleep for 1200 total ms (1.2 sec) for _ in range(12): time.sleep(0.1) logger.info("some extra logging message") diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 5df0fd0c..1fbd6884 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -183,7 +183,8 @@ def _wait_for_listen(host, port, timeout=10): @pytest.mark.skip def _wait_for_no_listen(host, port, timeout=10): - # Used in tests to make sure that the + # Used in tests to make sure that the port is actually free after + # the process binding to it should have been killed start_time = time.perf_counter() while True: try: From 7e559e55e524e3722b183af542752ce5545cdd0d Mon Sep 17 00:00:00 2001 From: Jeremy Fehr Date: Fri, 17 May 2024 13:04:55 -0700 Subject: [PATCH 15/15] add a few tests for sync worker timeouts these shouldn't have changed with this commit --- tests/test_timeouts.py | 66 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/tests/test_timeouts.py b/tests/test_timeouts.py index 1fbd6884..9637de67 100644 --- a/tests/test_timeouts.py +++ b/tests/test_timeouts.py @@ -163,6 +163,72 @@ def test_timeout_and_threaded_timeout_enabled_but_timeout_not_exceeded_doesnt_ki assert result.status_code == 200 +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_sync_worker_kills_on_timeout( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "1") + monkeypatch.setenv("WORKERS", 2) + monkeypatch.setenv("THREADS", 1) + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 500 + + +@pytest.mark.skipif("platform.system() == 'Windows'") +@pytest.mark.skipif("platform.system() == 'Darwin'") +@pytest.mark.slow_integration_test +def test_timeout_sync_worker_does_not_kill_if_less_than_timeout( + monkeypatch, +): + monkeypatch.setenv("CLOUD_RUN_TIMEOUT_SECONDS", "2") + monkeypatch.setenv("WORKERS", 2) + monkeypatch.setenv("THREADS", 1) + source = TEST_FUNCTIONS_DIR / "timeout" / "main.py" + target = "function" + + app = create_app(target, source) + + options = {} + + gunicorn_app = ff_gunicorn.GunicornApplication( + app, TEST_HOST, TEST_PORT, False, **options + ) + + gunicorn_p = Process(target=gunicorn_app.run) + gunicorn_p.start() + + _wait_for_listen(TEST_HOST, TEST_PORT) + + result = requests.get("http://{}:{}/".format(TEST_HOST, TEST_PORT)) + + gunicorn_p.terminate() + gunicorn_p.join() + + assert result.status_code == 200 + + @pytest.mark.skip def _wait_for_listen(host, port, timeout=10): # Used in tests to make sure that the gunicorn app has booted and is