Skip to content

Commit b48c714

Browse files
author
Roberto Gandolfo Hashioka
committed
Merge pull request #3 from docker-hub/rename_maxsize_param_name
Renamed the maxsize param name to match the potential upstream version
2 parents 97c07a0 + 704f608 commit b48c714

File tree

3 files changed

+12
-12
lines changed

3 files changed

+12
-12
lines changed

kafka/producer/base.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ class Producer(object):
101101
batch_send: If True, messages are send in batches
102102
batch_send_every_n: If set, messages are send in batches of this size
103103
batch_send_every_t: If set, messages are send after this timeout
104-
maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
104+
async_queue_maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
105105
"""
106106

107107
ACK_NOT_REQUIRED = 0 # No ack is required
@@ -117,13 +117,13 @@ def __init__(self, client, async=False,
117117
batch_send=False,
118118
batch_send_every_n=BATCH_SEND_MSG_COUNT,
119119
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
120-
maxsize=None):
120+
async_queue_maxsize=None):
121121

122122
if batch_send:
123123
async = True
124124
assert batch_send_every_n > 0
125125
assert batch_send_every_t > 0
126-
assert maxsize >= 0
126+
assert async_queue_maxsize >= 0
127127
else:
128128
batch_send_every_n = 1
129129
batch_send_every_t = 3600
@@ -134,8 +134,8 @@ def __init__(self, client, async=False,
134134
self.ack_timeout = ack_timeout
135135
self.stopped = False
136136

137-
if maxsize is None:
138-
maxsize = ASYNC_QUEUE_MAXSIZE
137+
if async_queue_maxsize is None:
138+
async_queue_maxsize = ASYNC_QUEUE_MAXSIZE
139139

140140
if codec is None:
141141
codec = CODEC_NONE
@@ -148,7 +148,7 @@ def __init__(self, client, async=False,
148148
log.warning("async producer does not guarantee message delivery!")
149149
log.warning("Current implementation does not retry Failed messages")
150150
log.warning("Use at your own risk! (or help improve with a PR!)")
151-
self.queue = Queue(maxsize) # Messages are sent through this queue
151+
self.queue = Queue(async_queue_maxsize) # Messages are sent through this queue
152152
self.thread_stop_event = Event()
153153
self.thread = Thread(target=_send_upstream,
154154
args=(self.queue,

kafka/producer/simple.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ class SimpleProducer(Producer):
2424
client: The Kafka client instance to use
2525
2626
Keyword Arguments:
27-
maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
27+
async_queue_maxsize: sets the upper-bound limit on the number of items that can be placed in the queue
2828
async: If True, the messages are sent asynchronously via another
2929
thread (process). We will not wait for a response to these
3030
req_acks: A value indicating the acknowledgements that the server must
@@ -47,14 +47,14 @@ def __init__(self, client, async=False,
4747
batch_send_every_n=BATCH_SEND_MSG_COUNT,
4848
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
4949
random_start=True,
50-
maxsize=None):
50+
async_queue_maxsize=None):
5151
self.partition_cycles = {}
5252
self.random_start = random_start
5353
super(SimpleProducer, self).__init__(client, async, req_acks,
5454
ack_timeout, codec, batch_send,
5555
batch_send_every_n,
5656
batch_send_every_t,
57-
maxsize)
57+
async_queue_maxsize)
5858

5959
def _next_partition(self, topic):
6060
if topic not in self.partition_cycles:

test/test_producer.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ def partitions(topic):
4848
@patch('kafka.producer.base._send_upstream')
4949
def test_producer_async_queue_overfilled_batch_send(self, mock):
5050
queue_size = 2
51-
producer = Producer(MagicMock(), batch_send=True, maxsize=queue_size)
51+
producer = Producer(MagicMock(), batch_send=True, async_queue_maxsize=queue_size)
5252

5353
topic = b'test-topic'
5454
partition = 0
@@ -64,7 +64,7 @@ def test_producer_async_queue_overfilled_batch_send(self, mock):
6464
@patch('kafka.producer.base._send_upstream')
6565
def test_producer_async_queue_overfilled(self, mock):
6666
queue_size = 2
67-
producer = Producer(MagicMock(), async=True, maxsize=queue_size)
67+
producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size)
6868

6969
topic = b'test-topic'
7070
partition = 0
@@ -79,7 +79,7 @@ def test_producer_async_queue_overfilled(self, mock):
7979

8080
def test_producer_async_queue_normal(self):
8181
queue_size = 4
82-
producer = Producer(MagicMock(), async=True, maxsize=queue_size)
82+
producer = Producer(MagicMock(), async=True, async_queue_maxsize=queue_size)
8383

8484
topic = b'test-topic'
8585
partition = 0

0 commit comments

Comments
 (0)