-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathnative_kafka_spec.rb
130 lines (95 loc) · 3.18 KB
/
native_kafka_spec.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
# frozen_string_literal: true
describe Rdkafka::NativeKafka do
let(:config) { rdkafka_producer_config }
let(:native) { config.send(:native_kafka, config.send(:native_config), :rd_kafka_producer) }
let(:closing) { false }
let(:thread) { double(Thread) }
let(:opaque) { Rdkafka::Opaque.new }
subject(:client) { described_class.new(native, run_polling_thread: true, opaque: opaque) }
before do
allow(Rdkafka::Bindings).to receive(:rd_kafka_name).and_return('producer-1')
allow(Thread).to receive(:new).and_return(thread)
allow(thread).to receive(:name=).with("rdkafka.native_kafka#producer-1")
allow(thread).to receive(:[]=).with(:closing, anything)
allow(thread).to receive(:join)
allow(thread).to receive(:abort_on_exception=).with(anything)
end
after { client.close }
context "defaults" do
it "sets the thread name" do
expect(thread).to receive(:name=).with("rdkafka.native_kafka#producer-1")
client
end
it "sets the thread to abort on exception" do
expect(thread).to receive(:abort_on_exception=).with(true)
client
end
it "sets the thread `closing` flag to false" do
expect(thread).to receive(:[]=).with(:closing, false)
client
end
end
context "the polling thread" do
it "is created" do
expect(Thread).to receive(:new)
client
end
end
it "exposes the inner client" do
client.with_inner do |inner|
expect(inner).to eq(native)
end
end
context "when client was not yet closed (`nil`)" do
it "is not closed" do
expect(client.closed?).to eq(false)
end
context "and attempt to close" do
it "calls the `destroy` binding" do
expect(Rdkafka::Bindings).to receive(:rd_kafka_destroy).with(native).and_call_original
client.close
end
it "indicates to the polling thread that it is closing" do
expect(thread).to receive(:[]=).with(:closing, true)
client.close
end
it "joins the polling thread" do
expect(thread).to receive(:join)
client.close
end
it "closes and unassign the native client" do
client.close
expect(client.closed?).to eq(true)
end
end
end
context "when client was already closed" do
before { client.close }
it "is closed" do
expect(client.closed?).to eq(true)
end
context "and attempt to close again" do
it "does not call the `destroy` binding" do
expect(Rdkafka::Bindings).not_to receive(:rd_kafka_destroy_flags)
client.close
end
it "does not indicate to the polling thread that it is closing" do
expect(thread).not_to receive(:[]=).with(:closing, true)
client.close
end
it "does not join the polling thread" do
expect(thread).not_to receive(:join)
client.close
end
it "does not close and unassign the native client again" do
client.close
expect(client.closed?).to eq(true)
end
end
end
it "provides a finalizer that closes the native kafka client" do
expect(client.closed?).to eq(false)
client.finalizer.call("some-ignored-object-id")
expect(client.closed?).to eq(true)
end
end