forked from dpkp/kafka-python
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathqueue.py
219 lines (179 loc) · 7.44 KB
/
queue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
from __future__ import absolute_import
from copy import copy
import logging
from multiprocessing import Process, Queue, Event
from Queue import Empty
import time
from kafka.client import KafkaClient, FetchRequest, ProduceRequest
log = logging.getLogger("kafka")
raise NotImplementedError("Still need to refactor this class")
class KafkaConsumerProcess(Process):
def __init__(self, client, topic, partition, out_queue, barrier,
consumer_fetch_size=1024, consumer_sleep=200):
self.client = copy(client)
self.topic = topic
self.partition = partition
self.out_queue = out_queue
self.barrier = barrier
self.consumer_fetch_size = consumer_fetch_size
self.consumer_sleep = consumer_sleep / 1000.
log.info("Initializing %s" % self)
Process.__init__(self)
def __str__(self):
return "[KafkaConsumerProcess: topic=%s, \
partition=%s, sleep=%s]" % \
(self.topic, self.partition, self.consumer_sleep)
def run(self):
self.barrier.wait()
log.info("Starting %s" % self)
fetchRequest = FetchRequest(self.topic, self.partition,
offset=0, size=self.consumer_fetch_size)
while True:
if self.barrier.is_set() is False:
log.info("Shutdown %s" % self)
self.client.close()
break
lastOffset = fetchRequest.offset
(messages, fetchRequest) = self.client.get_message_set(fetchRequest)
if fetchRequest.offset == lastOffset:
log.debug("No more data for this partition, "
"sleeping a bit (200ms)")
time.sleep(self.consumer_sleep)
continue
for message in messages:
self.out_queue.put(message)
class KafkaProducerProcess(Process):
def __init__(self, client, topic, in_queue, barrier,
producer_flush_buffer=500,
producer_flush_timeout=2000,
producer_timeout=100):
self.client = copy(client)
self.topic = topic
self.in_queue = in_queue
self.barrier = barrier
self.producer_flush_buffer = producer_flush_buffer
self.producer_flush_timeout = producer_flush_timeout / 1000.
self.producer_timeout = producer_timeout / 1000.
log.info("Initializing %s" % self)
Process.__init__(self)
def __str__(self):
return "[KafkaProducerProcess: topic=%s, \
flush_buffer=%s, flush_timeout=%s, timeout=%s]" % \
(self.topic,
self.producer_flush_buffer,
self.producer_flush_timeout,
self.producer_timeout)
def run(self):
self.barrier.wait()
log.info("Starting %s" % self)
messages = []
last_produce = time.time()
def flush(messages):
self.client.send_message_set(ProduceRequest(self.topic, -1,
messages))
del messages[:]
while True:
if self.barrier.is_set() is False:
log.info("Shutdown %s, flushing messages" % self)
flush(messages)
self.client.close()
break
if len(messages) > self.producer_flush_buffer:
log.debug("Message count threshold reached. Flushing messages")
flush(messages)
last_produce = time.time()
elif (time.time() - last_produce) > self.producer_flush_timeout:
log.debug("Producer timeout reached. Flushing messages")
flush(messages)
last_produce = time.time()
try:
msg = KafkaClient.create_message(
self.in_queue.get(True, self.producer_timeout))
messages.append(msg)
except Empty:
continue
class KafkaQueue(object):
def __init__(self, client, topic, partitions,
producer_config=None, consumer_config=None):
"""
KafkaQueue a Queue-like object backed by a Kafka producer and some
number of consumers
Messages are eagerly loaded by the consumer in batches of size
consumer_fetch_size.
Messages are buffered in the producer thread until
producer_flush_timeout or producer_flush_buffer is reached.
Params
======
client: KafkaClient object
topic: str, the topic name
partitions: list of ints, the partions to consume from
producer_config: dict, see below
consumer_config: dict, see below
Consumer Config
===============
consumer_fetch_size: int, number of bytes to fetch in one call
to Kafka. Default is 1024
consumer_sleep: int, time in milliseconds a consumer should sleep
when it reaches the end of a partition. Default is 200
Producer Config
===============
producer_timeout: int, time in milliseconds a producer should
wait for messages to enqueue for producing.
Default is 100
producer_flush_timeout: int, time in milliseconds a producer
should allow messages to accumulate before
sending to Kafka. Default is 2000
producer_flush_buffer: int, number of messages a producer should
allow to accumulate. Default is 500
"""
producer_config = {} if producer_config is None else producer_config
consumer_config = {} if consumer_config is None else consumer_config
self.in_queue = Queue()
self.out_queue = Queue()
self.consumers = []
self.barrier = Event()
# Initialize and start consumer threads
for partition in partitions:
consumer = KafkaConsumerProcess(client, topic, partition,
self.in_queue, self.barrier,
**consumer_config)
consumer.start()
self.consumers.append(consumer)
# Initialize and start producer thread
self.producer = KafkaProducerProcess(client, topic, self.out_queue,
self.barrier, **producer_config)
self.producer.start()
# Trigger everything to start
self.barrier.set()
def get(self, block=True, timeout=None):
"""
Consume a message from Kafka
Params
======
block: boolean, default True
timeout: int, number of seconds to wait when blocking, default None
Returns
=======
msg: str, the payload from Kafka
"""
return self.in_queue.get(block, timeout).payload
def put(self, msg, block=True, timeout=None):
"""
Send a message to Kafka
Params
======
msg: std, the message to send
block: boolean, default True
timeout: int, number of seconds to wait when blocking, default None
"""
self.out_queue.put(msg, block, timeout)
def close(self):
"""
Close the internal queues and Kafka consumers/producer
"""
self.in_queue.close()
self.out_queue.close()
self.barrier.clear()
self.producer.join()
for consumer in self.consumers:
consumer.join()