首頁 > 科技

用Spark,Kafka和k8s構建下一代資料管道

2021-08-08 03:10:33

資料整合,通常在企業的資訊架構中扮演著重要的角色。具體而言,企業的分析流程在很大程度上會依賴於此類整合模式,以便從交易系統中,提取方便分析與載入的資料格式。

過去,在傳統的架構正規化中,由於系統之間缺乏互連,事務和分析經常出現延遲,我們只能依賴Batch以實現整合。在Batch模式中,大檔案(即資料的dump檔案)通常是由作業系統生成,並且通過驗證、清理、標準化、以及轉換等處理,進而輸出檔案以供系統分析。由於此類大檔案的讀取會佔用大量的記憶體,因此資料架構師通常會依賴一些暫存類型的資料庫,來持續儲存已完成處理的資料輸出。

近年來,隨著以Hadoop為代表的分散式計算的廣泛發展與應用,MapReduce通過在商用硬體上的水平擴展,以分佈處理的方式,解決了高記憶體的使用需求。如今,隨著計算技術的進一步發展,我們已可以在記憶體中運行MapReduce,並使之成為了處理大型資料檔案的標準。

就在Batch方式進行演變的過程中,非批(non-batch)處理方式也得到了重大進展。多年來,面向使用者的物聯網裝置已逐漸成為資料系統中的重要一環。大量資料來源於物聯網裝置的採集,而事件驅動型架構也成為了基於微服務的雲原生開發方法的流行選擇。由於資料處理頻率的成倍增加,資料流的處理能力成為了資料整合工作的主要非功能性需求。因此,曾經是大檔案資料整合問題,已演變成為了流處理需求。這就需要我們提供一個具有足夠緩衝區的資料管道,通過永續性來避免資料包的丟失。

在那些以雲服務為主體的平臺上,各種元件的水平擴展能力,相對於資料流和使用者而言,要比垂直擴展更加重要。因此,對流的水平可伸縮性以及流的使用者有明確的關注。這也是諸如Kafka之類的資料流解決方案、以及Kubernetes叢集需要向使用者(consumer)提供的。目前,Lambda架構的Speed層、以及Kappa架構的構建也都在向此方法發展。

採用Spark、Kafka和k8s構建下一代資料管道的目的,本文將討論相關架構模式,以及對應的示例程式碼,您可以跟著一步一步在自己的環境中搭建與實現。

Lambda架構

Lambda架構主要兩個層次:Batch和Stream。Batch能夠按照預定的批次轉換資料,而Stream負責近乎實時地處理資料。Batch層通常被使用的場景是:在源系統中批量傳送的資料,需要訪問整個資料集,以進行所需的資料處理,不過因為資料集太大,無法執行流式處理。相反,那些帶有小塊資料包的高速資料需要在Speed層被處理。這些資料包要麼相互獨立,要麼按照速度相近的方式形成了對應的上下文。顯然,這兩種類型的資料處理方式,都屬於計算密集型,儘管Batch層的記憶體需求要高於Speed層。與之對應的架構方案需要具備可擴展性、容錯性、效能優勢、成本效益、靈活性、以及分散式。

圖 1:Lambda架構

由上圖可知,由於Lambda需要兩個單獨的元件,來進行Batch和Speed層面的資料處理,因此其架構較為複雜。如果我們能夠用某個單一的技術元件,來同時滿足這兩個目的,則會大幅降低複雜性。而這正是Apache Spark大顯身手之處。

分散式計算的最新選擇

憑藉著包括SparkSQL和SparkStreaming在內的一系列庫,Apache Spark作為一種有效的方案,可通過記憶體計算,來實現分散式Lambda架構。其中,SparkSQL能夠支援各種Batch操作,例如:通過分散式架構載入、驗證、轉換、聚合、以及對映資料,進而減少對於單臺機器的記憶體需求。同樣,基於SparkStreaming的作業任務,可以近乎實時地處理來自Kafka等來源的資料流,並將分析結果提供給諸如:資料倉庫或資料湖等更為持久的元件。

圖 2:Batch和Speed層的上下文

Kubernetes是一種雲平臺叢集管理器,其最新版本的Spark,可以運行在由 Kubernetes管理的叢集上。可以說,基於Kubernetes的Spark是在雲端實現Lambda架構的絕佳組合。

雖然我們可以單獨地使用Kubernetes進行分散式計算,但是在這種情況下我們仍需要依賴定製的解決方案。例如,在Batch層中,Spring Batch框架可以與Kubernetes叢集結合使用,進而將工作任務分發到多個叢集節點處。類似地,Kubernetes也可以將流資料分發到多個針對Speed層,而並行運行的Pod。Pod可以通過在其中生成容器,以實現輕鬆地水平擴展,進而能夠根據資料的體量和速率去調整叢集。

