From 6814dc8bd3a8bafaf21668e019778e2e6e1b9fd2 Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Tue, 29 Dec 2020 09:56:57 -0500 Subject: [PATCH 1/8] WS-6221: add option to refresh idle connections rather than closing --- kafka/client_async.py | 14 +++++++++++++- kafka/producer/kafka.py | 8 ++++++++ 2 files changed, 21 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..9214744db 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,13 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + keep_warm (int): The level of extra work to do to keep connections warm. + 0: Nothing extra. After bootstrapping, no extra connections will be + made, and idle connections will be closed. + 1: In addition to (1), reopen idle connections after closing (after + connections_max_idle_ms) + 2: After bootstrapping, connect to all leader brokers immediately + rather than waiting for first send. """ DEFAULT_CONFIG = { @@ -192,7 +199,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'keep_warm': 0, } def __init__(self, **configs): @@ -952,6 +960,10 @@ def _maybe_close_oldest_connection(self): log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) self.close(node_id=conn_id) + if self.config['keep_warm'] > 0: + log.info('Reopening idle connection %s', conn_id) + self._maybe_connect(node_id=conn_id) + def bootstrap_connected(self): """Return True if a bootstrap node is connected""" for node_id in self._conns: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ea010c59a..77cbb9b76 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,13 @@ class KafkaProducer(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + keep_warm (int): The level of extra work to do to keep connections warm. + 0: Nothing extra. After bootstrapping, no extra connections will be + made, and idle connections will be closed. + 1: In addition to (1), reopen idle connections after closing (after + connections_max_idle_ms) + 2: After bootstrapping, connect to all leader brokers immediately + rather than waiting for first send. Note: Configuration parameters are described in more detail at @@ -335,6 +342,7 @@ class KafkaProducer(object): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'kafka_client': KafkaClient, + 'keep_warm': 0, } _COMPRESSORS = { From 4b564bc602e587b880a11f8949436d8cff37e55c Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Tue, 29 Dec 2020 11:05:05 -0500 Subject: [PATCH 2/8] WS-6221: add support for connecting to all brokers returned by metadata --- kafka/client_async.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 9214744db..631293762 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -845,6 +845,11 @@ def _maybe_refresh_metadata(self, wakeup=False): log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) + if self.config["keep_warm"] > 1: + def keep_connections_warm(cluster): + for broker in cluster.brokers(): + self.maybe_connect(broker.nodeId, wakeup=wakeup) + self.cluster.add_listener(keep_connections_warm) future.add_errback(self.cluster.failed_update) self._metadata_refresh_in_progress = True From dd8144949883d81435a85e2a16dc98032386644a Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Tue, 29 Dec 2020 11:12:42 -0500 Subject: [PATCH 3/8] WS-6221: adding build automation --- .github/workflows/pr.yaml | 22 ++++++++++++++++++++++ .github/workflows/release.yaml | 30 ++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 .github/workflows/pr.yaml create mode 100644 .github/workflows/release.yaml diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml new file mode 100644 index 000000000..0e5488491 --- /dev/null +++ b/.github/workflows/pr.yaml @@ -0,0 +1,22 @@ +name: Pull Request +on: + - pull_request +jobs: + test-build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Upgrade pip + run: | + python -m pip install --upgrade pip wheel + - name: Build wheel + run: | + export BOTO_CONFIG=/dev/null + python setup.py bdist_wheel diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 000000000..df30f1e37 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,30 @@ +name: Release +on: + release: + types: [published] +jobs: + publish: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Upgrade pip & wheel + run: python -m pip install --upgrade pip wheel + - name: Set wheel version + run: echo ::set-env name=KAFKA_PYTHON_VERSION::${GITHUB_REF#refs/*/} + - name: Build wheel + run: | + export BOTO_CONFIG=/dev/null + python setup.py bdist_wheel + - name: Publish wheels + uses: inmar/actions-dpn-python-publish@v1.0.0 + with: + pypi_hostname: ${{ secrets.DPN_PYPI_HOSTNAME }} + pypi_username: ${{ secrets.DPN_PYPI_USERNAME }} + pypi_password: ${{ secrets.DPN_PYPI_PASSWORD }} From 7257b952d57db2e26c3e3b9f2a2fa76f29d2b0b4 Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Tue, 29 Dec 2020 16:26:09 -0500 Subject: [PATCH 4/8] WS-6221: update action to remove set-env --- .github/workflows/release.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index df30f1e37..202562270 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -17,7 +17,7 @@ jobs: - name: Upgrade pip & wheel run: python -m pip install --upgrade pip wheel - name: Set wheel version - run: echo ::set-env name=KAFKA_PYTHON_VERSION::${GITHUB_REF#refs/*/} + run: echo "KAFKA_PYTHON_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV - name: Build wheel run: | export BOTO_CONFIG=/dev/null From f7a1665c90c4c8cedd41980821f8813cd419a909 Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Tue, 29 Dec 2020 16:33:40 -0500 Subject: [PATCH 5/8] WS-6221: update docstring --- kafka/client_async.py | 7 +++---- kafka/producer/kafka.py | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 631293762..9085a4a70 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -157,10 +157,9 @@ class KafkaClient(object): keep_warm (int): The level of extra work to do to keep connections warm. 0: Nothing extra. After bootstrapping, no extra connections will be made, and idle connections will be closed. - 1: In addition to (1), reopen idle connections after closing (after - connections_max_idle_ms) - 2: After bootstrapping, connect to all leader brokers immediately - rather than waiting for first send. + 1: Reopen idle connections after closing (after connections_max_idle_ms) + 2: In addition to (1), after bootstrapping, connect to all leader brokers + immediately rather than waiting for first send. """ DEFAULT_CONFIG = { diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 77cbb9b76..a5fd19df9 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -284,10 +284,9 @@ class KafkaProducer(object): keep_warm (int): The level of extra work to do to keep connections warm. 0: Nothing extra. After bootstrapping, no extra connections will be made, and idle connections will be closed. - 1: In addition to (1), reopen idle connections after closing (after - connections_max_idle_ms) - 2: After bootstrapping, connect to all leader brokers immediately - rather than waiting for first send. + 1: Reopen idle connections after closing (after connections_max_idle_ms) + 2: In addition to (1), after bootstrapping, connect to all leader brokers + immediately rather than waiting for first send. Note: Configuration parameters are described in more detail at From 820080eba86b04ee15ef24ddcf4cdee635202f72 Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Wed, 30 Dec 2020 08:30:50 -0500 Subject: [PATCH 6/8] WS-6221: let automation set the version for dpn purposes --- docs/conf.py | 9 +-------- kafka/version.py | 1 - setup.py | 7 +------ 3 files changed, 2 insertions(+), 15 deletions(-) delete mode 100644 kafka/version.py diff --git a/docs/conf.py b/docs/conf.py index efa8d0807..b3506d00f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,15 +50,8 @@ project = u'kafka-python' copyright = u'2016 -- Dana Powers, David Arthur, and Contributors' -# The version info for the project you're documenting, acts as replacement for -# |version| and |release|, also used in various other places throughout the -# built documents. -# -# The short X.Y version. -exec(open('../kafka/version.py').read()) - # The full version, including alpha/beta/rc tags. -release = __version__ +release = "" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/kafka/version.py b/kafka/version.py deleted file mode 100644 index 06306bd1f..000000000 --- a/kafka/version.py +++ /dev/null @@ -1 +0,0 @@ -__version__ = '2.0.3-dev' diff --git a/setup.py b/setup.py index fe8a594f3..9174ed91e 100644 --- a/setup.py +++ b/setup.py @@ -3,10 +3,6 @@ from setuptools import setup, Command, find_packages -# Pull version from source without importing -# since we can't import something we haven't built yet :) -exec(open('kafka/version.py').read()) - class Tox(Command): @@ -33,8 +29,7 @@ def run(cls): setup( name="kafka-python", - version=__version__, - + version=os.environ.get("KAFKA_PYTHON_VERSION"), tests_require=test_require, extras_require={ "crc32c": ["crc32c"], From 4b0453bd1d8ddd64745c777c54fcdb3ffdab70de Mon Sep 17 00:00:00 2001 From: mattsteen-inmar Date: Wed, 30 Dec 2020 16:13:42 -0500 Subject: [PATCH 7/8] WS-6221: restore version.py for use elsewhere in project --- kafka/version.py | 1 + 1 file changed, 1 insertion(+) create mode 100644 kafka/version.py diff --git a/kafka/version.py b/kafka/version.py new file mode 100644 index 000000000..9e97c2a1d --- /dev/null +++ b/kafka/version.py @@ -0,0 +1 @@ +__version__ = "2.1.0rc0" From 81460f73a21022953256e227f03073d9248425cc Mon Sep 17 00:00:00 2001 From: rverry <92880936+rverry@users.noreply.github.com> Date: Wed, 22 Dec 2021 16:33:33 -0500 Subject: [PATCH 8/8] Adding WhiteSource configuration file --- .whitesource | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .whitesource diff --git a/.whitesource b/.whitesource new file mode 100644 index 000000000..e33dfd56a --- /dev/null +++ b/.whitesource @@ -0,0 +1,3 @@ +{ + "settingsInheritedFrom": "inmar/whitesource-config@master" +} \ No newline at end of file