<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
kafka是一個訊息佇列產品,基於Topic partitions的設計,能達到非常高的訊息傳送處理效能。Spring建立了一個專案Spring-kafka,封裝了Apache 的Kafka-client,用於在Spring專案裡快速整合kafka。除了簡單的收發訊息外,Spring-kafka還提供了很多高階功能,下面我們就來一一探祕這些用法。
專案地址:https://github.com/spring-projects/spring-kafka
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.2.6.RELEASE</version> </dependency>
spring.kafka.producer.bootstrap-servers=127.0.0.1:9092
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Autowired private KafkaTemplate<Object, Object> template; @GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { this.template.send("topic_input", input); } @KafkaListener(id = "webGroup", topics = "topic_input") public void listen(String input) { logger.info("input value: {}" , input); } }
啟動應用後,在瀏覽器中輸入:http://localhost:8080/send/kl。就可以在控制檯看到有紀錄檔輸出了:input value: "kl"。基礎的使用就這麼簡單。傳送訊息時注入一個KafkaTemplate,接收訊息時新增一個@KafkaListener註解即可。
不過上面的程式碼能夠啟動成功,前提是你已經有了Kafka Server的服務環境,我們知道Kafka是由Scala + Zookeeper構建的,可以從官網下載部署包在本地部署。但是,我想告訴你,為了簡化開發環節驗證Kafka相關功能,Spring-Kafka-Test已經封裝了Kafka-test提供了註解式的一鍵開啟Kafka Server的功能,使用起來也是超級簡單。本文後面的所有測試用例的Kafka都是使用這種嵌入式服務提供的。
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka-test</artifactId> <version>2.2.6.RELEASE</version> <scope>test</scope> </dependency>
下面使用Junit測試用例,直接啟動一個Kafka Server服務,包含四個Broker節點。
@RunWith(SpringRunner.class) @SpringBootTest(classes = ApplicationTests.class) @EmbeddedKafka(count = 4,ports = {9092,9093,9094,9095}) public class ApplicationTests { @Test public void contextLoads()throws IOException { System.in.read(); } }
如上:只需要一個註解@EmbeddedKafka即可,就可以啟動一個功能完整的Kafka服務,是不是很酷。預設只寫註解不加引數的情況下,是建立一個隨機埠的Broker,在啟動的紀錄檔中會輸出具體的埠以及預設的一些設定項。不過這些我們在Kafka安裝包組態檔中的設定項,在註解引數中都可以設定,下面詳解下@EmbeddedKafka註解中的可設定引數 :
Kafka是多Broker架構的高可用服務,一個Topic對應多個partition,一個Partition可以有多個副本Replication,這些Replication副本儲存在多個Broker,用於高可用。但是,雖然存在多個分割區副本集,當前工作副本集卻只有一個,預設就是首次分配的副本集【首選副本】為Leader,負責寫入和讀取資料。當我們升級Broker或者更新Broker設定時需要重啟服務,這個時候需要將partition轉移到可用的Broker。下面涉及到三種情況
ports
:埠列表,是一個陣列。對應了count引數,有幾個Broker,就要對應幾個埠號
brokerProperties
:Broker引數設定,是一個陣列結構,支援如下方式進行Broker引數設定:
@EmbeddedKafka(brokerProperties = {"log.index.interval.bytes = 4096","num.io.threads = 8"})
okerPropertiesLocation
:Broker引數檔案設定
功能同上面的brokerProperties,只是Kafka Broker的可設定引數達182個之多,都像上面這樣設定肯定不是最優方案,所以提供了載入本地組態檔的功能,如:
@EmbeddedKafka(brokerPropertiesLocation = "classpath:application.properties")
預設情況下,如果在使用KafkaTemplate傳送訊息時,Topic不存在,會建立一個新的Topic,預設的分割區數和副本數為如下Broker引數來設定
num.partitions = 1 #預設Topic分割區數 num.replica.fetchers = 1 #預設副本數
/** * @author: kl @kailing.pub * @date: 2019/5/31 */ @Configuration public class KafkaConfig { @Bean public KafkaAdmin admin(KafkaProperties properties){ KafkaAdmin admin = new KafkaAdmin(properties.buildAdminProperties()); admin.setFatalIfBrokerNotAvailable(true); return admin; } @Bean public NewTopic topic2() { return new NewTopic("topic-kl", 1, (short) 1); } }
如果Kafka Broker支援(1.0.0或更高版本),則如果發現現有Topic的Partition 數少於設定的Partition 數,則會新增新的Partition分割區。關於KafkaAdmin有幾個常用的用法如下:
setFatalIfBrokerNotAvailable(true):預設這個值是False的,在Broker不可用時,不影響Spring 上下文的初始化。如果你覺得Broker不可用影響正常業務需要顯示的將這個值設定為True
setAutoCreate(false) : 預設值為True,也就是Kafka範例化後會自動建立已經範例化的NewTopic物件
initialize():當setAutoCreate為false時,需要我們程式顯示的呼叫admin的initialize()方法來初始化NewTopic物件
有時候我們在程式啟動時並不知道某個Topic需要多少Partition數合適,但是又不能一股腦的直接使用Broker的預設設定,這個時候就需要使用Kafka-Client自帶的AdminClient來進行處理。上面的Spring封裝的KafkaAdmin也是使用的AdminClient來處理的。如:
@Autowired private KafkaProperties properties; @Test public void testCreateToipc(){ AdminClient client = AdminClient.create(properties.buildAdminProperties()); if(client !=null){ try { Collection<NewTopic> newTopics = new ArrayList<>(1); newTopics.add(new NewTopic("topic-kl",1,(short) 1)); client.createTopics(newTopics); }catch (Throwable e){ e.printStackTrace(); }finally { client.close(); } } }
上面的這些建立Topic方式前提是你的spring boot版本到2.x以上了,因為spring-kafka2.x版本只支援spring boot2.x的版本。在1.x的版本中還沒有這些api。下面補充一種在程式中通過Kafka_2.10建立Topic的方式
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.2</version> </dependency>
@Test public void testCreateTopic()throws Exception{ ZkClient zkClient =new ZkClient("127.0.0.1:2181", 3000, 3000, ZKStringSerializer$.MODULE$) String topicName = "topic-kl"; int partitions = 1; int replication = 1; AdminUtils.createTopic(zkClient,topicName,partitions,replication,new Properties()); }
注意下ZkClient最後一個構造入參,是一個序列化反序列化的介面實現,博主測試如果不填的話,建立的Topic在ZK上的資料是有問題的,預設的Kafka實現也很簡單,就是做了字串UTF-8編碼處理。ZKStringSerializer$是Kafka中已經實現好的一個介面範例,是一個Scala的伴生物件,在Java中直接呼叫點MODULE$就可以得到一個範例
@Test public void testCreateTopic(){ String [] options= new String[]{ "--create", "--zookeeper","127.0.0.1:2181", "--replication-factor", "3", "--partitions", "3", "--topic", "topic-kl" }; TopicCommand.main(options); }
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() { @Override public void onFailure(Throwable throwable) { ...... } @Override public void onSuccess(SendResult<Object, Object> objectObjectSendResult) { .... } });
ListenableFuture<SendResult<Object,Object>> future = template.send("topic-kl","kl"); try { SendResult<Object,Object> result = future.get(); }catch (Throwable e){ e.printStackTrace(); }
預設情況下,Spring-kafka自動生成的KafkaTemplate範例,是不具有事務訊息傳送能力的。需要使用如下設定啟用事務特性。事務啟用後,所有的訊息傳送只能在發生事務的方法內執行了,不然就會拋一個沒有事務交易的異常
spring.kafka.producer.transaction-id-prefix=kafka_tx.
當傳送訊息有事務要求時,比如,當所有訊息傳送成功才算成功,如下面的例子:假設第一條消費傳送後,在發第二條訊息前出現了異常,那麼第一條已經傳送的訊息也會回滾。而且正常情況下,假設在訊息一傳送後休眠一段時間,在傳送第二條訊息,消費端也只有在事務方法執行完成後才會接收到訊息
@GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { template.executeInTransaction(t ->{ t.send("topic_input","kl"); if("error".equals(input)){ throw new RuntimeException("failed"); } t.send("topic_input","ckl"); return true; }); }
當事務特性啟用時,同樣,在方法上面加@Transactional註解也會生效
@GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) { template.send("topic_input", "kl"); if ("error".equals(input)) { throw new RuntimeException("failed"); } template.send("topic_input", "ckl"); }
Spring-Kafka的事務訊息是基於Kafka提供的事務訊息功能的。而Kafka Broker預設的設定針對的三個或以上Broker高可用服務而設定的。這邊在測試的時候為了簡單方便,使用了嵌入式服務新建了一個單Broker的Kafka服務,出現了一些問題:如
1、事務紀錄檔副本集大於Broker數量,會拋如下異常:
Number of alive brokers '1' does not meet the required replication factor '3' for the transactions state topic (configured via 'transaction.state.log.replication.factor'). This error can be ignored if the cluster is starting up and not all brokers are up yet.
預設Broker的設定transaction.state.log.replication.factor=3,單節點只能調整為1
2、副本數小於副本同步佇列數目,會拋如下異常
Number of insync replicas for partition __transaction_state-13 is [1], below required minimum [2]
預設Broker的設定transaction.state.log.min.isr=2,單節點只能調整為1
ReplyingKafkaTemplate是KafkaTemplate的一個子類,除了繼承父類別的方法,新增了一個方法sendAndReceive,實現了訊息傳送回復語意
RequestReplyFuture sendAndReceive(ProducerRecord record);
也就是我傳送一條訊息,能夠拿到消費者給我返回的結果。就像傳統的RPC互動那樣。當訊息的傳送者需要知道訊息消費者的具體的消費情況,非常適合這個api。如,一條訊息中傳送一批資料,需要知道消費者成功處理了哪些資料。下面程式碼演示了怎麼整合以及使用ReplyingKafkaTemplate
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Bean public ConcurrentMessageListenerContainer<String, String> repliesContainer(ConcurrentKafkaListenerContainerFactory<String, String> containerFactory) { ConcurrentMessageListenerContainer<String, String> repliesContainer = containerFactory.createContainer("replies"); repliesContainer.getContainerProperties().setGroupId("repliesGroup"); repliesContainer.setAutoStartup(false); return repliesContainer; } @Bean public ReplyingKafkaTemplate<String, String, String> replyingTemplate(ProducerFactory<String, String> pf, ConcurrentMessageListenerContainer<String, String> repliesContainer) { return new ReplyingKafkaTemplate(pf, repliesContainer); } @Bean public KafkaTemplate kafkaTemplate(ProducerFactory<String, String> pf) { return new KafkaTemplate(pf); } @Autowired private ReplyingKafkaTemplate template; @GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); RequestReplyFuture<String, String, String> replyFuture = template.sendAndReceive(record); ConsumerRecord<String, String> consumerRecord = replyFuture.get(); System.err.println("Return value: " + consumerRecord.value()); } @KafkaListener(id = "webGroup", topics = "topic-kl") @SendTo public String listen(String input) { logger.info("input value: {}", input); return "successful"; } }
前面在簡單整合中已經演示過了@KafkaListener接收訊息的能力,但是@KafkaListener的功能不止如此,其他的比較常見的,使用場景比較多的功能點如下:
@KafkaListener(id = "webGroup", topicPartitions = { @TopicPartition(topic = "topic1", partitions = {"0", "1"}), @TopicPartition(topic = "topic2", partitions = "0", partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) },concurrency = "6",errorHandler = "myErrorHandler") public String listen(String input) { logger.info("input value: {}", input); return "successful"; }
其他的註解引數都很好理解,errorHandler需要說明下,設定這個引數需要實現一個介面KafkaListenerErrorHandler。而且註解裡的設定,是你自定義實現範例在spring上下文中的Name。比如,上面設定為errorHandler = "myErrorHandler"。則在spring上線中應該存在這樣一個範例:
/** * @author: kl @kailing.pub * @date: 2019/5/31 */ @Service("myErrorHandler") public class MyKafkaListenerErrorHandler implements KafkaListenerErrorHandler { Logger logger =LoggerFactory.getLogger(getClass()); @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception) { logger.info(message.getPayload().toString()); return null; } @Override public Object handleError(Message<?> message, ListenerExecutionFailedException exception, Consumer<?, ?> consumer) { logger.info(message.getPayload().toString()); return null; } }
手動ACK模式,由業務邏輯控制提交偏移量。比如程式在消費時,有這種語意,特別異常情況下不確認ack,也就是不提交偏移量,那麼你只能使用手動Ack模式來做了。開啟手動首先需要關閉自動提交,然後設定下consumer的消費模式
spring.kafka.consumer.enable-auto-commit=false spring.kafka.listener.ack-mode=manual
上面的設定好後,在消費時,只需要在@KafkaListener監聽方法的入參加入Acknowledgment 即可,執行到ack.acknowledge()代表提交了偏移量
@KafkaListener(id = "webGroup", topics = "topic-kl") public String listen(String input, Acknowledgment ack) { logger.info("input value: {}", input); if ("kl".equals(input)) { ack.acknowledge(); } return "successful"; }
@KafkaListener註解的監聽器的生命週期是可以控制的,預設情況下,@KafkaListener的引數autoStartup = "true"。也就是自動啟動消費,但是也可以同過KafkaListenerEndpointRegistry來干預他的生命週期。KafkaListenerEndpointRegistry有三個動作方法分別如:start(),pause(),resume()/啟動,停止,繼續。如下程式碼詳細演示了這種功能。
/** * @author: kl @kailing.pub * @date: 2019/5/30 */ @SpringBootApplication @RestController public class Application { private final Logger logger = LoggerFactory.getLogger(Application.class); public static void main(String[] args) { SpringApplication.run(Application.class, args); } @Autowired private KafkaTemplate template; @GetMapping("/send/{input}") @Transactional(rollbackFor = RuntimeException.class) public void sendFoo(@PathVariable String input) throws Exception { ProducerRecord<String, String> record = new ProducerRecord<>("topic-kl", input); template.send(record); } @Autowired private KafkaListenerEndpointRegistry registry; @GetMapping("/stop/{listenerID}") public void stop(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).pause(); } @GetMapping("/resume/{listenerID}") public void resume(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).resume(); } @GetMapping("/start/{listenerID}") public void start(@PathVariable String listenerID){ registry.getListenerContainer(listenerID).start(); } @KafkaListener(id = "webGroup", topics = "topic-kl",autoStartup = "false") public String listen(String input) { logger.info("input value: {}", input); return "successful"; } }
在上面的程式碼中,listenerID就是@KafkaListener中的id值“webGroup”。專案啟動好後,分別執行如下url,就可以看到效果了。
先傳送一條訊息:http://localhost:8081/send/ckl。因為autoStartup = "false",所以並不會看到有訊息進入監聽器。
接著啟動監聽器:http://localhost:8081/start/webGroup。可以看到有一條訊息進來了。
暫停和繼續消費的效果使用類似方法就可以測試出來了。
前面的訊息傳送響應應用裡面已經見過@SendTo,其實除了做傳送響應語意外,@SendTo註解還可以帶一個引數,指定轉發的Topic佇列。常見的場景如,一個訊息需要做多重加工,不同的加工耗費的cup等資源不一致,那麼就可以通過跨不同Topic和部署在不同主機上的consumer來解決了。如:
@KafkaListener(id = "webGroup", topics = "topic-kl") @SendTo("topic-ckl") public String listen(String input) { logger.info("input value: {}", input); return input + "hello!"; } @KafkaListener(id = "webGroup1", topics = "topic-ckl") public void listen2(String input) { logger.info("input value: {}", input); }
除了上面談到的通過手動Ack模式來控制訊息偏移量外,其實Spring-kafka內部還封裝了可重試消費訊息的語意,也就是可以設定為當消費資料出現異常時,重試這個訊息。而且可以設定重試達到多少次後,讓訊息進入預定好的Topic。也就是死信佇列裡。下面程式碼演示了這種效果:
@Autowired private KafkaTemplate template; @Bean public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory( ConcurrentKafkaListenerContainerFactoryConfigurer configurer, ConsumerFactory<Object, Object> kafkaConsumerFactory, KafkaTemplate<Object, Object> template) { ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); configurer.configure(factory, kafkaConsumerFactory); //最大重試三次 factory.setErrorHandler(new SeekToCurrentErrorHandler(new DeadLetterPublishingRecoverer(template), 3)); return factory; } @GetMapping("/send/{input}") public void sendFoo(@PathVariable String input) { template.send("topic-kl", input); } @KafkaListener(id = "webGroup", topics = "topic-kl") public String listen(String input) { logger.info("input value: {}", input); throw new RuntimeException("dlt"); } @KafkaListener(id = "dltGroup", topics = "topic-kl.DLT") public void dltListen(String input) { logger.info("Received from DLT: " + input); }
上面應用,在topic-kl監聽到訊息會,會觸發執行時異常,然後監聽器會嘗試三次呼叫,當到達最大的重試次數後。訊息就會被丟掉重試死信佇列裡面去。死信佇列的Topic的規則是,業務Topic名字+“.DLT”。如上面業務Topic的name為“topic-kl”,那麼對應的死信佇列的Topic就是“topic-kl.DLT”
最近業務上使用了kafka用到了Spring-kafka,所以系統性的探索了下Spring-kafka的各種用法,發現了很多好玩很酷的特性,比如,一個註解開啟嵌入式的Kafka服務、像RPC呼叫一樣的傳送響應語意呼叫、事務訊息等功能。希望此博文能夠幫助那些正在使用Spring-kafka或即將使用的人少走一些彎路少踩一點坑。
以上就是深入研究spring boot整合kafka之spring-kafka底層原理的詳細內容,更多關於spring boot整合kafka底層原理的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45