Skip to content

Problem using async=True #46

Closed
Closed
@watchforstock

Description

@watchforstock

Hi,

I'm having trouble using an asynchronous producer. I'm running the latest copy of the code from github.

I have a simple script which exhibits the problem:

from kafka.client import KafkaClient
from kafka.producer import SimpleProducer

if __name__ == '__main__':

    kafka = KafkaClient("192.168.15.136", 9092)
    producer = SimpleProducer(kafka, "test", async=True)

    producer.send_messages("Hello")

The log output is as follows:

Traceback (most recent call last):
  File "kafka_test.py", line 7, in <module>
    producer = SimpleProducer(kafka, "test", async=True)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 176, in __init__
    batch_send_every_t)
  File "C:\Users\astock\envs\midmarket\lib\site-packages\kafka\producer.py", line 72, in __init__
    self.proc.start()
  File "c:\python27\Lib\multiprocessing\process.py", line 130, in start
    self._popen = Popen(self)
  File "c:\python27\Lib\multiprocessing\forking.py", line 271, in __init__
    dump(process_obj, to_child, HIGHEST_PROTOCOL)
  File "c:\python27\Lib\multiprocessing\forking.py", line 193, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "c:\python27\Lib\pickle.py", line 224, in dump
    self.save(obj)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\multiprocessing\forking.py", line 66, in dispatcher
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 401, in save_reduce
    save(args)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 686, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 331, in save
    self.save_reduce(obj=obj, *rv)
  File "c:\python27\Lib\pickle.py", line 419, in save_reduce
    save(state)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 548, in save_tuple
    save(element)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 649, in save_dict
    self._batch_setitems(obj.iteritems())
  File "c:\python27\Lib\pickle.py", line 681, in _batch_setitems
    save(v)
  File "c:\python27\Lib\pickle.py", line 286, in save
    f(self, obj) # Call unbound method with explicit self
  File "c:\python27\Lib\pickle.py", line 748, in save_global
    (obj, module, name))
pickle.PicklingError: Can't pickle <built-in method recvfrom_into of _socket.socket object at 0x02D36F48>: it's not found as __main__.recvfrom_into
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "c:\python27\Lib\multiprocessing\forking.py", line 374, in main
    self = load(from_parent)
  File "c:\python27\Lib\pickle.py", line 1378, in load
    return Unpickler(file).load()
  File "c:\python27\Lib\pickle.py", line 858, in load
    dispatch[key](self)
  File "c:\python27\Lib\pickle.py", line 880, in load_eof
    raise EOFError
EOFError

The same code works if I change async to False. This is running on Python 2.7.3 on Windows against Kafka 0.8.0. Any suggestions gratefully received

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions