-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathspec_helper.rb
170 lines (152 loc) · 3.92 KB
/
spec_helper.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
# frozen_string_literal: true
unless ENV["CI"] == "true"
require "simplecov"
SimpleCov.start do
add_filter "/spec/"
end
end
require "pry"
require "rspec"
require "rdkafka"
require "timeout"
require "securerandom"
def rdkafka_base_config
{
:"bootstrap.servers" => "localhost:9092"
}
end
def rdkafka_config(config_overrides={})
# Generate the base config
config = rdkafka_base_config
# Merge overrides
config.merge!(config_overrides)
# Return it
Rdkafka::Config.new(config)
end
def rdkafka_consumer_config(config_overrides={})
# Generate the base config
config = rdkafka_base_config
# Add consumer specific fields to it
config[:"auto.offset.reset"] = "earliest"
config[:"enable.partition.eof"] = false
config[:"group.id"] = "ruby-test-#{SecureRandom.uuid}"
# Enable debug mode if required
if ENV["DEBUG_CONSUMER"]
config[:debug] = "cgrp,topic,fetch"
end
# Merge overrides
config.merge!(config_overrides)
# Return it
Rdkafka::Config.new(config)
end
def rdkafka_producer_config(config_overrides={})
# Generate the base config
config = rdkafka_base_config
# Enable debug mode if required
if ENV["DEBUG_PRODUCER"]
config[:debug] = "broker,topic,msg"
end
# Merge overrides
config.merge!(config_overrides)
# Return it
Rdkafka::Config.new(config)
end
def new_native_client
config = rdkafka_consumer_config
config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer)
end
def new_native_topic(topic_name="topic_name", native_client: )
Rdkafka::Bindings.rd_kafka_topic_new(
native_client,
topic_name,
nil
)
end
def wait_for_message(topic:, delivery_report:, timeout_in_seconds: 30, consumer: nil)
new_consumer = consumer.nil?
consumer ||= rdkafka_consumer_config.consumer
consumer.subscribe(topic)
timeout = Time.now.to_i + timeout_in_seconds
loop do
if timeout <= Time.now.to_i
raise "Timeout of #{timeout_in_seconds} seconds reached in wait_for_message"
end
message = consumer.poll(100)
if message &&
message.partition == delivery_report.partition &&
message.offset == delivery_report.offset
return message
end
end
ensure
consumer.close if new_consumer
end
def wait_for_assignment(consumer)
10.times do
break if !consumer.assignment.empty?
sleep 1
end
end
def wait_for_unassignment(consumer)
10.times do
break if consumer.assignment.empty?
sleep 1
end
end
def notify_listener(listener, &block)
# 1. subscribe and poll
consumer.subscribe("consume_test_topic")
wait_for_assignment(consumer)
consumer.poll(100)
block.call if block
# 2. unsubscribe
consumer.unsubscribe
wait_for_unassignment(consumer)
consumer.close
end
RSpec.configure do |config|
config.filter_run focus: true
config.run_all_when_everything_filtered = true
config.before(:suite) do
admin = rdkafka_config.admin
{
consume_test_topic: 3,
empty_test_topic: 3,
load_test_topic: 3,
produce_test_topic: 3,
rake_test_topic: 3,
watermarks_test_topic: 3,
partitioner_test_topic: 25,
example_topic: 1
}.each do |topic, partitions|
create_topic_handle = admin.create_topic(topic.to_s, partitions, 1)
begin
create_topic_handle.wait(max_wait_timeout: 1.0)
rescue Rdkafka::RdkafkaError => ex
raise unless ex.message.match?(/topic_already_exists/)
end
end
admin.close
end
config.around(:each) do |example|
# Timeout specs after a minute. If they take longer
# they are probably stuck
Timeout::timeout(60) do
example.run
end
end
end
class RdKafkaTestConsumer
def self.with
consumer = Rdkafka::Bindings.rd_kafka_new(
:rd_kafka_consumer,
nil,
nil,
0
)
yield consumer
ensure
Rdkafka::Bindings.rd_kafka_consumer_close(consumer)
Rdkafka::Bindings.rd_kafka_destroy(consumer)
end
end