<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
kafka的安裝需要依賴於jdk,需要在伺服器上提前安裝好該環境,這裡使用用jdk1.8。
官網地址:
較新的版本已自帶Zookeeper,無需額外下載。這裡使用3.2.0做演示。
注意要下載Binary downloads標籤下的tgz包,Source download標籤下的包為原始碼。無法直接執行,需要編譯。
使用ssh連線工具將kafka_2.12-3.2.0.tgz這個包上傳到雲伺服器上的一個目錄。
開啟命令列,進入到放有壓縮包的目錄,執行
tar -zxvf kafka_2.12-3.2.0.tgz
然後使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用
vi server.properties
編輯組態檔。
刪除listeners和advertised前方的#號,改成如下設定:
listeners=PLAINTEXT://雲伺服器內網ip:9092(本地存取用本地ip) # 如果要提供外網存取則必須設定此項 advertised.listeners=PLAINTEXT://雲伺服器公網ip:9092(若要遠端存取需設定此項為雲伺服器的公網ip) # zookeeper連線地址,叢集設定格式為ip:port,ip:port,ip:port zookeeper.connect=雲伺服器公網ip:2181
在雲伺服器控制檯內進入安全組頁面,新增兩條新的入站規則,tcp/9092和tcp/2181
先檢視使用的防火牆型別iptables/firewalld
iptables操作命令
1.開啟/關閉/重啟防火牆 開啟防火牆(重啟後永久生效):chkconfig iptables on 關閉防火牆(重啟後永久生效):chkconfig iptables off 開啟防火牆(即時生效,重啟後失效):service iptables start 關閉防火牆(即時生效,重啟後失效):service iptables stop 重啟防火牆:service iptables restartd 2.檢視開啟的埠 /etc/init.d/iptables status 3.開啟埠 iptables -A INPUT -p tcp --dport 8080 -j ACCEPT 4.儲存並重啟防火牆 /etc/rc.d/init.d/iptables save /etc/init.d/iptables restart
Centos7預設安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝。
操作指令如下:
1.啟動防火牆 systemctl start firewalld 2.禁用防火牆 systemctl stop firewalld 3.設定開機啟動 systemctl enable firewalld 4.停止並禁用開機啟動 sytemctl disable firewalld 5.重啟防火牆 firewall-cmd --reload 6.檢視狀態 systemctl status firewalld或者 firewall-cmd --state 7.在指定區域開啟埠(記得重啟防火牆) firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)
開啟tcp/9092和tcp/2181這兩個埠後,重啟防火牆,並檢視開放的埠確實生效。
cd命令進入kafka_2.12-3.2.0目錄下,執行
bin/zookeeper-server-start.sh config/zookeeper.properties
啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell視窗,進入該目錄再執行
bin/kafka-server-start.sh config/server.properties
啟動kafka,若列印紀錄檔未報錯,若未出現error紀錄檔,說明啟動成功。
查詢kafka下所有的topic bin/kafka-topics.sh --list --zookeeper ip:port 因為kafka使用zookeeper作為設定中心,一些topic資訊需要查詢該kafka對應的zookeeper 建立topic bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test 開啟生產者 bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test 開啟消費者 bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test
在pom.xml檔案中引入kafka依賴
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.9.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>3.2.0</version> </dependency>
在application.yml組態檔中設定kafka
server: port: 8080 spring: kafka: bootstrap-servers: 雲伺服器外網ip地址:9092 producer: # 生產者 retries: 3 # 設定大於0的值,則使用者端會將傳送失敗的記錄重新傳送 batch-size: 16384 buffer-memory: 33554432 acks: 1 # 指定訊息key和訊息體的編解碼方式 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.apache.kafka.common.serialization.StringSerializer consumer: group-id: default-group enable-auto-commit: false auto-offset-reset: earliest key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.apache.kafka.common.serialization.StringDeserializer listener: # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交 # RECORD # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後提交 # BATCH # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交 # TIME # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交 # COUNT # TIME | COUNT 有一個條件滿足時提交 # COUNT_TIME # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後, 手動呼叫Acknowledgment.acknowledge()後提交 # MANUAL # 手動呼叫Acknowledgment.acknowledge()後立即提交,一般使用這種 # MANUAL_IMMEDIATE ack-mode: manual_immediate
生產者
@RestController public class KafkaController { private final static String TOPIC_NAME = "test-topic"; @Autowired private KafkaTemplate<String, String> kafkaTemplate; @RequestMapping("/send") public String send(@RequestParam("msg") String msg) { kafkaTemplate.send(TOPIC_NAME, "key", msg); return String.format("訊息 %s 傳送成功!", msg); } }
消費者
@Component public class DemoConsumer { /** * @param record record * @KafkaListener(groupId = "testGroup", topicPartitions = { * @TopicPartition(topic = "topic1", partitions = {"0", "1"}), * @TopicPartition(topic = "topic2", partitions = "0", * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100")) * },concurrency = "6") * //concurrency就是同組下的消費者個數,就是並行消費數,必須小於等於分割區總數 */ @KafkaListener(topics = "test-topic", groupId = "testGroup1") public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println("testGroup1 message: " + value); System.out.println("testGroup1 record: " + record); //手動提交offset,一般是提交一個banch,冪等性防止重複訊息 // === 每條消費完確認效能不好! ack.acknowledge(); } //設定多個消費組 @KafkaListener(topics = "test--topic", groupId = "testGroup2") public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) { String value = record.value(); System.out.println("testGroup2 message: " + value); System.out.println("testGroup2 record: " + record); //手動提交offset ack.acknowledge(); } }
使用swagger測試傳送訊息
控制檯列印訊息
到此這篇關於雲伺服器(Linux)安裝部署Kafka的詳細過程的文章就介紹到這了,更多相關Linux安裝Kafka內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45