Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,11 @@
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-security</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.1.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/example/easynotes/controller/NoteController.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.example.easynotes.controller;

import com.example.easynotes.exception.ResourceNotFoundException;
import com.example.easynotes.kafka.consumer.NotesEventConsumer;
import com.example.easynotes.kafka.producer.NotesEventProducer;
import com.example.easynotes.model.Note;
import com.example.easynotes.model.SearchNote;
import com.example.easynotes.repository.NoteRepository;
Expand All @@ -11,6 +13,7 @@
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;

import javax.annotation.PostConstruct;
import javax.validation.Valid;

import java.util.ArrayList;
Expand All @@ -29,11 +32,23 @@ public class NoteController {
@Autowired
NoteRepository noteRepository;

@Autowired
NotesEventProducer eventProducer;
@Autowired
NotesEventConsumer eventConsumer;

@Autowired
esInterface es;

private static final Logger logger = LoggerFactory.getLogger(NoteController.class);

@PostConstruct
public void init() {
// Start consuming events in a separate thread
Thread consumerThread = new Thread(() -> eventConsumer.consumeEvents());
consumerThread.start();
}

@CrossOrigin(origins = "*")
@GetMapping("/notes")
public List<Note> getAllNotes(@RequestParam(defaultValue="10", name="limit") String limit,
Expand All @@ -54,6 +69,7 @@ public List<Note> getAllNotes(@RequestParam(defaultValue="10", name="limit") Str
public Note createNote(@Valid @RequestBody Note note) {
Note res = noteRepository.save(note);
es.addToES(res);
eventProducer.sendEvent(res.getContent());
return res;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package com.example.easynotes.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.stereotype.Service;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

@Service
public class NotesEventConsumer {
private final KafkaConsumer<String, String> consumer;
// private final String topic;

public NotesEventConsumer() {
// this.topic = "notes-repo";
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // Change to your Kafka broker address
props.put(ConsumerConfig.GROUP_ID_CONFIG, "notes-group"); // Consumer group id
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
this.consumer = new KafkaConsumer<>(props);
}

public void consumeEvents() {
String topic = "notes-repo";
consumer.subscribe(Collections.singletonList(topic));

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.println("[+][+][+]Received message: " + record.value() + "[+][+][+]");
}
}
}

public void close() {
consumer.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.example.easynotes.kafka.producer;


import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.springframework.stereotype.Service;

import java.util.Properties;

@Service
public class NotesEventProducer {
private final KafkaProducer<String, String> producer;
// private final String topic;

public NotesEventProducer() {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092"); // Change to your Kafka broker address
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
this.producer = new KafkaProducer<>(props);
}

public void sendEvent(String message) {
String topic = "notes-repo";
producer.send(new ProducerRecord<>(topic, message));
}

public void close() {
producer.close();
}
}