首頁 > 軟體

Spring Boot整合Kafka教學詳解

2023-03-11 06:02:07

正文

本教學將介紹如何在 Spring Boot 應用程式中使用 Kafka。Kafka 是一個分散式的釋出-訂閱訊息系統,它可以處理大量資料並提供高吞吐量。

在本教學中,我們將使用 Spring Boot 2.5.4Kafka 2.8.0

步驟一:新增依賴項

在 pom.xml 中新增以下依賴項:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.0</version>
</dependency>

步驟二:設定 Kafka

application.yml 檔案中新增以下設定:

sping:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: my-group
      auto-offset-reset: earliest
    producer:
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      key-serializer: org.apache.kafka.common.serialization.StringSerializer

這裡我們設定了 Kafka 的服務地址為 localhost:9092,設定了一個消費者組 ID 為 my-group,並設定了一個最早的偏移量來讀取訊息。在生產者方面,我們設定了訊息序列化程式為 StringSerializer

步驟三:建立一個生產者

現在,我們將建立一個 Kafka 生產者,用於傳送訊息到 Kafka 伺服器。在這裡,我們將建立一個 RESTful 端點,用於接收 POST 請求並將訊息傳送到 Kafka。

首先,我們將建立一個 KafkaProducerConfig 類,用於設定 Kafka 生產者:

@Configuration
public class KafkaProducerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

在上面的程式碼中,我們使用 @Configuration 註解將 KafkaProducerConfig 類宣告為設定類。然後,我們使用 @Value 註解注入組態檔中的 bootstrap-servers 屬性。

接下來,我們建立了一個 producerConfigs 方法,用於設定 Kafka 生產者的設定。在這裡,我們設定了 BOOTSTRAP_SERVERS_CONFIGKEY_SERIALIZER_CLASS_CONFIGVALUE_SERIALIZER_CLASS_CONFIG 三個屬性。

然後,我們建立了一個 producerFactory 方法,用於建立 Kafka 生產者工廠。在這裡,我們使用了 DefaultKafkaProducerFactory 類,並傳遞了我們的設定。

最後,我們建立了一個 kafkaTemplate 方法,用於建立 KafkaTemplate 範例。在這裡,我們使用了剛剛建立的生產者工廠作為引數,然後返回 KafkaTemplate 範例。

接下來,我們將建立一個 RESTful 端點,用於接收 POST 請求並將訊息傳送到 Kafka。在這裡,我們將使用 @RestController 註解建立一個 RESTful 控制器:

@RestController
public class KafkaController {
    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;
    @PostMapping("/send")
    public void sendMessage(@RequestBody String message) {
        kafkaTemplate.send("my-topic", message);
    }
}

在上面的程式碼中,我們使用 @Autowired 註解將 KafkaTemplate 範例注入到 KafkaController 類中。然後,我們建立了一個 sendMessage 方法,用於傳送訊息到 Kafka。

在這裡,我們使用 kafkaTemplate.send 方法傳送訊息到 my-topic 主題。send 方法返回一個 ListenableFuture 物件,用於非同步處理結果。

步驟四:建立一個消費者

現在,我們將建立一個 Kafka 消費者,用於從 Kafka 伺服器接收訊息。在這裡,我們將建立一個消費者組,並將其設定為從 my-topic 主題讀取訊息。

首先,我們將建立一個 KafkaConsumerConfig 類,用於設定 Kafka 消費者:

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Value("${spring.kafka.consumer.group-id}")
    private String groupId;
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

在上面的程式碼中,我們使用 @Configuration 註解將 KafkaConsumerConfig 類宣告為設定類,並使用 @EnableKafka 註解啟用 Kafka。

然後,我們使用 @Value 註解注入組態檔中的 bootstrap-serversconsumer.group-id 屬性。

接下來,我們建立了一個 consumerConfigs 方法,用於設定 Kafka 消費者的設定。在這裡,我們設定了 BOOTSTRAP_SERVERS_CONFIG、GROUP_ID_CONFIGAUTO_OFFSET_RESET_CONFIGKEY_DESERIALIZER_CLASS_CONFIGVALUE_DESERIALIZER_CLASS_CONFIG 五個屬性。

然後,我們建立了一個 consumerFactory 方法,用於建立 Kafka 消費者工廠。在這裡,我們使用了 DefaultKafkaConsumerFactory 類,並傳遞了我們的設定。

最後,我們建立了一個 kafkaListenerContainerFactory 方法,用於建立一個 ConcurrentKafkaListenerContainerFactory 範例。在這裡,我們將消費者工廠注入到 kafkaListenerContainerFactory 範例中。

接下來,我們將建立一個 Kafka 消費者類 KafkaConsumer,用於監聽 my-topic 主題並接收訊息:

@Service
public class KafkaConsumer {
    @KafkaListener(topics = "my-topic", groupId = "my-group-id")
    public void consume(String message) {
        System.out.println("Received message: " + message);
    }
}

在上面的程式碼中,我們使用 @KafkaListener 註解宣告了一個消費者方法,用於接收從 my-topic 主題中讀取的訊息。在這裡,我們將消費者組 ID 設定為 my-group-id

現在,我們已經完成了 Kafka 生產者和消費者的設定。我們可以使用 mvn spring-boot:run 命令啟動應用程式,並使用 curl 命令傳送 POST 請求到 http://localhost:8080/send 端點,以將訊息傳送到 Kafka。然後,我們可以在控制檯上檢視消費者接收到的訊息。

這就是使用 Spring Boot 和 Kafka 的基本設定。我們可以根據需要進行更改和擴充套件,以滿足特定的需求。

以上就是Spring Boot整合Kafka教學詳解的詳細內容,更多關於Spring Boot整合Kafka的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com