Closed
Description
Hi, I think this is related to #113, #150, #174. I'm prototyping a kafka environment with a single broker and 3 node zookeeper cluster. I installed the master version since this was a known issue but I'm still getting a key error when producing messages with SimpleProducer.
This cluster just had the topic created with the command line tool and here is the describe:
bin/kafka-topics.sh --describe --topic test --zookeeper XXX.XXX.XXX.XXX:YYYY
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Error:
client WARNING No partitions for test
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/gevent/greenlet.py", line 327, in run
result = self._run(*self.args, **self.kwargs)
File "/opt/logparser_deploy/logparser/core.py", line 247, in _run
response = self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 230, in send_messages
partition = self._next_partition(topic)
File "/usr/local/lib/python2.7/dist-packages/kafka/producer.py", line 219, in _next_partition
self.partition_cycles[topic] = cycle(self.client.topic_partitions[topic])
KeyError: 'test'
Connect Code:
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
for i in range(0,self.KAFKA_CONNECT_RETRIES):
try:
self._kafka_client = KafkaClient(self._kafka_addr)
except Exception as e:
logging.error("KAFKA ERROR: %s %s" % (e,self._kafka_addr))
else:
# To send messages synchronously
self._kafka_producer = SimpleProducer(self._kafka_client)
return True
return False
Send Code:
self._kafka_producer.send_messages(self.TOPIC, json.dumps(data))
Metadata
Metadata
Assignees
Labels
No labels