-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathmetadata_spec.rb
79 lines (65 loc) · 2.78 KB
/
metadata_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
# frozen_string_literal: true
require "securerandom"
describe Rdkafka::Metadata do
let(:config) { rdkafka_consumer_config }
let(:native_config) { config.send(:native_config) }
let(:native_kafka) { config.send(:native_kafka, native_config, :rd_kafka_consumer) }
after do
Rdkafka::Bindings.rd_kafka_consumer_close(native_kafka)
Rdkafka::Bindings.rd_kafka_destroy(native_kafka)
end
context "passing in a topic name" do
context "that is non-existent topic" do
let(:topic_name) { SecureRandom.uuid.to_s }
it "raises an appropriate exception" do
expect {
described_class.new(native_kafka, topic_name)
}.to raise_exception(Rdkafka::RdkafkaError, "Broker: Unknown topic or partition (unknown_topic_or_part)")
end
end
context "that is one of our test topics" do
subject { described_class.new(native_kafka, topic_name) }
let(:topic_name) { "partitioner_test_topic" }
it "#brokers returns our single broker" do
expect(subject.brokers.length).to eq(1)
expect(subject.brokers[0][:broker_id]).to eq(1)
expect(subject.brokers[0][:broker_name]).to eq("127.0.0.1")
expect(subject.brokers[0][:broker_port]).to eq(9092)
end
it "#topics returns data on our test topic" do
expect(subject.topics.length).to eq(1)
expect(subject.topics[0][:partition_count]).to eq(25)
expect(subject.topics[0][:partitions].length).to eq(25)
expect(subject.topics[0][:topic_name]).to eq(topic_name)
end
end
end
context "not passing in a topic name" do
subject { described_class.new(native_kafka, topic_name) }
let(:topic_name) { nil }
let(:test_topics) {
%w(consume_test_topic empty_test_topic load_test_topic produce_test_topic rake_test_topic watermarks_test_topic partitioner_test_topic)
} # Test topics crated in spec_helper.rb
it "#brokers returns our single broker" do
expect(subject.brokers.length).to eq(1)
expect(subject.brokers[0][:broker_id]).to eq(1)
expect(subject.brokers[0][:broker_name]).to eq("127.0.0.1")
expect(subject.brokers[0][:broker_port]).to eq(9092)
end
it "#topics returns data about all of our test topics" do
result = subject.topics.map { |topic| topic[:topic_name] }
expect(result).to include(*test_topics)
end
end
context "when a non-zero error code is returned" do
let(:topic_name) { SecureRandom.uuid.to_s }
before do
allow(Rdkafka::Bindings).to receive(:rd_kafka_metadata).and_return(-165)
end
it "creating the instance raises an exception" do
expect {
described_class.new(native_kafka, topic_name)
}.to raise_error(Rdkafka::RdkafkaError, /Local: Required feature not supported by broker \(unsupported_feature\)/)
end
end
end