首頁 > 軟體

Java Kafka分割區傳送及消費實戰

2022-07-14 14:04:03

前言

Kafka是現在非常熱門的分散式訊息佇列,常用於微服務間非同步通訊,業務解耦等場景。kafka的效能非常強大,但是單個微服務吞吐效能是有上限的,我們就會用到分散式微服務,多消費者多生產者進行資料處理,保證效能同時也能根據業務量進行橫向拓展,對於同一個微服務的多個範例,輸入輸出的topic是同一個,這時候我們就可以利用Kafka分割區消費來解決這個問題。

業務場景

我們開發的是一個物聯網系統,大量裝置接入到平臺實時傳送資料,有秒級資料和分鐘級別資料等等,處理流程包含接入、處理、儲存,這三個模組間就是使用kafka進行資料流轉,資料處理模組中包含多個微服務,單條資料會經歷多次處理,部分業務耗時較長,導致在高頻率接收到資料時候單體服務無法達到吞吐平衡,於是對於這些服務進行了分散式部署,多個範例進行消費處理。

業務實現

不指定分割區

我們在給kafka傳送訊息時候,如果不指定分割區,是不需要手動建立topic的,傳送時沒有topic,kafka會自動建立一個分割區為1的topic,如下:

@Service
public class ProductService {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void send(String msg, String topic) {
        kafkaTemplate.send(topic, msg);
    }
}

指定分割區

topic分割區初始化及設定

指定分割區傳送時候,如果未設定topic分割區數,指定>0的分割區,會提示分割區不存在,這時候我們就需要提前建立好topic及分割區

手動建立,服務啟動前,使用kafka tool手動建立topic 不推薦 x

自動建立,服務啟動時,使用KafkaClient建立 推薦 √

/**
 * 初始化多分割區的topic 基於springboot2
 */
@Component
public void TopicInitRunner implements ApplicationRunner {

    @Autowired
    private AdminClient adminClient;
    
    @Override
    public void run(ApplicationArguments args) throws Exception {
        // 通過組態檔讀取自定義設定的topic名及分割區數 省略...
        // Key topic V 分割區數
        Map<String, Integer> topicPartitionMap = new HashMap<>();
        for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) {
            createTopic(e.getKey(), e.getValue());
        }
        
    }

    public void createTopic(String topic, int partition) {
        NewTopic newTopic = new NewTopic(topic, partition);
        adminClient.createTopics(Lists.newArrayList(newTopic));
    }
}

/**
 * 設定類參考 基於springboot2
 * 如果只進行普通的單訊息傳送 無需新增此設定到專案中
 */
@Configuration
public class KafkaConfig {

    @Value("${spring.kafka.bootstrap-servers}")
    private String servers;

    @Bean
    public AdminClient adminClient() {
        return AdminClient.create(kafkaAdmin().getConfig());
    }

    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
        return new KafkaAdmin(props);
    }
}

生產者分割區傳送方案

上面講到如何初始化分割區topic,這時候我們的kafka環境已經準備好了,我們先使用TopicInitRunner為我們建立一個名稱為 partition-topic 分割區數為三,現在講一講如何均勻的講訊息傳送的每個分割區上,如何保證多消費者範例是負載均衡的,具體方案如下:

  • 1.因為每條訊息都是裝置上傳的,都會有裝置id,先給每個裝置生成一個自增號,這樣1000個裝置,每個裝置就會有0到999的自增號,放到快取中,每次根據訊息中的裝置id獲取到該裝置的自增號
  • 2.使用自增號對分割區數進行取模操作,程式碼實現如下:
public class ProductService {
    /**
     * data為需要傳送的資料
     */
    public void partitionSend(String topic, int partition, JSONObject data) {
         // 獲取裝置id
        String deviceId = data.getString("deviceId");
        // 獲取自增數 如果是新裝置會建立一個並放入快取中
        int inc = getDeviceInc(deviceId);
        // 如果分割區數為3 裝置自增id為1 取模結果為1 就是傳送到1分割區 這樣1000個裝置就可以保證每個分割區傳送資料量是1000 / 3
        int targetPartition = Math.floorMod(inc, partition);
        // 分割區傳送時候 需要指定一個唯一k 可以使用uuid或者百度提供的雪花演演算法獲取id 字串即可
        kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString());
    }
}

消費者

我們講到消費者使用分散式部署,一個微服務有多個範例,我們只需要按照服務監聽的topic分割區數建立對應數目的服務範例即可,這樣kafka就會自動分配對應分割區的資料到每個範例。

我們採取批次消費,進一步提高服務吞吐效能,消費及設定程式碼如下,組態檔參考springbootkafka設定即可,主要設計kafka服務設定,消費及生產設定,比較核心的是

@Component
public class DataListener {

    @Autowired
    private MongoTemplate mongoTemplate;

    /**
     * 站點報文監聽消費
     *
     * @param records
     */
    @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
    public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {        
    }
    
    /**
     * 消費者設定
     */
    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    /**
     * 批次消費設定
     */
    @Bean
    public KafkaListenerContainerFactory batchConsumerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        return factory;
    }
}

到此這篇關於Java Kafka分割區傳送及消費實戰的文章就介紹到這了,更多相關Kafka分割區傳送及消費內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com