diff --git a/.gitignore b/.gitignore index eeca9c78..0a017d0b 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,5 @@ ext/librdkafka.* doc coverage vendor +.idea/ +out/ diff --git a/lib/rdkafka.rb b/lib/rdkafka.rb index 0bd1d56c..cc907274 100644 --- a/lib/rdkafka.rb +++ b/lib/rdkafka.rb @@ -7,6 +7,7 @@ require "rdkafka/version" require "rdkafka/helpers/time" +require "rdkafka/helpers/oauth" require "rdkafka/abstract_handle" require "rdkafka/admin" require "rdkafka/admin/create_topic_handle" diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index 22ee3b90..49afe0f3 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -2,6 +2,8 @@ module Rdkafka class Admin + include Helpers::OAuth + # @private def initialize(native_kafka) @native_kafka = native_kafka diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index c21137e1..092f8b0f 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -17,6 +17,7 @@ def self.lib_extension RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174 + RD_KAFKA_RESP_ERR__STATE = -172 RD_KAFKA_RESP_ERR__NOENT = -156 RD_KAFKA_RESP_ERR_NO_ERROR = 0 @@ -111,7 +112,10 @@ class TopicPartitionList < FFI::Struct callback :error_cb, [:pointer, :int, :string, :pointer], :void attach_function :rd_kafka_conf_set_error_cb, [:pointer, :error_cb], :void attach_function :rd_kafka_rebalance_protocol, [:pointer], :string - + callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void + attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int], :int + attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void attach_function :rd_kafka_queue_get_main, [:pointer], :pointer @@ -161,6 +165,32 @@ class TopicPartitionList < FFI::Struct end end + # The OAuth callback is currently global and contextless. + # This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. + # The instance name will be provided in the callback, allowing the callback to reference the correct instance. + # + # An example of how to use the instance name in the callback is given below. + # The `refresh_token` is configured as the `oauthbearer_token_refresh_callback`. + # `instances` is a map of client names to client instances, maintained by the user. + # + # ``` + # def refresh_token(config, client_name) + # client = instances[client_name] + # client.oauthbearer_set_token( + # token: 'new-token-value', + # lifetime_ms: token-lifetime-ms, + # principal_name: 'principal-name' + # ) + # end + # ``` + OAuthbearerTokenRefreshCallback = FFI::Function.new( + :void, [:pointer, :string, :pointer] + ) do |client_ptr, config, _opaque| + if Rdkafka::Config.oauthbearer_token_refresh_callback + Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr)) + end + end + # Handle enum :kafka_type, [ diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb index cbddfc0c..4c866912 100644 --- a/lib/rdkafka/config.rb +++ b/lib/rdkafka/config.rb @@ -15,12 +15,13 @@ class Config @@opaques = ObjectSpace::WeakMap.new # @private @@log_queue = Queue.new - # @private # We memoize thread on the first log flush # This allows us also to restart logger thread on forks @@log_thread = nil # @private @@log_mutex = Mutex.new + # @private + @@oauthbearer_token_refresh_callback = nil # Returns the current logger, by default this is a logger to stdout. # @@ -104,6 +105,24 @@ def self.error_callback @@error_callback end + # Sets the SASL/OAUTHBEARER token refresh callback. + # This callback will be triggered when it is time to refresh the client's OAUTHBEARER token + # + # @param callback [Proc, #call] The callback + # + # @return [nil] + def self.oauthbearer_token_refresh_callback=(callback) + raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil + @@oauthbearer_token_refresh_callback = callback + end + + # Returns the current oauthbearer_token_refresh_callback callback, by default this is nil. + # + # @return [Proc, nil] + def self.oauthbearer_token_refresh_callback + @@oauthbearer_token_refresh_callback + end + # @private def self.opaques @@opaques @@ -300,6 +319,9 @@ def native_config(opaque = nil) # Set error callback Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback) + + # Set oauth callback + Rdkafka::Bindings.rd_kafka_conf_set_oauthbearer_token_refresh_cb(config, Rdkafka::Bindings::OAuthbearerTokenRefreshCallback) end end diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index b3d8ad43..5409e238 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -13,6 +13,7 @@ module Rdkafka class Consumer include Enumerable include Helpers::Time + include Helpers::OAuth # @private def initialize(native_kafka) diff --git a/lib/rdkafka/helpers/oauth.rb b/lib/rdkafka/helpers/oauth.rb new file mode 100644 index 00000000..4e4a46f5 --- /dev/null +++ b/lib/rdkafka/helpers/oauth.rb @@ -0,0 +1,47 @@ +module Rdkafka + module Helpers + + module OAuth + + # Set the OAuthBearer token + # + # @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1. + # @param lifetime_ms [Integer] when the token expires, in terms of the number of milliseconds since the epoch. See https://currentmillis.com/. + # @param principal_name [String] the mandatory Kafka principal name associated with the token. + # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. + # @return [Integer] 0 on success + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + error_buffer = FFI::MemoryPointer.from_string(" " * 256) + @native_kafka.with_inner do |inner| + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( + inner, token, lifetime_ms, principal_name, + flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 + ) + if response != 0 + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure( + inner, + "Failed to set token: #{error_buffer.read_string}" + ) + end + + response + end + end + + private + + # Flatten the extensions hash into a string according to the spec, https://datatracker.ietf.org/doc/html/rfc7628#section-3.1 + def flatten_extensions(extensions) + return nil unless extensions + "\x01#{extensions.map { |e| e.join("=") }.join("\x01")}" + end + + # extension_size is the number of keys + values which should be a non-negative even number + # https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sasl_oauthbearer.c#L327-L347 + def extension_size(extensions) + return 0 unless extensions + extensions.size * 2 + end + end + end +end diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 279506b3..fe421ec2 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -4,6 +4,7 @@ module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer include Helpers::Time + include Helpers::OAuth # Cache partitions count for 30 seconds PARTITIONS_COUNT_TTL = 30 diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index 620f32bd..7721a564 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -404,4 +404,41 @@ end end end + + describe '#oauthbearer_set_token' do + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = admin.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + before do + config_sasl = rdkafka_config( + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + ) + $admin_sasl = config_sasl.admin + end + + after do + $admin_sasl.close + end + + it 'should succeed' do + + response = $admin_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end + end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 87dc0269..1cbd7cba 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -132,4 +132,86 @@ end end end + + describe "oauthbearer set token" do + + context "without args" do + it "should raise argument error" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token + }.to raise_error(ArgumentError) + end + end + + context "with args" do + before do + DEFAULT_TOKEN_EXPIRY_SECONDS = 900 + $token_value = "token" + $md_lifetime_ms = Time.now.to_i*1000 + DEFAULT_TOKEN_EXPIRY_SECONDS * 1000 + $md_principal_name = "kafka-cluster" + $extensions = nil + $extension_size = 0 + $error_buffer = FFI::MemoryPointer.from_string(" " * 256) + end + + it "should set token or capture failure" do + RdKafkaTestConsumer.with do |consumer_ptr| + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(consumer_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism") + end + end + end + end + + describe "oauthbearer set token failure" do + + context "without args" do + + it "should fail" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure + }.to raise_error(ArgumentError) + end + end + + context "with args" do + it "should succeed" do + expect { + errstr = "error" + RdKafkaTestConsumer.with do |consumer_ptr| + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(consumer_ptr, errstr) + end + }.to_not raise_error + end + end + end + + describe "oauthbearer callback" do + + context "without an oauthbearer callback" do + it "should do nothing" do + expect { + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil) + }.not_to raise_error + end + end + + context "with an oauthbearer callback" do + before do + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name| + $received_config = config + $received_client_name = client_name + end + end + + it "should call the oauth bearer callback and receive config and client name" do + RdKafkaTestConsumer.with do |consumer_ptr| + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(consumer_ptr, "{}", nil) + expect($received_config).to eq("{}") + expect($received_client_name).to match(/consumer/) + end + end + end + end end diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index 3c2a9259..a188d858 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -115,6 +115,39 @@ def call(stats); end end end + context "oauthbearer calllback" do + context "with a proc/lambda" do + it "should set the callback" do + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name| + puts config + puts client_name + end + }.not_to raise_error + expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call + end + end + + context "with a callable object" do + it "should set the callback" do + callback = Class.new do + def call(config, client_name); end + end + + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = callback.new + }.not_to raise_error + expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call + end + end + + it "should not accept a callback that's not callable" do + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = 'not a callback' + }.to raise_error(TypeError) + end + end + context "configuration" do it "should store configuration" do config = Rdkafka::Config.new diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index 3d3c89cf..6b4f9eeb 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -1301,4 +1301,40 @@ def collect(name, list) ]) end end + + describe '#oauthbearer_set_token' do + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = consumer.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + before do + $consumer_sasl = rdkafka_producer_config( + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + ).consumer + end + + after do + $consumer_sasl.close + end + + it 'should succeed' do + + response = $consumer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end + end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index a856463f..f6ea3ef4 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -734,4 +734,34 @@ def call(_, handle) end end end + + describe '#oauthbearer_set_token' do + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = producer.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + it 'should succeed' do + producer_sasl = rdkafka_producer_config( + { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + } + ).producer + response = producer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0f3b6f07..0f2a02f3 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -155,3 +155,18 @@ def notify_listener(listener, &block) end end end + +class RdKafkaTestConsumer + def self.with + consumer = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + yield consumer + ensure + Rdkafka::Bindings.rd_kafka_consumer_close(consumer) + Rdkafka::Bindings.rd_kafka_destroy(consumer) + end +end