diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS new file mode 100644 index 00000000..bb880693 --- /dev/null +++ b/.github/CODEOWNERS @@ -0,0 +1,3 @@ +/.github @mensfeld +/.github/workflows/ @mensfeld +/.github/actions/ @mensfeld diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b7de7e73..3e0b9222 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -6,9 +6,14 @@ concurrency: on: pull_request: + branches: [ main, master ] push: + branches: [ main, master ] schedule: - - cron: '0 1 * * *' + - cron: '0 1 * * *' + +permissions: + contents: read env: BUNDLE_RETRY: 6 @@ -26,20 +31,27 @@ jobs: - '3.3' - '3.2' - '3.1' + - 'jruby-10.0' include: - ruby: '3.4' coverage: 'true' + - ruby: 'jruby-10.0' + continue-on-error: true + steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + with: + fetch-depth: 0 + - name: Install package dependencies run: "[ -e $APT_DEPS ] || sudo apt-get install -y --no-install-recommends $APT_DEPS" - - name: Start Kafka with docker compose + - name: Start Kafka with Docker Compose run: | docker compose up -d || (sleep 5 && docker compose up -d) - name: Set up Ruby - uses: ruby/setup-ruby@v1 + uses: ruby/setup-ruby@ca041f971d66735f3e5ff1e21cc13e2d51e7e535 # v1.233.0 with: ruby-version: ${{matrix.ruby}} bundler-cache: true @@ -47,15 +59,14 @@ jobs: - name: Run all specs env: GITHUB_COVERAGE: ${{matrix.coverage}} - + continue-on-error: ${{ matrix.continue-on-error || false }} # Use the matrix value if present run: | set -e - bundle install --path vendor/bundle + bundle install --jobs 4 --retry 3 cd ext && bundle exec rake cd .. bundle exec rspec - macos_build: timeout-minutes: 30 runs-on: macos-latest @@ -67,17 +78,22 @@ jobs: - '3.3' - '3.2' - '3.1' + - 'jruby-9.4' + include: + - ruby: 'jruby-10.0' + continue-on-error: true steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 - name: Set up Ruby - uses: ruby/setup-ruby@v1 + uses: ruby/setup-ruby@ca041f971d66735f3e5ff1e21cc13e2d51e7e535 # v1.233.0 with: ruby-version: ${{matrix.ruby}} bundler-cache: false - name: Build rdkafka-ruby + continue-on-error: ${{ matrix.continue-on-error || false }} run: | set -e - bundle install --path vendor/bundle + bundle install --jobs 4 --retry 3 cd ext && bundle exec rake diff --git a/.github/workflows/verify-action-pins.yml b/.github/workflows/verify-action-pins.yml new file mode 100644 index 00000000..69c1a13d --- /dev/null +++ b/.github/workflows/verify-action-pins.yml @@ -0,0 +1,16 @@ +name: Verify Action Pins +on: + pull_request: + paths: + - '.github/workflows/**' +jobs: + verify: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2 + - name: Check SHA pins + run: | + if grep -E -r "uses: .*/.*@(v[0-9]+|main|master)($|[[:space:]]|$)" --include="*.yml" --include="*.yaml" .github/workflows/ | grep -v "#"; then + echo "::error::Actions should use SHA pins, not tags or branch names" + exit 1 + fi diff --git a/.ruby-version b/.ruby-version index 47b322c9..6cb9d3dd 100644 --- a/.ruby-version +++ b/.ruby-version @@ -1 +1 @@ -3.4.1 +3.4.3 diff --git a/CHANGELOG.md b/CHANGELOG.md index 6bf0a9d5..de194bf2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Rdkafka Changelog +## 0.22.0 (Unreleased) +- [Enhancement] Align default configuration with deprecation warnings of librdkafka `2.10.0`. +- [Enhancement] Bump librdkafka to `2.10.0`. +- [Enhancement] Support producing and consuming of headers with mulitple values (KIP-82). +- [Enhancement] Allow native Kafka customization poll time. +- [Enhancement] Roll out experimental jruby support. +- [Enhancement] Add `logger` gem to dependencies since moved out of Ruby. + ## 0.21.0 (2025-02-13) - [Enhancement] Bump librdkafka to `2.8.0` diff --git a/README.md b/README.md index bf7d4345..98bcaf80 100644 --- a/README.md +++ b/README.md @@ -163,6 +163,7 @@ bundle exec rake produce_messages | rdkafka-ruby | librdkafka | patches | |-|-|-| +| 0.22.0 (Unreleased) | 2.10.0 (2025-04-18) | yes | | 0.21.0 (2025-02-13) | 2.8.0 (2025-01-07) | yes | | 0.20.0 (2025-01-07) | 2.6.1 (2024-11-18) | yes | | 0.19.0 (2024-10-01) | 2.5.3 (2024-09-02) | yes | diff --git a/dist/librdkafka-2.8.0.tar.gz b/dist/librdkafka-2.10.0.tar.gz similarity index 51% rename from dist/librdkafka-2.8.0.tar.gz rename to dist/librdkafka-2.10.0.tar.gz index a6ccaf45..05ebea46 100644 Binary files a/dist/librdkafka-2.8.0.tar.gz and b/dist/librdkafka-2.10.0.tar.gz differ diff --git a/docker-compose.yml b/docker-compose.yml index 4fe0461f..90c40594 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,7 +1,7 @@ services: kafka: container_name: kafka - image: confluentinc/cp-kafka:7.8.1 + image: confluentinc/cp-kafka:7.9.0 ports: - 9092:9092 diff --git a/lib/rdkafka/config.rb b/lib/rdkafka/config.rb index 22c2f8fb..fdddb590 100644 --- a/lib/rdkafka/config.rb +++ b/lib/rdkafka/config.rb @@ -129,10 +129,7 @@ def self.opaques end # Default config that can be overwritten. - DEFAULT_CONFIG = { - # Request api version so advanced features work - :"api.version.request" => true - }.freeze + DEFAULT_CONFIG = {}.freeze # Required config that cannot be overwritten. REQUIRED_CONFIG = { @@ -233,11 +230,12 @@ def consumer(native_kafka_auto_start: true) # # @param native_kafka_auto_start [Boolean] should the native kafka operations be started # automatically. Defaults to true. Set to false only when doing complex initialization. + # @param native_kafka_poll_timeout_ms [Integer] ms poll time of the native Kafka # @return [Producer] The created producer # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - def producer(native_kafka_auto_start: true) + def producer(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: 100) # Create opaque opaque = Opaque.new # Create Kafka config @@ -254,7 +252,8 @@ def producer(native_kafka_auto_start: true) kafka, run_polling_thread: true, opaque: opaque, - auto_start: native_kafka_auto_start + auto_start: native_kafka_auto_start, + timeout_ms: native_kafka_poll_timeout_ms ), partitioner_name ).tap do |producer| @@ -266,11 +265,12 @@ def producer(native_kafka_auto_start: true) # # @param native_kafka_auto_start [Boolean] should the native kafka operations be started # automatically. Defaults to true. Set to false only when doing complex initialization. + # @param native_kafka_poll_timeout_ms [Integer] ms poll time of the native Kafka # @return [Admin] The created admin instance # # @raise [ConfigError] When the configuration contains invalid options # @raise [ClientCreationError] When the native client cannot be created - def admin(native_kafka_auto_start: true) + def admin(native_kafka_auto_start: true, native_kafka_poll_timeout_ms: 100) opaque = Opaque.new config = native_config(opaque) Rdkafka::Bindings.rd_kafka_conf_set_background_event_cb(config, Rdkafka::Callbacks::BackgroundEventCallbackFunction) @@ -282,7 +282,8 @@ def admin(native_kafka_auto_start: true) kafka, run_polling_thread: true, opaque: opaque, - auto_start: native_kafka_auto_start + auto_start: native_kafka_auto_start, + timeout_ms: native_kafka_poll_timeout_ms ) ) end diff --git a/lib/rdkafka/consumer/headers.rb b/lib/rdkafka/consumer/headers.rb index e0af326d..c2873e49 100644 --- a/lib/rdkafka/consumer/headers.rb +++ b/lib/rdkafka/consumer/headers.rb @@ -7,11 +7,13 @@ module Headers EMPTY_HEADERS = {}.freeze # Reads a librdkafka native message's headers and returns them as a Ruby Hash + # where each key maps to either a String (single value) or Array (multiple values) + # to support duplicate headers per KIP-82 # # @private # # @param [Rdkafka::Bindings::Message] native_message - # @return [Hash] headers Hash for the native_message + # @return [Hash>] headers Hash for the native_message # @raise [Rdkafka::RdkafkaError] when fail to read headers def self.from_native(native_message) headers_ptrptr = FFI::MemoryPointer.new(:pointer) @@ -53,10 +55,19 @@ def self.from_native(native_message) size = size_ptr[:value] value_ptr = value_ptrptr.read_pointer - value = value_ptr.read_string(size) - headers[name] = value + if headers.key?(name) + # If we've seen this header before, convert to array if needed and append + if headers[name].is_a?(Array) + headers[name] << value + else + headers[name] = [headers[name], value] + end + else + # First occurrence - store as single value + headers[name] = value + end idx += 1 end diff --git a/lib/rdkafka/native_kafka.rb b/lib/rdkafka/native_kafka.rb index 8bf88d4f..ee16a595 100644 --- a/lib/rdkafka/native_kafka.rb +++ b/lib/rdkafka/native_kafka.rb @@ -4,7 +4,7 @@ module Rdkafka # @private # A wrapper around a native kafka that polls and cleanly exits class NativeKafka - def initialize(inner, run_polling_thread:, opaque:, auto_start: true) + def initialize(inner, run_polling_thread:, opaque:, auto_start: true, timeout_ms: 100) @inner = inner @opaque = opaque # Lock around external access @@ -30,6 +30,8 @@ def initialize(inner, run_polling_thread:, opaque:, auto_start: true) @run_polling_thread = run_polling_thread + @timeout_ms = timeout_ms + start if auto_start @closing = false @@ -50,7 +52,7 @@ def start @polling_thread = Thread.new do loop do @poll_mutex.synchronize do - Rdkafka::Bindings.rd_kafka_poll(@inner, 100) + Rdkafka::Bindings.rd_kafka_poll(@inner, @timeout_ms) end # Exit thread if closing and the poll queue is empty diff --git a/lib/rdkafka/producer.rb b/lib/rdkafka/producer.rb index 74168210..cd8b715a 100644 --- a/lib/rdkafka/producer.rb +++ b/lib/rdkafka/producer.rb @@ -247,7 +247,7 @@ def partition_count(topic) # @param partition [Integer,nil] Optional partition to produce to # @param partition_key [String, nil] Optional partition key based on which partition assignment can happen # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. - # @param headers [Hash] Optional message headers + # @param headers [Hash>] Optional message headers. Values can be either a single string or an array of strings to support duplicate headers per KIP-82 # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report # @param topic_config [Hash] topic config for given message dispatch. Allows to send messages to topics with different configuration # @@ -339,11 +339,23 @@ def produce( if headers headers.each do |key0, value0| key = key0.to_s - value = value0.to_s - args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER - args << :string << key - args << :pointer << value - args << :size_t << value.bytesize + if value0.is_a?(Array) + # Handle array of values per KIP-82 + value0.each do |value| + value = value.to_s + args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER + args << :string << key + args << :pointer << value + args << :size_t << value.bytesize + end + else + # Handle single value + value = value0.to_s + args << :int << Rdkafka::Bindings::RD_KAFKA_VTYPE_HEADER + args << :string << key + args << :pointer << value + args << :size_t << value.bytesize + end end end diff --git a/lib/rdkafka/version.rb b/lib/rdkafka/version.rb index f4662457..1ce3ebc1 100644 --- a/lib/rdkafka/version.rb +++ b/lib/rdkafka/version.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module Rdkafka - VERSION = "0.21.0" - LIBRDKAFKA_VERSION = "2.8.0" - LIBRDKAFKA_SOURCE_SHA256 = "5bd1c46f63265f31c6bfcedcde78703f77d28238eadf23821c2b43fc30be3e25" + VERSION = "0.22.0" + LIBRDKAFKA_VERSION = "2.10.0" + LIBRDKAFKA_SOURCE_SHA256 = "004b1cc2685d1d6d416b90b426a0a9d27327a214c6b807df6f9ea5887346ba3a" end diff --git a/rdkafka.gemspec b/rdkafka.gemspec index 40997fdc..c3511610 100644 --- a/rdkafka.gemspec +++ b/rdkafka.gemspec @@ -26,6 +26,7 @@ Gem::Specification.new do |gem| gem.add_dependency 'ffi', '~> 1.15' gem.add_dependency 'mini_portile2', '~> 2.6' gem.add_dependency 'rake', '> 12' + gem.add_dependency 'logger', '>= 1.5' gem.add_development_dependency 'pry' gem.add_development_dependency 'rspec', '~> 3.5' diff --git a/renovate.json b/renovate.json index 39a2b6e9..29ed7d43 100644 --- a/renovate.json +++ b/renovate.json @@ -1,6 +1,18 @@ { "$schema": "https://docs.renovatebot.com/renovate-schema.json", "extends": [ - "config:base" + "config:recommended" + ], + "github-actions": { + "enabled": true, + "pinDigests": true + }, + "packageRules": [ + { + "matchManagers": [ + "github-actions" + ], + "minimumReleaseAge": "7 days" + } ] } diff --git a/spec/rdkafka/admin_spec.rb b/spec/rdkafka/admin_spec.rb index d501e6e2..284054a3 100644 --- a/spec/rdkafka/admin_spec.rb +++ b/spec/rdkafka/admin_spec.rb @@ -34,7 +34,7 @@ describe '#describe_errors' do let(:errors) { admin.class.describe_errors } - it { expect(errors.size).to eq(170) } + it { expect(errors.size).to eq(172) } it { expect(errors[-184]).to eq(code: -184, description: 'Local: Queue full', name: '_QUEUE_FULL') } it { expect(errors[21]).to eq(code: 21, description: 'Broker: Invalid required acks value', name: 'INVALID_REQUIRED_ACKS') } end @@ -738,17 +738,19 @@ end end - context "when operating from a fork" do - # @see https://github.com/ffi/ffi/issues/1114 - it 'expect to be able to create topics and run other admin operations without hanging' do - # If the FFI issue is not mitigated, this will hang forever - pid = fork do - admin - .create_topic(topic_name, topic_partition_count, topic_replication_factor) - .wait - end + unless RUBY_PLATFORM == 'java' + context "when operating from a fork" do + # @see https://github.com/ffi/ffi/issues/1114 + it 'expect to be able to create topics and run other admin operations without hanging' do + # If the FFI issue is not mitigated, this will hang forever + pid = fork do + admin + .create_topic(topic_name, topic_partition_count, topic_replication_factor) + .wait + end - Process.wait(pid) + Process.wait(pid) + end end end end diff --git a/spec/rdkafka/bindings_spec.rb b/spec/rdkafka/bindings_spec.rb index 5d569664..b1b10421 100644 --- a/spec/rdkafka/bindings_spec.rb +++ b/spec/rdkafka/bindings_spec.rb @@ -149,15 +149,6 @@ end describe "oauthbearer set token" do - - context "without args" do - it "should raise argument error" do - expect { - Rdkafka::Bindings.rd_kafka_oauthbearer_set_token - }.to raise_error(ArgumentError) - end - end - context "with args" do before do DEFAULT_TOKEN_EXPIRY_SECONDS = 900 diff --git a/spec/rdkafka/config_spec.rb b/spec/rdkafka/config_spec.rb index a188d858..79fdae7b 100644 --- a/spec/rdkafka/config_spec.rb +++ b/spec/rdkafka/config_spec.rb @@ -33,23 +33,25 @@ expect(log.string).to include "FATAL -- : I love testing" end - it "expect to start new logger thread after fork and work" do - reader, writer = IO.pipe - - pid = fork do - $stdout.reopen(writer) - Rdkafka::Config.logger = Logger.new($stdout) - reader.close - producer = rdkafka_producer_config(debug: 'all').producer - producer.close + unless RUBY_PLATFORM == 'java' + it "expect to start new logger thread after fork and work" do + reader, writer = IO.pipe + + pid = fork do + $stdout.reopen(writer) + Rdkafka::Config.logger = Logger.new($stdout) + reader.close + producer = rdkafka_producer_config(debug: 'all').producer + producer.close + writer.close + sleep(1) + end + writer.close - sleep(1) + Process.wait(pid) + output = reader.read + expect(output.split("\n").size).to be >= 20 end - - writer.close - Process.wait(pid) - output = reader.read - expect(output.split("\n").size).to be >= 20 end end @@ -157,7 +159,7 @@ def call(config, client_name); end it "should use default configuration" do config = Rdkafka::Config.new - expect(config[:"api.version.request"]).to eq true + expect(config[:"api.version.request"]).to eq nil end it "should create a consumer with valid config" do diff --git a/spec/rdkafka/consumer/headers_spec.rb b/spec/rdkafka/consumer/headers_spec.rb index f467ec6d..8e855aef 100644 --- a/spec/rdkafka/consumer/headers_spec.rb +++ b/spec/rdkafka/consumer/headers_spec.rb @@ -3,7 +3,7 @@ describe Rdkafka::Consumer::Headers do let(:headers) do { # Note String keys! - "version" => "2.1.3", + "version" => ["2.1.3", "2.1.4"], "type" => "String" } end @@ -17,27 +17,39 @@ Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR end + # First version header expect(Rdkafka::Bindings).to \ receive(:rd_kafka_header_get_all) .with(headers_ptr, 0, anything, anything, anything) do |_, _, name_ptrptr, value_ptrptr, size_ptr| - expect(name_ptrptr).to receive(:read_pointer).and_return(double("pointer 0", read_string_to_null: headers.keys[0])) - expect(size_ptr).to receive(:[]).with(:value).and_return(headers.keys[0].size) - expect(value_ptrptr).to receive(:read_pointer).and_return(double("value pointer 0", read_string: headers.values[0])) + expect(name_ptrptr).to receive(:read_pointer).and_return(double("pointer 0", read_string_to_null: "version")) + expect(size_ptr).to receive(:[]).with(:value).and_return(headers["version"][0].size) + expect(value_ptrptr).to receive(:read_pointer).and_return(double("value pointer 0", read_string: headers["version"][0])) Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR end + # Second version header expect(Rdkafka::Bindings).to \ receive(:rd_kafka_header_get_all) .with(headers_ptr, 1, anything, anything, anything) do |_, _, name_ptrptr, value_ptrptr, size_ptr| - expect(name_ptrptr).to receive(:read_pointer).and_return(double("pointer 1", read_string_to_null: headers.keys[1])) - expect(size_ptr).to receive(:[]).with(:value).and_return(headers.keys[1].size) - expect(value_ptrptr).to receive(:read_pointer).and_return(double("value pointer 1", read_string: headers.values[1])) + expect(name_ptrptr).to receive(:read_pointer).and_return(double("pointer 1", read_string_to_null: "version")) + expect(size_ptr).to receive(:[]).with(:value).and_return(headers["version"][1].size) + expect(value_ptrptr).to receive(:read_pointer).and_return(double("value pointer 1", read_string: headers["version"][1])) Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR end + # Single type header expect(Rdkafka::Bindings).to \ receive(:rd_kafka_header_get_all) - .with(headers_ptr, 2, anything, anything, anything) + .with(headers_ptr, 2, anything, anything, anything) do |_, _, name_ptrptr, value_ptrptr, size_ptr| + expect(name_ptrptr).to receive(:read_pointer).and_return(double("pointer 2", read_string_to_null: "type")) + expect(size_ptr).to receive(:[]).with(:value).and_return(headers["type"].size) + expect(value_ptrptr).to receive(:read_pointer).and_return(double("value pointer 2", read_string: headers["type"])) + Rdkafka::Bindings::RD_KAFKA_RESP_ERR_NO_ERROR + end + + expect(Rdkafka::Bindings).to \ + receive(:rd_kafka_header_get_all) + .with(headers_ptr, 3, anything, anything, anything) .and_return(Rdkafka::Bindings::RD_KAFKA_RESP_ERR__NOENT) end @@ -46,8 +58,12 @@ it { is_expected.to eq(headers) } it { is_expected.to be_frozen } - it 'allows String key' do - expect(subject['version']).to eq("2.1.3") + it 'returns array for duplicate headers' do + expect(subject['version']).to eq(["2.1.3", "2.1.4"]) + end + + it 'returns string for single headers' do + expect(subject['type']).to eq("String") end it 'does not support symbols mappings' do diff --git a/spec/rdkafka/producer_spec.rb b/spec/rdkafka/producer_spec.rb index d86f6895..cf36094f 100644 --- a/spec/rdkafka/producer_spec.rb +++ b/spec/rdkafka/producer_spec.rb @@ -263,8 +263,6 @@ def call(_, handle) expect(message.partition).to eq 1 expect(message.payload).to eq "payload" expect(message.key).to eq "key" - # Since api.version.request is on by default we will get - # the message creation timestamp if it's not set. expect(message.timestamp).to be_within(10).of(Time.now) end @@ -819,4 +817,44 @@ def call(_, handle) end end end + + describe "#produce with headers" do + it "should produce a message with array headers" do + headers = { + "version" => ["2.1.3", "2.1.4"], + "type" => "String" + } + + report = producer.produce( + topic: "consume_test_topic", + key: "key headers", + headers: headers + ).wait + + message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report) + expect(message).to be + expect(message.key).to eq('key headers') + expect(message.headers['type']).to eq('String') + expect(message.headers['version']).to eq(["2.1.3", "2.1.4"]) + end + + it "should produce a message with single value headers" do + headers = { + "version" => "2.1.3", + "type" => "String" + } + + report = producer.produce( + topic: "consume_test_topic", + key: "key headers", + headers: headers + ).wait + + message = wait_for_message(topic: "consume_test_topic", consumer: consumer, delivery_report: report) + expect(message).to be + expect(message.key).to eq('key headers') + expect(message.headers['type']).to eq('String') + expect(message.headers['version']).to eq('2.1.3') + end + end end diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index 0f2a02f3..6c39cec0 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -15,9 +15,7 @@ def rdkafka_base_config { - :"api.version.request" => false, - :"broker.version.fallback" => "1.0", - :"bootstrap.servers" => "localhost:9092", + :"bootstrap.servers" => "localhost:9092" } end