Closed
Description
Hi,
I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.
I have a simple script which exhibits the problem:
from kafka.client import KafkaClient
from kafka.producer import SimpleProducer
if __name__ == '__main__':
kafka = KafkaClient("192.168.15.136", 9092)
producer = SimpleProducer(kafka, "test", async=True)
producer.send_messages("Hello")
The log output is as follows:
Traceback (most recent call last):
File "kafka_test.py", line 7, in <module>
producer = SimpleProducer(kafka, "test", async=True)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 176, in __init__
batch_send_every_t)
File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 72, in __init__
self.proc.start()
File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
self._popen = Popen(self)
File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
dump(process_obj, to_child, HIGHEST_PROTOCOL)
File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
ForkingPickler(file, protocol).dump(obj)
File "c:\python27\Lib\pickle.py", line 224, in dump
self.save(obj)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\multiprocessing\forking.py", line 66, in dispatcher
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 401, in save_reduce
save(args)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 548, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 331, in save
self.save_reduce(obj=obj, *rv)
File "c:\python27\Lib\pickle.py", line 419, in save_reduce
save(state)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 548, in save_tuple
save(element)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 649, in save_dict
self._batch_setitems(obj.iteritems())
File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
save(v)
File "c:\python27\Lib\pickle.py", line 286, in save
f(self, obj) # Call unbound method with explicit self
File "c:\python27\Lib\pickle.py", line 748, in save_global
(obj, module, name))
pickle.PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x02D36F48>: it's not found as __main__.recvfrom_into
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
self = load(from_parent)
File "c:\python27\Lib\pickle.py", line 1378, in load
return Unpickler(file).load()
File "c:\python27\Lib\pickle.py", line 858, in load
dispatch[key](self)
File "c:\python27\Lib\pickle.py", line 880, in load_eof
raise EOFError
EOFError
The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received
Metadata
Metadata
Assignees
Labels
No labels