diff --git a/pom.xml b/pom.xml
index c0a0239..641f801 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,11 @@
org.springframework.boot
spring-boot-starter-security
+
+ org.apache.kafka
+ kafka-clients
+ 3.1.0
+
org.elasticsearch
elasticsearch
diff --git a/src/main/java/com/example/easynotes/controller/NoteController.java b/src/main/java/com/example/easynotes/controller/NoteController.java
index 1eb2742..805fe55 100644
--- a/src/main/java/com/example/easynotes/controller/NoteController.java
+++ b/src/main/java/com/example/easynotes/controller/NoteController.java
@@ -1,6 +1,8 @@
package com.example.easynotes.controller;
import com.example.easynotes.exception.ResourceNotFoundException;
+import com.example.easynotes.kafka.consumer.NotesEventConsumer;
+import com.example.easynotes.kafka.producer.NotesEventProducer;
import com.example.easynotes.model.Note;
import com.example.easynotes.model.SearchNote;
import com.example.easynotes.repository.NoteRepository;
@@ -11,6 +13,7 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
+import javax.annotation.PostConstruct;
import javax.validation.Valid;
import java.util.ArrayList;
@@ -29,11 +32,23 @@ public class NoteController {
@Autowired
NoteRepository noteRepository;
+ @Autowired
+ NotesEventProducer eventProducer;
+ @Autowired
+ NotesEventConsumer eventConsumer;
+
@Autowired
esInterface es;
private static final Logger logger = LoggerFactory.getLogger(NoteController.class);
+ @PostConstruct
+ public void init() {
+ // Start consuming events in a separate thread
+ Thread consumerThread = new Thread(() -> eventConsumer.consumeEvents());
+ consumerThread.start();
+ }
+
@CrossOrigin(origins = "*")
@GetMapping("/notes")
public List getAllNotes(@RequestParam(defaultValue="10", name="limit") String limit,
@@ -54,6 +69,7 @@ public List getAllNotes(@RequestParam(defaultValue="10", name="limit") Str
public Note createNote(@Valid @RequestBody Note note) {
Note res = noteRepository.save(note);
es.addToES(res);
+ eventProducer.sendEvent(res.getContent());
return res;
}
diff --git a/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java b/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java
new file mode 100644
index 0000000..920a0c5
--- /dev/null
+++ b/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java
@@ -0,0 +1,43 @@
+package com.example.easynotes.kafka.consumer;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.springframework.stereotype.Service;
+
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+@Service
+public class NotesEventConsumer {
+ private final KafkaConsumer consumer;
+ // private final String topic;
+
+ public NotesEventConsumer() {
+ // this.topic = "notes-repo";
+ Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Change to your Kafka broker address
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, "notes-group"); // Consumer group id
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ this.consumer = new KafkaConsumer<>(props);
+ }
+
+ public void consumeEvents() {
+ String topic = "notes-repo";
+ consumer.subscribe(Collections.singletonList(topic));
+
+ while (true) {
+ ConsumerRecords records = consumer.poll(Duration.ofMillis(100));
+ for (ConsumerRecord record : records) {
+ System.out.println("[+][+][+]Received message: " + record.value() + "[+][+][+]");
+ }
+ }
+ }
+
+ public void close() {
+ consumer.close();
+ }
+}
diff --git a/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java b/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java
new file mode 100644
index 0000000..3764a20
--- /dev/null
+++ b/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java
@@ -0,0 +1,31 @@
+package com.example.easynotes.kafka.producer;
+
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.springframework.stereotype.Service;
+
+import java.util.Properties;
+
+@Service
+public class NotesEventProducer {
+ private final KafkaProducer producer;
+ // private final String topic;
+
+ public NotesEventProducer() {
+ Properties props = new Properties();
+ props.put("bootstrap.servers", "localhost:9092"); // Change to your Kafka broker address
+ props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
+ this.producer = new KafkaProducer<>(props);
+ }
+
+ public void sendEvent(String message) {
+ String topic = "notes-repo";
+ producer.send(new ProducerRecord<>(topic, message));
+ }
+
+ public void close() {
+ producer.close();
+ }
+}