Spark,針對Lambda架構的一站式解決方案

針對Batch和Speed層的非功能性需求,Apache Spark具有如下特性:

  • 可擴展性:Spark叢集可以按需進行擴、縮容。由於它由一個主節點和一組工作節點組成,因此這些工作節點會隨著工作負載的增加,而提高水平擴展的能力。

  • 容錯性:Spark框架能夠處理由於工作節點的崩潰,而導致的叢集故障。由於每個資料幀都會被邏輯分區,而每個分區的資料處理都會發生在某個節點上。那麼,在處理資料時,如果某個節點發生了故障,那麼叢集管理器會按照有向無環圖(Directed Acyclic Graph,DAG)的邏輯,分配另一個節點來執行資料幀的相同分區,進而確保絕對的零資料丟失。

  • 效率高:由於Spark支援記憶體計算,因此在執行期間,資料可以根據Hadoop的需要,被儲存在RAM中,而非磁碟上。其效率顯然要高得多。

  • 靈活的負載分配:由於Spark支援分散式計算,能夠橫跨多個節點共享任務的元件,並作為一個整合單元生成輸出。Spark可以運行在 Kubernetes 管理的叢集上,這使得它在雲環境中更加合適。

  • 成本:Spark是開源的,本身不包含任何成本。當然,如果選擇託管服務,則需要付出一定的代價。

現在讓我們深入瞭解Spark以瞭解它如何幫助Batch和蒸汽處理。Spark由兩個主要元件組成:Spark核心 API 和Spark庫。核心 API 層提供對四種語言的支援:R、Python、Scala 和 Java。在核心 API 層之上,我們有以下Spark庫,每個庫都針對不同的目的。

  • SparkSQL:處理(半)結構化資料,執行基本轉換功能並在資料集上執行 SQL 查詢SparkStreaming:能夠處理流資料;支援近實時資料處理

  • SparkMLib:用於機器學習;根據需要用於資料處理

  • SparkGraphX:用於圖形處理;這裡討論的範圍很少使用

圖 3:Spark堆棧(來源:LearningSpark,O'Reilly Media, Inc.)

Batch層的Spark

在Lambda架構轉換的Batch層,對(半)結構化資料的計算、聚合操作由SparkSQL 庫處理。讓我們進一步討論SparkSQL 架構。

圖 4:SparkSQL 架構

從上圖中可以明顯看出,SparkSQL 具有三個主要架構層,如下所述:

  • 資料來源 API:處理不同的資料格式,如 CSV、JSON、AVRO 和 PARQUET。它還有助於連線不同的資料來源,如 HDFS、HIVE、MYSQL、CASSANDRA 等。 用於載入不同格式資料的通用 API:

Dataframe.read.load(「ParquetFile | JsonFile | TextFile | CSVFile | AVROFile」)

  • 資料幀API:Spark2.0 以後,Spark資料幀被大量使用。它有助於儲存大型關係資料並公開多個轉換函數以對資料集進行切片和切塊。通過 Dataframe API 公開的此類轉換函數的示例是:

withColumn, select, withColumnRenamed, groupBy, filter, sort, orderBy etc.

  • SQL 服務:SparkSQL 服務是幫助我們創建資料框和儲存關係資料以進行進一步轉換的主要元素。這是我們使用SparkSQL 時Batch層轉換的入口點。在轉換過程中,可以使用python、R、Scala或Java中的不同API,也可以直接執行SQL來轉換資料。

下面是一些Batch的程式碼示例:

假設有兩個表:一個是 PRODUCT,另一個是 TRANSACTION。PRODUCT 表包含商店特定產品的所有資訊,Transaction 表包含針對每個產品的所有交易。我們可以通過轉換和聚合得到以下資訊。

  • 產品明智的總銷售量

  • 分部明智的總收入

通過在Spark資料幀上編寫純 SQL 或使用聚合函數可以獲得相同的結果。

Python from pyspark.sql importSparkSession from pyspark.sql.functions import * Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate() df1 =Spark.read.csv("Product.csv") df2 =Spark.read.csv("Transaction.csv") df3 = df1.filter(df1.Segment != 'Electric') df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10]) result_df1 = df3.join(df4, on= ['ProductCode'], how='inner') result_df2 = result_df1.groupBy('ProductName').sum('Quantity') result_df2.show() # Display segment wise revenue generated result_df3 = result_df1.groupBy('Segment').sum('Price') result_df3.show() Python from pyspark.sql importSparkSession from pyspark.sql.functions import * Spark=SparkSession.builder.master("local").appName("Superstore").getOrCreate() df1 =Spark.read.csv("Product.csv") df2 =Spark.read.csv("Transaction.csv") df3 = df1.filter(df1.Segment != 'Electric') df4 = df2.withColumn("OrderDate",df2.OrderDate[7:10]) result_df1 = df3.join(df4, on= ['ProductCode'], how='inner') result_df1.createOrReplaceTempView("SuperStore") # Display product wise quantity sold result_df2 =Spark.sql("select ProductName , Sum(Quantity) from Superstore group by ProductName") result_df2.show() # Display segment wise revenue earned result_df3 =Spark.sql("select Segment , Sum(Price) from Superstore group by Segment") result_df2.show()

