-
-
Notifications
You must be signed in to change notification settings - Fork 122
/
Copy pathmessage.rb
86 lines (72 loc) · 2.5 KB
/
message.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# frozen_string_literal: true
module Rdkafka
class Consumer
# A message that was consumed from a topic.
class Message
# The topic this message was consumed from
# @return [String]
attr_reader :topic
# The partition this message was consumed from
# @return [Integer]
attr_reader :partition
# This message's payload
# @return [String, nil]
attr_reader :payload
# This message's key
# @return [String, nil]
attr_reader :key
# This message's offset in its partition
# @return [Integer]
attr_reader :offset
# This message's timestamp, if provided by the broker
# @return [Time, nil]
attr_reader :timestamp
# @return [Hash<String, String>] a message headers
attr_reader :headers
# @private
def initialize(native_message)
# Set topic
unless native_message[:rkt].null?
@topic = Rdkafka::Bindings.rd_kafka_topic_name(native_message[:rkt])
end
# Set partition
@partition = native_message[:partition]
# Set payload
unless native_message[:payload].null?
@payload = native_message[:payload].read_string(native_message[:len])
end
# Set key
unless native_message[:key].null?
@key = native_message[:key].read_string(native_message[:key_len])
end
# Set offset
@offset = native_message[:offset]
# Set timestamp
raw_timestamp = Rdkafka::Bindings.rd_kafka_message_timestamp(native_message, nil)
@timestamp = if raw_timestamp && raw_timestamp > -1
# Calculate seconds and microseconds
seconds = raw_timestamp / 1000
milliseconds = (raw_timestamp - seconds * 1000) * 1000
Time.at(seconds, milliseconds)
else
nil
end
@headers = Headers.from_native(native_message)
end
# Human readable representation of this message.
# @return [String]
def to_s
is_headers = @headers.empty? ? "" : ", headers #{headers.size}"
"<Message in '#{topic}' with key '#{truncate(key)}', payload '#{truncate(payload)}', partition #{partition}, offset #{offset}, timestamp #{timestamp}#{is_headers}>"
end
def truncate(string)
if string && string.length > 40
"#{string[0..39]}..."
else
string
end
end
private
end
end
end