首頁 > 軟體

Springboot整合kafka的範例程式碼

2022-02-11 19:07:25

1. 整合kafka

1、引入依賴

 <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

2、設定yml檔案

spring:
  application:
    name: demo

  kafka:
    bootstrap-servers: 52.82.98.209:10903,52.82.98.209:10904
    producer: # producer 生產者
      retries: 0 # 重試次數
      acks: 1 # 應答級別:多少個分割區副本備份完成時向生產者傳送ack確認(可選0、1、all/-1)
      batch-size: 16384 # 批次大小
      buffer-memory: 33554432 # 生產端緩衝區大小
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
#      value-serializer: com.itheima.demo.config.MySerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: # consumer消費者
      group-id: javagroup # 預設的消費組ID
      enable-auto-commit: true # 是否自動提交offset
      auto-commit-interval: 100  # 提交offset延時(接收到訊息後多久提交offset)
      # earliest:當各分割區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,從頭開始消費
      # latest:當各分割區下有已提交的offset時,從提交的offset開始消費;無提交的offset時,消費新產生的該分割區下的資料
      # none:topic各分割區都存在已提交的offset時,從offset後開始消費;只要有一個分割區不存在已提交的offset,則丟擲異常
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#      value-deserializer: com.itheima.demo.config.MyDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、啟動專案

2. 訊息傳送

2.1 傳送型別

KafkaTemplate呼叫send時預設採用非同步傳送,如果需要同步獲取傳送結果,呼叫get方法

非同步傳送生產者:

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

同步傳送生產者:

//測試同步傳送與監聽
@RestController
public class AsyncProducer {
    private final static Logger logger = LoggerFactory.getLogger(AsyncProducer.class);
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //同步傳送
    @GetMapping("/kafka/sync/{msg}")
    public void sync(@PathVariable("msg") String msg) throws Exception {
        Message message = new Message();
        message.setMessage(msg);
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("test", JSON.toJSONString(message));
        //注意,可以設定等待時間,超出後,不再等候結果
        SendResult<String, Object> result = future.get(3,TimeUnit.SECONDS);
        logger.info("send result:{}",result.getProducerRecord().value());
    }
}

消費者:

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,預設取yml裡設定的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

那麼我們怎麼看出來同步傳送和非同步傳送的區別呢?

①首先在伺服器上,將kafka暫停服務。
②在swagger傳送訊息

調同步傳送:請求被阻斷,一直等待,超時後返回錯誤

而調非同步傳送的(預設傳送介面),請求立刻返回。

那麼,非同步傳送的訊息怎麼確認傳送情況呢?
我們使用註冊監聽
即新建一個類:KafkaListener.java

@Configuration
public class KafkaListener {
    private final static Logger logger = LoggerFactory.getLogger(KafkaListener.class);

    @Autowired
    KafkaTemplate kafkaTemplate;
    //設定監聽
    @PostConstruct
    private void listener() {
        kafkaTemplate.setProducerListener(new ProducerListener<String, Object>() {
            @Override
            public void onSuccess(ProducerRecord<String, Object> producerRecord, RecordMetadata recordMetadata) {
                logger.info("ok,message={}", producerRecord.value());
            }
            public void onError(ProducerRecord<String, Object> producerRecord, Exception exception) {
                logger.error("error!message={}", producerRecord.value());
        });
    }
}

檢視控制檯,等待一段時間後,非同步傳送失敗的訊息會被回撥給註冊過的listener

如果是正常傳送非同步訊息,則會獲得該訊息。可以看到,在內部類 KafkaListener$1 中,即註冊的Listener的訊息。

2.2 序列化

消費者使用:KafkaConsumer.java

@Component
public class KafkaConsumer {
    private final Logger logger = LoggerFactory.getLogger(KafkaConsumer.class);

    //不指定group,預設取yml裡設定的
    @KafkaListener(topics = {"test"})
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("message:{}", msg);
        }
    }
}

