Skip to content

Commit ee7e86e

Browse files
author
Mark Roberts
committed
Update example.py to compile, add friendly load_example.py
1 parent e5fdc1c commit ee7e86e

File tree

2 files changed

+92
-13
lines changed

2 files changed

+92
-13
lines changed

example.py

100644100755
+35-13
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,45 @@
1-
import logging
1+
#!/usr/bin/env python
2+
import threading, logging, time
23

3-
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
4+
from kafka.client import KafkaClient
45
from kafka.consumer import SimpleConsumer
56
from kafka.producer import SimpleProducer
67

7-
def produce_example(client):
8-
producer = SimpleProducer(client, "my-topic")
9-
producer.send_messages("test")
8+
class Producer(threading.Thread):
9+
daemon = True
1010

11-
def consume_example(client):
12-
consumer = SimpleConsumer(client, "test-group", "my-topic")
13-
for message in consumer:
14-
print(message)
11+
def run(self):
12+
client = KafkaClient("localhost", 9092)
13+
producer = SimpleProducer(client)
14+
15+
while True:
16+
producer.send_messages('my-topic', "test")
17+
producer.send_messages('my-topic', "\xc2Hola, mundo!")
18+
19+
time.sleep(1)
20+
21+
22+
class Consumer(threading.Thread):
23+
daemon = True
24+
25+
def run(self):
26+
client = KafkaClient("localhost", 9092)
27+
consumer = SimpleConsumer(client, "test-group", "my-topic")
28+
29+
for message in consumer:
30+
print(message)
1531

1632
def main():
17-
client = KafkaClient("localhost", 9092)
18-
produce_example(client)
19-
consume_example(client)
33+
threads = [
34+
Producer(),
35+
Consumer()
36+
]
37+
38+
for t in threads:
39+
t.start()
40+
41+
time.sleep(5)
2042

2143
if __name__ == "__main__":
22-
logging.basicConfig(level=logging.DEBUG)
44+
logging.basicConfig(level=logging.WARN)
2345
main()

load_example.py

+57
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
#!/usr/bin/env python
2+
import threading, logging, time, collections
3+
4+
from kafka.client import KafkaClient
5+
from kafka.consumer import SimpleConsumer
6+
from kafka.producer import SimpleProducer
7+
8+
msg_size = 524288
9+
10+
class Producer(threading.Thread):
11+
daemon = True
12+
big_msg = "1" * msg_size
13+
14+
def run(self):
15+
client = KafkaClient("localhost", 9092)
16+
producer = SimpleProducer(client)
17+
self.sent = 0
18+
19+
while True:
20+
producer.send_messages('my-topic', self.big_msg)
21+
self.sent += 1
22+
23+
24+
class Consumer(threading.Thread):
25+
daemon = True
26+
27+
def run(self):
28+
client = KafkaClient("localhost", 9092)
29+
consumer = SimpleConsumer(client, "test-group", "my-topic",
30+
max_buffer_size = None,
31+
)
32+
self.valid = 0
33+
self.invalid = 0
34+
35+
for message in consumer:
36+
if len(message.message.value) == msg_size:
37+
self.valid += 1
38+
else:
39+
self.invalid += 1
40+
41+
def main():
42+
threads = [
43+
Producer(),
44+
Consumer()
45+
]
46+
47+
for t in threads:
48+
t.start()
49+
50+
time.sleep(10)
51+
print 'Messages sent: %d' % threads[0].sent
52+
print 'Messages recvd: %d' % threads[1].valid
53+
print 'Messages invalid: %d' % threads[1].invalid
54+
55+
if __name__ == "__main__":
56+
logging.basicConfig(level=logging.DEBUG)
57+
main()

0 commit comments

Comments
 (0)