A standard way to connect to Kafka to consume and produce.
This is a private gem, so it is not hosted on fury.io.
As a developer, you will need to create an account on fury.io and be added to the organization. Get your repo access token from the management site, and configure Bundler to use it:
bundle config https://gem.fury.io/omniboard/ PeRSonAl-SeCrEt-ToKeN
Get the organization's account access token from the management site, and configure the environment on CI or an application:
export BUNDLE_GEM__FURY__IO=PeRSonAl-SeCrEt-ToKeN
Add this to your application's Gemfile, then run bundle
:
source "https://gem.fury.io/omniboard/" do
gem 'kafka_connection'
end
Never place a repository access token in the Gemfile, or commit it to the repo anywhere else. The default instructions that Gemfury provides do this, but we use the instructions for Bundler 1.8+.
This gem requires the following environment variables:
KAFKA_BROKERS
: Comma-separated list of brokers. E.g. "kafka+ssl://hostname:9092"KAFKA_CA
: The CA certificate in PEM format. (Required if a kafka+ssl broker is specified.)KAFKA_CERT
: The client's certificate in PEM format. (Required if a kafka+ssl broker is specified.)KAFKA_PRIVATE_KEY
: The client's private key in PEM format. (Required if a kafka+ssl broker is specified.)
The PEM-format keys are multi-line values and must not have their lines concatenated. If your environment does not make it easy to set variables containing newlines, you can use the string "\n" (acually containing a backslash) in place of newline characters.
KAFKA_TOPIC_PREFIX
: Optional. If present, any topic names used with the consumer or producer will be prefixed with this string.
To create a connection to Kafka:
kafka_connection = KafkaConnection.new(
app_name: "my_great_project", # Used as part of the Kafka client identifier
env_name: Rails.env.to_s.downcase,
)
To produce:
kafka_producer = kafka_connection.producer
kafka_producer.produce("My great log entry", topic: "topic-name")
kafka_producer.deliver_messages
# Or use a pool of connections:
# (in config/initializers/kafka.rb):
max_threads = ENV.fetch("RAILS_MAX_THREADS") { 5 }.to_i
$kafka_connection_pool = KafkaConnection.pool(
size: max_threads,
timeout: 5,
app_name: "my_great_project",
env_name: Rails.env.to_s.downcase,
)
# (anywhere in the project):
$kafka_connection_pool.with do |kafka_connection|
kafka_producer = kafka_connection.producer
kafka_producer.produce("My great log entry", topic: "topic-name")
kafka_producer.deliver_messages
end
To consume:
# `group_id` is the consumer group name; multiple processes with the same value will
# share the topic(s) (and each get different partitions).
kafka_consumer = kafka_connection.consumer(group_id: "#{Rails.env.to_s.downcase.downcase}.#{self.class.name}")
kafka_consumer.subscribe "topic-name"
kafka_consumer.each_message do |message|
process_message(message)
end
After checking out the repo, run bin/setup
to install dependencies. Then, run rake spec
to run the tests. You can also run bin/console
for an interactive prompt that will allow you to experiment.
To install this gem onto your local machine, run bundle exec rake install
. To release a new version, update the version number in version.rb
, and then run bundle exec rake release
, which will create a git tag for the version, push git commits and tags, and push the .gem
file to rubygems.org.
Bug reports and pull requests are welcome.
The gem is available as open source under the terms of the MIT License.