|
| 1 | +#pylint: skip-file |
| 2 | +import struct |
| 3 | + |
| 4 | +import pytest |
| 5 | +import six |
| 6 | + |
| 7 | +from kafka.protocol.api import RequestHeader |
| 8 | +from kafka.protocol.commit import GroupCoordinatorRequest |
| 9 | +from kafka.protocol.message import Message, MessageSet |
| 10 | + |
| 11 | + |
| 12 | +def test_create_message(): |
| 13 | + payload = b'test' |
| 14 | + key = b'key' |
| 15 | + msg = Message(payload, key=key) |
| 16 | + assert msg.magic == 0 |
| 17 | + assert msg.attributes == 0 |
| 18 | + assert msg.key == key |
| 19 | + assert msg.value == payload |
| 20 | + |
| 21 | + |
| 22 | +def test_encode_message_v0(): |
| 23 | + message = Message(b'test', key=b'key') |
| 24 | + encoded = message.encode() |
| 25 | + expect = b''.join([ |
| 26 | + struct.pack('>i', -1427009701), # CRC |
| 27 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 28 | + struct.pack('>i', 3), # Length of key |
| 29 | + b'key', # key |
| 30 | + struct.pack('>i', 4), # Length of value |
| 31 | + b'test', # value |
| 32 | + ]) |
| 33 | + assert encoded == expect |
| 34 | + |
| 35 | + |
| 36 | +def test_encode_message_v1(): |
| 37 | + message = Message(b'test', key=b'key', magic=1, timestamp=1234) |
| 38 | + encoded = message.encode() |
| 39 | + expect = b''.join([ |
| 40 | + struct.pack('>i', 1331087195), # CRC |
| 41 | + struct.pack('>bb', 1, 0), # Magic, flags |
| 42 | + struct.pack('>q', 1234), # Timestamp |
| 43 | + struct.pack('>i', 3), # Length of key |
| 44 | + b'key', # key |
| 45 | + struct.pack('>i', 4), # Length of value |
| 46 | + b'test', # value |
| 47 | + ]) |
| 48 | + assert encoded == expect |
| 49 | + |
| 50 | + |
| 51 | +def test_decode_message(): |
| 52 | + encoded = b''.join([ |
| 53 | + struct.pack('>i', -1427009701), # CRC |
| 54 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 55 | + struct.pack('>i', 3), # Length of key |
| 56 | + b'key', # key |
| 57 | + struct.pack('>i', 4), # Length of value |
| 58 | + b'test', # value |
| 59 | + ]) |
| 60 | + decoded_message = Message.decode(encoded) |
| 61 | + msg = Message(b'test', key=b'key') |
| 62 | + msg.encode() # crc is recalculated during encoding |
| 63 | + assert decoded_message == msg |
| 64 | + |
| 65 | + |
| 66 | +def test_encode_message_set(): |
| 67 | + messages = [ |
| 68 | + Message(b'v1', key=b'k1'), |
| 69 | + Message(b'v2', key=b'k2') |
| 70 | + ] |
| 71 | + encoded = MessageSet.encode([(0, msg.encode()) |
| 72 | + for msg in messages], |
| 73 | + size=False) |
| 74 | + expect = b''.join([ |
| 75 | + struct.pack('>q', 0), # MsgSet Offset |
| 76 | + struct.pack('>i', 18), # Msg Size |
| 77 | + struct.pack('>i', 1474775406), # CRC |
| 78 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 79 | + struct.pack('>i', 2), # Length of key |
| 80 | + b'k1', # Key |
| 81 | + struct.pack('>i', 2), # Length of value |
| 82 | + b'v1', # Value |
| 83 | + |
| 84 | + struct.pack('>q', 0), # MsgSet Offset |
| 85 | + struct.pack('>i', 18), # Msg Size |
| 86 | + struct.pack('>i', -16383415), # CRC |
| 87 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 88 | + struct.pack('>i', 2), # Length of key |
| 89 | + b'k2', # Key |
| 90 | + struct.pack('>i', 2), # Length of value |
| 91 | + b'v2', # Value |
| 92 | + ]) |
| 93 | + assert encoded == expect |
| 94 | + |
| 95 | + |
| 96 | +def test_decode_message_set(): |
| 97 | + encoded = b''.join([ |
| 98 | + struct.pack('>q', 0), # MsgSet Offset |
| 99 | + struct.pack('>i', 18), # Msg Size |
| 100 | + struct.pack('>i', 1474775406), # CRC |
| 101 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 102 | + struct.pack('>i', 2), # Length of key |
| 103 | + b'k1', # Key |
| 104 | + struct.pack('>i', 2), # Length of value |
| 105 | + b'v1', # Value |
| 106 | + |
| 107 | + struct.pack('>q', 1), # MsgSet Offset |
| 108 | + struct.pack('>i', 18), # Msg Size |
| 109 | + struct.pack('>i', -16383415), # CRC |
| 110 | + struct.pack('>bb', 0, 0), # Magic, flags |
| 111 | + struct.pack('>i', 2), # Length of key |
| 112 | + b'k2', # Key |
| 113 | + struct.pack('>i', 2), # Length of value |
| 114 | + b'v2', # Value |
| 115 | + ]) |
| 116 | + |
| 117 | + msgs = MessageSet.decode(encoded, bytes_to_read=len(encoded)) |
| 118 | + assert len(msgs) == 2 |
| 119 | + msg1, msg2 = msgs |
| 120 | + |
| 121 | + returned_offset1, message1_size, decoded_message1 = msg1 |
| 122 | + returned_offset2, message2_size, decoded_message2 = msg2 |
| 123 | + |
| 124 | + assert returned_offset1 == 0 |
| 125 | + message1 = Message(b'v1', key=b'k1') |
| 126 | + message1.encode() |
| 127 | + assert decoded_message1 == message1 |
| 128 | + |
| 129 | + assert returned_offset2 == 1 |
| 130 | + message2 = Message(b'v2', key=b'k2') |
| 131 | + message2.encode() |
| 132 | + assert decoded_message2 == message2 |
| 133 | + |
| 134 | + |
| 135 | +def test_encode_message_header(): |
| 136 | + expect = b''.join([ |
| 137 | + struct.pack('>h', 10), # API Key |
| 138 | + struct.pack('>h', 0), # API Version |
| 139 | + struct.pack('>i', 4), # Correlation Id |
| 140 | + struct.pack('>h', len('client3')), # Length of clientId |
| 141 | + b'client3', # ClientId |
| 142 | + ]) |
| 143 | + |
| 144 | + req = GroupCoordinatorRequest[0]('foo') |
| 145 | + header = RequestHeader(req, correlation_id=4, client_id='client3') |
| 146 | + assert header.encode() == expect |
0 commit comments