<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
1、引入依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>
2、設定yml檔案
spring: application: name: demo kafka: bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904 producer: # producer 生產者 retries: 0 # 重試次數 acks: 1 # 應答級別:多少個分割區副本備份完成時向生產者傳送ack確認(可選0、1、all/-1) batch-size: 16384 # 批次大小 buffer-memory: 33554432 # 生產端緩衝區大小 key-serializer: org.apache.kafka.common.serialization.StringSerializer # value-serializer: com.itheima.demo.config.MySerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: # consumer消費者 group-id: javagroup # 預設的消費組ID enable-auto-commit: true # 是否自動提交offset auto-commit-interval: 100 # 提交offset延時(接收到訊息後多久提交offset) # earliest:當各分割區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費 # latest:當各分割區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割區下的資料 # none:topic各分割區都存在已提交的offset時,從offset後開始消費;只要有一個分割區不存在已提交的offset,則丟擲異常 auto-offset-reset: latest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer # value-deserializer: com.itheima.demo.config.MyDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3、啟動專案
KafkaTemplate呼叫send時預設採用非同步傳送,如果需要同步獲取傳送結果,呼叫get方法
非同步傳送生產者:
@RestController public class KafkaProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/kafka/test/{msg}") public void sendMessage(@PathVariable("msg") String msg) { Message message = new Message(); message.setMessage(msg); kafkaTemplate.send("test", JSON.toJSONString(message)); } }
同步傳送生產者:
//測試同步傳送與監聽 @RestController public class AsyncProducer { private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class); @Resource private KafkaTemplate<String, Object> kafkaTemplate; //同步傳送 @GetMapping("/kafka/sync/{msg}") public void sync(@PathVariable("msg") String msg) throws Exception { Message message = new Message(); message.setMessage(msg); ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message)); //注意,可以設定等待時間,超出後,不再等候結果 SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS); logger.info("send result:{}",result.getProducerRecord().value()); } }
消費者:
@Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); //不指定group,預設取yml裡設定的 @KafkaListener(topics = {"test"}) public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("message:{}", msg); } } }
那麼我們怎麼看出來同步傳送和非同步傳送的區別呢?
①首先在伺服器上,將kafka暫停服務。
②在swagger傳送訊息
調同步傳送:請求被阻斷,一直等待,超時後返回錯誤
而調非同步傳送的(預設傳送介面),請求立刻返回。
那麼,非同步傳送的訊息怎麼確認傳送情況呢?
我們使用註冊監聽
即新建一個類:KafkaListener.java
@Configuration public class KafkaListener { private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class); @Autowired KafkaTemplate kafkaTemplate; //設定監聽 @PostConstruct private void listener() { kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() { @Override public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) { logger.info("ok,message={}", producerRecord.value()); } public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) { logger.error("error!message={}", producerRecord.value()); }); } }
檢視控制檯,等待一段時間後,非同步傳送失敗的訊息會被回撥給註冊過的listener
如果是正常傳送非同步訊息,則會獲得該訊息。可以看到,在內部類 KafkaListener$1 中,即註冊的Listener的訊息。
消費者使用:KafkaConsumer.java
@Component public class KafkaConsumer { private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class); //不指定group,預設取yml裡設定的 @KafkaListener(topics = {"test"}) public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("message:{}", msg); } } }
1)序列化詳解
org.apache.kafka.common.serialization.StringSerializer
)org.apache.kafka.common.serialization.Serializer
)2)自定義序列化
自己實現,實現對應的介面即可,有以下方法:
public interface Serializer<T> extends Closeable { default void configure(Map<String, ?> configs, Boolean isKey) { } //理論上,只實現這個即可正常執行 byte[] serialize(String var1, T var2); //預設調上面的方法 default byte[] serialize(String topic, Headers headers, T data) { return this.serialize(topic, data); } default void close() { } }
我們來自己實現一個序列化器:MySerializer.java
public class MySerializer implements Serializer { @Override public byte[] serialize(String s, Object o) { String json = JSON.toJSONString(o); return json.getBytes(); } }
3)解碼MyDeserializer.java
,實現方式與編碼器幾乎一樣.
public class MyDeserializer implements Deserializer { private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class); @Override public Object deserialize(String s, byte[] bytes) { try { String json = new String(bytes,"utf-8"); return JSON.parse(json); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } }
4)在yaml中設定自己的編碼器、解碼器
再次收發,訊息正常
分割區策略決定了訊息根據key投放到哪個分割區,也是順序消費保障的基石。
1)驗證預設分割區規則
傳送者程式碼參考:PartitionProducer.java
//測試分割區傳送 @RestController public class PartitionProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; // 指定分割區傳送 // 不管你key是什麼,到同一個分割區 @GetMapping("/kafka/partitionSend/{key}") public void setPartition(@PathVariable("key") String key) { kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0號分割區"); } // 指定key傳送,不指定分割區 // 根據key做hash,相同的key到同一個分割區 @GetMapping("/kafka/keysend/{key}") public void setKey(@PathVariable("key") String key) { kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分割區"); }
消費者程式碼使用:PartitionConsumer.java
@Component public class PartitionConsumer { private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class); //分割區消費 @KafkaListener(topics = {"test"},topicPattern = "0") public void onMessage(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("partition=0,message:[{}]", msg); } } @KafkaListener(topics = {"test"},topicPattern = "1") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("partition=1,message:[{}]", msg); } } }
通過swagger存取setKey(也就是隻給了key的方法):
可以看到key相同的被hash到了同一個分割區
再存取setPartition來設定分割區號0來傳送:
可以看到無論key是什麼,都是分割區0來消費
2)自定義分割區
參考程式碼:MyPartitioner.java , MyPartitionTemplate.java。
傳送使用:MyPartitionProducer.java。
public class MyPartitioner implements Partitioner { @Override public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { // 定義自己的分割區策略 // 如果key以0開頭,發到0號分割區 // 其他都扔到1號分割區 String keyStr = key+""; if (keyStr.startsWith("0")){ return 0; }else { return 1; } } public void close() { public void configure(Map<String, ?> map) { }
@Configuration public class MyPartitionTemplate { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; KafkaTemplate kafkaTemplate; @PostConstruct public void setKafkaTemplate() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); //注意分割區器在這裡!!! props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class); this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props)); } public KafkaTemplate getKafkaTemplate(){ return kafkaTemplate; }
//測試自定義分割區傳送 @RestController public class MyPartitionProducer { @Autowired MyPartitionTemplate template; // 使用0開頭和其他任意字母開頭的key傳送訊息 // 看控制檯的輸出,在哪個分割區裡? @GetMapping("/kafka/myPartitionSend/{key}") public void setPartition(@PathVariable("key") String key) { template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定義分割區策略"); } }
使用swagger,傳送0開頭和非0開頭兩種key
傳送者使用:KafkaProducer.java
@RestController public class KafkaProducer { @Resource private KafkaTemplate<String, Object> kafkaTemplate; @GetMapping("/kafka/test/{msg}") public void sendMessage(@PathVariable("msg") String msg) { Message message = new Message(); message.setMessage(msg); kafkaTemplate.send("test", JSON.toJSONString(message)); } }
1)程式碼參考:GroupConsumer.java,Listener拷貝3份,分別賦予兩組group,驗證分組消費:
//測試組消費 @Component public class GroupConsumer { private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class); //組1,消費者1 @KafkaListener(topics = {"test"},groupId = "group1") public void onMessage1(ConsumerRecord<?, ?> consumerRecord) { Optional<?> optional = Optional.ofNullable(consumerRecord.value()); if (optional.isPresent()) { Object msg = optional.get(); logger.info("group:group1-1 , message:{}", msg); } } //組1,消費者2 public void onMessage2(ConsumerRecord<?, ?> consumerRecord) { logger.info("group:group1-2 , message:{}", msg); //組2,只有一個消費者 @KafkaListener(topics = {"test"},groupId = "group2") public void onMessage3(ConsumerRecord<?, ?> consumerRecord) { logger.info("group:group2 , message:{}", msg); }
2)啟動
3)通過swagger傳送2條訊息
4)消費端閒置
注意分割區數與消費者數的搭配,如果 ( 消費者數 > 分割區數量 ),將會出現消費者閒置(因為一個分割區只能分配給一個消費者),浪費資源!
驗證方式:
停掉專案,刪掉test主題,重新建一個 ,這次只給它分配一個分割區。
重新傳送兩條訊息,試一試
1)自動提交
前面的案例中,我們設定了以下兩個選項,則kafka會按延時設定自動提交
enable-auto-commit: true # 是否自動提交offset auto-commit-interval: 100 # 提交offset延時(接收到訊息後多久提交offset,預設單位為ms)
2)手動提交
有些時候,我們需要手動控制偏移量的提交時機,比如確保訊息嚴格消費後再提交,以防止丟失或重複。
下面我們自己定義設定,覆蓋上面的引數
程式碼參考:MyOffsetConfig.java
@Configuration public class MyOffsetConfig { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); // 注意這裡!!!設定手動提交 configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps)); // ack模式: // AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有以下幾種: // // RECORD // 每處理一條commit一次 // BATCH(預設) // 每次poll的時候批次提交一次,頻率取決於每次poll的呼叫頻率 // TIME // 每次間隔ackTime的時間去commit(跟auto commit interval有什麼區別呢?) // COUNT // 累積達到ackCount次的ack去commit // COUNT_TIME // ackTime或ackCount哪個條件先滿足,就commit // MANUAL // listener負責ack,但是背後也是批次上去 // MANUAL_IMMEDIATE // listner負責ack,每呼叫一次,就立即commit factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE); return factory; } }
然後通過在消費端的Consumer來提交偏移量
MyOffsetConsumer:
@Component public class MyOffsetConsumer { private final Logger logger = LoggerFactory.getLogger(this.getClass()); @KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory") public void manualCommit(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) { logger.info("手動提交偏移量 , partition={}, msg={}", partition, message); // 同步提交 consumer.commitSync(); //非同步提交 //consumer.commitAsync(); // ack提交也可以,會按設定的ack策略走(參考MyOffsetConfig.java裡的ack模式) // ack.acknowledge(); } @KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory") public void noCommit(@Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic, Consumer consumer, Acknowledgment ack) { logger.info("忘記提交偏移量, partition={}, msg={}", partition, message); // 不做commit! /** * 現實狀況: * commitSync和commitAsync組合使用 * <p> * 手工提交非同步 consumer.commitAsync(); * 手工同步提交 consumer.commitSync() * commitSync()方法提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤之前, * commitSync()會一直重試,但是commitAsync()不會。 * 一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題 * 因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。 * 但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。否則就會造成重複消費 * 因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。 */ // @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory") public void manualOffset(@Payload String message, try { logger.info("同步非同步搭配 , partition={}, msg={}", partition, message); //先非同步提交 consumer.commitAsync(); //繼續做別的事 } catch (Exception e) { System.out.println("commit failed"); } finally { try { consumer.commitSync(); } finally { consumer.close(); } } * 甚至可以手動提交,指定任意位置的偏移量 * 不推薦日常使用!!! // @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory") public void offset(ConsumerRecord record, Consumer consumer) { logger.info("手動指定任意偏移量, partition={}, msg={}", record.partition(), record); Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>(); currentOffset.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1)); consumer.commitSync(currentOffset); }
3)重複消費問題
如果手動提交模式被開啟,一定不要忘記提交偏移量。否則會造成重複消費!
用km將test主題刪除,新建一個test空主題。方便觀察訊息偏移 註釋掉其他Consumer的Component註解,只保留當前MyOffsetConsumer.java 啟動專案,使用swagger的KafkaProducer傳送連續幾條訊息 留心控制檯,都能消費,沒問題:
但是!重啟專案:
無論重啟多少次,不提交偏移量的消費組,會重複消費一遍!!!
再通過命令列查詢偏移量
4)經驗與總結
commitSync()方法,即同步提交,會提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,commitSync()會一直重試,但是commitAsync()不會。
這就造成一個陷阱:
如果非同步提交,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。只要成功一次,偏移量就會提交上去。
但是!如果這是發生在關閉消費者時的最後一次提交,就要確保能夠提交成功,如果還沒提交完就停掉了程序。就會造成重複消費!
因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。
詳細程式碼參考:MyOffsetConsumer.manualOffset()
到此這篇關於Springboot整合kafka的文章就介紹到這了,更多相關Springboot整合kafka內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45