18
18
19
19
20
20
21
+
22
+
23
+
24
+
21
25
### 2. 异步发送 API
22
26
23
27
- ** KafkaProducer** 需要创建一个生产者对象,用来发送数据
32
36
</dependency >
33
37
```
34
38
39
+ #### (1)不带回调函数的异步(AsyncProducer)
40
+
35
41
``` java
36
42
package com.tian.kafka.producer ;
37
43
@@ -70,6 +76,8 @@ public class AsyncProducer {
70
76
}
71
77
```
72
78
79
+ #### (2)带回调函数的异步(CallbackProducer)
80
+
73
81
``` java
74
82
package com.tian.kafka.producer ;
75
83
@@ -79,13 +87,13 @@ import org.apache.kafka.common.serialization.StringSerializer;
79
87
import java.util.Properties ;
80
88
81
89
/**
82
- * 带回调函数的异步 Producer API
90
+ * 带回调函数的异步Producer API
83
91
*/
84
92
public class CallbackProducer {
85
93
public static void main (String [] args ) {
86
94
Properties props = new Properties ();
87
95
props. put(ProducerConfig . BOOTSTRAP_SERVERS_CONFIG ,
88
- " hadoop101:9092,hadoop102:9092,hadoop103 :9092" );
96
+ " 192.168.72.133 :9092" );
89
97
props. put(ProducerConfig . KEY_SERIALIZER_CLASS_CONFIG ,
90
98
StringSerializer . class. getName());
91
99
props. put(ProducerConfig . VALUE_SERIALIZER_CLASS_CONFIG ,
@@ -105,8 +113,9 @@ public class CallbackProducer {
105
113
else e. printStackTrace();
106
114
}
107
115
});
108
- producer . close();
116
+
109
117
}
118
+ producer. close();
110
119
}
111
120
}
112
121
```
@@ -115,6 +124,8 @@ public class CallbackProducer {
115
124
116
125
### 3. 同步发送 API
117
126
127
+ #### (1)同步发送(SyncProducer)
128
+
118
129
``` java
119
130
package com.tian.kafka.producer ;
120
131
@@ -339,7 +350,7 @@ public class AsyncManualCommitOffset {
339
350
### 3. 自定义存储 offset
340
351
341
352
Kafka 0.9 版本之前,offset 存储在 zookeeper,0.9 版本之后,默认将 offset 存储在 Kafka 的一个内置的 topic 中。除此之外,Kafka 还可以选择自定义存储 offset。
342
- Offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
353
+ offset 的维护是相当繁琐的,因为需要考虑到消费者的 Rebalance。
343
354
当有新的消费者加入消费者组、已有的消费者推出消费者组或者所订阅的主题的分区发生变化,就会触发到分区的重新分配,重新分配的过程叫做 Rebalance。
344
355
消费者发生 Rebalance 之后,每个消费者消费的分区就会发生变化。因此消费者要首先获取到自己被重新分配到的分区,并且定位到每个分区最近提交的 offset 位置继续消费。
345
356
要实现自定义存储 offset,需要借助 ConsumerRebalanceListener,以下为示例代码,其中提交和获取 offset 的方法,需要根据所选的 offset 存储系统自行实现。
@@ -368,6 +379,7 @@ public class CustomConsumer {
368
379
props. put(" value.deserializer" , " org.apache.kafka.common.serialization.StringDeserializer" );
369
380
370
381
KafkaConsumer<String , String > consumer = new KafkaConsumer<> (props);
382
+ // 消费者订阅主题
371
383
consumer. subscribe(Arrays . asList(" first" ), new ConsumerRebalanceListener () {
372
384
373
385
// 该方法会在 Rebalance 之前调用
0 commit comments