diff --git a/pom.xml b/pom.xml index c0a0239..641f801 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,11 @@ org.springframework.boot spring-boot-starter-security + + org.apache.kafka + kafka-clients + 3.1.0 + org.elasticsearch elasticsearch diff --git a/src/main/java/com/example/easynotes/controller/NoteController.java b/src/main/java/com/example/easynotes/controller/NoteController.java index 1eb2742..805fe55 100644 --- a/src/main/java/com/example/easynotes/controller/NoteController.java +++ b/src/main/java/com/example/easynotes/controller/NoteController.java @@ -1,6 +1,8 @@ package com.example.easynotes.controller; import com.example.easynotes.exception.ResourceNotFoundException; +import com.example.easynotes.kafka.consumer.NotesEventConsumer; +import com.example.easynotes.kafka.producer.NotesEventProducer; import com.example.easynotes.model.Note; import com.example.easynotes.model.SearchNote; import com.example.easynotes.repository.NoteRepository; @@ -11,6 +13,7 @@ import org.springframework.http.ResponseEntity; import org.springframework.web.bind.annotation.*; +import javax.annotation.PostConstruct; import javax.validation.Valid; import java.util.ArrayList; @@ -29,11 +32,23 @@ public class NoteController { @Autowired NoteRepository noteRepository; + @Autowired + NotesEventProducer eventProducer; + @Autowired + NotesEventConsumer eventConsumer; + @Autowired esInterface es; private static final Logger logger = LoggerFactory.getLogger(NoteController.class); + @PostConstruct + public void init() { + // Start consuming events in a separate thread + Thread consumerThread = new Thread(() -> eventConsumer.consumeEvents()); + consumerThread.start(); + } + @CrossOrigin(origins = "*") @GetMapping("/notes") public List getAllNotes(@RequestParam(defaultValue="10", name="limit") String limit, @@ -54,6 +69,7 @@ public List getAllNotes(@RequestParam(defaultValue="10", name="limit") Str public Note createNote(@Valid @RequestBody Note note) { Note res = noteRepository.save(note); es.addToES(res); + eventProducer.sendEvent(res.getContent()); return res; } diff --git a/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java b/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java new file mode 100644 index 0000000..920a0c5 --- /dev/null +++ b/src/main/java/com/example/easynotes/kafka/consumer/NotesEventConsumer.java @@ -0,0 +1,43 @@ +package com.example.easynotes.kafka.consumer; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.springframework.stereotype.Service; + +import java.time.Duration; +import java.util.Collections; +import java.util.Properties; + +@Service +public class NotesEventConsumer { + private final KafkaConsumer consumer; + // private final String topic; + + public NotesEventConsumer() { + // this.topic = "notes-repo"; + Properties props = new Properties(); + props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Change to your Kafka broker address + props.put(ConsumerConfig.GROUP_ID_CONFIG, "notes-group"); // Consumer group id + props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + this.consumer = new KafkaConsumer<>(props); + } + + public void consumeEvents() { + String topic = "notes-repo"; + consumer.subscribe(Collections.singletonList(topic)); + + while (true) { + ConsumerRecords records = consumer.poll(Duration.ofMillis(100)); + for (ConsumerRecord record : records) { + System.out.println("[+][+][+]Received message: " + record.value() + "[+][+][+]"); + } + } + } + + public void close() { + consumer.close(); + } +} diff --git a/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java b/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java new file mode 100644 index 0000000..3764a20 --- /dev/null +++ b/src/main/java/com/example/easynotes/kafka/producer/NotesEventProducer.java @@ -0,0 +1,31 @@ +package com.example.easynotes.kafka.producer; + + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.springframework.stereotype.Service; + +import java.util.Properties; + +@Service +public class NotesEventProducer { + private final KafkaProducer producer; + // private final String topic; + + public NotesEventProducer() { + Properties props = new Properties(); + props.put("bootstrap.servers", "localhost:9092"); // Change to your Kafka broker address + props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); + this.producer = new KafkaProducer<>(props); + } + + public void sendEvent(String message) { + String topic = "notes-repo"; + producer.send(new ProducerRecord<>(topic, message)); + } + + public void close() { + producer.close(); + } +}