diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml new file mode 100644 index 000000000..0e5488491 --- /dev/null +++ b/.github/workflows/pr.yaml @@ -0,0 +1,22 @@ +name: Pull Request +on: + - pull_request +jobs: + test-build: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Upgrade pip + run: | + python -m pip install --upgrade pip wheel + - name: Build wheel + run: | + export BOTO_CONFIG=/dev/null + python setup.py bdist_wheel diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml new file mode 100644 index 000000000..202562270 --- /dev/null +++ b/.github/workflows/release.yaml @@ -0,0 +1,30 @@ +name: Release +on: + release: + types: [published] +jobs: + publish: + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [3.7] + steps: + - uses: actions/checkout@v1 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v1 + with: + python-version: ${{ matrix.python-version }} + - name: Upgrade pip & wheel + run: python -m pip install --upgrade pip wheel + - name: Set wheel version + run: echo "KAFKA_PYTHON_VERSION=${GITHUB_REF#refs/*/}" >> $GITHUB_ENV + - name: Build wheel + run: | + export BOTO_CONFIG=/dev/null + python setup.py bdist_wheel + - name: Publish wheels + uses: inmar/actions-dpn-python-publish@v1.0.0 + with: + pypi_hostname: ${{ secrets.DPN_PYPI_HOSTNAME }} + pypi_username: ${{ secrets.DPN_PYPI_USERNAME }} + pypi_password: ${{ secrets.DPN_PYPI_PASSWORD }} diff --git a/.whitesource b/.whitesource new file mode 100644 index 000000000..e33dfd56a --- /dev/null +++ b/.whitesource @@ -0,0 +1,3 @@ +{ + "settingsInheritedFrom": "inmar/whitesource-config@master" +} \ No newline at end of file diff --git a/docs/conf.py b/docs/conf.py index efa8d0807..b3506d00f 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,15 +50,8 @@ project = u'kafka-python' copyright = u'2016 -- Dana Powers, David Arthur, and Contributors' -# The version info for the project you're documenting, acts as replacement for -# |version| and |release|, also used in various other places throughout the -# built documents. -# -# The short X.Y version. -exec(open('../kafka/version.py').read()) - # The full version, including alpha/beta/rc tags. -release = __version__ +release = "" # The language for content autogenerated by Sphinx. Refer to documentation # for a list of supported languages. diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..9085a4a70 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -154,6 +154,12 @@ class KafkaClient(object): sasl mechanism handshake. Default: one of bootstrap servers sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None + keep_warm (int): The level of extra work to do to keep connections warm. + 0: Nothing extra. After bootstrapping, no extra connections will be + made, and idle connections will be closed. + 1: Reopen idle connections after closing (after connections_max_idle_ms) + 2: In addition to (1), after bootstrapping, connect to all leader brokers + immediately rather than waiting for first send. """ DEFAULT_CONFIG = { @@ -192,7 +198,8 @@ class KafkaClient(object): 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', 'sasl_kerberos_domain_name': None, - 'sasl_oauth_token_provider': None + 'sasl_oauth_token_provider': None, + 'keep_warm': 0, } def __init__(self, **configs): @@ -837,6 +844,11 @@ def _maybe_refresh_metadata(self, wakeup=False): log.debug("Sending metadata request %s to node %s", request, node_id) future = self.send(node_id, request, wakeup=wakeup) future.add_callback(self.cluster.update_metadata) + if self.config["keep_warm"] > 1: + def keep_connections_warm(cluster): + for broker in cluster.brokers(): + self.maybe_connect(broker.nodeId, wakeup=wakeup) + self.cluster.add_listener(keep_connections_warm) future.add_errback(self.cluster.failed_update) self._metadata_refresh_in_progress = True @@ -952,6 +964,10 @@ def _maybe_close_oldest_connection(self): log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms) self.close(node_id=conn_id) + if self.config['keep_warm'] > 0: + log.info('Reopening idle connection %s', conn_id) + self._maybe_connect(node_id=conn_id) + def bootstrap_connected(self): """Return True if a bootstrap node is connected""" for node_id in self._conns: diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ea010c59a..a5fd19df9 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -281,6 +281,12 @@ class KafkaProducer(object): sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider instance. (See kafka.oauth.abstract). Default: None kafka_client (callable): Custom class / callable for creating KafkaClient instances + keep_warm (int): The level of extra work to do to keep connections warm. + 0: Nothing extra. After bootstrapping, no extra connections will be + made, and idle connections will be closed. + 1: Reopen idle connections after closing (after connections_max_idle_ms) + 2: In addition to (1), after bootstrapping, connect to all leader brokers + immediately rather than waiting for first send. Note: Configuration parameters are described in more detail at @@ -335,6 +341,7 @@ class KafkaProducer(object): 'sasl_kerberos_domain_name': None, 'sasl_oauth_token_provider': None, 'kafka_client': KafkaClient, + 'keep_warm': 0, } _COMPRESSORS = { diff --git a/kafka/version.py b/kafka/version.py index 06306bd1f..9e97c2a1d 100644 --- a/kafka/version.py +++ b/kafka/version.py @@ -1 +1 @@ -__version__ = '2.0.3-dev' +__version__ = "2.1.0rc0" diff --git a/setup.py b/setup.py index fe8a594f3..9174ed91e 100644 --- a/setup.py +++ b/setup.py @@ -3,10 +3,6 @@ from setuptools import setup, Command, find_packages -# Pull version from source without importing -# since we can't import something we haven't built yet :) -exec(open('kafka/version.py').read()) - class Tox(Command): @@ -33,8 +29,7 @@ def run(cls): setup( name="kafka-python", - version=__version__, - + version=os.environ.get("KAFKA_PYTHON_VERSION"), tests_require=test_require, extras_require={ "crc32c": ["crc32c"],