1)序列化詳解

  • 前面用到的是Kafka自帶的字串序列化器(org.apache.kafka.common.serialization.StringSerializer
  • 除此之外還有:ByteArray、ByteBuffer、Bytes、Double、Integer、Long 等
  • 這些序列化器都實現了介面(org.apache.kafka.common.serialization.Serializer
  • 基本上,可以滿足絕大多數場景

2)自定義序列化
自己實現,實現對應的介面即可,有以下方法:

public interface Serializer<T> extends Closeable {
	default void configure(Map<String, ?> configs, Boolean isKey) {
	}
	//理論上,只實現這個即可正常執行
	byte[] serialize(String var1, T var2);
	//預設調上面的方法
	default byte[] serialize(String topic, Headers headers, T data) {
		return this.serialize(topic, data);
	}
	default void close() {
	}
}

我們來自己實現一個序列化器:MySerializer.java

public class MySerializer implements Serializer {

    @Override
    public byte[] serialize(String s, Object o) {
        String json = JSON.toJSONString(o);
        return json.getBytes();
    }
}

3)解碼
MyDeserializer.java,實現方式與編碼器幾乎一樣.

public class MyDeserializer implements Deserializer {
    private final static Logger logger = LoggerFactory.getLogger(MyDeserializer.class);
    @Override
    public Object deserialize(String s, byte[] bytes) {
        try {
            String json = new String(bytes,"utf-8");
            return JSON.parse(json);
        } catch (UnsupportedEncodingException e) {
            e.printStackTrace();
        }
        return null;
    }
}

4)在yaml中設定自己的編碼器、解碼器

再次收發,訊息正常

2.3 分割區策略

分割區策略決定了訊息根據key投放到哪個分割區,也是順序消費保障的基石。

  • 給定了分割區號,直接將資料傳送到指定的分割區裡面去
  • 沒有給定分割區號,給定資料的key值,通過key取上hashCode進行分割區
  • 既沒有給定分割區號,也沒有給定key值,直接輪循進行分割區(預設)
  • 自定義分割區,你想怎麼做就怎麼做

1)驗證預設分割區規則
傳送者程式碼參考:PartitionProducer.java

//測試分割區傳送
@RestController
public class PartitionProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    //    指定分割區傳送
//    不管你key是什麼,到同一個分割區
    @GetMapping("/kafka/partitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        kafkaTemplate.send("test", 0, key, "key=" + key + ",msg=指定0號分割區");
    }
    //    指定key傳送,不指定分割區
//    根據key做hash,相同的key到同一個分割區
    @GetMapping("/kafka/keysend/{key}")
    public void setKey(@PathVariable("key") String key) {
        kafkaTemplate.send("test", key, "key=" + key + ",msg=不指定分割區");
}

消費者程式碼使用:PartitionConsumer.java

@Component
public class PartitionConsumer {
    private final Logger logger = LoggerFactory.getLogger(PartitionConsumer.class);

    //分割區消費
    @KafkaListener(topics = {"test"},topicPattern = "0")
    public void onMessage(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=0,message:[{}]", msg);
        }
    }
    @KafkaListener(topics = {"test"},topicPattern = "1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("partition=1,message:[{}]", msg);
        }
    }
}

通過swagger存取setKey(也就是隻給了key的方法):

可以看到key相同的被hash到了同一個分割區

再存取setPartition來設定分割區號0來傳送:

可以看到無論key是什麼,都是分割區0來消費

2)自定義分割區
參考程式碼:MyPartitioner.java , MyPartitionTemplate.java。
傳送使用:MyPartitionProducer.java。

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
//        定義自己的分割區策略
//                如果key以0開頭,發到0號分割區
//                其他都扔到1號分割區
        String keyStr = key+"";
        if (keyStr.startsWith("0")){
            return 0;
        }else {
            return 1;
        }
    }
    public void close() {
    public void configure(Map<String, ?> map) {
}
@Configuration
public class MyPartitionTemplate {
 
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    KafkaTemplate kafkaTemplate;
    @PostConstruct
    public void setKafkaTemplate() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        //注意分割區器在這裡!!!
        props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, MyPartitioner.class);
        this.kafkaTemplate = new KafkaTemplate<String, String>(new DefaultKafkaProducerFactory<>(props));
    }
    public KafkaTemplate getKafkaTemplate(){
        return kafkaTemplate;
}
//測試自定義分割區傳送
@RestController
public class MyPartitionProducer {

