-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathconfig_spec.rb
258 lines (225 loc) · 7.8 KB
/
config_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
# frozen_string_literal: true
describe Rdkafka::Config do
context "logger" do
it "should have a default logger" do
expect(Rdkafka::Config.logger).to be_a Logger
end
it "should set the logger" do
logger = Logger.new(STDOUT)
expect(Rdkafka::Config.logger).not_to eq logger
Rdkafka::Config.logger = logger
expect(Rdkafka::Config.logger).to eq logger
end
it "should not accept a nil logger" do
expect {
Rdkafka::Config.logger = nil
}.to raise_error(Rdkafka::Config::NoLoggerError)
end
it "supports logging queue" do
log = StringIO.new
Rdkafka::Config.logger = Logger.new(log)
Rdkafka::Config.ensure_log_thread
Rdkafka::Config.log_queue << [Logger::FATAL, "I love testing"]
20.times do
break if log.string != ""
sleep 0.05
end
expect(log.string).to include "FATAL -- : I love testing"
end
unless RUBY_PLATFORM == 'java'
it "expect to start new logger thread after fork and work" do
reader, writer = IO.pipe
pid = fork do
$stdout.reopen(writer)
Rdkafka::Config.logger = Logger.new($stdout)
reader.close
producer = rdkafka_producer_config(debug: 'all').producer
producer.close
writer.close
sleep(1)
end
writer.close
Process.wait(pid)
output = reader.read
expect(output.split("\n").size).to be >= 20
end
end
end
context "statistics callback" do
context "with a proc/lambda" do
it "should set the callback" do
expect {
Rdkafka::Config.statistics_callback = lambda do |stats|
puts stats
end
}.not_to raise_error
expect(Rdkafka::Config.statistics_callback).to respond_to :call
end
end
context "with a callable object" do
it "should set the callback" do
callback = Class.new do
def call(stats); end
end
expect {
Rdkafka::Config.statistics_callback = callback.new
}.not_to raise_error
expect(Rdkafka::Config.statistics_callback).to respond_to :call
end
end
it "should not accept a callback that's not callable" do
expect {
Rdkafka::Config.statistics_callback = 'a string'
}.to raise_error(TypeError)
end
end
context "error callback" do
context "with a proc/lambda" do
it "should set the callback" do
expect {
Rdkafka::Config.error_callback = lambda do |error|
puts error
end
}.not_to raise_error
expect(Rdkafka::Config.error_callback).to respond_to :call
end
end
context "with a callable object" do
it "should set the callback" do
callback = Class.new do
def call(stats); end
end
expect {
Rdkafka::Config.error_callback = callback.new
}.not_to raise_error
expect(Rdkafka::Config.error_callback).to respond_to :call
end
end
it "should not accept a callback that's not callable" do
expect {
Rdkafka::Config.error_callback = 'a string'
}.to raise_error(TypeError)
end
end
context "oauthbearer calllback" do
context "with a proc/lambda" do
it "should set the callback" do
expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name|
puts config
puts client_name
end
}.not_to raise_error
expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call
end
end
context "with a callable object" do
it "should set the callback" do
callback = Class.new do
def call(config, client_name); end
end
expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = callback.new
}.not_to raise_error
expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call
end
end
it "should not accept a callback that's not callable" do
expect {
Rdkafka::Config.oauthbearer_token_refresh_callback = 'not a callback'
}.to raise_error(TypeError)
end
end
context "configuration" do
it "should store configuration" do
config = Rdkafka::Config.new
config[:"key"] = 'value'
expect(config[:"key"]).to eq 'value'
end
it "should use default configuration" do
config = Rdkafka::Config.new
expect(config[:"api.version.request"]).to eq nil
end
it "should create a consumer with valid config" do
consumer = rdkafka_consumer_config.consumer
expect(consumer).to be_a Rdkafka::Consumer
consumer.close
end
it "should create a consumer with consumer_poll_set set to false" do
config = rdkafka_consumer_config
config.consumer_poll_set = false
consumer = config.consumer
expect(consumer).to be_a Rdkafka::Consumer
consumer.close
end
it "should raise an error when creating a consumer with invalid config" do
config = Rdkafka::Config.new('invalid.key' => 'value')
expect {
config.consumer
}.to raise_error(Rdkafka::Config::ConfigError, "No such configuration property: \"invalid.key\"")
end
it "should raise an error when creating a consumer with a nil key in the config" do
config = Rdkafka::Config.new(nil => 'value')
expect {
config.consumer
}.to raise_error(Rdkafka::Config::ConfigError, "No such configuration property: \"\"")
end
it "should treat a nil value as blank" do
config = Rdkafka::Config.new('security.protocol' => nil)
expect {
config.consumer
config.producer
}.to raise_error(Rdkafka::Config::ConfigError, "Configuration property \"security.protocol\" cannot be set to empty value")
end
it "should create a producer with valid config" do
producer = rdkafka_consumer_config.producer
expect(producer).to be_a Rdkafka::Producer
producer.close
end
it "should raise an error when creating a producer with invalid config" do
config = Rdkafka::Config.new('invalid.key' => 'value')
expect {
config.producer
}.to raise_error(Rdkafka::Config::ConfigError, "No such configuration property: \"invalid.key\"")
end
it "allows string partitioner key" do
expect(Rdkafka::Producer).to receive(:new).with(kind_of(Rdkafka::NativeKafka), "murmur2").and_call_original
config = Rdkafka::Config.new("partitioner" => "murmur2")
config.producer.close
end
it "allows symbol partitioner key" do
expect(Rdkafka::Producer).to receive(:new).with(kind_of(Rdkafka::NativeKafka), "murmur2").and_call_original
config = Rdkafka::Config.new(:partitioner => "murmur2")
config.producer.close
end
it "should allow configuring zstd compression" do
config = Rdkafka::Config.new('compression.codec' => 'zstd')
begin
producer = config.producer
expect(producer).to be_a Rdkafka::Producer
producer.close
rescue Rdkafka::Config::ConfigError => ex
pending "Zstd compression not supported on this machine"
raise ex
end
end
it "should raise an error when client creation fails for a consumer" do
config = Rdkafka::Config.new(
"security.protocol" => "SSL",
"ssl.ca.location" => "/nonsense"
)
expect {
config.consumer
}.to raise_error(Rdkafka::Config::ClientCreationError, /ssl.ca.location failed(.*)/)
end
it "should raise an error when client creation fails for a producer" do
config = Rdkafka::Config.new(
"security.protocol" => "SSL",
"ssl.ca.location" => "/nonsense"
)
expect {
config.producer
}.to raise_error(Rdkafka::Config::ClientCreationError, /ssl.ca.location failed(.*)/)
end
end
end