Skip to content

Commit ba4ec47

Browse files
committed
Merge pull request dpkp#487 from dpkp/kafka_version_tests
Refactor kafka_versions to support arbitrary operators (> >= < <= ! =)
2 parents 1856063 + 4bd20aa commit ba4ec47

5 files changed

+47
-56
lines changed

test/test_client_integration.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ def tearDownClass(cls): # noqa
2727
cls.server.close()
2828
cls.zk.close()
2929

30-
@kafka_versions("all")
3130
def test_consume_none(self):
3231
fetch = FetchRequest(self.bytes_topic, 0, 0, 1024)
3332

@@ -39,7 +38,6 @@ def test_consume_none(self):
3938
messages = list(fetch_resp.messages)
4039
self.assertEqual(len(messages), 0)
4140

42-
@kafka_versions("all")
4341
def test_ensure_topic_exists(self):
4442

4543
# assume that self.topic was created by setUp
@@ -50,7 +48,6 @@ def test_ensure_topic_exists(self):
5048
with self.assertRaises(KafkaTimeoutError):
5149
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)
5250

53-
@kafka_versions('all')
5451
def test_send_produce_request_maintains_request_response_order(self):
5552

5653
self.client.ensure_topic_exists(b'foo')
@@ -83,7 +80,7 @@ def test_send_produce_request_maintains_request_response_order(self):
8380
# Offset Tests #
8481
####################
8582

86-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
83+
@kafka_versions('>=0.8.1')
8784
def test_commit_fetch_offsets(self):
8885
req = OffsetCommitRequest(self.bytes_topic, 0, 42, b"metadata")
8986
(resp,) = self.client.send_offset_commit_request(b"group", [req])

test/test_consumer_integration.py

Lines changed: 5 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,6 @@ def kafka_consumer(self, **configs):
7878
**configs)
7979
return consumer
8080

81-
@kafka_versions("all")
8281
def test_simple_consumer(self):
8382
self.send_messages(0, range(0, 100))
8483
self.send_messages(1, range(100, 200))
@@ -90,7 +89,6 @@ def test_simple_consumer(self):
9089

9190
consumer.stop()
9291

93-
@kafka_versions('all')
9492
def test_simple_consumer_smallest_offset_reset(self):
9593
self.send_messages(0, range(0, 100))
9694
self.send_messages(1, range(100, 200))
@@ -102,7 +100,6 @@ def test_simple_consumer_smallest_offset_reset(self):
102100
# messages from beginning.
103101
self.assert_message_count([message for message in consumer], 200)
104102

105-
@kafka_versions('all')
106103
def test_simple_consumer_largest_offset_reset(self):
107104
self.send_messages(0, range(0, 100))
108105
self.send_messages(1, range(100, 200))
@@ -120,7 +117,6 @@ def test_simple_consumer_largest_offset_reset(self):
120117
# Since the offset is set to largest we should read all the new messages.
121118
self.assert_message_count([message for message in consumer], 200)
122119

123-
@kafka_versions('all')
124120
def test_simple_consumer_no_reset(self):
125121
self.send_messages(0, range(0, 100))
126122
self.send_messages(1, range(100, 200))
@@ -132,7 +128,7 @@ def test_simple_consumer_no_reset(self):
132128
with self.assertRaises(OffsetOutOfRangeError):
133129
consumer.get_message()
134130

135-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
131+
@kafka_versions('>=0.8.1')
136132
def test_simple_consumer_load_initial_offsets(self):
137133
self.send_messages(0, range(0, 100))
138134
self.send_messages(1, range(100, 200))
@@ -149,7 +145,6 @@ def test_simple_consumer_load_initial_offsets(self):
149145
consumer = self.consumer(auto_commit=False)
150146
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
151147

152-
@kafka_versions("all")
153148
def test_simple_consumer__seek(self):
154149
self.send_messages(0, range(0, 100))
155150
self.send_messages(1, range(100, 200))
@@ -180,7 +175,6 @@ def test_simple_consumer__seek(self):
180175

181176
consumer.stop()
182177

183-
@kafka_versions("all")
184178
def test_simple_consumer_blocking(self):
185179
consumer = self.consumer()
186180

@@ -214,7 +208,6 @@ def test_simple_consumer_blocking(self):
214208

215209
consumer.stop()
216210

217-
@kafka_versions("all")
218211
def test_simple_consumer_pending(self):
219212
# make sure that we start with no pending messages
220213
consumer = self.consumer()
@@ -242,7 +235,6 @@ def test_simple_consumer_pending(self):
242235
self.assertEquals(set([0, 1]), set([pending_part1, pending_part2]))
243236
consumer.stop()
244237

245-
@kafka_versions("all")
246238
def test_multi_process_consumer(self):
247239
# Produce 100 messages to partitions 0 and 1
248240
self.send_messages(0, range(0, 100))
@@ -254,7 +246,6 @@ def test_multi_process_consumer(self):
254246

255247
consumer.stop()
256248

257-
@kafka_versions("all")
258249
def test_multi_process_consumer_blocking(self):
259250
consumer = self.consumer(consumer = MultiProcessConsumer)
260251

@@ -292,7 +283,6 @@ def test_multi_process_consumer_blocking(self):
292283

293284
consumer.stop()
294285

295-
@kafka_versions("all")
296286
def test_multi_proc_pending(self):
297287
self.send_messages(0, range(0, 10))
298288
self.send_messages(1, range(10, 20))
@@ -308,7 +298,7 @@ def test_multi_proc_pending(self):
308298

309299
consumer.stop()
310300

311-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
301+
@kafka_versions('>=0.8.1')
312302
def test_multi_process_consumer_load_initial_offsets(self):
313303
self.send_messages(0, range(0, 10))
314304
self.send_messages(1, range(10, 20))
@@ -326,7 +316,6 @@ def test_multi_process_consumer_load_initial_offsets(self):
326316
auto_commit=False)
327317
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
328318

329-
@kafka_versions("all")
330319
def test_large_messages(self):
331320
# Produce 10 "normal" size messages
332321
small_messages = self.send_messages(0, [ str(x) for x in range(10) ])
@@ -343,7 +332,6 @@ def test_large_messages(self):
343332

344333
consumer.stop()
345334

346-
@kafka_versions("all")
347335
def test_huge_messages(self):
348336
huge_message, = self.send_messages(0, [
349337
create_message(random_string(MAX_FETCH_BUFFER_SIZE_BYTES + 10)),
@@ -374,7 +362,7 @@ def test_huge_messages(self):
374362

375363
big_consumer.stop()
376364

377-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
365+
@kafka_versions('>=0.8.1')
378366
def test_offset_behavior__resuming_behavior(self):
379367
self.send_messages(0, range(0, 100))
380368
self.send_messages(1, range(100, 200))
@@ -401,7 +389,7 @@ def test_offset_behavior__resuming_behavior(self):
401389
consumer1.stop()
402390
consumer2.stop()
403391

404-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
392+
@kafka_versions('>=0.8.1')
405393
def test_multi_process_offset_behavior__resuming_behavior(self):
406394
self.send_messages(0, range(0, 100))
407395
self.send_messages(1, range(100, 200))
@@ -437,7 +425,6 @@ def test_multi_process_offset_behavior__resuming_behavior(self):
437425
consumer2.stop()
438426

439427
# TODO: Make this a unit test -- should not require integration
440-
@kafka_versions("all")
441428
def test_fetch_buffer_size(self):
442429

443430
# Test parameters (see issue 135 / PR 136)
@@ -455,7 +442,6 @@ def test_fetch_buffer_size(self):
455442
messages = [ message for message in consumer ]
456443
self.assertEqual(len(messages), 2)
457444

458-
@kafka_versions("all")
459445
def test_kafka_consumer(self):
460446
self.send_messages(0, range(0, 100))
461447
self.send_messages(1, range(100, 200))
@@ -476,7 +462,6 @@ def test_kafka_consumer(self):
476462
self.assertEqual(len(messages[0]), 100)
477463
self.assertEqual(len(messages[1]), 100)
478464

479-
@kafka_versions("all")
480465
def test_kafka_consumer__blocking(self):
481466
TIMEOUT_MS = 500
482467
consumer = self.kafka_consumer(auto_offset_reset='smallest',
@@ -509,7 +494,7 @@ def test_kafka_consumer__blocking(self):
509494
self.assertEqual(len(messages), 5)
510495
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
511496

512-
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.1")
497+
@kafka_versions('>=0.8.1')
513498
def test_kafka_consumer__offset_commit_resume(self):
514499
GROUP_ID = random_string(10).encode('utf-8')
515500

test/test_failover_integration.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@
88
from kafka.util import kafka_bytestring
99

1010
from test.fixtures import ZookeeperFixture, KafkaFixture
11-
from test.testutil import (
12-
KafkaIntegrationTestCase, kafka_versions, random_string
13-
)
11+
from test.testutil import KafkaIntegrationTestCase, random_string
1412

1513

1614
log = logging.getLogger(__name__)
@@ -21,7 +19,7 @@ class TestFailover(KafkaIntegrationTestCase):
2119

2220
def setUp(self):
2321
if not os.environ.get('KAFKA_VERSION'):
24-
return
22+
self.skipTest('integration test requires KAFKA_VERSION')
2523

2624
zk_chroot = random_string(10)
2725
replicas = 3
@@ -46,7 +44,6 @@ def tearDown(self):
4644
broker.close()
4745
self.zk.close()
4846

49-
@kafka_versions("all")
5047
def test_switch_leader(self):
5148
topic = self.topic
5249
partition = 0
@@ -94,7 +91,6 @@ def test_switch_leader(self):
9491
self.assert_message_count(topic, 201, partitions=(partition,),
9592
at_least=True)
9693

97-
@kafka_versions("all")
9894
def test_switch_leader_async(self):
9995
topic = self.topic
10096
partition = 0
@@ -142,7 +138,6 @@ def test_switch_leader_async(self):
142138
self.assert_message_count(topic, 21, partitions=(partition + 1,),
143139
at_least=True)
144140

145-
@kafka_versions("all")
146141
def test_switch_leader_keyed_producer(self):
147142
topic = self.topic
148143

@@ -180,7 +175,6 @@ def test_switch_leader_keyed_producer(self):
180175
msg = random_string(10).encode('utf-8')
181176
producer.send_messages(topic, key, msg)
182177

183-
@kafka_versions("all")
184178
def test_switch_leader_simple_consumer(self):
185179
producer = Producer(self.client, async=False)
186180
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)

0 commit comments

Comments
 (0)