-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathbindings_spec.rb
223 lines (188 loc) · 7.38 KB
/
bindings_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
# frozen_string_literal: true
require 'zlib'
describe Rdkafka::Bindings do
it "should load librdkafka" do
expect(Rdkafka::Bindings.ffi_libraries.map(&:name).first).to include "librdkafka"
end
describe ".lib_extension" do
it "should know the lib extension for darwin" do
stub_const('RbConfig::CONFIG', 'host_os' =>'darwin')
expect(Rdkafka::Bindings.lib_extension).to eq "dylib"
end
it "should know the lib extension for linux" do
stub_const('RbConfig::CONFIG', 'host_os' =>'linux')
expect(Rdkafka::Bindings.lib_extension).to eq "so"
end
end
it "should successfully call librdkafka" do
expect {
Rdkafka::Bindings.rd_kafka_conf_new
}.not_to raise_error
end
describe "log callback" do
let(:log_queue) { Rdkafka::Config.log_queue }
before do
allow(log_queue).to receive(:<<)
end
it "should log fatal messages" do
Rdkafka::Bindings::LogCallback.call(nil, 0, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::FATAL, "rdkafka: log line"])
end
it "should log fatal messages" do
Rdkafka::Bindings::LogCallback.call(nil, 1, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::FATAL, "rdkafka: log line"])
end
it "should log fatal messages" do
Rdkafka::Bindings::LogCallback.call(nil, 2, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::FATAL, "rdkafka: log line"])
end
it "should log error messages" do
Rdkafka::Bindings::LogCallback.call(nil, 3, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::ERROR, "rdkafka: log line"])
end
it "should log warning messages" do
Rdkafka::Bindings::LogCallback.call(nil, 4, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::WARN, "rdkafka: log line"])
end
it "should log info messages" do
Rdkafka::Bindings::LogCallback.call(nil, 5, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::INFO, "rdkafka: log line"])
end
it "should log info messages" do
Rdkafka::Bindings::LogCallback.call(nil, 6, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::INFO, "rdkafka: log line"])
end
it "should log debug messages" do
Rdkafka::Bindings::LogCallback.call(nil, 7, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::DEBUG, "rdkafka: log line"])
end
it "should log unknown messages" do
Rdkafka::Bindings::LogCallback.call(nil, 100, nil, "log line")
expect(log_queue).to have_received(:<<).with([Logger::UNKNOWN, "rdkafka: log line"])
end
end
describe "partitioner" do
let(:partition_key) { ('a'..'z').to_a.shuffle.take(15).join('') }
let(:partition_count) { rand(50) + 1 }
it "should return the same partition for a similar string and the same partition count" do
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
result_2 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
expect(result_1).to eq(result_2)
end
it "should match the old partitioner" do
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count)
result_2 = (Zlib.crc32(partition_key) % partition_count)
expect(result_1).to eq(result_2)
end
it "should return the partition calculated by the specified partitioner" do
result_1 = Rdkafka::Bindings.partitioner(partition_key, partition_count, "murmur2")
ptr = FFI::MemoryPointer.from_string(partition_key)
result_2 = Rdkafka::Bindings.rd_kafka_msg_partitioner_murmur2(nil, ptr, partition_key.size, partition_count, nil, nil)
expect(result_1).to eq(result_2)
end
end
describe "stats callback" do
context "without a stats callback" do
it "should do nothing" do
expect {
Rdkafka::Bindings::StatsCallback.call(nil, "{}", 2, nil)
}.not_to raise_error
end
end
context "with a stats callback" do
before do
Rdkafka::Config.statistics_callback = lambda do |stats|
$received_stats = stats
end
end
it "should call the stats callback with a stats hash" do
Rdkafka::Bindings::StatsCallback.call(nil, "{\"received\":1}", 13, nil)
expect($received_stats).to eq({'received' => 1})
end
end
end
describe "error callback" do
context "without an error callback" do
it "should do nothing" do
expect {
Rdkafka::Bindings::ErrorCallback.call(nil, 1, "error", nil)
}.not_to raise_error
end
end
context "with an error callback" do
before do
Rdkafka::Config.error_callback = lambda do |error|
$received_error = error
end
end
it "should call the error callback with an Rdkafka::Error" do
Rdkafka::Bindings::ErrorCallback.call(nil, 8, "Broker not available", nil)
expect($received_error.code).to eq(:broker_not_available)
expect($received_error.broker_message).to eq("Broker not available")
end
end
end
describe "oauthbearer set token" do
context "with args" do
before do
DEFAULT_TOKEN_EXPIRY_SECONDS = 900
$token_value = "token"
$md_lifetime_ms = Time.now.to_i*1000 + DEFAULT_TOKEN_EXPIRY_SECONDS * 1000
$md_principal_name = "kafka-cluster"
$extensions = nil
$extension_size = 0
$error_buffer = FFI::MemoryPointer.from_string(" " * 256)
end
it "should set token or capture failure" do
RdKafkaTestConsumer.with do |consumer_ptr|
response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(consumer_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256)
expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE)
expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism")
end
end
end
end
describe "oauthbearer set token failure" do
context "without args" do
it "should fail" do
expect {
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure
}.to raise_error(ArgumentError)
end
end
context "with args" do
it "should succeed" do
expect {
errstr = "error"
RdKafkaTestConsumer.with do |consumer_ptr|
Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(consumer_ptr, errstr)
end
}.to_not raise_error
end
end
end
describe "oauthbearer callback" do
context "without an oauthbearer callback" do
it "should do nothing" do
expect {
Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil)
}.not_to raise_error
end
end
context "with an oauthbearer callback" do
before do
Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name|
$received_config = config
$received_client_name = client_name
end
end
it "should call the oauth bearer callback and receive config and client name" do
RdKafkaTestConsumer.with do |consumer_ptr|
Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(consumer_ptr, "{}", nil)
expect($received_config).to eq("{}")
expect($received_client_name).to match(/consumer/)
end
end
end
end
end