Skip to content

Commit 2ea6ed2

Browse files
authored
Update kafka-tutorial-5_kafka-api.md
1 parent a731777 commit 2ea6ed2

File tree

1 file changed

+16
-4
lines changed

1 file changed

+16
-4
lines changed

notes/MicroService/kafka/kafka-tutorial-5_kafka-api.md

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,10 @@
1818

1919

2020

21+
22+
23+
24+
2125
### 2. 异步发送 API
2226

2327
- **KafkaProducer** 需要创建一个生产者对象,用来发送数据
@@ -32,6 +36,8 @@
3236
</dependency>
3337
```
3438

39+
#### (1)不带回调函数的异步(AsyncProducer)
40+
3541
```java
3642
package com.tian.kafka.producer;
3743

@@ -70,6 +76,8 @@ public class AsyncProducer {
7076
}
7177
```
7278

79+
#### (2)带回调函数的异步(CallbackProducer)
80+
7381
```java
7482
package com.tian.kafka.producer;
7583

@@ -79,13 +87,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
7987
import java.util.Properties;
8088

8189
/**
82-
* 带回调函数的异步 Producer API
90+
* 带回调函数的异步Producer API
8391
*/
8492
public class CallbackProducer {
8593
public static void main(String[] args) {
8694
Properties props = new Properties();
8795
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
88-
"hadoop101:9092,hadoop102:9092,hadoop103:9092");
96+
"192.168.72.133:9092");
8997
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
9098
StringSerializer.class.getName());
9199
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
@@ -105,8 +113,9 @@ public class CallbackProducer {
105113
else e.printStackTrace();
106114
}
107115
});
108-
producer.close();
116+
109117
}
118+
producer.close();
110119
}
111120
}
112121
```
@@ -115,6 +124,8 @@ public class CallbackProducer {
115124

116125
### 3. 同步发送 API
117126

127+
#### (1)同步发送(SyncProducer)
128+
118129
```java
119130
package com.tian.kafka.producer;
120131

@@ -339,7 +350,7 @@ public class AsyncManualCommitOffset {
339350
### 3. 自定义存储 offset
340351

341352
  Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
342-
  Offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
353+
  offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
343354
  当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
344355
  消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
345356
  要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
@@ -368,6 +379,7 @@ public class CustomConsumer {
368379
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
369380

370381
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
382+
// 消费者订阅主题
371383
consumer.subscribe(Arrays.asList("first"), new ConsumerRebalanceListener() {
372384

373385
// 该方法会在 Rebalance 之前调用

0 commit comments

Comments
 (0)