在這兩種情況下,第一個資料是從兩個不同的來源載入的,並且產品資料針對所有非電氣產品進行過濾。交易資料根據訂單日期的某種格式進行更改。然後,將兩個資料幀連線起來,並生成該超市中細分收入和產品銷售數量的結果。

當然,這是載入、驗證、轉換和聚合的簡單示例。使用SparkSQL 可以進行更復雜的操作。要了解有關SparkSQL 服務的更多資訊,請參閱此處的文件。

Sparkfor Speed 層

SparkStreaming 是一個庫,用於核心Spark框架之上。它確保實時資料流處理的可擴展性、高吞吐量和容錯性。

圖 5:SparkStreaming 架構

如上圖所示,Spark將輸入資料流轉換為批量輸入資料。這種離散Batch有兩種實現方式:a) Dstreams 或離散化流和 b) 結構化流。前者非常受歡迎,直到後者作為更高階的版本出現。但是,Dstream 還沒有完全過時,為了完整起見,將其保留在本文中。

· Discretized Streams:這提供了對火花流庫的抽象。它是 RDD 的集合,代表一個連續的資料流。它將資料離散成小批量並運行小作業來處理這些小批量。任務根據資料的位置分配給工作節點。因此,通過 Dstream 的這個概念,Spark可以並行讀取資料,執行小批量處理流並確保流處理的有效節點分配。

· 結構化流:這是使用Spark引擎的最先進和現代的流處理方法。它與SparkDataframe API(在上面的Batch部分中討論)很好地整合在一起,用於對流資料的各種操作。結構化流可以增量和連續地處理資料。基於特定視窗和水印的近實時聚合也是可能的。

Spark結構化流可以處理不同的流處理用例,如下面的示例所示:

簡單的結構化流媒體

簡單的結構化流只會轉換和載入來自流的資料,並且不包括特定時間範圍內的任何聚合。例如,系統從 Apache Kafka 獲取資料,並通過Spark流和SparkSQL 近乎實時地對其進行轉換(請參閱下面的程式碼片段)。

Python from pyspark.sql importSparkSession from pyspark.streaming import StreamingContext import pyspark.sql.functions as sf spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers,"localhost:9092")                                              .option("subscribe", "test_topic").load() df1 = df.selectExpr("CAST(value AS STRING)") df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age") df2.show()

SparkSession 物件的ReadStream函數用於連線特定的 Kafka 主題。正如上面選項中的程式碼片段一樣,我們需要提供 Kafka 叢集代理的 IP 和 Kafka 主題名稱。此程式碼的輸出是一個表,有兩列:Dept 和 Age。

結構化流媒體聚合

可以通過 Structured Streaming 對流資料進行聚合,它能夠在新事件到達的基礎上計算滾動聚合結果。這是對整個資料流的運行聚合。請參考下面的程式碼片段,它在整個資料流上推匯出部門明智的平均年齡。

Python from pyspark.sql importSparkSession from pyspark.streaming import StreamingContext import pyspark.sql.functions as sf spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")                                      .option("subscribe", "test_topic").load() df1 = df.selectExpr("CAST(value AS STRING)") df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age") df3 = df2.groupBy("Dept").avg("Age") df3.show()

視窗聚合

有時我們需要在某個時間視窗內進行聚合,而不是運行聚合。SparkStructured Streaming 也提供了這樣的功能。假設我們要計算過去 5 分鐘內的事件數。這個帶聚合的視窗函數將幫助我們。

Python from pyspark.sql importSparkSession from pyspark.streaming import StreamingContext import pyspark.sql.functions as sf import datetime import time spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")                                      .option("subscribe", "test_topic").load() df1 = df.selectExpr("CAST(value AS STRING)") df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age") df3 = df2.withColumn("Age", df2.Age.cast('int')) df4 = df3.withColumn("eventTime",sf.current_timestamp()) df_final = df4.groupBy(sf.window("eventTime", "5 minute")).count() df_final.show()

