首頁 > 軟體

PySpark中RDD的資料輸出問題詳解

2023-01-16 14:01:40

RDD概念

RDD(resilient distributed dataset ,彈性分散式資料集),是 Spark 中最基礎的抽象。它表示了一個可以並行操作的、不可變得、被分割區了的元素集合。使用者不需要關心底層複雜的抽象處理,直接使用方便的運算元處理和計算就可以了。

RDD的特點

1) . 分散式 RDD是一個抽象的概念,RDD在spark driver中,通過RDD來參照資料,資料真正儲存在節點機的partition上。

2). 唯讀 在Spark中RDD一旦生成了,就不能修改。 那麼為什麼要設定為唯讀,設定為唯讀的話,因為不存在修改,並行的吞吐量就上來了。

3). 血緣關係 我們需要對RDD進行一系列的操作,因為RDD是唯讀的,我們只能不斷的生產新的RDD,這樣,新的RDD與原來的RDD就會存在一些血緣關係。

Spark會記錄這些血緣關係,在後期的容錯上會有很大的益處。

4). 快取 當一個 RDD 需要被重複使用時,或者當任務失敗重新計算的時候,這時如果將 RDD 快取起來,就可以避免重新計算,保證程式執行的效能。

一. 回顧

資料輸入:

  • sc.parallelize
  • sc.textFile

資料計算:

  • rdd.map
  • rdd.flatMap
  • rdd.reduceByKey
  • .…

二.輸出為python物件

資料輸出可用的方法是很多的,這裡簡單介紹常會用到的4個

  • collect:將RDD內容轉換為list
  • reduce:對RDD內容進行自定義聚合
  • take:取出RDD的前N個元素組成list
  • count:統計RDD元素個數

collect運算元

功能:將RDD各個分割區內的資料,統一收集到Driver中,形成一個List物件
用法:
rdd.collect()
返回值是一個list

演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect運算元,輸出RDD為list物件
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())

結果是

 單獨輸出rdd,輸出的是rdd的類名而非內容

reduce運算元

功能:對RDD資料集按照你傳入的邏輯進行聚合

語法:

程式碼

 返回值等於計算函數的返回值

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect運算元,輸出RDD為list物件
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的型別是:",type(rdd.collect()))
#reduce運算元,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)

結果是

 take運算元

功能:取RDD的前N個元素,組合成list返回給你
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect運算元,輸出RDD為list物件
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的型別是:",type(rdd.collect()))
#reduce運算元,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take運算元,取出RDD前n個元素,組成list返回
take_list=rdd.take(3)
print(take_list)

結果是

 count運算元

功能:計算RDD有多少條資料,返回值是一個數位
用法:

 演示

from pyspark import SparkContext,SparkConf
import os
os.environ["PYSPARK_PYTHON"]="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備一個RDD
rdd=sc.parallelize([1,2,3,4,5])
#collect運算元,輸出RDD為list物件
print("rdd是:",rdd)
print("rdd.collect是:",rdd.collect())
print("rdd.collect的型別是:",type(rdd.collect()))
#reduce運算元,對RDD進行兩兩聚合
num=rdd.reduce(lambda x,y:x+y)
print(num)
#take運算元,取出RDD前n個元素,組成list返回
take_list=rdd.take(3)
print(take_list)
#count運算元,統計rdd中有多少條資料,返回值為數位
num_count=rdd.count()
print(num_count)
#關閉連結
sc.stop()

結果是

小結

1.Spark的程式設計流程就是:

  • 將資料載入為RDD(資料輸入)對RDD進行計算(資料計算)
  • 將RDD轉換為Python物件(資料輸出)

 2.資料輸出的方法

  • collect:將RDD內容轉換為list
  • reduce:對RDD內容進行自定義聚合
  • take:取出RDD的前N個元素組成list
  • count:統計RDD元素個數

資料輸出可用的方法是很多的,這裡只是簡單介紹4個

三.輸出到檔案中

savaAsTextFile運算元

功能:將RDD的資料寫入文字檔案中支援本地寫出, hdfs等檔案系統.
程式碼:

 演示

 這是因為這個方法本質上依賴巨量資料的Hadoop框架,需要設定Hadoop 依賴.

設定Hadoop依賴

呼叫儲存檔案的運算元,需要設定Hadoop依賴。

  • 下載Hadoop安裝包解壓到電腦任意位置
  • 在Python程式碼中使用os模組設定: os.environ['HADOOP_HOME']='HADOOP解壓資料夾路徑′。
  • 下載winutils.exe,並放入Hadoop解壓資料夾的bin目錄內
  • 下載hadoop.dll,並放入:C:/Windows/System32資料夾內

