<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
對於 Text檔案的讀取和儲存 ,其語法和實現是最簡單的,因此我只是簡單敘述一下這部分相關知識點,大家可以結合demo具體分析記憶。
1)基本語法
(1)資料讀取:textFile(String)
(2)資料儲存:saveAsTextFile(String)
2)實現程式碼demo如下:
object Operate_Text { def main(args: Array[String]): Unit = { //1.建立SparkConf並設定App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.建立SparkContext,該物件是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 讀取輸入檔案 val inputRDD: RDD[String] = sc.textFile("input/demo.txt") //3.2 儲存資料 inputRDD.saveAsTextFile("textFile") //4.關閉連線 sc.stop() } }
SequenceFile檔案 是Hadoop中用來儲存二進位制形式的 key-value對 的一種平面檔案(Flat File)。在SparkContext中,可以通過呼叫 sequenceFile[ keyClass,valueClass ] (path) 來呼叫。
1)基本語法
2)實現程式碼demo如下:
object Operate_Sequence { def main(args: Array[String]): Unit = { //1.建立SparkConf並設定App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.建立SparkContext,該物件是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 建立rdd val dataRDD: RDD[(Int, Int)] = sc.makeRDD(Array((1,2,3),(4,5,6),(7,8,9))) //3.2 儲存資料為SequenceFile dataRDD.saveAsSequenceFile("seqFile") //3.3 讀取SequenceFile檔案 sc.sequenceFile[Int,Int]("seqFile").collect().foreach(println) //4.關閉連線 sc.stop() } }
物件檔案是將物件序列化後儲存的檔案,採用Hadoop的序列化機制。可以通過 objectFile[ k , v ] (path) 函數接收一個路徑,讀取物件檔案,返回對應的RDD,也可以通過呼叫 saveAsObjectFile() 實現對物件檔案的輸出。因為要序列化所以要指定型別。
1)基本語法
2)實現程式碼demo如下:
object Operate_Object { def main(args: Array[String]): Unit = { //1.建立SparkConf並設定App名稱 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[1]") //2.建立SparkContext,該物件是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //3.1 建立RDD val dataRDD: RDD[Int] = sc.makeRDD(Array(1,2,3,4,5,6),2) //3.2 儲存資料 dataRDD.saveAsObjectFile("objFile") //3.3 讀取資料 sc.objectFile[Int]("objFile").collect().foreach(println) //4.關閉連線 sc.stop() } }
累加器,是一種變數---分散式共用只寫變數。僅支援“add”,支援並行,但Executor和Executor之間不能讀資料,可實現所有分片處理時更新共用變數的功能。
累加器用來把Executor端變數資訊聚合到Driver端。在Driver中定義的一個變數,在Executor端的每個task都會得到這個變數的一份新的副本,每個task更新這些副本的值後,傳回Driver端進行合併計算。
1)累加器定義(SparkContext.accumulator(initialValue)方法)
val sum: LongAccumulator = sc.longAccumulator("sum")
2)累加器新增資料(累加器.add方法)
sum.add(count)
3)累加器獲取資料(累加器.value)
sum.value
注意:Executor端的任務不能讀取累加器的值(例如:在Executor端呼叫sum.value,獲取的值不是累加器最終的值)。因此我們說,累加器是一個分散式共用只寫變數。
4)累加器要放在行動運算元中
因為轉換運算元執行的次數取決於job的數量,如果一個 spark應用 有多個行動運算元,那麼轉換運算元中的累加器可能會發生不止一次更新,導致結果錯誤。所以,如果想要一個無論在失敗還是重複計算時都絕對可靠的累加器,必須把它放在foreach()這樣的行動運算元中。
5) 程式碼實現:
object accumulator_system { package com.atguigu.cache import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator import org.apache.spark.{SparkConf, SparkContext} object accumulator_system { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") val sc = new SparkContext(conf) val dataRDD: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("a", 4))) //需求:統計a出現的所有次數 ("a",10) //普通運算元實現 reduceByKey 程式碼會走shuffle 效率低 val rdd: RDD[(String, Int)] = dataRDD.reduceByKey(_ + _) //累加器實現 //1 宣告累加器 val accSum: LongAccumulator = sc.longAccumulator("sum") dataRDD.foreach{ case (a,count) => { //2 使用累加器累加 累加器.add() accSum.add(count) // 4 不在executor端獲取累加器的值,因為得到的值不準確,所以累加器叫分散式共用只寫變數 //println("sum = " + accSum.value) } } //3 獲取累加器的值 累加器.value println(("a",accSum.value)) sc.stop() } }
以上就是Spark中的資料讀取儲存和累加器範例詳解的詳細內容,更多關於Spark資料讀取儲存累加器的資料請關注it145.com其它相關文章!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45