<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
本教學將介紹如何在 Spring Boot 應用程式中使用 Kafka。Kafka 是一個分散式的釋出-訂閱訊息系統,它可以處理大量資料並提供高吞吐量。
在本教學中,我們將使用 Spring Boot 2.5.4 和 Kafka 2.8.0。
在 pom.xml 中新增以下依賴項:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.8.0</version> </dependency>
在 application.yml
檔案中新增以下設定:
sping: kafka: bootstrap-servers: localhost:9092 consumer: group-id: my-group auto-offset-reset: earliest producer: value-serializer: org.apache.kafka.common.serialization.StringSerializer key-serializer: org.apache.kafka.common.serialization.StringSerializer
這裡我們設定了 Kafka 的服務地址為 localhost:9092
,設定了一個消費者組 ID 為 my-group
,並設定了一個最早的偏移量來讀取訊息。在生產者方面,我們設定了訊息序列化程式為 StringSerializer
。
現在,我們將建立一個 Kafka 生產者,用於傳送訊息到 Kafka 伺服器。在這裡,我們將建立一個 RESTful 端點,用於接收 POST 請求並將訊息傳送到 Kafka。
首先,我們將建立一個 KafkaProducerConfig
類,用於設定 Kafka 生產者:
@Configuration public class KafkaProducerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Bean public Map<String, Object> producerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return props; } @Bean public ProducerFactory<String, String> producerFactory() { return new DefaultKafkaProducerFactory<>(producerConfigs()); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
在上面的程式碼中,我們使用 @Configuration
註解將 KafkaProducerConfig
類宣告為設定類。然後,我們使用 @Value
註解注入組態檔中的 bootstrap-servers
屬性。
接下來,我們建立了一個 producerConfigs
方法,用於設定 Kafka 生產者的設定。在這裡,我們設定了 BOOTSTRAP_SERVERS_CONFIG
、KEY_SERIALIZER_CLASS_CONFIG
和 VALUE_SERIALIZER_CLASS_CONFIG
三個屬性。
然後,我們建立了一個 producerFactory
方法,用於建立 Kafka 生產者工廠。在這裡,我們使用了 DefaultKafkaProducerFactory
類,並傳遞了我們的設定。
最後,我們建立了一個 kafkaTemplate
方法,用於建立 KafkaTemplate
範例。在這裡,我們使用了剛剛建立的生產者工廠作為引數,然後返回 KafkaTemplate
範例。
接下來,我們將建立一個 RESTful 端點,用於接收 POST 請求並將訊息傳送到 Kafka。在這裡,我們將使用 @RestController
註解建立一個 RESTful 控制器:
@RestController public class KafkaController { @Autowired private KafkaTemplate<String, String> kafkaTemplate; @PostMapping("/send") public void sendMessage(@RequestBody String message) { kafkaTemplate.send("my-topic", message); } }
在上面的程式碼中,我們使用 @Autowired
註解將 KafkaTemplate
範例注入到 KafkaController
類中。然後,我們建立了一個 sendMessage
方法,用於傳送訊息到 Kafka。
在這裡,我們使用 kafkaTemplate.send
方法傳送訊息到 my-topic
主題。send 方法返回一個 ListenableFuture
物件,用於非同步處理結果。
現在,我們將建立一個 Kafka 消費者,用於從 Kafka 伺服器接收訊息。在這裡,我們將建立一個消費者組,並將其設定為從 my-topic
主題讀取訊息。
首先,我們將建立一個 KafkaConsumerConfig
類,用於設定 Kafka 消費者:
@Configuration @EnableKafka public class KafkaConsumerConfig { @Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers; @Value("${spring.kafka.consumer.group-id}") private String groupId; @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } @Bean public ConsumerFactory<String, String> consumerFactory() { return new DefaultKafkaConsumerFactory<>(consumerConfigs()); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
在上面的程式碼中,我們使用 @Configuration
註解將 KafkaConsumerConfig
類宣告為設定類,並使用 @EnableKafka
註解啟用 Kafka。
然後,我們使用 @Value
註解注入組態檔中的 bootstrap-servers
和 consumer.group-id
屬性。
接下來,我們建立了一個 consumerConfigs
方法,用於設定 Kafka 消費者的設定。在這裡,我們設定了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIG
、AUTO_OFFSET_RESET_CONFIG
、KEY_DESERIALIZER_CLASS_CONFIG
和 VALUE_DESERIALIZER_CLASS_CONFIG
五個屬性。
然後,我們建立了一個 consumerFactory
方法,用於建立 Kafka 消費者工廠。在這裡,我們使用了 DefaultKafkaConsumerFactory
類,並傳遞了我們的設定。
最後,我們建立了一個 kafkaListenerContainerFactory
方法,用於建立一個 ConcurrentKafkaListenerContainerFactory
範例。在這裡,我們將消費者工廠注入到 kafkaListenerContainerFactory
範例中。
接下來,我們將建立一個 Kafka 消費者類 KafkaConsumer
,用於監聽 my-topic
主題並接收訊息:
@Service public class KafkaConsumer { @KafkaListener(topics = "my-topic", groupId = "my-group-id") public void consume(String message) { System.out.println("Received message: " + message); } }
在上面的程式碼中,我們使用 @KafkaListener
註解宣告了一個消費者方法,用於接收從 my-topic
主題中讀取的訊息。在這裡,我們將消費者組 ID 設定為 my-group-id
。
現在,我們已經完成了 Kafka 生產者和消費者的設定。我們可以使用 mvn spring-boot:run
命令啟動應用程式,並使用 curl 命令傳送 POST 請求到 http://localhost:8080/send
端點,以將訊息傳送到 Kafka。然後,我們可以在控制檯上檢視消費者接收到的訊息。
這就是使用 Spring Boot 和 Kafka 的基本設定。我們可以根據需要進行更改和擴充套件,以滿足特定的需求。
以上就是Spring Boot整合Kafka教學詳解的詳細內容,更多關於Spring Boot整合Kafka的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45