Skip to content

Commit ad5cbe6

Browse files
committed
md: 08-客户端SpringBoot
1 parent a624e6a commit ad5cbe6

File tree

1 file changed

+346
-0
lines changed

1 file changed

+346
-0
lines changed
Lines changed: 346 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,346 @@
1+
# 08-客户端SpringBoot
2+
3+
## 一、生产者
4+
5+
### 1、配置POM
6+
7+
```xml
8+
<parent>
9+
<groupId>org.springframework.boot</groupId>
10+
<artifactId>spring-boot-starter-parent</artifactId>
11+
<version>3.1.3</version>
12+
<relativePath/>
13+
</parent>
14+
15+
<dependencies>
16+
<dependency>
17+
<groupId>org.springframework.boot</groupId>
18+
<artifactId>spring-boot-starter-web</artifactId>
19+
</dependency>
20+
21+
<dependency>
22+
<groupId>org.projectlombok</groupId>
23+
<artifactId>lombok</artifactId>
24+
<optional>true</optional>
25+
</dependency>
26+
<dependency>
27+
<groupId>org.springframework.boot</groupId>
28+
<artifactId>spring-boot-starter-test</artifactId>
29+
<scope>test</scope>
30+
</dependency>
31+
<!--spring-kafka-->
32+
<dependency>
33+
<groupId>org.springframework.kafka</groupId>
34+
<artifactId>spring-kafka</artifactId>
35+
</dependency>
36+
<!--hutool-->
37+
<dependency>
38+
<groupId>cn.hutool</groupId>
39+
<artifactId>hutool-all</artifactId>
40+
<version>5.8.19</version>
41+
</dependency>
42+
</dependencies>
43+
44+
<build>
45+
<plugins>
46+
<plugin>
47+
<groupId>org.springframework.boot</groupId>
48+
<artifactId>spring-boot-maven-plugin</artifactId>
49+
<configuration>
50+
<excludes>
51+
<exclude>
52+
<groupId>org.projectlombok</groupId>
53+
<artifactId>lombok</artifactId>
54+
</exclude>
55+
</excludes>
56+
</configuration>
57+
</plugin>
58+
</plugins>
59+
</build>
60+
```
61+
62+
63+
64+
### 2、配置YAML
65+
66+
```yaml
67+
spring:
68+
kafka:
69+
bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
70+
producer:
71+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
72+
value-serializer: org.apache.kafka.common.serialization.StringSerializer
73+
```
74+
75+
76+
77+
### 3、主启动类
78+
79+
```java
80+
import org.springframework.boot.SpringApplication;
81+
import org.springframework.boot.autoconfigure.SpringBootApplication;
82+
83+
@SpringBootApplication
84+
public class KafkaMainTypeProducer {
85+
86+
public static void main(String[] args) {
87+
SpringApplication.run(KafkaMainType.class, args);
88+
}
89+
90+
}
91+
```
92+
93+
94+
95+
### 4、配置类创建主题
96+
97+
```java
98+
import org.apache.kafka.clients.admin.NewTopic;
99+
import org.springframework.context.annotation.Bean;
100+
import org.springframework.context.annotation.Configuration;
101+
import org.springframework.kafka.config.TopicBuilder;
102+
103+
@Configuration
104+
public class KafkaConfig {
105+
@Bean
106+
public NewTopic springTestTopic() {
107+
return TopicBuilder.name("topic-spring-boot") // 主题名称
108+
.partitions(3) // 分区数量
109+
.replicas(3) // 复制因子
110+
.build();
111+
}
112+
}
113+
```
114+
115+
116+
117+
到这里我们可以运行主启动类,看看主题是否创建成功
118+
119+
```shell
120+
kafka-topics.sh --bootstrap-server 192.168.200.100:7000 --list
121+
```
122+
123+
124+
125+
### 5、发送消息
126+
127+
#### ①命令行监听消息
128+
129+
```shell
130+
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 0
131+
132+
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 1
133+
134+
kafka-console-consumer.sh --bootstrap-server 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000 --topic topic-spring-boot --partition 2
135+
```
136+
137+
138+
139+
#### ②Java代码
140+
141+
```java
142+
import jakarta.annotation.Resource;
143+
import org.junit.jupiter.api.Test;
144+
import org.springframework.boot.test.context.SpringBootTest;
145+
import org.springframework.kafka.core.KafkaTemplate;
146+
147+
@SpringBootTest
148+
public class KafkaTest {
149+
150+
@Resource
151+
private KafkaTemplate kafkaTemplate;
152+
153+
@Test
154+
public void testSendMessage() {
155+
156+
String topicName = "topic-spring-boot";
157+
String message = "hello spring boot message";
158+
159+
kafkaTemplate.send(topicName, message);
160+
}
161+
162+
}
163+
```
164+
165+
166+
167+
## 二、消费者
168+
169+
### 1、配置POM
170+
171+
```xml
172+
<parent>
173+
<groupId>org.springframework.boot</groupId>
174+
<artifactId>spring-boot-starter-parent</artifactId>
175+
<version>3.1.3</version>
176+
<relativePath/>
177+
</parent>
178+
179+
<dependencies>
180+
<dependency>
181+
<groupId>org.springframework.boot</groupId>
182+
<artifactId>spring-boot-starter-web</artifactId>
183+
</dependency>
184+
185+
<dependency>
186+
<groupId>org.projectlombok</groupId>
187+
<artifactId>lombok</artifactId>
188+
<optional>true</optional>
189+
</dependency>
190+
<!--spring-kafka-->
191+
<dependency>
192+
<groupId>org.springframework.kafka</groupId>
193+
<artifactId>spring-kafka</artifactId>
194+
</dependency>
195+
<!--hutool-->
196+
<dependency>
197+
<groupId>cn.hutool</groupId>
198+
<artifactId>hutool-all</artifactId>
199+
<version>5.8.19</version>
200+
</dependency>
201+
</dependencies>
202+
203+
<build>
204+
<plugins>
205+
<plugin>
206+
<groupId>org.springframework.boot</groupId>
207+
<artifactId>spring-boot-maven-plugin</artifactId>
208+
<configuration>
209+
<excludes>
210+
<exclude>
211+
<groupId>org.projectlombok</groupId>
212+
<artifactId>lombok</artifactId>
213+
</exclude>
214+
</excludes>
215+
</configuration>
216+
</plugin>
217+
</plugins>
218+
</build>
219+
```
220+
221+
222+
223+
### 2、配置YAML
224+
225+
```yaml
226+
spring:
227+
Kafka:
228+
bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
229+
consumer:
230+
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
231+
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
232+
group-id: consumer-group
233+
```
234+
235+
236+
237+
### 3、主启动类
238+
239+
```java
240+
package com.atguigu.kafka;
241+
242+
import org.springframework.boot.SpringApplication;
243+
import org.springframework.boot.autoconfigure.SpringBootApplication;
244+
245+
@SpringBootApplication
246+
public class KafkaMainTypeConsumer {
247+
248+
public static void main(String[] args) {
249+
SpringApplication.run(KafkaMainTypeConsumer.class, args);
250+
}
251+
252+
}
253+
```
254+
255+
256+
257+
### 4、接收消息的监听器
258+
259+
```java
260+
import org.apache.kafka.clients.consumer.ConsumerRecord;
261+
import org.springframework.kafka.annotation.KafkaListener;
262+
import org.springframework.stereotype.Component;
263+
264+
@Component
265+
public class KafkaMessageListener {
266+
267+
@KafkaListener(topics = {"topic-spring-boot"})
268+
public void simpleConsumerPartition(ConsumerRecord<String, String> record) {
269+
System.out.println("进入simpleConsumer方法");
270+
System.out.printf(
271+
"分区 = %d, 偏移量 = %d, key = %s, 内容 = %s, 时间戳 = %d%n",
272+
record.partition(),
273+
record.offset(),
274+
record.key(),
275+
record.value(),
276+
record.timestamp()
277+
);
278+
}
279+
280+
}
281+
```
282+
283+
284+
285+
注意:这里我们没有指定具体接收哪个分区的消息,所以如果接收不到消息,那么就需要登录Zookeeper删除__consumer_offsets
286+
287+
```shell
288+
deleteall /brokers/topics/__consumer_offsets
289+
```
290+
291+
292+
293+
## 三、实体类对象类型的消息
294+
295+
### 1、创建实体类
296+
297+
```java
298+
import lombok.AllArgsConstructor;
299+
import lombok.Data;
300+
301+
@Data
302+
@AllArgsConstructor
303+
public class UserDTO {
304+
private String name;
305+
private Integer age;
306+
private String mobile;
307+
}
308+
```
309+
310+
311+
312+
### 2、发送消息的方法
313+
314+
```java
315+
@Test
316+
public void testSendEntity() {
317+
String topicName = "topic-spring-boot230628";
318+
UserDTO userDTO = new UserDTO("tom", 25, "12345343");
319+
320+
kafkaTemplate.send(topicName, userDTO);
321+
}
322+
```
323+
324+
325+
326+
### 3、异常
327+
328+
- 异常全类名:java.lang.ClassCastException
329+
- 异常信息:class com.atguigu.kafka.entity.UserDTO cannot be cast to class java.lang.String (com.atguigu.kafka.entity.UserDTO is in unnamed module of loader 'app'; java.lang.String is in module java.base of loader 'bootstrap')
330+
- 异常原因:目前使用的序列化器是StringSerializer,不支持非字符串序列化
331+
- 解决办法:把序列化器换成支持复杂类型的
332+
333+
334+
335+
### 4、修改YAML配置
336+
337+
```yaml
338+
spring:
339+
kafka:
340+
bootstrap-servers: 192.168.200.100:7000,192.168.200.100:8000,192.168.200.100:9000
341+
producer:
342+
key-serializer: org.apache.kafka.common.serialization.StringSerializer
343+
# value-serializer: org.apache.kafka.common.serialization.StringSerializer
344+
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
345+
```
346+

0 commit comments

Comments
 (0)