Skip to content

Conversation

Joel-hanson
Copy link
Contributor

This pull request refactors the demo application's deployment and Kafka integration for improved reliability and modern best practices. The main changes include switching to a KRaft-based Kafka setup in Docker, updating configuration to use container networking, and refactoring the PeriodicProducer logic for better resource management and error handling.

Deployment and Kafka setup:

  • Migrated the Docker Compose configuration to use a single KRaft-based Kafka broker (apache/kafka:latest) instead of separate Zookeeper and Kafka containers, including health checks and updated environment variables for KRaft operation. The demo app now waits for Kafka health before starting. (docker-compose.yml, docker-compose.ymlL1-R34)
  • Added a multi-stage Dockerfile to build the Java application with Maven and run it in a slim OpenJDK container for production use. (Dockerfile, DockerfileR1-R22)
  • Updated kafka.properties to use the container name (kafka:9092) for bootstrap servers, ensuring correct connectivity when running in Docker. (kafka.properties, kafka.propertiesL1-R1)

Codebase improvements:

  • Refactored PeriodicProducer.java to use Vert.x timers instead of TimeoutStream, improved Kafka producer setup with explicit serializers, streamlined config handling, and added proper cleanup in the stop() method to close the producer gracefully. (src/main/java/kafka/vertx/demo/PeriodicProducer.java, [1] [2]
  • Changed the WebSocket server to bind to 0.0.0.0 for compatibility with Docker container networking. (src/main/java/kafka/vertx/demo/WebSocketServer.java, src/main/java/kafka/vertx/demo/WebSocketServer.javaL97-R96)

Checklist

  • Automated tests exist
  • Documentation exists link
  • Local unit tests performed
  • Sufficient logging/trace
  • Desired commit message set as PR title and commit description set above

@Joel-hanson Joel-hanson requested a review from Copilot August 20, 2025 05:44
Copilot

This comment was marked as outdated.

@aswinayyolath
Copy link
Contributor

please can you fix Git Lint issue?

@Joel-hanson Joel-hanson force-pushed the refactor-producer branch 4 times, most recently from 34cf782 to 10e3b26 Compare August 20, 2025 05:57
@Joel-hanson Joel-hanson requested a review from Copilot August 20, 2025 05:57
Copilot

This comment was marked as outdated.

@Joel-hanson Joel-hanson force-pushed the refactor-producer branch 2 times, most recently from bdd4f76 to 53109ff Compare August 20, 2025 08:56
@Joel-hanson Joel-hanson requested a review from Copilot August 20, 2025 08:57
Copy link

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR refactors the demo application to use modern Kafka deployment with KRaft mode instead of Zookeeper, improves Docker containerization, and enhances the producer code with better resource management and error handling.

  • Migrated from Zookeeper-based Kafka to KRaft-based single broker setup for simplified deployment
  • Added multi-stage Dockerfile for containerized application builds and deployment
  • Refactored PeriodicProducer to use Vert.x timers instead of TimeoutStream with proper resource cleanup

Reviewed Changes

Copilot reviewed 5 out of 5 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
docker-compose.yml Replaced Zookeeper+Kafka with single KRaft broker and added containerized demo app
Dockerfile Added multi-stage build for Maven compilation and slim runtime container
kafka.properties Updated bootstrap servers to use container networking (broker:9092)
PeriodicProducer.java Refactored to use Vert.x timers, improved config handling, and added proper cleanup
WebSocketServer.java Updated server binding and simplified Kafka consumer subscription logic

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

"bin/zookeeper-server-start.sh config/zookeeper.properties"
]
broker:
image: apache/kafka:latest
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the best option...it can break builds when latest changes. is pinning a version the best way forward?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes that make sense. Will change that

private static final long PRODUCE_INTERVAL_MS = Duration.ofSeconds(2).toMillis();

private KafkaProducer<String, String> kafkaProducer;
private long timerId = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't we use something like

private static final long TIMER_NOT_SET = -1L;
private long timerId = TIMER_NOT_SET;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now timerId is initialized with TIMER_NOT_SET, but later in STOP_ACTION it’s reset using the raw -1. This duplicates the Placeholder value . For clarity and maintainability, isn't it better to use the TIMER_NOT_SET constant consistently instead of mixing it with a literal.

public void stop() {
if (kafkaProducer != null) {
kafkaProducer.close()
.onComplete(ar -> logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : ar.cause()));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will ar.cause()) be null is success ??

logger.info("KafkaProducer closed: {}", ar.succeeded() ? "success" : "failure", ar.cause());

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, that won't get called

case WebSocketServer.STOP_ACTION:
if (timerId != TIMER_NOT_SET) {
vertx.cancelTimer(timerId);
timerId = -1;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

timerId = TIMER_NOT_SET;  


private KafkaProducer<String, String> kafkaProducer;
private long timerId = -1;
private long TIMER_NOT_SET = -1;
private String customMessage;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only get val when START_ACTION is received... So I tjink, If for some reason produceKafkaRecord() is ever called before the first START_ACTION, customMessage will be null.. so in that case producer could send null messages to Kafka is that ok? can't we initialize customMessage like

private String customMessage = "Hello World";

@@ -140,11 +138,7 @@ private void handleProduceSocket(ServerWebSocket webSocket) {

private void handleConsumeSocket(ServerWebSocket webSocket) {
KafkaConsumer<String, JsonObject> kafkaConsumer = KafkaConsumer.create(vertx, kafkaConfig);

kafkaConsumer.exceptionHandler(err -> logger.error("Kafka error", err));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add it back for better error visibility?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, you are right

* Added new Dockerfile and docker compose file to test locally

Resolves: #0

Signed-off-by: Joel Hanson <joelhanson025@gmail.com>
@Joel-hanson Joel-hanson merged commit ec03822 into ibm-messaging:master Aug 21, 2025
2 checks passed
@Joel-hanson Joel-hanson deleted the refactor-producer branch August 21, 2025 04:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants