From fd865fdeb04d7f794e439ab18eea9a0d9fc9b54e Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sun, 4 Feb 2024 22:05:20 -0600 Subject: [PATCH 01/32] configure oauthbearer_token_refresh_callback --- .gitignore | 1 + lib/rdkafka/bindings.rb | 10 ++++++++++ lib/rdkafka/config.rb | 23 +++++++++++++++++++++++ 3 files changed, 34 insertions(+) diff --git a/.gitignore b/.gitignore index eeca9c78..82062c83 100644 --- a/.gitignore +++ b/.gitignore @@ -10,3 +10,4 @@ ext/librdkafka.* doc coverage vendor +.idea/ diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 18edcf56..d24263ae 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -111,6 +111,8 @@ class TopicPartitionList < FFI::Struct callback :error_cb, [:pointer, :int, :string, :pointer], :void attach_function :rd_kafka_conf_set_error_cb, [:pointer, :error_cb], :void attach_function :rd_kafka_rebalance_protocol, [:pointer], :string + callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void + attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void @@ -159,6 +161,14 @@ class TopicPartitionList < FFI::Struct end end + OAuthbearerTokenRefreshCallback = FFI::Function.new( + :void, [:pointer, :string, :pointer] + ) do |_client_ptr, _config, _opaque| + if Rdkafka::Config.oauthbearer_token_refresh_callback + Rdkafka::Config.oauthbearer_token_refresh_callback.call() + end + end + # Handle enum :kafka_type, [ diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb index 3fa60df3..16f4674f 100644 --- a/lib/rdkafka/config.rb +++ b/lib/rdkafka/config.rb @@ -15,6 +15,8 @@ class Config @@opaques = ObjectSpace::WeakMap.new # @private @@log_queue = Queue.new + # @private + @@oauthbearer_token_refresh_callback = nil Thread.start do loop do @@ -87,6 +89,24 @@ def self.error_callback @@error_callback end + # Sets the SASL/OAUTHBEARER token refresh callback. + # This callback will be triggered when it is time to refresh the client's OAUTHBEARER token + # + # @param callback [Proc, #call] The callback + # + # @return [nil] + def self.oauthbearer_token_refresh_callback=(callback) + raise TypeError.new("Callback has to be callable") unless callback.respond_to?(:call) || callback == nil + @@oauthbearer_token_refresh_callback = callback + end + + # Returns the current oauthbearer_token_refresh_callback callback, by default this is nil. + # + # @return [Proc, nil] + def self.oauthbearer_token_refresh_callback + @@oauthbearer_token_refresh_callback + end + # @private def self.opaques @@opaques @@ -283,6 +303,9 @@ def native_config(opaque = nil) # Set error callback Rdkafka::Bindings.rd_kafka_conf_set_error_cb(config, Rdkafka::Bindings::ErrorCallback) + + # Set oauth callback + Rdkafka::Bindings.rd_kafka_conf_set_oauthbearer_token_refresh_cb(config, Rdkafka::Bindings::OAuthbearerTokenRefreshCallback) end end From 33499508e6edd0917b9f87b7c179d7cbf825b2a1 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Mon, 5 Feb 2024 14:05:42 -0600 Subject: [PATCH 02/32] config specs --- spec/rdkafka/config_spec.rb | 31 +++++++++++++++++++++++++++++++ 1 file changed, 31 insertions(+) diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index eeaa6503..61ac87cc 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -95,6 +95,37 @@ def call(stats); end end end + context "oauthbearer calllback" do + context "with a proc/lambda" do + it "should set the callback" do + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth| + puts oauth + end + }.not_to raise_error + expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call + end + end + + context "with a callable object" do + it "should set the callback" do + callback = Class.new do + def call(oauth); end + end + + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = callback.new + }.not_to raise_error + expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call + end + end + + it "should not accept a callback that's not callable" do + + end + + end + context "configuration" do it "should store configuration" do config = Rdkafka::Config.new From 7177f5e42f5666b17aec712ffb16199fa389a30e Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Mon, 5 Feb 2024 14:27:59 -0600 Subject: [PATCH 03/32] config specs --- lib/rdkafka/bindings.rb | 4 ++-- spec/rdkafka/bindings_spec.rb | 24 ++++++++++++++++++++++++ spec/rdkafka/config_spec.rb | 5 +++-- 3 files changed, 29 insertions(+), 4 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index d24263ae..9951ac23 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -163,9 +163,9 @@ class TopicPartitionList < FFI::Struct OAuthbearerTokenRefreshCallback = FFI::Function.new( :void, [:pointer, :string, :pointer] - ) do |_client_ptr, _config, _opaque| + ) do |_client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback - Rdkafka::Config.oauthbearer_token_refresh_callback.call() + Rdkafka::Config.oauthbearer_token_refresh_callback.call(config) end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 87dc0269..6336b672 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -132,4 +132,28 @@ end end end + + describe "oauthbearer callback" do + context "without an oauthbearer callback" do + it "should do nothing" do + expect { + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil) + }.not_to raise_error + end + end + + context "with an oauthbearer callback" do + before do + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth| + $received_oauth = oauth + end + end + + it "should call the oauth bearer callback with an ???" do + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "oauth", nil) + expect($received_oauth) + + end + end + end end diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index 61ac87cc..871fab5a 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -121,9 +121,10 @@ def call(oauth); end end it "should not accept a callback that's not callable" do - + expect { + Rdkafka::Config.oauthbearer_token_refresh_callback = 'not a callback' + }.to raise_error(TypeError) end - end context "configuration" do From c55d383ddbaae34bfd02918bb611c65b71f7f87c Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 7 Feb 2024 13:36:59 -0600 Subject: [PATCH 04/32] add set_token / set_token_failure bindings --- lib/rdkafka/bindings.rb | 3 ++- spec/rdkafka/bindings_spec.rb | 13 +++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 9951ac23..dfc23be6 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -113,7 +113,8 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :string, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :pointer], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void attach_function :rd_kafka_queue_get_main, [:pointer], :pointer diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 6336b672..2adc32e1 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -134,6 +134,19 @@ end describe "oauthbearer callback" do + + it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token when args not given" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token + }.to raise_error + end + + it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token_failure when args not given" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure + }.to raise_error + end + context "without an oauthbearer callback" do it "should do nothing" do expect { From cd8367669355c7a719a00cf79c2baed6426cf99a Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 7 Feb 2024 14:42:18 -0600 Subject: [PATCH 05/32] add set_token / set_token_failure bindings --- spec/rdkafka/bindings_spec.rb | 36 +++++++++++++++++++++++++++++++++-- 1 file changed, 34 insertions(+), 2 deletions(-) diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 2adc32e1..491ff28e 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -138,13 +138,45 @@ it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token when args not given" do expect { Rdkafka::Bindings.rd_kafka_oauthbearer_set_token - }.to raise_error + }.to raise_error(ArgumentError) + end + + it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token when args given" do + expect { + handle = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + token_value = "token" + md_lifetime_ms = 1000 + md_principal_name = "principal" + extensions = nil + extension_size = 0 + errstr = nil + errstr_size = 0 + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(handle, token_value, md_lifetime_ms, md_principal_name, extensions, extension_size, errstr, errstr_size) + }.not_to raise_error end it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token_failure when args not given" do expect { Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure - }.to raise_error + }.to raise_error(ArgumentError) + end + + it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token_failure when args are given" do + expect { + handle = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + errstr = "error" + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(handle, errstr) + }.to_not raise_error end context "without an oauthbearer callback" do From 0bf54f5dc8b3e3acb40dd341e9a38fad1f8128f2 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 7 Feb 2024 16:38:29 -0600 Subject: [PATCH 06/32] pass client pointer to callback --- lib/rdkafka/bindings.rb | 4 ++-- spec/rdkafka/bindings_spec.rb | 26 ++++++++++++++++---------- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 43504d09..3cd5e80a 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -166,9 +166,9 @@ class TopicPartitionList < FFI::Struct OAuthbearerTokenRefreshCallback = FFI::Function.new( :void, [:pointer, :string, :pointer] - ) do |_client_ptr, config, _opaque| + ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback - Rdkafka::Config.oauthbearer_token_refresh_callback.call(config) + Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_ptr, config) end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 491ff28e..80495bdc 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -143,7 +143,7 @@ it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token when args given" do expect { - handle = Rdkafka::Bindings.rd_kafka_new( + client_ptr = Rdkafka::Bindings.rd_kafka_new( :rd_kafka_consumer, nil, nil, @@ -156,7 +156,7 @@ extension_size = 0 errstr = nil errstr_size = 0 - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(handle, token_value, md_lifetime_ms, md_principal_name, extensions, extension_size, errstr, errstr_size) + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(client_ptr, token_value, md_lifetime_ms, md_principal_name, extensions, extension_size, errstr, errstr_size) }.not_to raise_error end @@ -168,14 +168,14 @@ it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token_failure when args are given" do expect { - handle = Rdkafka::Bindings.rd_kafka_new( + client_ptr = Rdkafka::Bindings.rd_kafka_new( :rd_kafka_consumer, nil, nil, 0 ) errstr = "error" - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(handle, errstr) + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(client_ptr, errstr) }.to_not raise_error end @@ -189,15 +189,21 @@ context "with an oauthbearer callback" do before do - Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth| - $received_oauth = oauth + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |client, config| + $received_client = client + $received_config = config end end - it "should call the oauth bearer callback with an ???" do - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "oauth", nil) - expect($received_oauth) - + it "should call the oauth bearer callback" do + client_ptr = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(client_ptr, "oauth", nil) + expect($received_config).to eq("oauth") end end end From ac2121d36577eb9300abbf081ceeae7d3e745f0c Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 14 Feb 2024 14:28:38 -0600 Subject: [PATCH 07/32] adjust types for set token --- lib/rdkafka/bindings.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 3cd5e80a..2f9f68d4 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -113,7 +113,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :string, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :string, :pointer, :int, :pointer, :int ], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :pointer], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void From d889648ee6b8be849b51a6702224bb0a5f782a47 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 14 Feb 2024 14:29:40 -0600 Subject: [PATCH 08/32] adjust types for set token --- lib/rdkafka/bindings.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 2f9f68d4..931f88e9 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -114,7 +114,7 @@ class TopicPartitionList < FFI::Struct callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :string, :pointer, :int, :pointer, :int ], :int - attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :pointer], :int + attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void attach_function :rd_kafka_queue_get_main, [:pointer], :pointer From c84db709586e9ce487d9e6fb502ebf1d153416ea Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 14 Feb 2024 17:29:15 -0600 Subject: [PATCH 09/32] back to string --- lib/rdkafka/bindings.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 931f88e9..84461e51 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -113,7 +113,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :string, :pointer, :int, :pointer, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :string, :pointer, :int, :string, :int ], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void From b315d0ba388c00770ce5a59e1e3444607d8af7ab Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 14 Feb 2024 17:45:36 -0600 Subject: [PATCH 10/32] back to string --- lib/rdkafka/bindings.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 84461e51..c82e65bf 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -113,7 +113,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :string, :pointer, :int, :string, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :string, :int ], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void From 1cad89a4f82770012e8ccf87e8623f1c1c494777 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Wed, 14 Feb 2024 21:09:40 -0600 Subject: [PATCH 11/32] back to pointer --- lib/rdkafka/bindings.rb | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index c82e65bf..b587aa67 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -17,6 +17,7 @@ def self.lib_extension RD_KAFKA_RESP_ERR__ASSIGN_PARTITIONS = -175 RD_KAFKA_RESP_ERR__REVOKE_PARTITIONS = -174 + RD_KAFKA_RESP_ERR__STATE = -172 RD_KAFKA_RESP_ERR__NOENT = -156 RD_KAFKA_RESP_ERR_NO_ERROR = 0 @@ -113,7 +114,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :string, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :pointer, :int ], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void From b1bd10e2b9d2860c9c4b5a486dea58ace6046f6b Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Thu, 15 Feb 2024 08:33:43 -0600 Subject: [PATCH 12/32] int -> int64 --- lib/rdkafka/bindings.rb | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index b587aa67..66490e5c 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -114,7 +114,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int, :pointer, :pointer, :int, :pointer, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int ], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void From a133f6ae1972dffe52230f7c31fb5c519657c467 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Thu, 15 Feb 2024 10:42:40 -0600 Subject: [PATCH 13/32] cleanup set token tests --- .gitignore | 1 + lib/rdkafka/bindings.rb | 2 +- spec/rdkafka/bindings_spec.rb | 83 +++++++++++++++++++++-------------- 3 files changed, 53 insertions(+), 33 deletions(-) diff --git a/.gitignore b/.gitignore index 82062c83..0a017d0b 100644 --- a/.gitignore +++ b/.gitignore @@ -11,3 +11,4 @@ doc coverage vendor .idea/ +out/ diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 66490e5c..ac377544 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -114,7 +114,7 @@ class TopicPartitionList < FFI::Struct attach_function :rd_kafka_rebalance_protocol, [:pointer], :string callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void - attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int ], :int + attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int # Log queue attach_function :rd_kafka_set_log_queue, [:pointer, :pointer], :void diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 80495bdc..96db9ce1 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -133,51 +133,70 @@ end end - describe "oauthbearer callback" do + describe "oauthbearer set token" do - it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token when args not given" do - expect { - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token - }.to raise_error(ArgumentError) + context "without args" do + it "should raise argument error" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token + }.to raise_error(ArgumentError) + end end - it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token when args given" do - expect { - client_ptr = Rdkafka::Bindings.rd_kafka_new( + context "with args" do + before do + $client_ptr = Rdkafka::Bindings.rd_kafka_new( :rd_kafka_consumer, nil, nil, 0 ) - token_value = "token" - md_lifetime_ms = 1000 - md_principal_name = "principal" - extensions = nil - extension_size = 0 - errstr = nil - errstr_size = 0 - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(client_ptr, token_value, md_lifetime_ms, md_principal_name, extensions, extension_size, errstr, errstr_size) - }.not_to raise_error + DEFAULT_TOKEN_EXPIRY_SECONDS = 900 + $token_value = "token" + $md_lifetime_ms = Time.now.to_i*1000 + DEFAULT_TOKEN_EXPIRY_SECONDS * 1000 + $md_principal_name = "kafka-cluster" + $extensions = nil + $extension_size = 0 + $error_buffer = FFI::MemoryPointer.from_string(" " * 256) + end + + it "should set token or capture failure" do + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token($client_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism") + end end + end + + describe "oauthbearer set token failure" do - it "should fail to call librdkafka.rd_kafka_oauthbearer_set_token_failure when args not given" do - expect { - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure - }.to raise_error(ArgumentError) + context "without args" do + + it "should fail" do + expect { + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure + }.to raise_error(ArgumentError) + end end - it "should successfully call librdkafka.rd_kafka_oauthbearer_set_token_failure when args are given" do - expect { - client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) - errstr = "error" - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(client_ptr, errstr) - }.to_not raise_error + context "with args" do + + it "should succeed" do + expect { + client_ptr = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + errstr = "error" + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(client_ptr, errstr) + }.to_not raise_error + end end + end + + describe "oauthbearer callback" do context "without an oauthbearer callback" do it "should do nothing" do From 387e5c902c5f703fad581308d694b92aa3230d4e Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Thu, 29 Feb 2024 17:24:31 -0600 Subject: [PATCH 14/32] expose oauthbearer_set_token on producer --- lib/rdkafka/producer.rb | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 279506b3..7068a30a 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -295,6 +295,31 @@ def arity(callback) callback.method(:call).arity end + # Set the OAuthBearer token + # + # @param token [String] The token value + # @param lifetime_ms [Integer] The token lifetime in milliseconds since the epoch + # @param principal_name [String] The principal name + # @param extensions [String] The token extensions + # @param extension_size [Integer] The token extensions size + # @return [nil] + # @raise [Rdkafka::RdkafkaError] when setting the token fails + def oauthbearer_set_token(token, lifetime_ms, principal_name, extensions, extension_size) + error_buffer = FFI::MemoryPointer.from_string(" " * 256) + @native_kafka.with_inner do |inner| + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( + inner, token, lifetime_ms, principal_name, + extensions, extension_size, error_buffer, 256 + ) + if response != 0 + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure( + inner, + "Failed to set token: #{error_buffer.read_string}" + ) + end + end + end + private # Ensures, no operations can happen on a closed producer From a15d95d4a82c9eb0001dc9410f1e3838d5cf0a15 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Thu, 29 Feb 2024 22:14:45 -0600 Subject: [PATCH 15/32] oauthbearer_set_token specs --- lib/rdkafka/producer.rb | 4 +++- spec/rdkafka/producer_spec.rb | 26 ++++++++++++++++++++++++++ 2 files changed, 29 insertions(+), 1 deletion(-) diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 7068a30a..972da840 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -304,7 +304,7 @@ def arity(callback) # @param extension_size [Integer] The token extensions size # @return [nil] # @raise [Rdkafka::RdkafkaError] when setting the token fails - def oauthbearer_set_token(token, lifetime_ms, principal_name, extensions, extension_size) + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil, extension_size:0) error_buffer = FFI::MemoryPointer.from_string(" " * 256) @native_kafka.with_inner do |inner| response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( @@ -317,6 +317,8 @@ def oauthbearer_set_token(token, lifetime_ms, principal_name, extensions, extens "Failed to set token: #{error_buffer.read_string}" ) end + + response end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index a856463f..2979a6ae 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -4,6 +4,9 @@ describe Rdkafka::Producer do let(:producer) { rdkafka_producer_config.producer } + let(:producer_sasl) { rdkafka_producer_config({ + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER'}).producer } let(:consumer) { rdkafka_consumer_config.consumer } after do @@ -734,4 +737,27 @@ def call(_, handle) end end end + + describe '#oauthbearer_set_token' + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = producer.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + it 'should succeed' do + response = producer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end end From f98687a0cd42df2c91ec7626b29002834e5b9ab4 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Thu, 29 Feb 2024 22:45:00 -0600 Subject: [PATCH 16/32] flatten extensions --- lib/rdkafka/producer.rb | 26 +++++++++++++++++--------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 972da840..aab73d96 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -297,19 +297,17 @@ def arity(callback) # Set the OAuthBearer token # - # @param token [String] The token value - # @param lifetime_ms [Integer] The token lifetime in milliseconds since the epoch - # @param principal_name [String] The principal name - # @param extensions [String] The token extensions - # @param extension_size [Integer] The token extensions size - # @return [nil] - # @raise [Rdkafka::RdkafkaError] when setting the token fails - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil, extension_size:0) + # @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1. + # @param lifetime_ms [Integer] when the token expires, in terms of the number of milliseconds since the epoch. See https://currentmillis.com/. + # @param principal_name [String] the mandatory Kafka principal name associated with the token. + # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. + # @return [Integer] 0 on success + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil) error_buffer = FFI::MemoryPointer.from_string(" " * 256) @native_kafka.with_inner do |inner| response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( inner, token, lifetime_ms, principal_name, - extensions, extension_size, error_buffer, 256 + flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 ) if response != 0 Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure( @@ -331,5 +329,15 @@ def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil, def closed_producer_check(method) raise Rdkafka::ClosedProducerError.new(method) if closed? end + + def flatten_extensions(extensions) + return nil unless extensions + "\x01#{extensions.map {|e| e.join("=")}.join("\x01")}" + end + + def extension_size(extensions) + return 0 unless extensions + extensions.size*2 + end end end From dcfa9ef2c16bd82a00deabbeb2156f3616ceda10 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 10:27:11 -0600 Subject: [PATCH 17/32] add set token ability to consumer and admin, refactor into helper --- lib/rdkafka/admin.rb | 9 ++++++++ lib/rdkafka/consumer.rb | 9 ++++++++ lib/rdkafka/helpers/oauth.rb | 41 +++++++++++++++++++++++++++++++++++ lib/rdkafka/producer.rb | 40 ++++++---------------------------- spec/rdkafka/admin_spec.rb | 28 ++++++++++++++++++++++++ spec/rdkafka/consumer_spec.rb | 28 ++++++++++++++++++++++++ spec/rdkafka/producer_spec.rb | 4 +++- 7 files changed, 125 insertions(+), 34 deletions(-) create mode 100644 lib/rdkafka/helpers/oauth.rb diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index 22ee3b90..68337036 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -605,6 +605,15 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip describe_acl_handle end + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + Helpers::OAuth.oauthbearer_set_token( + token: token, + lifetime_ms: lifetime_ms, + principal_name: principal_name, + extensions: extensions + ) + end + private def closed_admin_check(method) diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index b3d8ad43..6e3dcedf 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -688,6 +688,15 @@ def consumer_group_metadata_pointer end end + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + Helpers::OAuth.oauthbearer_set_token( + token: token, + lifetime_ms: lifetime_ms, + principal_name: principal_name, + extensions: extensions + ) + end + private def closed_consumer_check(method) diff --git a/lib/rdkafka/helpers/oauth.rb b/lib/rdkafka/helpers/oauth.rb new file mode 100644 index 00000000..d0adf151 --- /dev/null +++ b/lib/rdkafka/helpers/oauth.rb @@ -0,0 +1,41 @@ +module Rdkafka + module Helpers + module OAuth + # Set the OAuthBearer token + # + # @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1. + # @param lifetime_ms [Integer] when the token expires, in terms of the number of milliseconds since the epoch. See https://currentmillis.com/. + # @param principal_name [String] the mandatory Kafka principal name associated with the token. + # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. + # @return [Integer] 0 on success + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + error_buffer = FFI::MemoryPointer.from_string(" " * 256) + @native_kafka.with_inner do |inner| + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( + inner, token, lifetime_ms, principal_name, + flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 + ) + if response != 0 + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure( + inner, + "Failed to set token: #{error_buffer.read_string}" + ) + end + + response + end + end + + # Flatten the extensions hash into a string according to the spec, https://datatracker.ietf.org/doc/html/rfc7628#section-3.1 + def flatten_extensions(extensions) + return nil unless extensions + "\x01#{extensions.map { |e| e.join("=") }.join("\x01")}" + end + + def extension_size(extensions) + return 0 unless extensions + extensions.size * 2 + end + end + end +end diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index aab73d96..4850ab5b 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -295,29 +295,13 @@ def arity(callback) callback.method(:call).arity end - # Set the OAuthBearer token - # - # @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1. - # @param lifetime_ms [Integer] when the token expires, in terms of the number of milliseconds since the epoch. See https://currentmillis.com/. - # @param principal_name [String] the mandatory Kafka principal name associated with the token. - # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. - # @return [Integer] 0 on success - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil) - error_buffer = FFI::MemoryPointer.from_string(" " * 256) - @native_kafka.with_inner do |inner| - response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( - inner, token, lifetime_ms, principal_name, - flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 - ) - if response != 0 - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure( - inner, - "Failed to set token: #{error_buffer.read_string}" - ) - end - - response - end + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + Helpers::OAuth.oauthbearer_set_token( + token: token, + lifetime_ms: lifetime_ms, + principal_name: principal_name, + extensions: extensions + ) end private @@ -329,15 +313,5 @@ def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions:nil) def closed_producer_check(method) raise Rdkafka::ClosedProducerError.new(method) if closed? end - - def flatten_extensions(extensions) - return nil unless extensions - "\x01#{extensions.map {|e| e.join("=")}.join("\x01")}" - end - - def extension_size(extensions) - return 0 unless extensions - extensions.size*2 - end end end diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index 620f32bd..f74f1c94 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -4,7 +4,11 @@ describe Rdkafka::Admin do let(:config) { rdkafka_config } + let(:config_sasl) { rdkafka_config({ + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER'})} let(:admin) { config.admin } + let(:admin_sasl) { config_sasl.admin } after do # Registry should always end up being empty @@ -404,4 +408,28 @@ end end end + + describe '#oauthbearer_set_token' do + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = producer.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + it 'should succeed' do + response = producer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end + end end diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index 3d3c89cf..de69f44e 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -6,9 +6,13 @@ describe Rdkafka::Consumer do let(:consumer) { rdkafka_consumer_config.consumer } let(:producer) { rdkafka_producer_config.producer } + let(:consumer_sasl) { rdkafka_producer_config({ + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER'}).producer } after { consumer.close } after { producer.close } + after { consumer_sasl.close } describe '#name' do it { expect(consumer.name).to include('rdkafka#consumer-') } @@ -1301,4 +1305,28 @@ def collect(name, list) ]) end end + + describe '#oauthbearer_set_token' do + context 'when sasl not configured' do + it 'should return RD_KAFKA_RESP_ERR__STATE' do + response = consumer.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) + end + end + + context 'when sasl configured' do + it 'should succeed' do + response = consumer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end + end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index 2979a6ae..5bb379c0 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -14,6 +14,7 @@ registry = Rdkafka::Producer::DeliveryHandle::REGISTRY expect(registry).to be_empty, registry.inspect producer.close + producer_sasl.close consumer.close end @@ -738,7 +739,7 @@ def call(_, handle) end end - describe '#oauthbearer_set_token' + describe '#oauthbearer_set_token' do context 'when sasl not configured' do it 'should return RD_KAFKA_RESP_ERR__STATE' do response = producer.oauthbearer_set_token( @@ -760,4 +761,5 @@ def call(_, handle) expect(response).to eq(0) end end + end end From 5705fee6d42d74446459e54c776da63693e7615b Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 11:11:00 -0600 Subject: [PATCH 18/32] pass native-kafka as client, make helper a class --- lib/rdkafka/admin.rb | 5 ++++- lib/rdkafka/consumer.rb | 5 ++++- lib/rdkafka/helpers/oauth.rb | 10 +++++++--- lib/rdkafka/producer.rb | 5 ++++- spec/rdkafka/admin_spec.rb | 5 +++-- 5 files changed, 22 insertions(+), 8 deletions(-) diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index 68337036..de7b56f8 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require_relative 'helpers/oauth' + module Rdkafka class Admin # @private @@ -606,7 +608,8 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip end def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.oauthbearer_set_token( + Helpers::OAuth.new.oauthbearer_set_token( + client: @native_kafka, token: token, lifetime_ms: lifetime_ms, principal_name: principal_name, diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 6e3dcedf..2983f6fb 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require_relative 'helpers/oauth' + module Rdkafka # A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka # brokers automatically assign partitions and load balance partitions over consumers that @@ -689,7 +691,8 @@ def consumer_group_metadata_pointer end def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.oauthbearer_set_token( + Helpers::OAuth.new.oauthbearer_set_token( + client: @native_kafka, token: token, lifetime_ms: lifetime_ms, principal_name: principal_name, diff --git a/lib/rdkafka/helpers/oauth.rb b/lib/rdkafka/helpers/oauth.rb index d0adf151..11207c09 100644 --- a/lib/rdkafka/helpers/oauth.rb +++ b/lib/rdkafka/helpers/oauth.rb @@ -1,6 +1,8 @@ module Rdkafka module Helpers - module OAuth + + class OAuth + # Set the OAuthBearer token # # @param token [String] the mandatory token value to set, often (but not necessarily) a JWS compact serialization as per https://tools.ietf.org/html/rfc7515#section-3.1. @@ -8,9 +10,9 @@ module OAuth # @param principal_name [String] the mandatory Kafka principal name associated with the token. # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. # @return [Integer] 0 on success - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) + def oauthbearer_set_token(client:, token:, lifetime_ms:, principal_name:, extensions: nil) error_buffer = FFI::MemoryPointer.from_string(" " * 256) - @native_kafka.with_inner do |inner| + client.with_inner do |inner| response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( inner, token, lifetime_ms, principal_name, flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 @@ -26,6 +28,8 @@ def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil end end + private + # Flatten the extensions hash into a string according to the spec, https://datatracker.ietf.org/doc/html/rfc7628#section-3.1 def flatten_extensions(extensions) return nil unless extensions diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 4850ab5b..ef535fa7 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -1,5 +1,7 @@ # frozen_string_literal: true +require_relative 'helpers/oauth' + module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer @@ -296,7 +298,8 @@ def arity(callback) end def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.oauthbearer_set_token( + Helpers::OAuth.new.oauthbearer_set_token( + client: @native_kafka, token: token, lifetime_ms: lifetime_ms, principal_name: principal_name, diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index f74f1c94..441aa152 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -18,6 +18,7 @@ expect(Rdkafka::Admin::CreateAclHandle::REGISTRY).to be_empty expect(Rdkafka::Admin::DeleteAclHandle::REGISTRY).to be_empty admin.close + admin_sasl.close end let(:topic_name) { "test-topic-#{Random.new.rand(0..1_000_000)}" } @@ -412,7 +413,7 @@ describe '#oauthbearer_set_token' do context 'when sasl not configured' do it 'should return RD_KAFKA_RESP_ERR__STATE' do - response = producer.oauthbearer_set_token( + response = admin.oauthbearer_set_token( token: "foo", lifetime_ms: Time.now.to_i*1000 + 900 * 1000, principal_name: "kafka-cluster" @@ -423,7 +424,7 @@ context 'when sasl configured' do it 'should succeed' do - response = producer_sasl.oauthbearer_set_token( + response = admin_sasl.oauthbearer_set_token( token: "foo", lifetime_ms: Time.now.to_i*1000 + 900 * 1000, principal_name: "kafka-cluster" From b3b9d8b23cf4b0bad59ca89a44fe6c34dc38fc8b Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 12:25:07 -0600 Subject: [PATCH 19/32] expect type of client --- spec/rdkafka/bindings_spec.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 96db9ce1..4fe04800 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -222,6 +222,8 @@ 0 ) Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(client_ptr, "oauth", nil) + expect($received_client).to eq(client_ptr) + expect($received_client).to be_instance_of(FFI::Pointer) expect($received_config).to eq("oauth") end end From 7ecc6e5d34604169724e31e319d2af60fa2775c1 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 13:30:47 -0600 Subject: [PATCH 20/32] pass client-id in oauth callback --- lib/rdkafka/bindings.rb | 8 ++++---- spec/rdkafka/bindings_spec.rb | 19 ++++++------------- spec/rdkafka/config_spec.rb | 3 ++- 3 files changed, 12 insertions(+), 18 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index ac377544..e6a67639 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -112,7 +112,7 @@ class TopicPartitionList < FFI::Struct callback :error_cb, [:pointer, :int, :string, :pointer], :void attach_function :rd_kafka_conf_set_error_cb, [:pointer, :error_cb], :void attach_function :rd_kafka_rebalance_protocol, [:pointer], :string - callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void + callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer, :string], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int @@ -166,10 +166,10 @@ class TopicPartitionList < FFI::Struct end OAuthbearerTokenRefreshCallback = FFI::Function.new( - :void, [:pointer, :string, :pointer] - ) do |client_ptr, config, _opaque| + :void, [:pointer, :string, :pointer, :string] + ) do |_client_ptr, config, _opaque, instance_id| if Rdkafka::Config.oauthbearer_token_refresh_callback - Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_ptr, config) + Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, instance_id) end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 4fe04800..b3e873cf 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -201,30 +201,23 @@ context "without an oauthbearer callback" do it "should do nothing" do expect { - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil) + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil, "client_id") }.not_to raise_error end end context "with an oauthbearer callback" do before do - Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |client, config| - $received_client = client + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, instance_id| $received_config = config + $received_instance_id = instance_id end end it "should call the oauth bearer callback" do - client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(client_ptr, "oauth", nil) - expect($received_client).to eq(client_ptr) - expect($received_client).to be_instance_of(FFI::Pointer) - expect($received_config).to eq("oauth") + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, nil, nil, "consumer_id") + expect($received_config).to eq(nil) + expect($received_instance_id).to eq("consumer_id") end end end diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index 94aef4a2..8206f9c3 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -119,8 +119,9 @@ def call(stats); end context "with a proc/lambda" do it "should set the callback" do expect { - Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth| + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth, instance_id| puts oauth + puts instance_id end }.not_to raise_error expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call From 947eb2cb501ff3d326695935c7a77ca1f84cba1b Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 13:43:09 -0600 Subject: [PATCH 21/32] refactor helper include --- lib/rdkafka.rb | 1 + lib/rdkafka/admin.rb | 2 -- lib/rdkafka/consumer.rb | 2 -- lib/rdkafka/producer.rb | 2 -- 4 files changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/rdkafka.rb b/lib/rdkafka.rb index 0bd1d56c..cc907274 100644 --- a/lib/rdkafka.rb +++ b/lib/rdkafka.rb @@ -7,6 +7,7 @@ require "rdkafka/version" require "rdkafka/helpers/time" +require "rdkafka/helpers/oauth" require "rdkafka/abstract_handle" require "rdkafka/admin" require "rdkafka/admin/create_topic_handle" diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index de7b56f8..f791fa26 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require_relative 'helpers/oauth' - module Rdkafka class Admin # @private diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 2983f6fb..0b6f6423 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require_relative 'helpers/oauth' - module Rdkafka # A consumer of Kafka messages. It uses the high-level consumer approach where the Kafka # brokers automatically assign partitions and load balance partitions over consumers that diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index ef535fa7..02f489ee 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -1,7 +1,5 @@ # frozen_string_literal: true -require_relative 'helpers/oauth' - module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer From 172e34935d2e7e347d66864f1123ca0f400c5686 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Fri, 1 Mar 2024 17:02:46 -0600 Subject: [PATCH 22/32] comment out specs for oauthbearer_set_token when sasl is configured --- spec/rdkafka/admin_spec.rb | 29 ++++++++++++++--------------- spec/rdkafka/consumer_spec.rb | 27 +++++++++++++-------------- spec/rdkafka/producer_spec.rb | 30 ++++++++++++++++-------------- 3 files changed, 43 insertions(+), 43 deletions(-) diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index 441aa152..58525da8 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -4,11 +4,7 @@ describe Rdkafka::Admin do let(:config) { rdkafka_config } - let(:config_sasl) { rdkafka_config({ - "security.protocol": "sasl_ssl", - "sasl.mechanisms": 'OAUTHBEARER'})} let(:admin) { config.admin } - let(:admin_sasl) { config_sasl.admin } after do # Registry should always end up being empty @@ -18,7 +14,6 @@ expect(Rdkafka::Admin::CreateAclHandle::REGISTRY).to be_empty expect(Rdkafka::Admin::DeleteAclHandle::REGISTRY).to be_empty admin.close - admin_sasl.close end let(:topic_name) { "test-topic-#{Random.new.rand(0..1_000_000)}" } @@ -422,15 +417,19 @@ end end - context 'when sasl configured' do - it 'should succeed' do - response = admin_sasl.oauthbearer_set_token( - token: "foo", - lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - principal_name: "kafka-cluster" - ) - expect(response).to eq(0) - end - end + # context 'when sasl configured' do + # it 'should succeed' do + # let(:config_sasl) { rdkafka_config({ + # "security.protocol": "sasl_ssl", + # "sasl.mechanisms": 'OAUTHBEARER'})} + # let(:admin_sasl) { config_sasl.admin } + # response = admin_sasl.oauthbearer_set_token( + # token: "foo", + # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + # principal_name: "kafka-cluster" + # ) + # expect(response).to eq(0) + # end + # end end end diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index de69f44e..dbd35720 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -6,13 +6,9 @@ describe Rdkafka::Consumer do let(:consumer) { rdkafka_consumer_config.consumer } let(:producer) { rdkafka_producer_config.producer } - let(:consumer_sasl) { rdkafka_producer_config({ - "security.protocol": "sasl_ssl", - "sasl.mechanisms": 'OAUTHBEARER'}).producer } after { consumer.close } after { producer.close } - after { consumer_sasl.close } describe '#name' do it { expect(consumer.name).to include('rdkafka#consumer-') } @@ -1318,15 +1314,18 @@ def collect(name, list) end end - context 'when sasl configured' do - it 'should succeed' do - response = consumer_sasl.oauthbearer_set_token( - token: "foo", - lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - principal_name: "kafka-cluster" - ) - expect(response).to eq(0) - end - end + # context 'when sasl configured' do + # it 'should succeed' do + # consumer_sasl = rdkafka_producer_config({ + # "security.protocol": "sasl_ssl", + # "sasl.mechanisms": 'OAUTHBEARER'}).consumer + # response = consumer_sasl.oauthbearer_set_token( + # token: "foo", + # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + # principal_name: "kafka-cluster" + # ) + # expect(response).to eq(0) + # end + # end end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index 5bb379c0..f9982b9c 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -4,9 +4,6 @@ describe Rdkafka::Producer do let(:producer) { rdkafka_producer_config.producer } - let(:producer_sasl) { rdkafka_producer_config({ - "security.protocol": "sasl_ssl", - "sasl.mechanisms": 'OAUTHBEARER'}).producer } let(:consumer) { rdkafka_consumer_config.consumer } after do @@ -14,7 +11,6 @@ registry = Rdkafka::Producer::DeliveryHandle::REGISTRY expect(registry).to be_empty, registry.inspect producer.close - producer_sasl.close consumer.close end @@ -751,15 +747,21 @@ def call(_, handle) end end - context 'when sasl configured' do - it 'should succeed' do - response = producer_sasl.oauthbearer_set_token( - token: "foo", - lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - principal_name: "kafka-cluster" - ) - expect(response).to eq(0) - end - end + # context 'when sasl configured' do + # it 'should succeed' do + # producer_sasl = rdkafka_producer_config( + # { + # "security.protocol": "sasl_ssl", + # "sasl.mechanisms": 'OAUTHBEARER' + # } + # ).producer + # response = producer_sasl.oauthbearer_set_token( + # token: "foo", + # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + # principal_name: "kafka-cluster" + # ) + # expect(response).to eq(0) + # end + # end end end From 0a6babf2b7558d6e83ffdb62f6e2dfa21dbb4314 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sat, 2 Mar 2024 12:15:50 -0600 Subject: [PATCH 23/32] remove client id for now --- lib/rdkafka/bindings.rb | 8 ++++---- spec/rdkafka/admin_spec.rb | 29 +++++++++++++++-------------- spec/rdkafka/consumer_spec.rb | 27 ++++++++++++++------------- spec/rdkafka/producer_spec.rb | 32 ++++++++++++++++---------------- 4 files changed, 49 insertions(+), 47 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index e6a67639..ac377544 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -112,7 +112,7 @@ class TopicPartitionList < FFI::Struct callback :error_cb, [:pointer, :int, :string, :pointer], :void attach_function :rd_kafka_conf_set_error_cb, [:pointer, :error_cb], :void attach_function :rd_kafka_rebalance_protocol, [:pointer], :string - callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer, :string], :void + callback :oauthbearer_token_refresh_cb, [:pointer, :string, :pointer], :void attach_function :rd_kafka_conf_set_oauthbearer_token_refresh_cb, [:pointer, :oauthbearer_token_refresh_cb], :void attach_function :rd_kafka_oauthbearer_set_token, [:pointer, :string, :int64, :pointer, :pointer, :int, :pointer, :int], :int attach_function :rd_kafka_oauthbearer_set_token_failure, [:pointer, :string], :int @@ -166,10 +166,10 @@ class TopicPartitionList < FFI::Struct end OAuthbearerTokenRefreshCallback = FFI::Function.new( - :void, [:pointer, :string, :pointer, :string] - ) do |_client_ptr, config, _opaque, instance_id| + :void, [:pointer, :string, :pointer] + ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback - Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, instance_id) + Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_ptr, config) end end diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index 58525da8..14c846a9 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -417,19 +417,20 @@ end end - # context 'when sasl configured' do - # it 'should succeed' do - # let(:config_sasl) { rdkafka_config({ - # "security.protocol": "sasl_ssl", - # "sasl.mechanisms": 'OAUTHBEARER'})} - # let(:admin_sasl) { config_sasl.admin } - # response = admin_sasl.oauthbearer_set_token( - # token: "foo", - # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - # principal_name: "kafka-cluster" - # ) - # expect(response).to eq(0) - # end - # end + context 'when sasl configured' do + it 'should succeed' do + config_sasl = rdkafka_config( + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + ) + admin_sasl = config_sasl.admin + response = admin_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end end end diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index dbd35720..0b850ceb 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -1314,18 +1314,19 @@ def collect(name, list) end end - # context 'when sasl configured' do - # it 'should succeed' do - # consumer_sasl = rdkafka_producer_config({ - # "security.protocol": "sasl_ssl", - # "sasl.mechanisms": 'OAUTHBEARER'}).consumer - # response = consumer_sasl.oauthbearer_set_token( - # token: "foo", - # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - # principal_name: "kafka-cluster" - # ) - # expect(response).to eq(0) - # end - # end + context 'when sasl configured' do + it 'should succeed' do + consumer_sasl = rdkafka_producer_config( + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + ).consumer + response = consumer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end end end diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index f9982b9c..f6ea3ef4 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -747,21 +747,21 @@ def call(_, handle) end end - # context 'when sasl configured' do - # it 'should succeed' do - # producer_sasl = rdkafka_producer_config( - # { - # "security.protocol": "sasl_ssl", - # "sasl.mechanisms": 'OAUTHBEARER' - # } - # ).producer - # response = producer_sasl.oauthbearer_set_token( - # token: "foo", - # lifetime_ms: Time.now.to_i*1000 + 900 * 1000, - # principal_name: "kafka-cluster" - # ) - # expect(response).to eq(0) - # end - # end + context 'when sasl configured' do + it 'should succeed' do + producer_sasl = rdkafka_producer_config( + { + "security.protocol": "sasl_ssl", + "sasl.mechanisms": 'OAUTHBEARER' + } + ).producer + response = producer_sasl.oauthbearer_set_token( + token: "foo", + lifetime_ms: Time.now.to_i*1000 + 900 * 1000, + principal_name: "kafka-cluster" + ) + expect(response).to eq(0) + end + end end end From c989728031df2ea35a38e4ff406208a32864f552 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sun, 3 Mar 2024 15:17:09 -0600 Subject: [PATCH 24/32] pass client-name to oauth callback --- lib/rdkafka/bindings.rb | 3 ++- spec/rdkafka/bindings_spec.rb | 20 +++++++++++++------- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index ac377544..906c7bae 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -169,7 +169,8 @@ class TopicPartitionList < FFI::Struct :void, [:pointer, :string, :pointer] ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback - Rdkafka::Config.oauthbearer_token_refresh_callback.call(client_ptr, config) + client_name = !client_ptr.null? ? Rdkafka::Bindings.rd_kafka_name(client_ptr) : nil + Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, client_name) end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index b3e873cf..53ea2ac1 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -201,23 +201,29 @@ context "without an oauthbearer callback" do it "should do nothing" do expect { - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil, "client_id") + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, "", nil) }.not_to raise_error end end context "with an oauthbearer callback" do before do - Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, instance_id| + $client_ptr = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name| $received_config = config - $received_instance_id = instance_id + $received_client_name = client_name end end - it "should call the oauth bearer callback" do - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(nil, nil, nil, "consumer_id") - expect($received_config).to eq(nil) - expect($received_instance_id).to eq("consumer_id") + it "should call the oauth bearer callback and receive config and client name" do + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call($client_ptr, "{}", nil) + expect($received_config).to eq("{}") + expect($received_client_name).to match(/consumer/) end end end From e51542cacce7a3ef238844331a00cab991a38985 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sun, 3 Mar 2024 15:32:46 -0600 Subject: [PATCH 25/32] make args match across tests --- spec/rdkafka/config_spec.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index 8206f9c3..a188d858 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -119,9 +119,9 @@ def call(stats); end context "with a proc/lambda" do it "should set the callback" do expect { - Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |oauth, instance_id| - puts oauth - puts instance_id + Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name| + puts config + puts client_name end }.not_to raise_error expect(Rdkafka::Config.oauthbearer_token_refresh_callback).to respond_to :call @@ -131,7 +131,7 @@ def call(stats); end context "with a callable object" do it "should set the callback" do callback = Class.new do - def call(oauth); end + def call(config, client_name); end end expect { From c545a1c0ff0119f871ce7d8aaab54a7177de4d45 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Mon, 4 Mar 2024 14:15:15 -0600 Subject: [PATCH 26/32] refactor oauthbearer_set_token as a mixin --- lib/rdkafka/admin.rb | 12 ++---------- lib/rdkafka/bindings.rb | 22 ++++++++++++++++++++-- lib/rdkafka/consumer.rb | 11 +---------- lib/rdkafka/helpers/oauth.rb | 6 +++--- lib/rdkafka/producer.rb | 11 +---------- 5 files changed, 27 insertions(+), 35 deletions(-) diff --git a/lib/rdkafka/admin.rb b/lib/rdkafka/admin.rb index f791fa26..49afe0f3 100644 --- a/lib/rdkafka/admin.rb +++ b/lib/rdkafka/admin.rb @@ -2,6 +2,8 @@ module Rdkafka class Admin + include Helpers::OAuth + # @private def initialize(native_kafka) @native_kafka = native_kafka @@ -605,16 +607,6 @@ def describe_acl(resource_type:, resource_name:, resource_pattern_type:, princip describe_acl_handle end - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.new.oauthbearer_set_token( - client: @native_kafka, - token: token, - lifetime_ms: lifetime_ms, - principal_name: principal_name, - extensions: extensions - ) - end - private def closed_admin_check(method) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index 906c7bae..bb4ab62a 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -165,12 +165,30 @@ class TopicPartitionList < FFI::Struct end end + # Callbacks are currently global and contextless. + # The same callback object is assigned for statistics, errors, and potentially OAuth regardless of the instance to which it is associated. + # This means that the callback will be called for all instances, and the callback must be able to determine which instance it is associated with. + # The instance name will be provided in the callback, allowing the callback to reference the correct instance. + # + # An example of how to use the instance name in the callback is given below. + # The `refresh_token` is configured as the `oauthbearer_token_refresh_callback`. + # `instances` is a map of client names to client instances, maintained by the user. + # + # ``` + # def refresh_token(config, client_name) + # client = instances[client_name] + # client.oauthbearer_set_token( + # token: 'new-token-value', + # lifetime_ms: token-lifetime-ms, + # principal_name: 'principal-name' + # ) + # end + # ``` OAuthbearerTokenRefreshCallback = FFI::Function.new( :void, [:pointer, :string, :pointer] ) do |client_ptr, config, _opaque| if Rdkafka::Config.oauthbearer_token_refresh_callback - client_name = !client_ptr.null? ? Rdkafka::Bindings.rd_kafka_name(client_ptr) : nil - Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, client_name) + Rdkafka::Config.oauthbearer_token_refresh_callback.call(config, Rdkafka::Bindings.rd_kafka_name(client_ptr)) end end diff --git a/lib/rdkafka/consumer.rb b/lib/rdkafka/consumer.rb index 0b6f6423..5409e238 100644 --- a/lib/rdkafka/consumer.rb +++ b/lib/rdkafka/consumer.rb @@ -13,6 +13,7 @@ module Rdkafka class Consumer include Enumerable include Helpers::Time + include Helpers::OAuth # @private def initialize(native_kafka) @@ -688,16 +689,6 @@ def consumer_group_metadata_pointer end end - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.new.oauthbearer_set_token( - client: @native_kafka, - token: token, - lifetime_ms: lifetime_ms, - principal_name: principal_name, - extensions: extensions - ) - end - private def closed_consumer_check(method) diff --git a/lib/rdkafka/helpers/oauth.rb b/lib/rdkafka/helpers/oauth.rb index 11207c09..1e8868e0 100644 --- a/lib/rdkafka/helpers/oauth.rb +++ b/lib/rdkafka/helpers/oauth.rb @@ -1,7 +1,7 @@ module Rdkafka module Helpers - class OAuth + module OAuth # Set the OAuthBearer token # @@ -10,9 +10,9 @@ class OAuth # @param principal_name [String] the mandatory Kafka principal name associated with the token. # @param extensions [Hash] optional SASL extensions key-value pairs to be communicated to the broker as additional key-value pairs during the initial client response as per https://tools.ietf.org/html/rfc7628#section-3.1. # @return [Integer] 0 on success - def oauthbearer_set_token(client:, token:, lifetime_ms:, principal_name:, extensions: nil) + def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) error_buffer = FFI::MemoryPointer.from_string(" " * 256) - client.with_inner do |inner| + @native_kafka.with_inner do |inner| response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token( inner, token, lifetime_ms, principal_name, flatten_extensions(extensions), extension_size(extensions), error_buffer, 256 diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 02f489ee..fe421ec2 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -4,6 +4,7 @@ module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer include Helpers::Time + include Helpers::OAuth # Cache partitions count for 30 seconds PARTITIONS_COUNT_TTL = 30 @@ -295,16 +296,6 @@ def arity(callback) callback.method(:call).arity end - def oauthbearer_set_token(token:, lifetime_ms:, principal_name:, extensions: nil) - Helpers::OAuth.new.oauthbearer_set_token( - client: @native_kafka, - token: token, - lifetime_ms: lifetime_ms, - principal_name: principal_name, - extensions: extensions - ) - end - private # Ensures, no operations can happen on a closed producer From 2ef64c329ac32d023b05fd37475e816afe4d5ef2 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sat, 9 Mar 2024 15:09:37 -0600 Subject: [PATCH 27/32] close and free --- spec/rdkafka/admin_spec.rb | 14 +++++++++++--- spec/rdkafka/bindings_spec.rb | 28 +++++++++++++++++++++------- spec/rdkafka/consumer_spec.rb | 14 +++++++++++--- 3 files changed, 43 insertions(+), 13 deletions(-) diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index 14c846a9..7721a564 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -418,13 +418,21 @@ end context 'when sasl configured' do - it 'should succeed' do + before do config_sasl = rdkafka_config( "security.protocol": "sasl_ssl", "sasl.mechanisms": 'OAUTHBEARER' ) - admin_sasl = config_sasl.admin - response = admin_sasl.oauthbearer_set_token( + $admin_sasl = config_sasl.admin + end + + after do + $admin_sasl.close + end + + it 'should succeed' do + + response = $admin_sasl.oauthbearer_set_token( token: "foo", lifetime_ms: Time.now.to_i*1000 + 900 * 1000, principal_name: "kafka-cluster" diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 53ea2ac1..6592140f 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -160,6 +160,10 @@ $error_buffer = FFI::MemoryPointer.from_string(" " * 256) end + after do + $client_ptr.free + end + it "should set token or capture failure" do response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token($client_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) @@ -180,17 +184,23 @@ end context "with args" do + before do + $client_ptr = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + end + + after do + $client_ptr.free + end it "should succeed" do expect { - client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) errstr = "error" - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(client_ptr, errstr) + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure($client_ptr, errstr) }.to_not raise_error end end @@ -220,6 +230,10 @@ end end + after do + $client_ptr.free + end + it "should call the oauth bearer callback and receive config and client name" do Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call($client_ptr, "{}", nil) expect($received_config).to eq("{}") diff --git a/spec/rdkafka/consumer_spec.rb b/spec/rdkafka/consumer_spec.rb index 0b850ceb..6b4f9eeb 100644 --- a/spec/rdkafka/consumer_spec.rb +++ b/spec/rdkafka/consumer_spec.rb @@ -1315,12 +1315,20 @@ def collect(name, list) end context 'when sasl configured' do - it 'should succeed' do - consumer_sasl = rdkafka_producer_config( + before do + $consumer_sasl = rdkafka_producer_config( "security.protocol": "sasl_ssl", "sasl.mechanisms": 'OAUTHBEARER' ).consumer - response = consumer_sasl.oauthbearer_set_token( + end + + after do + $consumer_sasl.close + end + + it 'should succeed' do + + response = $consumer_sasl.oauthbearer_set_token( token: "foo", lifetime_ms: Time.now.to_i*1000 + 900 * 1000, principal_name: "kafka-cluster" From 90c387259e7a145a12911cd8756aec06c56b2a08 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sun, 10 Mar 2024 11:56:42 -0500 Subject: [PATCH 28/32] close instead of free --- spec/rdkafka/bindings_spec.rb | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 6592140f..cb431360 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -161,7 +161,7 @@ end after do - $client_ptr.free + Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) end it "should set token or capture failure" do @@ -194,7 +194,7 @@ end after do - $client_ptr.free + Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) end it "should succeed" do @@ -231,7 +231,7 @@ end after do - $client_ptr.free + Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) end it "should call the oauth bearer callback and receive config and client name" do From 0ef23503e3e430cec137cf4f77bc657ad2e6f330 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Sun, 10 Mar 2024 12:23:08 -0500 Subject: [PATCH 29/32] add TestConsumer that closes and destroys when done --- spec/rdkafka/bindings_spec.rb | 49 ++++++++--------------------------- spec/spec_helper.rb | 15 +++++++++++ 2 files changed, 26 insertions(+), 38 deletions(-) diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index cb431360..7757bf80 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -145,12 +145,6 @@ context "with args" do before do - $client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) DEFAULT_TOKEN_EXPIRY_SECONDS = 900 $token_value = "token" $md_lifetime_ms = Time.now.to_i*1000 + DEFAULT_TOKEN_EXPIRY_SECONDS * 1000 @@ -160,14 +154,12 @@ $error_buffer = FFI::MemoryPointer.from_string(" " * 256) end - after do - Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) - end - it "should set token or capture failure" do - response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token($client_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) + TestConsumer.with do |consumer_ptr| + response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(consumer_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism") + end end end end @@ -184,23 +176,12 @@ end context "with args" do - before do - $client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) - end - - after do - Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) - end - it "should succeed" do expect { errstr = "error" - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure($client_ptr, errstr) + TestConsumer.with do |consumer_ptr| + Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(consumer_ptr, errstr) + end }.to_not raise_error end end @@ -218,26 +199,18 @@ context "with an oauthbearer callback" do before do - $client_ptr = Rdkafka::Bindings.rd_kafka_new( - :rd_kafka_consumer, - nil, - nil, - 0 - ) Rdkafka::Config.oauthbearer_token_refresh_callback = lambda do |config, client_name| $received_config = config $received_client_name = client_name end end - after do - Rdkafka::Bindings.rd_kafka_consumer_close($client_ptr) - end - it "should call the oauth bearer callback and receive config and client name" do - Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call($client_ptr, "{}", nil) - expect($received_config).to eq("{}") - expect($received_client_name).to match(/consumer/) + TestConsumer.with do |consumer_ptr| + Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(consumer_ptr, "{}", nil) + expect($received_config).to eq("{}") + expect($received_client_name).to match(/consumer/) + end end end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 1ce43e73..0850186b 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -155,3 +155,18 @@ def notify_listener(listener, &block) end end end + +class TestConsumer + def self.with + consumer = Rdkafka::Bindings.rd_kafka_new( + :rd_kafka_consumer, + nil, + nil, + 0 + ) + yield consumer + ensure + Rdkafka::Bindings.rd_kafka_consumer_close(consumer) + Rdkafka::Bindings.rd_kafka_destroy(consumer) + end +end From 8a093ed872fc875c6fc25f8a047812fa3511ddcd Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Tue, 12 Mar 2024 09:42:32 -0500 Subject: [PATCH 30/32] names are hard --- spec/rdkafka/bindings_spec.rb | 6 +++--- spec/spec_helper.rb | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 7757bf80..1cbd7cba 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -155,7 +155,7 @@ end it "should set token or capture failure" do - TestConsumer.with do |consumer_ptr| + RdKafkaTestConsumer.with do |consumer_ptr| response = Rdkafka::Bindings.rd_kafka_oauthbearer_set_token(consumer_ptr, $token_value, $md_lifetime_ms, $md_principal_name, $extensions, $extension_size, $error_buffer, 256) expect(response).to eq(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__STATE) expect($error_buffer.read_string).to eq("SASL/OAUTHBEARER is not the configured authentication mechanism") @@ -179,7 +179,7 @@ it "should succeed" do expect { errstr = "error" - TestConsumer.with do |consumer_ptr| + RdKafkaTestConsumer.with do |consumer_ptr| Rdkafka::Bindings.rd_kafka_oauthbearer_set_token_failure(consumer_ptr, errstr) end }.to_not raise_error @@ -206,7 +206,7 @@ end it "should call the oauth bearer callback and receive config and client name" do - TestConsumer.with do |consumer_ptr| + RdKafkaTestConsumer.with do |consumer_ptr| Rdkafka::Bindings::OAuthbearerTokenRefreshCallback.call(consumer_ptr, "{}", nil) expect($received_config).to eq("{}") expect($received_client_name).to match(/consumer/) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0850186b..a2b35732 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -156,7 +156,7 @@ def notify_listener(listener, &block) end end -class TestConsumer +class RdKafkaTestConsumer def self.with consumer = Rdkafka::Bindings.rd_kafka_new( :rd_kafka_consumer, From 86cba7079fdf81a738e42aecf481935135d1ddb7 Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Tue, 12 Mar 2024 09:42:44 -0500 Subject: [PATCH 31/32] clarify callback scope --- lib/rdkafka/bindings.rb | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/lib/rdkafka/bindings.rb b/lib/rdkafka/bindings.rb index bb4ab62a..092f8b0f 100644 --- a/lib/rdkafka/bindings.rb +++ b/lib/rdkafka/bindings.rb @@ -165,9 +165,8 @@ class TopicPartitionList < FFI::Struct end end - # Callbacks are currently global and contextless. - # The same callback object is assigned for statistics, errors, and potentially OAuth regardless of the instance to which it is associated. - # This means that the callback will be called for all instances, and the callback must be able to determine which instance it is associated with. + # The OAuth callback is currently global and contextless. + # This means that the callback will be called for all instances, and the callback must be able to determine to which instance it is associated. # The instance name will be provided in the callback, allowing the callback to reference the correct instance. # # An example of how to use the instance name in the callback is given below. From a8df2abbad2bcafd8e37be985f53905a6779bf7a Mon Sep 17 00:00:00 2001 From: bruce szalwinski Date: Tue, 19 Mar 2024 08:55:55 -0500 Subject: [PATCH 32/32] docs on extension_size math --- lib/rdkafka/helpers/oauth.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/lib/rdkafka/helpers/oauth.rb b/lib/rdkafka/helpers/oauth.rb index 1e8868e0..4e4a46f5 100644 --- a/lib/rdkafka/helpers/oauth.rb +++ b/lib/rdkafka/helpers/oauth.rb @@ -36,6 +36,8 @@ def flatten_extensions(extensions) "\x01#{extensions.map { |e| e.join("=") }.join("\x01")}" end + # extension_size is the number of keys + values which should be a non-negative even number + # https://github.com/confluentinc/librdkafka/blob/master/src/rdkafka_sasl_oauthbearer.c#L327-L347 def extension_size(extensions) return 0 unless extensions extensions.size * 2