Skip to content

Key Error: Single Broker #183

Closed
Closed
@jshaw86

Description

@jshaw86

Hi, I think this is related to #113, #150, #174. I'm prototyping a kafka environment with a single broker and 3 node zookeeper cluster. I installed the master version since this was a known issue but I'm still getting a key error when producing messages with SimpleProducer.

This cluster just had the topic created with the command line tool and here is the describe:

bin/kafka-topics.sh --describe --topic test --zookeeper XXX.XXX.XXX.XXX:YYYY
Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 1       Replicas: 1     Isr: 1

Error:

client WARNING No partitions for test
Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
    result = self._run(*self.args, **self.kwargs)
  File "/opt/logparser_deploy/logparser/core.py", line 247, in _run
    response = self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 230, in send_messages
    partition = self._next_partition(topic)
  File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 219, in _next_partition
    self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
KeyError: 'test'

Connect Code:

from kafka.client import KafkaClient                                                                                                                                             
from kafka.producer import SimpleProducer
for i in range(0,self.KAFKA_CONNECT_RETRIES):
       try:
          self._kafka_client = KafkaClient(self._kafka_addr)
       except Exception as e:
          logging.error("KAFKA ERROR: %s %s" % (e,self._kafka_addr))
       else:
          # To send messages synchronously
          self._kafka_producer = SimpleProducer(self._kafka_client)
          return True

return False

Send Code:

self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions