-
Notifications
You must be signed in to change notification settings - Fork 38
refactor: update producer code to work with latest packages #373
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor: update producer code to work with latest packages #373
Conversation
39bb4be
to
fa3af21
Compare
please can you fix Git Lint issue? |
34cf782
to
10e3b26
Compare
bdd4f76
to
53109ff
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR refactors the demo application to use modern Kafka deployment with KRaft mode instead of Zookeeper, improves Docker containerization, and enhances the producer code with better resource management and error handling.
- Migrated from Zookeeper-based Kafka to KRaft-based single broker setup for simplified deployment
- Added multi-stage Dockerfile for containerized application builds and deployment
- Refactored PeriodicProducer to use Vert.x timers instead of TimeoutStream with proper resource cleanup
Reviewed Changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.
Show a summary per file
File | Description |
---|---|
docker-compose.yml | Replaced Zookeeper+Kafka with single KRaft broker and added containerized demo app |
Dockerfile | Added multi-stage build for Maven compilation and slim runtime container |
kafka.properties | Updated bootstrap servers to use container networking (broker:9092) |
PeriodicProducer.java | Refactored to use Vert.x timers, improved config handling, and added proper cleanup |
WebSocketServer.java | Updated server binding and simplified Kafka consumer subscription logic |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
docker-compose.yml
Outdated
"bin/zookeeper-server-start.sh config/zookeeper.properties" | ||
] | ||
broker: | ||
image: apache/kafka:latest |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this the best option...it can break builds when latest changes. is pinning a version the best way forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes that make sense. Will change that
private static final long PRODUCE_INTERVAL_MS = Duration.ofSeconds(2).toMillis(); | ||
|
||
private KafkaProducer<String, String> kafkaProducer; | ||
private long timerId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't we use something like
private static final long TIMER_NOT_SET = -1L;
private long timerId = TIMER_NOT_SET;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right now timerId is initialized with TIMER_NOT_SET, but later in STOP_ACTION it’s reset using the raw -1. This duplicates the Placeholder value . For clarity and maintainability, isn't it better to use the TIMER_NOT_SET constant consistently instead of mixing it with a literal.
public void stop() { | ||
if (kafkaProducer != null) { | ||
kafkaProducer.close() | ||
.onComplete(ar -> logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : ar.cause())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will ar.cause())
be null is success ??
logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : "failure", ar.cause());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, that won't get called
case WebSocketServer.STOP_ACTION: | ||
if (timerId != TIMER_NOT_SET) { | ||
vertx.cancelTimer(timerId); | ||
timerId = -1; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
timerId = TIMER_NOT_SET;
|
||
private KafkaProducer<String, String> kafkaProducer; | ||
private long timerId = -1; | ||
private long TIMER_NOT_SET = -1; | ||
private String customMessage; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will only get val when START_ACTION is received... So I tjink, If for some reason produceKafkaRecord() is ever called before the first START_ACTION, customMessage will be null.. so in that case producer could send null messages to Kafka is that ok? can't we initialize customMessage like
private String customMessage = "Hello World";
@@ -140,11 +138,7 @@ private void handleProduceSocket(ServerWebSocket webSocket) { | |||
|
|||
private void handleConsumeSocket(ServerWebSocket webSocket) { | |||
KafkaConsumer<String, JsonObject> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig); | |||
|
|||
kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we add it back for better error visibility?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, you are right
53109ff
to
3008c8f
Compare
* Added new Dockerfile and docker compose file to test locally Resolves: #0 Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
3008c8f
to
02d3c11
Compare
This pull request refactors the demo application's deployment and Kafka integration for improved reliability and modern best practices. The main changes include switching to a KRaft-based Kafka setup in Docker, updating configuration to use container networking, and refactoring the
PeriodicProducer
logic for better resource management and error handling.Deployment and Kafka setup:
apache/kafka:latest
) instead of separate Zookeeper and Kafka containers, including health checks and updated environment variables for KRaft operation. The demo app now waits for Kafka health before starting. (docker-compose.yml
, docker-compose.ymlL1-R34)Dockerfile
to build the Java application with Maven and run it in a slim OpenJDK container for production use. (Dockerfile
, DockerfileR1-R22)kafka.properties
to use the container name (kafka:9092
) for bootstrap servers, ensuring correct connectivity when running in Docker. (kafka.properties
, kafka.propertiesL1-R1)Codebase improvements:
PeriodicProducer.java
to use Vert.x timers instead ofTimeoutStream
, improved Kafka producer setup with explicit serializers, streamlined config handling, and added proper cleanup in thestop()
method to close the producer gracefully. (src/main/java/kafka/vertx/demo/PeriodicProducer.java
, [1] [2]0.0.0.0
for compatibility with Docker container networking. (src/main/java/kafka/vertx/demo/WebSocketServer.java
, src/main/java/kafka/vertx/demo/WebSocketServer.javaL97-R96)Checklist