Skip to content

Commit adbb297

Browse files
authored
更新 Spring Kafka 部分
1 parent 2ea6ed2 commit adbb297

File tree

1 file changed

+263
-0
lines changed

1 file changed

+263
-0
lines changed
Lines changed: 263 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,265 @@
11
# 深入浅出 Kafka(六)Spring Kafka API
22

3+
  在上一节中学习了如何通过 Kafka API 的方式进行生产者和消费者及其配置,但是主要是通过手动编写 Java 代码的方式实现。在项目开发中,我们主要会使用到 SpringBoot 框架,这里就将带你 SpringBoot 与 Kafka 整合,通过注解和配置的方式轻松集成。
4+
5+
  这里将列举,最常见的生产者和消费者使用方式,更完整的 API 文档,请转向 [Overview (Spring Kafka 2.2.8.RELEASE API)](https://docs.spring.io/spring-kafka/api/) 学习。
6+
7+
  送上本节 DEMO 项目代码,[点击预览](https://github.com/frank-lam/open-code-lab/tree/master/springboot/springboot-kafka-sample)
8+
9+
10+
11+
## 一、Producer
12+
13+
### 1. KafkaProducer
14+
15+
```java
16+
package cn.frankfeekr.springbootkafkasample.client;
17+
18+
import org.slf4j.Logger;
19+
import org.slf4j.LoggerFactory;
20+
import org.springframework.beans.factory.annotation.Autowired;
21+
import org.springframework.beans.factory.annotation.Value;
22+
import org.springframework.kafka.core.KafkaTemplate;
23+
import org.springframework.web.bind.annotation.GetMapping;
24+
import org.springframework.web.bind.annotation.RequestParam;
25+
import org.springframework.web.bind.annotation.RestController;
26+
27+
28+
@RestController
29+
public class KafkaProducer {
30+
31+
private static final Logger LOG = LoggerFactory.getLogger(KafkaProducer.class);
32+
33+
@Autowired
34+
private KafkaTemplate<String, String> kafkaTemplate;
35+
36+
@Value("${app.topic.foo}")
37+
private String topic;
38+
39+
40+
@GetMapping("demo")
41+
public String send(@RequestParam String msg){
42+
// LOG.info("sending message='{}' to topic='{}'", message, topic);
43+
kafkaTemplate.send(topic,"key", msg);
44+
return "send success";
45+
}
46+
}
47+
```
48+
49+
### 2. KafkaProducerConfig
50+
51+
```java
52+
package cn.frankfeekr.springbootkafkasample.client;
53+
54+
import org.apache.kafka.clients.producer.ProducerConfig;
55+
import org.apache.kafka.common.serialization.StringSerializer;
56+
import org.springframework.beans.factory.annotation.Value;
57+
import org.springframework.context.annotation.Bean;
58+
import org.springframework.context.annotation.Configuration;
59+
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
60+
import org.springframework.kafka.core.KafkaTemplate;
61+
import org.springframework.kafka.core.ProducerFactory;
62+
63+
import java.util.HashMap;
64+
import java.util.Map;
65+
66+
@Configuration
67+
public class KafkaSenderConfig {
68+
69+
@Value("${spring.kafka.bootstrap-servers}")
70+
private String bootstrapServers;
71+
72+
@Bean
73+
public Map<String, Object> producerConfigs() {
74+
Map<String, Object> props = new HashMap<>();
75+
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
76+
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
77+
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
78+
return props;
79+
}
80+
81+
@Bean
82+
public ProducerFactory<String, String> producerFactory() {
83+
return new DefaultKafkaProducerFactory<>(producerConfigs());
84+
}
85+
86+
@Bean
87+
public KafkaTemplate<String, String> kafkaTemplate() {
88+
return new KafkaTemplate<>(producerFactory());
89+
}
90+
91+
}
92+
```
93+
94+
95+
96+
## 二、Consumer
97+
98+
### 3. KafkaConsumer
99+
100+
```java
101+
package cn.frankfeekr.springbootkafkasample.client;
102+
103+
import org.apache.kafka.clients.consumer.Consumer;
104+
import org.apache.kafka.clients.consumer.ConsumerRecord;
105+
import org.slf4j.Logger;
106+
import org.slf4j.LoggerFactory;
107+
import org.springframework.kafka.annotation.KafkaListener;
108+
import org.springframework.kafka.support.Acknowledgment;
109+
import org.springframework.stereotype.Service;
110+
111+
@Service
112+
public class KafkaConsumer {
113+
114+
private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumer.class);
115+
116+
@KafkaListener(topics = "${app.topic.foo}")
117+
public void listen(ConsumerRecord<String, String> record, Acknowledgment ack, Consumer<?, ?> consumer) {
118+
LOG.warn("topic:{},key: {},partition:{}, value: {}, record: {}",record.topic(), record.key(),record.partition(), record.value(), record);
119+
if (record.topic().equalsIgnoreCase("test")){
120+
throw new RuntimeException();
121+
}
122+
System.out.println("提交 offset ");
123+
consumer.commitAsync();
124+
}
125+
}
126+
```
127+
128+
129+
130+
### 4. KafkaConsumerConfig
131+
132+
```java
133+
package cn.frankfeekr.springbootkafkasample.client;
134+
135+
import org.apache.kafka.clients.consumer.ConsumerConfig;
136+
import org.apache.kafka.common.serialization.StringDeserializer;
137+
import org.springframework.beans.factory.annotation.Value;
138+
import org.springframework.context.annotation.Bean;
139+
import org.springframework.context.annotation.Configuration;
140+
import org.springframework.kafka.annotation.EnableKafka;
141+
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
142+
import org.springframework.kafka.config.KafkaListenerContainerFactory;
143+
import org.springframework.kafka.core.ConsumerFactory;
144+
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
145+
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
146+
import org.springframework.kafka.listener.ContainerProperties;
147+
148+
import java.util.HashMap;
149+
import java.util.Map;
150+
151+
@EnableKafka
152+
@Configuration
153+
public class KafkaConsumerConfig {
154+
155+
@Value("${spring.kafka.bootstrap-servers}")
156+
private String bootstrapServers;
157+
158+
@Bean
159+
public Map<String, Object> consumerConfigs() {
160+
Map<String, Object> props = new HashMap<>();
161+
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
162+
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
163+
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
164+
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
165+
props.put(ConsumerConfig.GROUP_ID_CONFIG, "foo");
166+
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
167+
168+
return props;
169+
}
170+
171+
172+
@Bean
173+
public ConsumerFactory<String, String> consumerFactory() {
174+
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
175+
}
176+
177+
@Bean
178+
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
179+
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
180+
factory.setConcurrency(3);
181+
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
182+
factory.setConsumerFactory(consumerFactory());
183+
return factory;
184+
}
185+
186+
}
187+
```
188+
189+
190+
191+
## 三、配置文件
192+
193+
```yaml
194+
server:
195+
port: 9090
196+
spring:
197+
kafka:
198+
bootstrap-servers: 127.0.0.1:9092
199+
app:
200+
topic:
201+
foo: frankfeekr
202+
```
203+
204+
205+
206+
## 四、pom 依赖
207+
208+
```xml
209+
<?xml version="1.0" encoding="UTF-8"?>
210+
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
211+
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
212+
<modelVersion>4.0.0</modelVersion>
213+
<parent>
214+
<groupId>org.springframework.boot</groupId>
215+
<artifactId>spring-boot-starter-parent</artifactId>
216+
<version>2.1.7.RELEASE</version>
217+
<relativePath/> <!-- lookup parent from repository -->
218+
</parent>
219+
<groupId>cn.frankfeekr</groupId>
220+
<artifactId>springboot-kafka-sample</artifactId>
221+
<version>0.0.1-SNAPSHOT</version>
222+
<name>springboot-kafka-sample</name>
223+
<description>Demo project for Spring Boot</description>
224+
225+
<properties>
226+
<java.version>1.8</java.version>
227+
</properties>
228+
229+
<dependencies>
230+
<dependency>
231+
<groupId>org.springframework.boot</groupId>
232+
<artifactId>spring-boot-starter-web</artifactId>
233+
</dependency>
234+
235+
<dependency>
236+
<groupId>org.springframework.boot</groupId>
237+
<artifactId>spring-boot-starter-test</artifactId>
238+
<scope>test</scope>
239+
</dependency>
240+
241+
<dependency>
242+
<groupId>org.springframework.kafka</groupId>
243+
<artifactId>spring-kafka</artifactId>
244+
<version>2.2.7.RELEASE</version>
245+
</dependency>
246+
</dependencies>
247+
248+
<build>
249+
<plugins>
250+
<plugin>
251+
<groupId>org.springframework.boot</groupId>
252+
<artifactId>spring-boot-maven-plugin</artifactId>
253+
</plugin>
254+
</plugins>
255+
</build>
256+
257+
</project>
258+
```
259+
260+
261+
262+
## 参考资料
263+
264+
- [Spring for Apache Kafka](https://docs.spring.io/spring-kafka/reference/html/#si-kafka)
265+
- [SpringBoot整合Kafka实现发布订阅 - 简书](https://www.jianshu.com/p/7a284bf4efc9)

0 commit comments

Comments
 (0)