首頁 > 軟體

Spark中的資料讀取儲存和累加器範例詳解

2022-11-03 14:02:12

資料讀取與儲存

Text檔案

對於 Text檔案的讀取和儲存 ,其語法和實現是最簡單的,因此我只是簡單敘述一下這部分相關知識點,大家可以結合demo具體分析記憶。

1)基本語法

(1)資料讀取:textFile(String)

(2)資料儲存:saveAsTextFile(String)

2)實現程式碼demo如下:

object Operate_Text {
    def main(args: Array[String]): Unit = {
        //1.建立SparkConf並設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.建立SparkContext,該物件是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 讀取輸入檔案
        val inputRDD: RDD[String] = sc.textFile("input/demo.txt")
        //3.2 儲存資料
        inputRDD.saveAsTextFile("textFile")
        //4.關閉連線
        sc.stop()
    }
}

Sequence檔案

SequenceFile檔案 是Hadoop中用來儲存二進位制形式的 key-value對 的一種平面檔案(Flat File)。在SparkContext中,可以通過呼叫 sequenceFile[ keyClass,valueClass ] (path) 來呼叫。

1)基本語法

  • (1)資料讀取:sequenceFile[ keyClass, valueClass ] (path)
  • (2)資料儲存:saveAsSequenceFile(String)

2)實現程式碼demo如下:

object Operate_Sequence {
    def main(args: Array[String]): Unit = {
        //1.建立SparkConf並設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.建立SparkContext,該物件是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 建立rdd
        val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9)))
        //3.2 儲存資料為SequenceFile
        dataRDD.saveAsSequenceFile("seqFile")
        //3.3 讀取SequenceFile檔案
        sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println)
        //4.關閉連線
        sc.stop()
    }
}

Object物件檔案

物件檔案是將物件序列化後儲存的檔案,採用Hadoop的序列化機制。可以通過 objectFile[ k , v ] (path) 函數接收一個路徑,讀取物件檔案,返回對應的RDD,也可以通過呼叫 saveAsObjectFile() 實現對物件檔案的輸出。因為要序列化所以要指定型別。

1)基本語法

  • (1)資料讀取:objectFile[ k , v ] (path)
  • (2)資料儲存:saveAsObjectFile(String)

2)實現程式碼demo如下:

object Operate_Object {
    def main(args: Array[String]): Unit = {
        //1.建立SparkConf並設定App名稱
        val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]")
        //2.建立SparkContext,該物件是提交Spark App的入口
        val sc: SparkContext = new SparkContext(conf)
        //3.1 建立RDD
        val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2)
        //3.2 儲存資料
        dataRDD.saveAsObjectFile("objFile")
        //3.3 讀取資料
        sc.objectFile[Int]("objFile").collect().foreach(println)
        //4.關閉連線
        sc.stop()
    }
}

累加器

累加器概念

累加器,是一種變數---分散式共用只寫變數。僅支援“add”,支援並行,但Executor和Executor之間不能讀資料,可實現所有分片處理時更新共用變數的功能。

累加器用來把Executor端變數資訊聚合到Driver端。在Driver中定義的一個變數,在Executor端的每個task都會得到這個變數的一份新的副本,每個task更新這些副本的值後,傳回Driver端進行合併計算。

系統累加器

1)累加器定義(SparkContext.accumulator(initialValue)方法)

val sum: LongAccumulator = sc.longAccumulator("sum")

2)累加器新增資料(累加器.add方法)

sum.add(count)

3)累加器獲取資料(累加器.value)

sum.value

注意:Executor端的任務不能讀取累加器的值(例如:在Executor端呼叫sum.value,獲取的值不是累加器最終的值)。因此我們說,累加器是一個分散式共用只寫變數。

4)累加器要放在行動運算元中

因為轉換運算元執行的次數取決於job的數量,如果一個 spark應用 有多個行動運算元,那麼轉換運算元中的累加器可能會發生不止一次更新,導致結果錯誤。所以,如果想要一個無論在失敗還是重複計算時都絕對可靠的累加器,必須把它放在foreach()這樣的行動運算元中。

5) 程式碼實現:

object accumulator_system {
package com.atguigu.cache
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}
object accumulator_system {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]")
    val sc = new SparkContext(conf)
    val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4)))
    //需求:統計a出現的所有次數 ("a",10)
    //普通運算元實現 reduceByKey 程式碼會走shuffle 效率低
    val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _)
    //累加器實現
    //1 宣告累加器
    val accSum: LongAccumulator = sc.longAccumulator("sum")
    dataRDD.foreach{
      case (a,count) => {
        //2 使用累加器累加  累加器.add()
        accSum.add(count)
        // 4 不在executor端獲取累加器的值,因為得到的值不準確,所以累加器叫分散式共用只寫變數
        //println("sum = " + accSum.value)
      }
    }
    //3 獲取累加器的值 累加器.value
    println(("a",accSum.value))
    sc.stop()
  }
}

以上就是Spark中的資料讀取儲存和累加器範例詳解的詳細內容,更多關於Spark資料讀取儲存累加器的資料請關注it145.com其它相關文章!


IT145.com E-mail:sddin#qq.com