設定完成之後,執行下面的程式碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
sc=SparkContext(conf=conf)
 
#準備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到檔案中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結果是

 輸出的資料夾中有這麼8檔案,是因為RDD被預設為分成8個分割區
SaveAsTextFile運算元輸出檔案的個數是根據RDD的分割區來決定的,有多少分割區就會輸出多少個檔案,RDD在本電腦中預設是8(該電腦CPU核心數是8核)

 開啟裝置管理器就可以檢視處理器個數,這裡是有8個邏輯CPU
或者開啟工作管理員就可以看到是4核8個邏輯CPU

 修改rdd分割區為1個

方式1, SparkConf物件設定屬性全域性並行度為1:

 方式2,建立RDD的時候設定( parallelize方法傳入numSlices引數為1)

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分割區設定為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
#準備rdd
rdd1=sc.parallelize([1,2,3,4,5])
rdd2=sc.parallelize([("asdf",3),("w3er_!2",5),("hello",3)])
rdd3=sc.parallelize([[1,2,3],[3,2,4],[4,3,5]])
#輸出到檔案中
rdd1.saveAsTextFile("D:/output1")
rdd2.saveAsTextFile("D:/output2")
rdd3.saveAsTextFile("D:/output3")

結果是

 小結

1.RDD輸出到檔案的方法

  • rdd.saveAsTextFile(路徑)
  • 輸出的結果是一個資料夾
  • 有幾個分割區就輸出多少個結果檔案

2.如何修改RDD分割區

  • SparkConf物件設定conf.set("spark.default.parallelism", "7")
  • 建立RDD的時候,sc.parallelize方法傳入numSlices引數為1

四.練習案例

需求: 

讀取檔案轉換成RDD,並完成:

  • 列印輸出:熱門搜尋時間段(小時精度)Top3
  • 列印輸出:熱門搜尋詞Top3
  • 列印輸出:統計黑馬程式設計師關鍵字在哪個時段被搜尋最多
  • 將資料轉換為JSON格式,寫出為檔案

程式碼

from pyspark import SparkConf,SparkContext
import os
os.environ['PYSPARK_PYTHON']="C:/Users/hawa/AppData/Local/Programs/Python/Python39/python.exe"
os.environ['HADOOP_HOME']='D:/heima_hadoop/hadoop-3.0.0'
 
conf=SparkConf().setMaster("local[*]").setAppName("test_spark")
#rdd分割區設定為1
conf.set("spark.default.parallelism","1")
sc=SparkContext(conf=conf)
 
rdd=sc.textFile("D:/search_log.txt")
#需求1 列印輸出:熱門搜尋時間段(小時精度)Top3
# 取出全部的時間並轉換為小時
# 轉換為(小時,1)的二元元組
# Key分組聚合Value
# 排序(降序)
# 取前3
result1=rdd.map(lambda x:x.split("t")).
    map(lambda x:x[0][:2]).
    map(lambda x:(x,1)).
    reduceByKey(lambda x,y:x+y).
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).
    take(3)#上面用的‘/'是換行的意思,當一行程式碼太長時就可以這樣用
print(result1)
#需求2 列印輸出:熱門搜尋詞Top3
# 取出全部的搜尋詞
# (詞,1)二元元組
# 分組聚合
# 排序
# Top3
result2=rdd.map(lambda x:x.split("t")).
    map(lambda x:x[2])
    .map(lambda x:(x,1)).
    reduceByKey(lambda x,y:x+y).
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).
    take(3)
print(result2)
#需求3 列印輸出:統計黑馬程式設計師關鍵字在哪個時段被搜尋最多
result3=rdd.map(lambda x:x.split("t")).
    filter((lambda x:x[2]=="黑馬程式設計師")).
    map(lambda x:(x[0][:2],1)).
    reduceByKey(lambda x,y:x+y).
    sortBy(lambda x:x[1],ascending=False,numPartitions=1).
    take(3)
print(result3)
#需求4 將資料轉換為JSON格式,寫出為檔案
rdd.map(lambda x:x.split("t")).
    map(lambda x:{"time":x[0],"id":x[1],"key":x[2],"num1":x[3],"num2":x[4],"url":x[5]})
    .saveAsTextFile("D:/out_json")

結果是

到此這篇關於PySpark中RDD的資料輸出詳解的文章就介紹到這了,更多相關PySpark RDD資料輸出內容請搜尋it145.com以前的文章或繼續瀏覽下面的相關文章希望大家以後多多支援it145.com!


IT145.com E-mail:sddin#qq.com