    @Autowired
    MyPartitionTemplate template;

//    使用0開頭和其他任意字母開頭的key傳送訊息
//    看控制檯的輸出,在哪個分割區裡?
    @GetMapping("/kafka/myPartitionSend/{key}")
    public void setPartition(@PathVariable("key") String key) {
        template.getKafkaTemplate().send("test", key,"key="+key+",msg=自定義分割區策略");
    }
}

使用swagger,傳送0開頭和非0開頭兩種key

3. 訊息消費

3.1 訊息組別

傳送者使用:KafkaProducer.java

@RestController
public class KafkaProducer {
    @Resource
    private KafkaTemplate<String, Object> kafkaTemplate;

    @GetMapping("/kafka/test/{msg}")
    public void sendMessage(@PathVariable("msg") String msg) {
        Message message = new Message();
        message.setMessage(msg);
        kafkaTemplate.send("test", JSON.toJSONString(message));
    }
}

1)程式碼參考:GroupConsumer.java,Listener拷貝3份,分別賦予兩組group,驗證分組消費:

//測試組消費
@Component
public class GroupConsumer {
    private final Logger logger = LoggerFactory.getLogger(GroupConsumer.class);

    //組1,消費者1
    @KafkaListener(topics = {"test"},groupId = "group1")
    public void onMessage1(ConsumerRecord<?, ?> consumerRecord) {
        Optional<?> optional = Optional.ofNullable(consumerRecord.value());
        if (optional.isPresent()) {
            Object msg = optional.get();
            logger.info("group:group1-1 , message:{}", msg);
        }
    }
    //組1,消費者2
    public void onMessage2(ConsumerRecord<?, ?> consumerRecord) {
            logger.info("group:group1-2 , message:{}", msg);
    //組2,只有一個消費者
    @KafkaListener(topics = {"test"},groupId = "group2")
    public void onMessage3(ConsumerRecord<?, ?> consumerRecord) {
            logger.info("group:group2 , message:{}", msg);
}

2)啟動

3)通過swagger傳送2條訊息

  • 同一group下的兩個消費者,在group1均分訊息
  • group2下只有一個消費者,得到全部訊息

4)消費端閒置
注意分割區數與消費者數的搭配,如果 ( 消費者數 > 分割區數量 ),將會出現消費者閒置(因為一個分割區只能分配給一個消費者),浪費資源!

驗證方式:
停掉專案,刪掉test主題,重新建一個 ,這次只給它分配一個分割區。
重新傳送兩條訊息,試一試

  • group2可以消費到1、2兩條訊息
  • group1下有兩個消費者,但是隻分配給了 1 , 2這個程序被閒置

3.2 位移提交

1)自動提交
前面的案例中,我們設定了以下兩個選項,則kafka會按延時設定自動提交

enable-auto-commit: true # 是否自動提交offset
auto-commit-interval: 100 # 提交offset延時(接收到訊息後多久提交offset,預設單位為ms)

2)手動提交
有些時候,我們需要手動控制偏移量的提交時機,比如確保訊息嚴格消費後再提交,以防止丟失或重複。
下面我們自己定義設定,覆蓋上面的引數
程式碼參考:MyOffsetConfig.java

@Configuration
public class MyOffsetConfig {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());
    @Value("${spring.kafka.bootstrap-servers}")
    private String bootstrapServers;
    @Bean
    public KafkaListenerContainerFactory<?> manualKafkaListenerContainerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        // 注意這裡!!!設定手動提交
        configProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(configProps));
        // ack模式:
        //          AckMode針對ENABLE_AUTO_COMMIT_CONFIG=false時生效,有以下幾種:
        //
        //          RECORD
        //          每處理一條commit一次
        //          BATCH(預設)
        //          每次poll的時候批次提交一次,頻率取決於每次poll的呼叫頻率
        //          TIME
        //          每次間隔ackTime的時間去commit(跟auto commit interval有什麼區別呢?)
        //          COUNT
        //          累積達到ackCount次的ack去commit
        //          COUNT_TIME
        //          ackTime或ackCount哪個條件先滿足,就commit
        //          MANUAL
        //          listener負責ack,但是背後也是批次上去
        //          MANUAL_IMMEDIATE
        //          listner負責ack,每呼叫一次,就立即commit
        factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
}

然後通過在消費端的Consumer來提交偏移量
MyOffsetConsumer:

