Skip to content

Commit aa5bde6

Browse files
committed
Add some simple message protocol tests
1 parent 54eb264 commit aa5bde6

File tree

1 file changed

+146
-0
lines changed

1 file changed

+146
-0
lines changed

test/test_protocol.py

Lines changed: 146 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,146 @@
1+
#pylint: skip-file
2+
import struct
3+
4+
import pytest
5+
import six
6+
7+
from kafka.protocol.api import RequestHeader
8+
from kafka.protocol.commit import GroupCoordinatorRequest
9+
from kafka.protocol.message import Message, MessageSet
10+
11+
12+
def test_create_message():
13+
payload = b'test'
14+
key = b'key'
15+
msg = Message(payload, key=key)
16+
assert msg.magic == 0
17+
assert msg.attributes == 0
18+
assert msg.key == key
19+
assert msg.value == payload
20+
21+
22+
def test_encode_message_v0():
23+
message = Message(b'test', key=b'key')
24+
encoded = message.encode()
25+
expect = b''.join([
26+
struct.pack('>i', -1427009701), # CRC
27+
struct.pack('>bb', 0, 0), # Magic, flags
28+
struct.pack('>i', 3), # Length of key
29+
b'key', # key
30+
struct.pack('>i', 4), # Length of value
31+
b'test', # value
32+
])
33+
assert encoded == expect
34+
35+
36+
def test_encode_message_v1():
37+
message = Message(b'test', key=b'key', magic=1, timestamp=1234)
38+
encoded = message.encode()
39+
expect = b''.join([
40+
struct.pack('>i', 1331087195), # CRC
41+
struct.pack('>bb', 1, 0), # Magic, flags
42+
struct.pack('>q', 1234), # Timestamp
43+
struct.pack('>i', 3), # Length of key
44+
b'key', # key
45+
struct.pack('>i', 4), # Length of value
46+
b'test', # value
47+
])
48+
assert encoded == expect
49+
50+
51+
def test_decode_message():
52+
encoded = b''.join([
53+
struct.pack('>i', -1427009701), # CRC
54+
struct.pack('>bb', 0, 0), # Magic, flags
55+
struct.pack('>i', 3), # Length of key
56+
b'key', # key
57+
struct.pack('>i', 4), # Length of value
58+
b'test', # value
59+
])
60+
decoded_message = Message.decode(encoded)
61+
msg = Message(b'test', key=b'key')
62+
msg.encode() # crc is recalculated during encoding
63+
assert decoded_message == msg
64+
65+
66+
def test_encode_message_set():
67+
messages = [
68+
Message(b'v1', key=b'k1'),
69+
Message(b'v2', key=b'k2')
70+
]
71+
encoded = MessageSet.encode([(0, msg.encode())
72+
for msg in messages],
73+
size=False)
74+
expect = b''.join([
75+
struct.pack('>q', 0), # MsgSet Offset
76+
struct.pack('>i', 18), # Msg Size
77+
struct.pack('>i', 1474775406), # CRC
78+
struct.pack('>bb', 0, 0), # Magic, flags
79+
struct.pack('>i', 2), # Length of key
80+
b'k1', # Key
81+
struct.pack('>i', 2), # Length of value
82+
b'v1', # Value
83+
84+
struct.pack('>q', 0), # MsgSet Offset
85+
struct.pack('>i', 18), # Msg Size
86+
struct.pack('>i', -16383415), # CRC
87+
struct.pack('>bb', 0, 0), # Magic, flags
88+
struct.pack('>i', 2), # Length of key
89+
b'k2', # Key
90+
struct.pack('>i', 2), # Length of value
91+
b'v2', # Value
92+
])
93+
assert encoded == expect
94+
95+
96+
def test_decode_message_set():
97+
encoded = b''.join([
98+
struct.pack('>q', 0), # MsgSet Offset
99+
struct.pack('>i', 18), # Msg Size
100+
struct.pack('>i', 1474775406), # CRC
101+
struct.pack('>bb', 0, 0), # Magic, flags
102+
struct.pack('>i', 2), # Length of key
103+
b'k1', # Key
104+
struct.pack('>i', 2), # Length of value
105+
b'v1', # Value
106+
107+
struct.pack('>q', 1), # MsgSet Offset
108+
struct.pack('>i', 18), # Msg Size
109+
struct.pack('>i', -16383415), # CRC
110+
struct.pack('>bb', 0, 0), # Magic, flags
111+
struct.pack('>i', 2), # Length of key
112+
b'k2', # Key
113+
struct.pack('>i', 2), # Length of value
114+
b'v2', # Value
115+
])
116+
117+
msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded))
118+
assert len(msgs) == 2
119+
msg1, msg2 = msgs
120+
121+
returned_offset1, message1_size, decoded_message1 = msg1
122+
returned_offset2, message2_size, decoded_message2 = msg2
123+
124+
assert returned_offset1 == 0
125+
message1 = Message(b'v1', key=b'k1')
126+
message1.encode()
127+
assert decoded_message1 == message1
128+
129+
assert returned_offset2 == 1
130+
message2 = Message(b'v2', key=b'k2')
131+
message2.encode()
132+
assert decoded_message2 == message2
133+
134+
135+
def test_encode_message_header():
136+
expect = b''.join([
137+
struct.pack('>h', 10), # API Key
138+
struct.pack('>h', 0), # API Version
139+
struct.pack('>i', 4), # Correlation Id
140+
struct.pack('>h', len('client3')), # Length of clientId
141+
b'client3', # ClientId
142+
])
143+
144+
req = GroupCoordinatorRequest[0]('foo')
145+
header = RequestHeader(req, correlation_id=4, client_id='client3')
146+
assert header.encode() == expect

0 commit comments

Comments
 (0)