<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
RDD(resilient distributed dataset ,彈性分散式資料集),是 Spark 中最基礎的抽象。它表示了一個可以並行操作的、不可變得、被分割區了的元素集合。使用者不需要關心底層複雜的抽象處理,直接使用方便的運算元處理和計算就可以了。
1) . 分散式 RDD是一個抽象的概念,RDD在spark driver中,通過RDD來參照資料,資料真正儲存在節點機的partition上。
2). 唯讀 在Spark中RDD一旦生成了,就不能修改。 那麼為什麼要設定為唯讀,設定為唯讀的話,因為不存在修改,並行的吞吐量就上來了。
3). 血緣關係 我們需要對RDD進行一系列的操作,因為RDD是唯讀的,我們只能不斷的生產新的RDD,這樣,新的RDD與原來的RDD就會存在一些血緣關係。
Spark會記錄這些血緣關係,在後期的容錯上會有很大的益處。
4). 快取 當一個 RDD 需要被重複使用時,或者當任務失敗重新計算的時候,這時如果將 RDD 快取起來,就可以避免重新計算,保證程式執行的效能。
資料輸入:
資料計算:
資料輸出可用的方法是很多的,這裡簡單介紹常會用到的4個
功能:將RDD各個分割區內的資料,統一收集到Driver中,形成一個List物件
用法:
rdd.collect()
返回值是一個list
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect運算元,輸出RDD為list物件 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect())
結果是
單獨輸出rdd,輸出的是rdd的類名而非內容
功能:對RDD資料集按照你傳入的邏輯進行聚合
語法:
程式碼
返回值等於計算函數的返回值
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect運算元,輸出RDD為list物件 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的型別是:",type(rdd.collect())) #reduce運算元,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num)
結果是
功能:取RDD的前N個元素,組合成list返回給你
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect運算元,輸出RDD為list物件 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的型別是:",type(rdd.collect())) #reduce運算元,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take運算元,取出RDD前n個元素,組成list返回 take_list=rdd.take(3) print(take_list)
結果是
功能:計算RDD有多少條資料,返回值是一個數位
用法:
演示
from pyspark import SparkContext,SparkConf import os os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備一個RDD rdd=sc.parallelize([1,2,3,4,5]) #collect運算元,輸出RDD為list物件 print("rdd是:",rdd) print("rdd.collect是:",rdd.collect()) print("rdd.collect的型別是:",type(rdd.collect())) #reduce運算元,對RDD進行兩兩聚合 num=rdd.reduce(lambda x,y:x+y) print(num) #take運算元,取出RDD前n個元素,組成list返回 take_list=rdd.take(3) print(take_list) #count運算元,統計rdd中有多少條資料,返回值為數位 num_count=rdd.count() print(num_count) #關閉連結 sc.stop()
結果是
1.Spark的程式設計流程就是:
2.資料輸出的方法
資料輸出可用的方法是很多的,這裡只是簡單介紹4個
功能:將RDD的資料寫入文字檔案中支援本地寫出, hdfs等檔案系統.
程式碼:
演示
這是因為這個方法本質上依賴巨量資料的Hadoop框架,需要設定Hadoop 依賴.
呼叫儲存檔案的運算元,需要設定Hadoop依賴。
設定完成之後,執行下面的程式碼
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") sc=SparkContext(conf=conf) #準備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到檔案中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結果是
輸出的資料夾中有這麼8檔案,是因為RDD被預設為分成8個分割區
SaveAsTextFile運算元輸出檔案的個數是根據RDD的分割區來決定的,有多少分割區就會輸出多少個檔案,RDD在本電腦中預設是8(該電腦CPU核心數是8核)
開啟裝置管理器就可以檢視處理器個數,這裡是有8個邏輯CPU
或者開啟工作管理員就可以看到是4核8個邏輯CPU
方式1, SparkConf物件設定屬性全域性並行度為1:
方式2,建立RDD的時候設定( parallelize方法傳入numSlices引數為1)
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分割區設定為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) #準備rdd rdd1=sc.parallelize([1,2,3,4,5]) rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)]) rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]]) #輸出到檔案中 rdd1.saveAsTextFile("D:/output1") rdd2.saveAsTextFile("D:/output2") rdd3.saveAsTextFile("D:/output3")
結果是
1.RDD輸出到檔案的方法
2.如何修改RDD分割區
讀取檔案轉換成RDD,並完成:
from pyspark import SparkConf,SparkContext import os os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe" os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0' conf=SparkConf().setMaster("local[*]").setAppName("test_spark") #rdd分割區設定為1 conf.set("spark.default.parallelism","1") sc=SparkContext(conf=conf) rdd=sc.textFile("D:/search_log.txt") #需求1 列印輸出:熱門搜尋時間段(小時精度)Top3 # 取出全部的時間並轉換為小時 # 轉換為(小時,1)的二元元組 # Key分組聚合Value # 排序(降序) # 取前3 result1=rdd.map(lambda x:x.split("t")). map(lambda x:x[0][:2]). map(lambda x:(x,1)). reduceByKey(lambda x,y:x+y). sortBy(lambda x:x[1],ascending=False,numPartitions=1). take(3)#上面用的‘/'是換行的意思,當一行程式碼太長時就可以這樣用 print(result1) #需求2 列印輸出:熱門搜尋詞Top3 # 取出全部的搜尋詞 # (詞,1)二元元組 # 分組聚合 # 排序 # Top3 result2=rdd.map(lambda x:x.split("t")). map(lambda x:x[2]) .map(lambda x:(x,1)). reduceByKey(lambda x,y:x+y). sortBy(lambda x:x[1],ascending=False,numPartitions=1). take(3) print(result2) #需求3 列印輸出:統計黑馬程式設計師關鍵字在哪個時段被搜尋最多 result3=rdd.map(lambda x:x.split("t")). filter((lambda x:x[2]=="黑馬程式設計師")). map(lambda x:(x[0][:2],1)). reduceByKey(lambda x,y:x+y). sortBy(lambda x:x[1],ascending=False,numPartitions=1). take(3) print(result3) #需求4 將資料轉換為JSON格式,寫出為檔案 rdd.map(lambda x:x.split("t")). map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]}) .saveAsTextFile("D:/out_json")
結果是
到此這篇關於PySpark中RDD的資料輸出詳解的文章就介紹到這了,更多相關PySpark RDD資料輸出內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!
相關文章
<em>Mac</em>Book项目 2009年学校开始实施<em>Mac</em>Book项目,所有师生配备一本<em>Mac</em>Book,并同步更新了校园无线网络。学校每周进行电脑技术更新,每月发送技术支持资料,极大改变了教学及学习方式。因此2011
2021-06-01 09:32:01
综合看Anker超能充系列的性价比很高,并且与不仅和iPhone12/苹果<em>Mac</em>Book很配,而且适合多设备充电需求的日常使用或差旅场景,不管是安卓还是Switch同样也能用得上它,希望这次分享能给准备购入充电器的小伙伴们有所
2021-06-01 09:31:42
除了L4WUDU与吴亦凡已经多次共事,成为了明面上的厂牌成员,吴亦凡还曾带领20XXCLUB全队参加2020年的一场音乐节,这也是20XXCLUB首次全员合照,王嗣尧Turbo、陈彦希Regi、<em>Mac</em> Ova Seas、林渝植等人全部出场。然而让
2021-06-01 09:31:34
目前应用IPFS的机构:1 谷歌<em>浏览器</em>支持IPFS分布式协议 2 万维网 (历史档案博物馆)数据库 3 火狐<em>浏览器</em>支持 IPFS分布式协议 4 EOS 等数字货币数据存储 5 美国国会图书馆,历史资料永久保存在 IPFS 6 加
2021-06-01 09:31:24
开拓者的车机是兼容苹果和<em>安卓</em>,虽然我不怎么用,但确实兼顾了我家人的很多需求:副驾的门板还配有解锁开关,有的时候老婆开车,下车的时候偶尔会忘记解锁,我在副驾驶可以自己开门:第二排设计很好,不仅配置了一个很大的
2021-06-01 09:30:48
不仅是<em>安卓</em>手机,苹果手机的降价力度也是前所未有了,iPhone12也“跳水价”了,发布价是6799元,如今已经跌至5308元,降价幅度超过1400元,最新定价确认了。iPhone12是苹果首款5G手机,同时也是全球首款5nm芯片的智能机,它
2021-06-01 09:30:45