Skip to content

Console commands #167

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
from kafka.producer import SimpleProducer

def produce_example(client):
producer = SimpleProducer(client, "my-topic")
producer.send_messages("test")
producer = SimpleProducer(client)
producer.send_messages("my-topic", "test")

def consume_example(client):
consumer = SimpleConsumer(client, "test-group", "my-topic")
Expand Down
12 changes: 6 additions & 6 deletions kafka/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
from kafka.protocol import (
create_message, create_gzip_message, create_snappy_message
)
from kafka.producer import SimpleProducer, KeyedProducer
from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner
from kafka.consumer import SimpleConsumer, MultiProcessConsumer
from kafka.producer import ConsoleProducer, KeyedProducer, SimpleProducer
from kafka.partitioner import HashedPartitioner, RoundRobinPartitioner
from kafka.consumer import ConsoleConsumer, MultiProcessConsumer, SimpleConsumer

__all__ = [
'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer',
'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer',
'MultiProcessConsumer', 'create_message', 'create_gzip_message',
'create_snappy_message'
'ConsoleProducer', 'RoundRobinPartitioner', 'HashedPartitioner',
'ConsoleConsumer', 'SimpleConsumer', 'MultiProcessConsumer',
'create_message', 'create_gzip_message', 'create_snappy_message'
]
35 changes: 35 additions & 0 deletions kafka/_script_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import logging
from optparse import OptionParser, Option, OptionValueError

from kafka import KafkaClient

logging.basicConfig()

# Add this attribute to easily check if the option is required
Option.ATTRS.append("required")

BROKER_OPTION = Option("-b", "--broker", dest="broker", required=True,
help="Required: The address of a kafka broker")
TOPIC_OPTION = Option("-t", "--topic", dest="topic", required=True,
help="Required: The topic to consume from")

def parse_options(*extra_options):
parser = OptionParser()
options = [BROKER_OPTION, TOPIC_OPTION] + list(extra_options)
parser.add_options(options)
(opts, args) = parser.parse_args()

missing = [o._long_opts[0] for o in options
if o.required and getattr(opts, o.dest) is None]
if missing:
parser.error("Missing required option(s) %s" % ", ".join(missing))

return opts

def get_client(broker, client_id=KafkaClient.CLIENT_ID):
try:
(host, port) = broker.split(':')
except ValueError:
raise OptionValueError("Broker should be in the form 'host:port'")

return KafkaClient(host, int(port), client_id)
6 changes: 6 additions & 0 deletions kafka/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -674,3 +674,9 @@ def get_messages(self, count=1, block=True, timeout=10):
self._auto_commit()

return messages


class ConsoleConsumer(SimpleConsumer):
def run(self):
for message in self:
print(message.message.value)
10 changes: 10 additions & 0 deletions kafka/producer.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,13 @@ def send(self, topic, key, msg):

def __repr__(self):
return '<KeyedProducer batch=%s>' % self.async


class ConsoleProducer(SimpleProducer):
def run(self, topic):
import readline
while True:
try:
self.send_messages(topic, raw_input())
except EOFError:
break
21 changes: 21 additions & 0 deletions kafka/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from collections import defaultdict
import struct
from threading import Thread, Event
from time import sleep, time

from kafka.common import BufferUnderflowError

Expand Down Expand Up @@ -114,3 +115,23 @@ def stop(self):
self.active.set()
self.thread.join(self.t + 1)
self.timer = None

DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS = 30
def ensure_topic_creation(client, topic,
timeout=DEFAULT_TOPIC_CREATION_TIMEOUT_SECONDS):
if timeout is not None:
max_time = time() + timeout

while timeout is None or timeout > 0:
client.load_metadata_for_topics(topic)
if client.has_metadata_for_topic(topic):
return

if timeout is not None:
# If we have a timeout, reduce it to the appropriate value
timeout = max_time - time()

sleep(1)

raise RuntimeError("Unable to create topic %s" % topic)

28 changes: 28 additions & 0 deletions scripts/kp_consumer
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
#!/usr/bin/env python

from optparse import Option
import sys

from kafka import ConsoleConsumer
from kafka._script_utils import parse_options, get_client

GROUP_OPTION = Option("-g", "--group", dest="group", required=True,
help="Required: The consumer group")
CLIENT_ID = "kp_consumer"

def main():
options = parse_options(GROUP_OPTION)
client = get_client(options.broker, CLIENT_ID)
consumer = ConsoleConsumer(client, options.group, options.topic,
auto_commit=False)
try:
consumer.run()
except KeyboardInterrupt:
consumer.stop()
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
22 changes: 22 additions & 0 deletions scripts/kp_create_topic
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#!/usr/bin/env python

import sys

from kafka.util import ensure_topic_creation
from kafka._script_utils import parse_options, get_client

CLIENT_ID = "kp_create_topic"

def main():
options = parse_options()
client = get_client(options.broker, CLIENT_ID)
try:
print "Creating topic %s..." % options.topic
ensure_topic_creation(client, options.topic)
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
25 changes: 25 additions & 0 deletions scripts/kp_producer
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#!/usr/bin/env python

import sys

from kafka import ConsoleProducer
from kafka._script_utils import parse_options, get_client

CLIENT_ID = "kp_producer"

def main():
options = parse_options()
client = get_client(options.broker, CLIENT_ID)
producer = ConsoleProducer(client)
try:
producer.run(options.topic)
producer.stop()
except KeyboardInterrupt:
producer.stop()
finally:
client.close()
print("Done!")
return 0

if __name__ == '__main__':
sys.exit(main())
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def run(self):
cmdclass={"test": Tox},

packages=["kafka"],
scripts=["scripts/kp_consumer", "scripts/kp_producer", "scripts/kp_create_topic"],

author="David Arthur",
author_email="mumrah@gmail.com",
Expand Down
15 changes: 1 addition & 14 deletions test/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from kafka.common import * # noqa
from kafka.codec import has_gzip, has_snappy
from kafka.consumer import MAX_FETCH_BUFFER_SIZE_BYTES
from kafka.util import ensure_topic_creation
from .fixtures import ZookeeperFixture, KafkaFixture


Expand All @@ -17,20 +18,6 @@ def random_string(l):
return s


def ensure_topic_creation(client, topic_name):
times = 0
while True:
times += 1
client.load_metadata_for_topics(topic_name)
if client.has_metadata_for_topic(topic_name):
break
print "Waiting for %s topic to be created" % topic_name
time.sleep(1)

if times > 30:
raise Exception("Unable to create topic %s" % topic_name)


class KafkaTestCase(unittest.TestCase):
def setUp(self):
self.topic = "%s-%s" % (self.id()[self.id().rindex(".")+1:], random_string(10))
Expand Down