首頁 > 軟體

雲伺服器(Linux)安裝部署Kafka的詳細過程

2022-11-15 14:02:17

雲伺服器(Linux)安裝部署Kafka

前期準備

kafka的安裝需要依賴於jdk,需要在伺服器上提前安裝好該環境,這裡使用用jdk1.8。

下載安裝包

官網地址:

較新的版本已自帶Zookeeper,無需額外下載。這裡使用3.2.0做演示。

注意要下載Binary downloads標籤下的tgz包,Source download標籤下的包為原始碼。無法直接執行,需要編譯。

上載安裝包到雲伺服器

使用ssh連線工具將kafka_2.12-3.2.0.tgz這個包上傳到雲伺服器上的一個目錄。

開啟命令列,進入到放有壓縮包的目錄,執行

tar -zxvf kafka_2.12-3.2.0.tgz

設定kafka

然後使用cd命令進入到/kafka_2.12-3.2.0/config/下,使用

vi server.properties

編輯組態檔。

刪除listeners和advertised前方的#號,改成如下設定:

listeners=PLAINTEXT://雲伺服器內網ip:9092(本地存取用本地ip)
# 如果要提供外網存取則必須設定此項
advertised.listeners=PLAINTEXT://雲伺服器公網ip:9092(若要遠端存取需設定此項為雲伺服器的公網ip)
# zookeeper連線地址,叢集設定格式為ip:port,ip:port,ip:port
zookeeper.connect=雲伺服器公網ip:2181

開放雲伺服器埠

在雲伺服器控制檯內進入安全組頁面,新增兩條新的入站規則,tcp/9092和tcp/2181

開放linux防火牆埠

先檢視使用的防火牆型別iptables/firewalld

iptables操作命令

1.開啟/關閉/重啟防火牆

開啟防火牆(重啟後永久生效):chkconfig iptables on

關閉防火牆(重啟後永久生效):chkconfig iptables off

開啟防火牆(即時生效,重啟後失效):service iptables start

關閉防火牆(即時生效,重啟後失效):service iptables stop

重啟防火牆:service iptables restartd

2.檢視開啟的埠

/etc/init.d/iptables status
3.開啟埠

iptables -A INPUT -p tcp --dport 8080 -j ACCEPT
4.儲存並重啟防火牆
/etc/rc.d/init.d/iptables save
/etc/init.d/iptables restart

Centos7預設安裝了firewalld,如果沒有安裝的話,可以使用 yum install firewalld firewalld-config進行安裝。

操作指令如下:

1.啟動防火牆

systemctl start firewalld
2.禁用防火牆

systemctl stop firewalld
3.設定開機啟動

systemctl enable firewalld
4.停止並禁用開機啟動

sytemctl disable firewalld
5.重啟防火牆

firewall-cmd --reload

6.檢視狀態

systemctl status firewalld或者 firewall-cmd --state
7.在指定區域開啟埠(記得重啟防火牆)

firewall-cmd --zone=public --add-port=80/tcp(永久生效再加上 --permanent)

開啟tcp/9092和tcp/2181這兩個埠後,重啟防火牆,並檢視開放的埠確實生效。

啟動kafka服務

cd命令進入kafka_2.12-3.2.0目錄下,執行

bin/zookeeper-server-start.sh config/zookeeper.properties

啟動zookeeper,不加-daemon方便排除啟動錯誤,新建一個shell視窗,進入該目錄再執行

bin/kafka-server-start.sh config/server.properties

啟動kafka,若列印紀錄檔未報錯,若未出現error紀錄檔,說明啟動成功。

測試單機連通性

查詢kafka下所有的topic
bin/kafka-topics.sh --list --zookeeper ip:port
因為kafka使用zookeeper作為設定中心,一些topic資訊需要查詢該kafka對應的zookeeper
建立topic
bin/kafka-topics.sh --create --zookeeper ip:port --replication-factor 1 --partitions 1 --topic test
開啟生產者
bin/kafka-console-producer.sh --broker-list cos100:9092 --topic test
開啟消費者
bin/kafka-console-consumer.sh --bootstrap-server cos100:9092 --topic test

Springboot連線kafak

在pom.xml檔案中引入kafka依賴

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.9.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.2.0</version>
        </dependency>

在application.yml組態檔中設定kafka

server:
  port: 8080

spring:
  kafka:
    bootstrap-servers: 雲伺服器外網ip地址:9092
    producer: # 生產者
      retries: 3 # 設定大於0的值,則使用者端會將傳送失敗的記錄重新傳送
      batch-size: 16384
      buffer-memory: 33554432
      acks: 1
      # 指定訊息key和訊息體的編解碼方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      group-id: default-group
      enable-auto-commit: false
      auto-offset-reset: earliest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 當每一條記錄被消費者監聽器(ListenerConsumer)處理之後提交
      # RECORD
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後提交
      # BATCH
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,距離上次提交時間大於TIME時提交
      # TIME
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後,被處理record數量大於等於COUNT時提交
      # COUNT
      # TIME | COUNT 有一個條件滿足時提交
      # COUNT_TIME
      # 當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之後, 手動呼叫Acknowledgment.acknowledge()後提交
      # MANUAL
      # 手動呼叫Acknowledgment.acknowledge()後立即提交,一般使用這種
      # MANUAL_IMMEDIATE
      ack-mode: manual_immediate

生產者

@RestController
public class KafkaController {
    private final static String TOPIC_NAME = "test-topic";

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @RequestMapping("/send")
    public String send(@RequestParam("msg") String msg) {
        kafkaTemplate.send(TOPIC_NAME, "key", msg);
        return String.format("訊息 %s 傳送成功!", msg);
    }
}

消費者

@Component
public class DemoConsumer {
    /**
     * @param record record
     * @KafkaListener(groupId = "testGroup", topicPartitions = {
     * @TopicPartition(topic = "topic1", partitions = {"0", "1"}),
     * @TopicPartition(topic = "topic2", partitions = "0",
     * partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "100"))
     * },concurrency = "6")
     * //concurrency就是同組下的消費者個數,就是並行消費數,必須小於等於分割區總數
     */
    @KafkaListener(topics = "test-topic", groupId = "testGroup1")
    public void listentestGroup(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup1 message: " + value);
        System.out.println("testGroup1 record: " + record);
        //手動提交offset,一般是提交一個banch,冪等性防止重複訊息
        // === 每條消費完確認效能不好!
        ack.acknowledge();
    }

    //設定多個消費組
    @KafkaListener(topics = "test--topic", groupId = "testGroup2")
    public void listentestGroup2(ConsumerRecord<String, String> record, Acknowledgment ack) {
        String value = record.value();
        System.out.println("testGroup2 message: " + value);
        System.out.println("testGroup2 record: " + record);
        //手動提交offset
        ack.acknowledge();
    }
}

使用swagger測試傳送訊息

控制檯列印訊息

到此這篇關於雲伺服器(Linux)安裝部署Kafka的詳細過程的文章就介紹到這了,更多相關Linux安裝Kafka內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com