重疊視窗上的聚合

在上面的例子中,每個視窗都是一個完成聚合的組。還提供了通過提及視窗長度和滑動間隔來定義重疊視窗的規定。它在視窗聚合中的後期資料處理中非常有用。下面的程式碼基於 5 分鐘視窗計算事件數,滑動間隔為 10 分鐘。

Python from pyspark.sql importSparkSession from pyspark.streaming import StreamingContext import pyspark.sql.functions as sf import datetime import time spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")                                      .option("subscribe", "test_topic").load() df1 = df.selectExpr("CAST(value AS STRING)") df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age") df3 = df2.withColumn("Age", df2.Age.cast('int')) df4 = df3.withColumn("eventTime",sf.current_timestamp()) df_final = df4.groupBy("Dept",sf.window("eventTime","10 minutes", "5 minute")).count() df_final.show()

帶水印和重疊視窗的聚合

資料遲到會在近實時系統的聚合中產生問題。我們可以使用重疊視窗來解決這個錯誤。但問題是:系統等待遲到的資料需要多長時間?這可以通過水印解決。通過這種方法,我們在重疊視窗之上定義了一個特定的時間段。之後,系統丟棄該事件。

Python from pyspark.sql importSparkSession from pyspark.streaming import StreamingContext import pyspark.sql.functions as sf import datetime import time spark=SparkSession.builder.master('local').appName('StructuredStreamingApp').getOrCreate() df =Spark.readStream.format("kafka").option("kafka.bootstrap.servers", "localhost:9092")                                      .option("subscribe", "test_topic").load() df1 = df.selectExpr("CAST(value AS STRING)") df2 = df1.selectExpr("split(value, ',')[0] as Dept","split(value, ',')[1] as Age") df3 = df2.withColumn("Age", df2.Age.cast('int')) df4 = df3.withColumn("eventTime",sf.current_timestamp()) df_final = df4.withWatermark("eventTime","10 Minutes").groupBy("Dept",sf.window("eventTime","10 minutes", "5 minute")).count() df_final.show()

上面的程式碼表示對於延遲事件,10 分鐘後,舊視窗結果將不會更新。

Kafka + k8s - Speed層的另一種解決方案

託管在 Kubernetes 叢集上的 Pod 形成了 Kafka 流的消費者組,是另一種近乎實時資料處理的方法。通過使用這種組合,我們可以輕鬆獲得分散式計算的優勢。

圖 6:通過 Kafka + Kubernetes 實現的Speed層示例

在上面例子中的事件驅動系統中,資料正在從 Kafka 主題載入到基於 Python 的處理單元中。如果 Kafka 叢集中的分區數量與 Pod 的複製因子匹配,則 Pod 一起組成一個消費者組,訊息被無縫消費。

這是構建分散式資料處理系統的經典示例,僅使用Kafka+k8s組合即可確保並行處理。

使用 Python 創建 Kafka 消費者的兩個非常流行的庫是:

  • Python_Kafka 庫

  • Confluent_Kafka 庫

Python_Kafka Python from kafka import KafkaConsumer consumer = KafkaConsumer(TopicName,                          bootstrap_servers= <broker-list>,                          group_id=<GroupName>,                          enable_auto_commit=True,                          auto_offset_reset='earliest') consumer.poll() Confluent_Kafka  Python from confluent_kafka import Consumer consumer = Consumer({'bootstrap.servers': <broker-list>,                      'group.id': <GroupName>,                      'enable.auto.commit': True,                      'auto.offset.reset': 'earliest'                      }) consumer.subscribe([TopicName])

K8.yml 檔案的示例結構如下:

YAML metadata:   name: <app name>   namespace: <deployment namespace>   labels:     app: <app name> spec:   replicas: <replication-factor>   spec:       containers:       - name: <container name>

如果按照上述方式開發基本元件,系統將獲得分散式計算的幫助,而無需進行記憶體計算。一切都取決於系統的體積和所需速度。對於低/中等資料量,可以通過實現這種基於 python-k8 的架構來確保良好的速度。

這兩種方法都可以託管在具有各種服務的雲中。例如,我們在 AWS 中有 EMR 和 Glue,可以在 GCP 中通過 Dataproc 創建Spark叢集,或者我們可以在 Azure 中使用 Databricks。另一方面,kafka-python-k8的方式可以很容易地在雲端實現,這保證了更好的可管理性。例如在 AWS 中,我們可以將 MSK 或 Kinesis 和 EKS 的組合用於這種方法。在下一個版本中,我們將討論所有云供應商中Batch和Speed層的實現,並根據不同的需求提供比較研究。


IT145.com E-mail:sddin#qq.com