<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
Kafka是現在非常熱門的分散式訊息佇列,常用於微服務間非同步通訊,業務解耦等場景。kafka的效能非常強大,但是單個微服務吞吐效能是有上限的,我們就會用到分散式微服務,多消費者多生產者進行資料處理,保證效能同時也能根據業務量進行橫向拓展,對於同一個微服務的多個範例,輸入輸出的topic是同一個,這時候我們就可以利用Kafka分割區消費來解決這個問題。
我們開發的是一個物聯網系統,大量裝置接入到平臺實時傳送資料,有秒級資料和分鐘級別資料等等,處理流程包含接入、處理、儲存,這三個模組間就是使用kafka進行資料流轉,資料處理模組中包含多個微服務,單條資料會經歷多次處理,部分業務耗時較長,導致在高頻率接收到資料時候單體服務無法達到吞吐平衡,於是對於這些服務進行了分散式部署,多個範例進行消費處理。
我們在給kafka傳送訊息時候,如果不指定分割區,是不需要手動建立topic的,傳送時沒有topic,kafka會自動建立一個分割區為1的topic,如下:
@Service public class ProductService { @Autowired private KafkaTemplate kafkaTemplate; public void send(String msg, String topic) { kafkaTemplate.send(topic, msg); } }
指定分割區傳送時候,如果未設定topic分割區數,指定>0的分割區,會提示分割區不存在,這時候我們就需要提前建立好topic及分割區
手動建立,服務啟動前,使用kafka tool手動建立topic 不推薦 x
自動建立,服務啟動時,使用KafkaClient建立 推薦 √
/** * 初始化多分割區的topic 基於springboot2 */ @Component public void TopicInitRunner implements ApplicationRunner { @Autowired private AdminClient adminClient; @Override public void run(ApplicationArguments args) throws Exception { // 通過組態檔讀取自定義設定的topic名及分割區數 省略... // Key topic V 分割區數 Map<String, Integer> topicPartitionMap = new HashMap<>(); for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) { createTopic(e.getKey(), e.getValue()); } } public void createTopic(String topic, int partition) { NewTopic newTopic = new NewTopic(topic, partition); adminClient.createTopics(Lists.newArrayList(newTopic)); } } /** * 設定類參考 基於springboot2 * 如果只進行普通的單訊息傳送 無需新增此設定到專案中 */ @Configuration public class KafkaConfig { @Value("${spring.kafka.bootstrap-servers}") private String servers; @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = Maps.newHashMap(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaAdmin(props); } }
上面講到如何初始化分割區topic,這時候我們的kafka環境已經準備好了,我們先使用TopicInitRunner為我們建立一個名稱為 partition-topic 分割區數為三,現在講一講如何均勻的講訊息傳送的每個分割區上,如何保證多消費者範例是負載均衡的,具體方案如下:
public class ProductService { /** * data為需要傳送的資料 */ public void partitionSend(String topic, int partition, JSONObject data) { // 獲取裝置id String deviceId = data.getString("deviceId"); // 獲取自增數 如果是新裝置會建立一個並放入快取中 int inc = getDeviceInc(deviceId); // 如果分割區數為3 裝置自增id為1 取模結果為1 就是傳送到1分割區 這樣1000個裝置就可以保證每個分割區傳送資料量是1000 / 3 int targetPartition = Math.floorMod(inc, partition); // 分割區傳送時候 需要指定一個唯一k 可以使用uuid或者百度提供的雪花演演算法獲取id 字串即可 kafkaTemplate.send(topic, partition, getUuid(), data.toJSONString()); } }
我們講到消費者使用分散式部署,一個微服務有多個範例,我們只需要按照服務監聽的topic分割區數建立對應數目的服務範例即可,這樣kafka就會自動分配對應分割區的資料到每個範例。
我們採取批次消費,進一步提高服務吞吐效能,消費及設定程式碼如下,組態檔參考springbootkafka設定即可,主要設計kafka服務設定,消費及生產設定,比較核心的是
@Component public class DataListener { @Autowired private MongoTemplate mongoTemplate; /** * 站點報文監聽消費 * * @param records */ @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory") public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) { } /** * 消費者設定 */ @Bean public Map<String, Object> consumerConfigs() { Map<String, Object> props = Maps.newHashMap(); props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords); props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return props; } /** * 批次消費設定 */ @Bean public KafkaListenerContainerFactory batchConsumerFactory() { ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs())); factory.setBatchListener(true); return factory; } }
到此這篇關於Java Kafka分割區傳送及消費實戰的文章就介紹到這了,更多相關Kafka分割區傳送及消費內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45