Skip to content

Confluent's provided bootstrap endpoint from terraform causes parsing/casting error on broker.host #2606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
gregswift-pwell opened this issue May 1, 2025 · 1 comment · Fixed by #2608

Comments

@gregswift-pwell
Copy link

We recently switch to leveraging the Confluent terraform provider for generating the configuration data for our applications into our secret store, which then loads directly into our apps via environment variables.

Before the change our BOOTSTRAP_SERVERS env var looked like this:

pkc-wtf25.us-east-2.aws.confluent.cloud:9092

After the change the output from the terraform resource for bootstrap_endpoint looks like this:

SASL_SSL://pkc-wtf25.us-east-2.aws.confluent.cloud:9092

Our Java applications seem to have taken this change and not even noticed it, however the python apps leveraging kafka-python are running into an issue where the get_ip_port_afi function doesn't recognize that there might be a prefix of XXX:// on the broker.host.

[2025-05-01 17:04:55,456] ERROR [mysdk.app.container.kafka.kafka_listener.listen:90] Error while consuming message: invalid literal for int() with base 10: '//pkc-wtf25.us-east-2.aws.confluent.cloud'
Traceback (most recent call last):
  File "/app/.venv/lib/python3.12/site-packages/mysdk/app/container/kafka/kafka_listener.py", line 44, in listen
    topic_to_msgs: dict[str, list[ConsumerRecord]] = consumer.poll(
                                                     ^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 693, in poll
    records = self._poll_once(inner_timeout_ms(), max_records, update_offsets=update_offsets)
              ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/consumer/group.py", line 713, in _poll_once
    if not self._coordinator.poll(timeout_ms=inner_timeout_ms()):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/consumer.py", line 279, in poll
    self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
  File "/app/.venv/lib/python3.12/site-packages/kafka/coordinator/base.py", line 269, in ensure_coordinator_ready
    self._client.poll(future=future, timeout_ms=inner_timeout_ms())
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 667, in poll
    metadata_timeout_ms = self._maybe_refresh_metadata()
                          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 965, in _maybe_refresh_metadata
    if not self._init_connect(node_id):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/client_async.py", line 443, in _init_connect
    host, port, afi = get_ip_port_afi(broker.host)
                      ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/app/.venv/lib/python3.12/site-packages/kafka/conn.py", line 1483, in get_ip_port_afi
    port = int(port)
@dpkp
Copy link
Owner

dpkp commented May 1, 2025

Yea, I think this is undocumented behavior of the java client. I don't see it in the documentation anywhere. https://kafka.apache.org/documentation/#consumerconfigs_bootstrap.servers . From the source code it appears that the "protocol://" portion is parsed but ignored. Clients require separate configuration for security.protocol (plus likely more to configure the SASL auth specifics). Also the broker MetadataResponse returns only host + port, so internally clients use a single global configuration for SASL/SSL I think.

So this seems like a bug in the terraform provider. Unless it is intended to be used for kafka broker configuration, which does use the protocol value in protocol://host:port (e.g., in listeners=...)` But in any event, it may be worth flagging for them as well. It wouldn't surprise me if other non-java clients had problems with this as well.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants