@@ -78,7 +78,6 @@ def kafka_consumer(self, **configs):
78
78
** configs )
79
79
return consumer
80
80
81
- @kafka_versions ("all" )
82
81
def test_simple_consumer (self ):
83
82
self .send_messages (0 , range (0 , 100 ))
84
83
self .send_messages (1 , range (100 , 200 ))
@@ -90,7 +89,6 @@ def test_simple_consumer(self):
90
89
91
90
consumer .stop ()
92
91
93
- @kafka_versions ('all' )
94
92
def test_simple_consumer_smallest_offset_reset (self ):
95
93
self .send_messages (0 , range (0 , 100 ))
96
94
self .send_messages (1 , range (100 , 200 ))
@@ -102,7 +100,6 @@ def test_simple_consumer_smallest_offset_reset(self):
102
100
# messages from beginning.
103
101
self .assert_message_count ([message for message in consumer ], 200 )
104
102
105
- @kafka_versions ('all' )
106
103
def test_simple_consumer_largest_offset_reset (self ):
107
104
self .send_messages (0 , range (0 , 100 ))
108
105
self .send_messages (1 , range (100 , 200 ))
@@ -120,7 +117,6 @@ def test_simple_consumer_largest_offset_reset(self):
120
117
# Since the offset is set to largest we should read all the new messages.
121
118
self .assert_message_count ([message for message in consumer ], 200 )
122
119
123
- @kafka_versions ('all' )
124
120
def test_simple_consumer_no_reset (self ):
125
121
self .send_messages (0 , range (0 , 100 ))
126
122
self .send_messages (1 , range (100 , 200 ))
@@ -132,7 +128,7 @@ def test_simple_consumer_no_reset(self):
132
128
with self .assertRaises (OffsetOutOfRangeError ):
133
129
consumer .get_message ()
134
130
135
- @kafka_versions (" 0.8.1" , "0.8.1.1" , "0.8.2.1" )
131
+ @kafka_versions ('>= 0.8.1' )
136
132
def test_simple_consumer_load_initial_offsets (self ):
137
133
self .send_messages (0 , range (0 , 100 ))
138
134
self .send_messages (1 , range (100 , 200 ))
@@ -149,7 +145,6 @@ def test_simple_consumer_load_initial_offsets(self):
149
145
consumer = self .consumer (auto_commit = False )
150
146
self .assertEqual (consumer .offsets , {0 : 51 , 1 : 101 })
151
147
152
- @kafka_versions ("all" )
153
148
def test_simple_consumer__seek (self ):
154
149
self .send_messages (0 , range (0 , 100 ))
155
150
self .send_messages (1 , range (100 , 200 ))
@@ -180,7 +175,6 @@ def test_simple_consumer__seek(self):
180
175
181
176
consumer .stop ()
182
177
183
- @kafka_versions ("all" )
184
178
def test_simple_consumer_blocking (self ):
185
179
consumer = self .consumer ()
186
180
@@ -214,7 +208,6 @@ def test_simple_consumer_blocking(self):
214
208
215
209
consumer .stop ()
216
210
217
- @kafka_versions ("all" )
218
211
def test_simple_consumer_pending (self ):
219
212
# make sure that we start with no pending messages
220
213
consumer = self .consumer ()
@@ -242,7 +235,6 @@ def test_simple_consumer_pending(self):
242
235
self .assertEquals (set ([0 , 1 ]), set ([pending_part1 , pending_part2 ]))
243
236
consumer .stop ()
244
237
245
- @kafka_versions ("all" )
246
238
def test_multi_process_consumer (self ):
247
239
# Produce 100 messages to partitions 0 and 1
248
240
self .send_messages (0 , range (0 , 100 ))
@@ -254,7 +246,6 @@ def test_multi_process_consumer(self):
254
246
255
247
consumer .stop ()
256
248
257
- @kafka_versions ("all" )
258
249
def test_multi_process_consumer_blocking (self ):
259
250
consumer = self .consumer (consumer = MultiProcessConsumer )
260
251
@@ -292,7 +283,6 @@ def test_multi_process_consumer_blocking(self):
292
283
293
284
consumer .stop ()
294
285
295
- @kafka_versions ("all" )
296
286
def test_multi_proc_pending (self ):
297
287
self .send_messages (0 , range (0 , 10 ))
298
288
self .send_messages (1 , range (10 , 20 ))
@@ -308,7 +298,7 @@ def test_multi_proc_pending(self):
308
298
309
299
consumer .stop ()
310
300
311
- @kafka_versions (" 0.8.1" , "0.8.1.1" , "0.8.2.1" )
301
+ @kafka_versions ('>= 0.8.1' )
312
302
def test_multi_process_consumer_load_initial_offsets (self ):
313
303
self .send_messages (0 , range (0 , 10 ))
314
304
self .send_messages (1 , range (10 , 20 ))
@@ -326,7 +316,6 @@ def test_multi_process_consumer_load_initial_offsets(self):
326
316
auto_commit = False )
327
317
self .assertEqual (consumer .offsets , {0 : 5 , 1 : 15 })
328
318
329
- @kafka_versions ("all" )
330
319
def test_large_messages (self ):
331
320
# Produce 10 "normal" size messages
332
321
small_messages = self .send_messages (0 , [ str (x ) for x in range (10 ) ])
@@ -343,7 +332,6 @@ def test_large_messages(self):
343
332
344
333
consumer .stop ()
345
334
346
- @kafka_versions ("all" )
347
335
def test_huge_messages (self ):
348
336
huge_message , = self .send_messages (0 , [
349
337
create_message (random_string (MAX_FETCH_BUFFER_SIZE_BYTES + 10 )),
@@ -374,7 +362,7 @@ def test_huge_messages(self):
374
362
375
363
big_consumer .stop ()
376
364
377
- @kafka_versions (" 0.8.1" , "0.8.1.1" , "0.8.2.1" )
365
+ @kafka_versions ('>= 0.8.1' )
378
366
def test_offset_behavior__resuming_behavior (self ):
379
367
self .send_messages (0 , range (0 , 100 ))
380
368
self .send_messages (1 , range (100 , 200 ))
@@ -401,7 +389,7 @@ def test_offset_behavior__resuming_behavior(self):
401
389
consumer1 .stop ()
402
390
consumer2 .stop ()
403
391
404
- @kafka_versions (" 0.8.1" , "0.8.1.1" , "0.8.2.1" )
392
+ @kafka_versions ('>= 0.8.1' )
405
393
def test_multi_process_offset_behavior__resuming_behavior (self ):
406
394
self .send_messages (0 , range (0 , 100 ))
407
395
self .send_messages (1 , range (100 , 200 ))
@@ -437,7 +425,6 @@ def test_multi_process_offset_behavior__resuming_behavior(self):
437
425
consumer2 .stop ()
438
426
439
427
# TODO: Make this a unit test -- should not require integration
440
- @kafka_versions ("all" )
441
428
def test_fetch_buffer_size (self ):
442
429
443
430
# Test parameters (see issue 135 / PR 136)
@@ -455,7 +442,6 @@ def test_fetch_buffer_size(self):
455
442
messages = [ message for message in consumer ]
456
443
self .assertEqual (len (messages ), 2 )
457
444
458
- @kafka_versions ("all" )
459
445
def test_kafka_consumer (self ):
460
446
self .send_messages (0 , range (0 , 100 ))
461
447
self .send_messages (1 , range (100 , 200 ))
@@ -476,7 +462,6 @@ def test_kafka_consumer(self):
476
462
self .assertEqual (len (messages [0 ]), 100 )
477
463
self .assertEqual (len (messages [1 ]), 100 )
478
464
479
- @kafka_versions ("all" )
480
465
def test_kafka_consumer__blocking (self ):
481
466
TIMEOUT_MS = 500
482
467
consumer = self .kafka_consumer (auto_offset_reset = 'smallest' ,
@@ -509,7 +494,7 @@ def test_kafka_consumer__blocking(self):
509
494
self .assertEqual (len (messages ), 5 )
510
495
self .assertGreaterEqual (t .interval , TIMEOUT_MS / 1000.0 )
511
496
512
- @kafka_versions (" 0.8.1" , "0.8.1.1" , "0.8.2.1" )
497
+ @kafka_versions ('>= 0.8.1' )
513
498
def test_kafka_consumer__offset_commit_resume (self ):
514
499
GROUP_ID = random_string (10 ).encode ('utf-8' )
515
500
0 commit comments