You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
We recently switch to leveraging the Confluent terraform provider for generating the configuration data for our applications into our secret store, which then loads directly into our apps via environment variables.
Before the change our BOOTSTRAP_SERVERS env var looked like this:
pkc-wtf25.us-east-2.aws.confluent.cloud:9092
After the change the output from the terraform resource for bootstrap_endpoint looks like this:
Our Java applications seem to have taken this change and not even noticed it, however the python apps leveraging kafka-python are running into an issue where the get_ip_port_afi function doesn't recognize that there might be a prefix of XXX:// on the broker.host.
[2025-05-01 17:04:55,456] ERROR [mysdk.app.container.kafka.kafka_listener.listen:90] Error while consuming message: invalid literal for int() with base 10: '//pkc-wtf25.us-east-2.aws.confluent.cloud'
Traceback (most recent call last):
File "/app/.venv/lib/python3.12/site-packages/mysdk/app/container/kafka/kafka_listener.py", line 44, in listen
topic_to_msgs: dict[str, list[ConsumerRecord]] = consumer.poll(
^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 693, in poll
records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 713, in _poll_once
if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/consumer.py", line 279, in poll
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
self._client.poll(future=future, timeout_ms=inner_timeout_ms())
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 667, in poll
metadata_timeout_ms = self._maybe_refresh_metadata()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 965, in _maybe_refresh_metadata
if not self._init_connect(node_id):
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 443, in _init_connect
host, port, afi = get_ip_port_afi(broker.host)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/app/.venv/lib/python3.12/site-packages/kafka/conn.py", line 1483, in get_ip_port_afi
port = int(port)
The text was updated successfully, but these errors were encountered:
Yea, I think this is undocumented behavior of the java client. I don't see it in the documentation anywhere. https://kafka.apache.org/documentation/#consumerconfigs_bootstrap.servers . From the source code it appears that the "protocol://" portion is parsed but ignored. Clients require separate configuration for security.protocol (plus likely more to configure the SASL auth specifics). Also the broker MetadataResponse returns only host + port, so internally clients use a single global configuration for SASL/SSL I think.
So this seems like a bug in the terraform provider. Unless it is intended to be used for kafka broker configuration, which does use the protocol value in protocol://host:port (e.g., in listeners=...)` But in any event, it may be worth flagging for them as well. It wouldn't surprise me if other non-java clients had problems with this as well.
We recently switch to leveraging the Confluent terraform provider for generating the configuration data for our applications into our secret store, which then loads directly into our apps via environment variables.
Before the change our
BOOTSTRAP_SERVERS
env var looked like this:After the change the output from the terraform resource for bootstrap_endpoint looks like this:
Our Java applications seem to have taken this change and not even noticed it, however the python apps leveraging kafka-python are running into an issue where the
get_ip_port_afi
function doesn't recognize that there might be a prefix ofXXX://
on thebroker.host
.The text was updated successfully, but these errors were encountered: