8
8
9
9
import kafka .errors as Errors
10
10
from kafka .future import Future
11
- from kafka .protocol .commit import (GroupCoordinatorRequest ,
12
- OffsetCommitRequest_v2 as OffsetCommitRequest )
11
+ from kafka .protocol .commit import GroupCoordinatorRequest , OffsetCommitRequest
13
12
from kafka .protocol .group import (HeartbeatRequest , JoinGroupRequest ,
14
13
LeaveGroupRequest , SyncGroupRequest )
15
14
from .heartbeat import Heartbeat
@@ -79,8 +78,8 @@ def __init__(self, client, **configs):
79
78
self .config [key ] = configs [key ]
80
79
81
80
self ._client = client
82
- self .generation = OffsetCommitRequest .DEFAULT_GENERATION_ID
83
- self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
81
+ self .generation = OffsetCommitRequest [ 2 ] .DEFAULT_GENERATION_ID
82
+ self .member_id = JoinGroupRequest [ 0 ] .UNKNOWN_MEMBER_ID
84
83
self .group_id = self .config ['group_id' ]
85
84
self .coordinator_id = None
86
85
self .rejoin_needed = True
@@ -269,7 +268,7 @@ def _send_join_group_request(self):
269
268
270
269
# send a join group request to the coordinator
271
270
log .info ("(Re-)joining group %s" , self .group_id )
272
- request = JoinGroupRequest (
271
+ request = JoinGroupRequest [ 0 ] (
273
272
self .group_id ,
274
273
self .config ['session_timeout_ms' ],
275
274
self .member_id ,
@@ -324,7 +323,7 @@ def _handle_join_group_response(self, future, response):
324
323
elif error_type is Errors .UnknownMemberIdError :
325
324
# reset the member id and retry immediately
326
325
error = error_type (self .member_id )
327
- self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
326
+ self .member_id = JoinGroupRequest [ 0 ] .UNKNOWN_MEMBER_ID
328
327
log .debug ("Attempt to join group %s failed due to unknown member id" ,
329
328
self .group_id )
330
329
future .failure (error )
@@ -354,7 +353,7 @@ def _handle_join_group_response(self, future, response):
354
353
355
354
def _on_join_follower (self ):
356
355
# send follower's sync group with an empty assignment
357
- request = SyncGroupRequest (
356
+ request = SyncGroupRequest [ 0 ] (
358
357
self .group_id ,
359
358
self .generation ,
360
359
self .member_id ,
@@ -381,7 +380,7 @@ def _on_join_leader(self, response):
381
380
except Exception as e :
382
381
return Future ().failure (e )
383
382
384
- request = SyncGroupRequest (
383
+ request = SyncGroupRequest [ 0 ] (
385
384
self .group_id ,
386
385
self .generation ,
387
386
self .member_id ,
@@ -425,7 +424,7 @@ def _handle_sync_group_response(self, future, response):
425
424
Errors .IllegalGenerationError ):
426
425
error = error_type ()
427
426
log .debug ("SyncGroup for group %s failed due to %s" , self .group_id , error )
428
- self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
427
+ self .member_id = JoinGroupRequest [ 0 ] .UNKNOWN_MEMBER_ID
429
428
future .failure (error )
430
429
elif error_type in (Errors .GroupCoordinatorNotAvailableError ,
431
430
Errors .NotCoordinatorForGroupError ):
@@ -450,7 +449,7 @@ def _send_group_coordinator_request(self):
450
449
451
450
log .debug ("Sending group coordinator request for group %s to broker %s" ,
452
451
self .group_id , node_id )
453
- request = GroupCoordinatorRequest (self .group_id )
452
+ request = GroupCoordinatorRequest [ 0 ] (self .group_id )
454
453
future = Future ()
455
454
_f = self ._client .send (node_id , request )
456
455
_f .add_callback (self ._handle_group_coordinator_response , future )
@@ -514,14 +513,14 @@ def close(self):
514
513
if not self .coordinator_unknown () and self .generation > 0 :
515
514
# this is a minimal effort attempt to leave the group. we do not
516
515
# attempt any resending if the request fails or times out.
517
- request = LeaveGroupRequest (self .group_id , self .member_id )
516
+ request = LeaveGroupRequest [ 0 ] (self .group_id , self .member_id )
518
517
future = self ._client .send (self .coordinator_id , request )
519
518
future .add_callback (self ._handle_leave_group_response )
520
519
future .add_errback (log .error , "LeaveGroup request failed: %s" )
521
520
self ._client .poll (future = future )
522
521
523
- self .generation = OffsetCommitRequest .DEFAULT_GENERATION_ID
524
- self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
522
+ self .generation = OffsetCommitRequest [ 2 ] .DEFAULT_GENERATION_ID
523
+ self .member_id = JoinGroupRequest [ 0 ] .UNKNOWN_MEMBER_ID
525
524
self .rejoin_needed = True
526
525
527
526
def _handle_leave_group_response (self , response ):
@@ -533,7 +532,7 @@ def _handle_leave_group_response(self, response):
533
532
534
533
def _send_heartbeat_request (self ):
535
534
"""Send a heartbeat request"""
536
- request = HeartbeatRequest (self .group_id , self .generation , self .member_id )
535
+ request = HeartbeatRequest [ 0 ] (self .group_id , self .generation , self .member_id )
537
536
log .debug ("Heartbeat: %s[%s] %s" , request .group , request .generation_id , request .member_id ) #pylint: disable-msg=no-member
538
537
future = Future ()
539
538
_f = self ._client .send (self .coordinator_id , request )
@@ -569,7 +568,7 @@ def _handle_heartbeat_response(self, future, response):
569
568
elif error_type is Errors .UnknownMemberIdError :
570
569
log .warning ("Heartbeat: local member_id was not recognized;"
571
570
" this consumer needs to re-join" )
572
- self .member_id = JoinGroupRequest .UNKNOWN_MEMBER_ID
571
+ self .member_id = JoinGroupRequest [ 0 ] .UNKNOWN_MEMBER_ID
573
572
self .rejoin_needed = True
574
573
future .failure (error_type )
575
574
elif error_type is Errors .GroupAuthorizationFailedError :
0 commit comments