@Component
public class MyOffsetConsumer {
    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @KafkaListener(topics = "test", groupId = "myoffset-group-1", containerFactory = "manualKafkaListenerContainerFactory")
    public void manualCommit(@Payload String message,
                             @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                             @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                             Consumer consumer,
                             Acknowledgment ack) {
        logger.info("手動提交偏移量 , partition={}, msg={}", partition, message);
        // 同步提交
        consumer.commitSync();
        //非同步提交
        //consumer.commitAsync();
        // ack提交也可以,會按設定的ack策略走(參考MyOffsetConfig.java裡的ack模式)
        // ack.acknowledge();
    }
    @KafkaListener(topics = "test", groupId = "myoffset-group-2", containerFactory = "manualKafkaListenerContainerFactory")
    public void noCommit(@Payload String message,
                         @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                         @Header(KafkaHeaders.RECEIVED_TOPIC) String topic,
                         Consumer consumer,
                         Acknowledgment ack) {
        logger.info("忘記提交偏移量, partition={}, msg={}", partition, message);
        // 不做commit!
    /**
     * 現實狀況:
     * commitSync和commitAsync組合使用
     * <p>
     * 手工提交非同步 consumer.commitAsync();
     * 手工同步提交 consumer.commitSync()
     * commitSync()方法提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,
     * commitSync()會一直重試,但是commitAsync()不會。
     * 一般情況下,針對偶爾出現的提交失敗,不進行重試不會有太大問題
     * 因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。
     * 但如果這是發生在關閉消費者或再均衡前的最後一次提交,就要確保能夠提交成功。否則就會造成重複消費
     * 因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。
     */
//   @KafkaListener(topics = "test", groupId = "myoffset-group-3",containerFactory = "manualKafkaListenerContainerFactory")
    public void manualOffset(@Payload String message,
        try {
            logger.info("同步非同步搭配 , partition={}, msg={}", partition, message);
            //先非同步提交
            consumer.commitAsync();
            //繼續做別的事
        } catch (Exception e) {
            System.out.println("commit failed");
        } finally {
            try {
                consumer.commitSync();
            } finally {
                consumer.close();
            }
        }
     * 甚至可以手動提交,指定任意位置的偏移量
     * 不推薦日常使用!!!
//    @KafkaListener(topics = "test", groupId = "myoffset-group-4",containerFactory = "manualKafkaListenerContainerFactory")
    public void offset(ConsumerRecord record, Consumer consumer) {
        logger.info("手動指定任意偏移量, partition={}, msg={}", record.partition(), record);
        Map<TopicPartition, OffsetAndMetadata> currentOffset = new HashMap<>();
        currentOffset.put(new TopicPartition(record.topic(), record.partition()),
                new OffsetAndMetadata(record.offset() + 1));
        consumer.commitSync(currentOffset);
}

3)重複消費問題
如果手動提交模式被開啟,一定不要忘記提交偏移量。否則會造成重複消費!

用km將test主題刪除,新建一個test空主題。方便觀察訊息偏移 註釋掉其他Consumer的Component註解,只保留當前MyOffsetConsumer.java 啟動專案,使用swagger的KafkaProducer傳送連續幾條訊息 留心控制檯,都能消費,沒問題:

但是!重啟專案:

無論重啟多少次,不提交偏移量的消費組,會重複消費一遍!!!

再通過命令列查詢偏移量

4)經驗與總結
commitSync()方法,即同步提交,會提交最後一個偏移量。在成功提交或碰到無怯恢復的錯誤之前,commitSync()會一直重試,但是commitAsync()不會。

這就造成一個陷阱:
如果非同步提交,針對偶爾出現的提交失敗,不進行重試不會有太大問題,因為如果提交失敗是因為臨時問題導致的,那麼後續的提交總會有成功的。只要成功一次,偏移量就會提交上去。

但是!如果這是發生在關閉消費者時的最後一次提交,就要確保能夠提交成功,如果還沒提交完就停掉了程序。就會造成重複消費!

因此,在消費者關閉前一般會組合使用commitAsync()和commitSync()。
詳細程式碼參考:MyOffsetConsumer.manualOffset()

到此這篇關於Springboot整合kafka的文章就介紹到這了,更多相關Springboot整合kafka內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com