spark算子大全glom_2小时入门Spark之RDD编程
公眾號后臺回復關鍵字:pyspark,獲取本項目github地址。
本節將介紹RDD數據結構的常用函數。包括如下內容:
創建RDD
常用Action操作
常用Transformation操作
常用PairRDD的轉換操作
緩存操作
共享變量
分區操作
這些函數中,我最常用的是如下15個函數,需要認真掌握其用法。
map
flatMap
mapPartitions
filter
count
reduce
take
saveAsTextFile
collect
join
union
persist
repartition
reduceByKey
aggregateByKey
import?findspark
#指定spark_home為剛才的解壓路徑,指定python路徑
spark_home?=?"/Users/liangyun/ProgramFiles/spark-3.0.1-bin-hadoop3.2"
python_path?=?"/Users/liangyun/anaconda3/bin/python"
findspark.init(spark_home,python_path)
import?pyspark
from?pyspark?import?SparkContext,?SparkConf
conf?=?SparkConf().setAppName("rdd_tutorial").setMaster("local[4]")
sc?=?SparkContext(conf=conf)
print(pyspark.__version__)
3.0.1
一,創建RDD
創建RDD主要有兩種方式,一個是textFile加載本地或者集群文件系統中的數據,
第二個是用parallelize方法將Driver中的數據結構并行化成RDD。
#從本地文件系統中加載數據
file?=?"./data/hello.txt"
rdd?=?sc.textFile(file,3)
rdd.collect()
['hello?world',
'hello?spark',
'spark?love?jupyter',
'spark?love?pandas',
'spark?love?sql']
#從集群文件系統中加載數據
#file?=?"hdfs://localhost:9000/user/hadoop/data.txt"
#也可以省去hdfs://localhost:9000
#rdd?=?sc.textFile(file,3)
#parallelize將Driver中的數據結構生成RDD,第二個參數指定分區數
rdd?=?sc.parallelize(range(1,11),2)
rdd.collect()
[1,?2,?3,?4,?5,?6,?7,?8,?9,?10]
二,常用Action操作
Action操作將觸發基于RDD依賴關系的計算。
collect
rdd?=?sc.parallelize(range(10),5)
#collect操作將數據匯集到Driver,數據過大時有超內存風險
all_data?=?rdd.collect()
all_data
[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
take
#take操作將前若干個數據匯集到Driver,相比collect安全
rdd?=?sc.parallelize(range(10),5)
part_data?=?rdd.take(4)
part_data
[0,?1,?2,?3]
takeSample
#takeSample可以隨機取若干個到Driver,第一個參數設置是否放回抽樣
rdd?=?sc.parallelize(range(10),5)
sample_data?=?rdd.takeSample(False,10,0)
sample_data
[7,?8,?1,?5,?3,?4,?2,?0,?9,?6]
first
#first取第一個數據
rdd?=?sc.parallelize(range(10),5)
first_data?=?rdd.first()
print(first_data)
0
count
#count查看RDD元素數量
rdd?=?sc.parallelize(range(10),5)
data_count?=?rdd.count()
print(data_count)
10
reduce
#reduce利用二元函數對數據進行規約
rdd?=?sc.parallelize(range(10),5)
rdd.reduce(lambda?x,y:x+y)
45
foreach
#foreach對每一個元素執行某種操作,不生成新的RDD
#累加器用法詳見共享變量
rdd?=?sc.parallelize(range(10),5)
accum?=?sc.accumulator(0)
rdd.foreach(lambda?x:accum.add(x))
print(accum.value)
45
countByKey
#countByKey對Pair?RDD按key統計數量
pairRdd?=?sc.parallelize([(1,1),(1,4),(3,9),(2,16)])
pairRdd.countByKey()
defaultdict(int,?{1:?2,?3:?1,?2:?1})
saveAsTextFile
#saveAsTextFile保存rdd成text文件到本地
text_file?=?"./data/rdd.txt"
rdd?=?sc.parallelize(range(5))
rdd.saveAsTextFile(text_file)
#重新讀入會被解析文本
rdd_loaded?=?sc.textFile(file)
rdd_loaded.collect()
['2',?'3',?'4',?'1',?'0']
三,常用Transformation操作
Transformation轉換操作具有懶惰執行的特性,它只指定新的RDD和其父RDD的依賴關系,只有當Action操作觸發到該依賴的時候,它才被計算。
map
#map操作對每個元素進行一個映射轉換
rdd?=?sc.parallelize(range(10),3)
rdd.collect()
[0,?1,?2,?3,?4,?5,?6,?7,?8,?9]
rdd.map(lambda?x:x**2).collect()
[0,?1,?4,?9,?16,?25,?36,?49,?64,?81]
filter
#filter應用過濾條件過濾掉一些數據
rdd?=?sc.parallelize(range(10),3)
rdd.filter(lambda?x:x>5).collect()
[6,?7,?8,?9]
flatMap
#flatMap操作執行將每個元素生成一個Array后壓平
rdd?=?sc.parallelize(["hello?world","hello?China"])
rdd.map(lambda?x:x.split("?")).collect()
[['hello',?'world'],?['hello',?'China']]
rdd.flatMap(lambda?x:x.split("?")).collect()
['hello',?'world',?'hello',?'China']
sample
#sample對原rdd在每個分區按照比例進行抽樣,第一個參數設置是否可以重復抽樣
rdd?=?sc.parallelize(range(10),1)
rdd.sample(False,0.5,0).collect()
[1,?4,?9]
distinct
#distinct去重
rdd?=?sc.parallelize([1,1,2,2,3,3,4,5])
rdd.distinct().collect()
[4,?1,?5,?2,?3]
subtract
#subtract找到屬于前一個rdd而不屬于后一個rdd的元素
a?=?sc.parallelize(range(10))
b?=?sc.parallelize(range(5,15))
a.subtract(b).collect()
[0,?1,?2,?3,?4]
union
#union合并數據
a?=?sc.parallelize(range(5))
b?=?sc.parallelize(range(3,8))
a.union(b).collect()
[0,?1,?2,?3,?4,?3,?4,?5,?6,?7]
intersection
#intersection求交集
a?=?sc.parallelize(range(1,6))
b?=?sc.parallelize(range(3,9))
a.intersection(b).collect()
[3,?4,?5]
cartesian
#cartesian笛卡爾積
boys?=?sc.parallelize(["LiLei","Tom"])
girls?=?sc.parallelize(["HanMeiMei","Lily"])
boys.cartesian(girls).collect()
[('LiLei',?'HanMeiMei'),
('LiLei',?'Lily'),
('Tom',?'HanMeiMei'),
('Tom',?'Lily')]
sortBy
#按照某種方式進行排序
#指定按照第3個元素大小進行排序
rdd?=?sc.parallelize([(1,2,3),(3,2,2),(4,1,1)])
rdd.sortBy(lambda?x:x[2]).collect()
[(4,?1,?1),?(3,?2,?2),?(1,?2,?3)]
zip
#按照拉鏈方式連接兩個RDD,效果類似python的zip函數
#需要兩個RDD具有相同的分區,每個分區元素數量相同
rdd_name?=?sc.parallelize(["LiLei","Hanmeimei","Lily"])
rdd_age?=?sc.parallelize([19,18,20])
rdd_zip?=?rdd_name.zip(rdd_age)
print(rdd_zip.collect())
[('LiLei',?19),?('Hanmeimei',?18),?('Lily',?20)]
zipWithIndex
#將RDD和一個從0開始的遞增序列按照拉鏈方式連接。
rdd_name?=??sc.parallelize(["LiLei","Hanmeimei","Lily","Lucy","Ann","Dachui","RuHua"])
rdd_index?=?rdd_name.zipWithIndex()
print(rdd_index.collect())
[('LiLei',?0),?('Hanmeimei',?1),?('Lily',?2),?('Lucy',?3),?('Ann',?4),?('Dachui',?5),?('RuHua',?6)]
四,常用PairRDD的轉換操作
PairRDD指的是數據為長度為2的tuple類似(k,v)結構的數據類型的RDD,其每個數據的第一個元素被當做key,第二個元素被當做value.
reduceByKey
#reduceByKey對相同的key對應的values應用二元歸并操作
rdd?=?sc.parallelize([("hello",1),("world",2),
("hello",3),("world",5)])
rdd.reduceByKey(lambda?x,y:x+y).collect()
[('hello',?4),?('world',?7)]
groupByKey
#groupByKey將相同的key對應的values收集成一個Iterator
rdd?=?sc.parallelize([("hello",1),("world",2),("hello",3),("world",5)])
rdd.groupByKey().collect()
[('hello',?),
('world',?)]
sortByKey
#sortByKey按照key排序,可以指定是否降序
rdd?=?sc.parallelize([("hello",1),("world",2),
("China",3),("Beijing",5)])
rdd.sortByKey(False).collect()
[('world',?2),?('hello',?1),?('China',?3),?('Beijing',?5)]
join
#join相當于根據key進行內連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.join(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
leftOuterJoin和rightOuterJoin
#leftOuterJoin相當于關系表的左連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female"),("Lucy","female")])
age.leftOuterJoin(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
#rightOuterJoin相當于關系表的右連接
age?=?sc.parallelize([("LiLei",18),
("HanMeiMei",16),("Jim",20)])
gender?=?sc.parallelize([("LiLei","male"),
("HanMeiMei","female")])
age.rightOuterJoin(gender).collect()
[('LiLei',?(18,?'male')),?('HanMeiMei',?(16,?'female'))]
cogroup
#cogroup相當于對兩個輸入分別goupByKey然后再對結果進行groupByKey
x?=?sc.parallelize([("a",1),("b",2),("a",3)])
y?=?sc.parallelize([("a",2),("b",3),("b",5)])
result?=?x.cogroup(y).collect()
print(result)
print(list(result[0][1][0]))
[('a',?(,?)),?('b',?(,?))]
[1,?3]
subtractByKey
#subtractByKey去除x中那些key也在y中的元素
x?=?sc.parallelize([("a",1),("b",2),("c",3)])
y?=?sc.parallelize([("a",2),("b",(1,2))])
x.subtractByKey(y).collect()
[('c',?3)]
foldByKey
#foldByKey的操作和reduceByKey類似,但是要提供一個初始值
x?=?sc.parallelize([("a",1),("b",2),("a",3),("b",5)],1)
x.foldByKey(1,lambda?x,y:x*y).collect()
[('a',?3),?('b',?10)]
五,緩存操作
如果一個rdd被多個任務用作中間量,那么對其進行cache緩存到內存中對加快計算會非常有幫助。
聲明對一個rdd進行cache后,該rdd不會被立即緩存,而是等到它第一次被計算出來時才進行緩存。
可以使用persist明確指定存儲級別,常用的存儲級別是MEMORY_ONLY和EMORY_AND_DISK。
如果一個RDD后面不再用到,可以用unpersist釋放緩存,unpersist是立即執行的。
緩存數據不會切斷血緣依賴關系,這是因為緩存數據某些分區所在的節點有可能會有故障,例如內存溢出或者節點損壞。
這時候可以根據血緣關系重新計算這個分區的數據。
#cache緩存到內存中,使用存儲級別 MEMORY_ONLY。
#MEMORY_ONLY意味著如果內存存儲不下,放棄存儲其余部分,需要時重新計算。
a?=?sc.parallelize(range(10000),5)
a.cache()
sum_a?=?a.reduce(lambda?x,y:x+y)
cnt_a?=?a.count()
mean_a?=?sum_a/cnt_a
print(mean_a)
#persist緩存到內存或磁盤中,默認使用存儲級別MEMORY_AND_DISK
#MEMORY_AND_DISK意味著如果內存存儲不下,其余部分存儲到磁盤中。
#persist可以指定其它存儲級別,cache相當于persist(MEMORY_ONLY)
from??pyspark.storagelevel?import?StorageLevel
a?=?sc.parallelize(range(10000),5)
a.persist(StorageLevel.MEMORY_AND_DISK)
sum_a?=?a.reduce(lambda?x,y:x+y)
cnt_a?=?a.count()
mean_a?=?sum_a/cnt_a
a.unpersist()?#立即釋放緩存
print(mean_a)
六,共享變量
當spark集群在許多節點上運行一個函數時,默認情況下會把這個函數涉及到的對象在每個節點生成一個副本。
但是,有時候需要在不同節點或者節點和Driver之間共享變量。
Spark提供兩種類型的共享變量,廣播變量和累加器。
廣播變量是不可變變量,實現在不同節點不同任務之間共享數據。
廣播變量在每個機器上緩存一個只讀的變量,而不是為每個task生成一個副本,可以減少數據的傳輸。
累加器主要是不同節點和Driver之間共享變量,只能實現計數或者累加功能。
累加器的值只有在Driver上是可讀的,在節點上不可見。
#廣播變量?broadcast?不可變,在所有節點可讀
broads?=?sc.broadcast(100)
rdd?=?sc.parallelize(range(10))
print(rdd.map(lambda?x:x+broads.value).collect())
print(broads.value)
[100,?101,?102,?103,?104,?105,?106,?107,?108,?109]
100
#累加器?只能在Driver上可讀,在其它節點只能進行累加
total?=?sc.accumulator(0)
rdd?=?sc.parallelize(range(10),3)
rdd.foreach(lambda?x:total.add(x))
total.value
45
#?計算數據的平均值
rdd?=?sc.parallelize([1.1,2.1,3.1,4.1])
total?=?sc.accumulator(0.1)
count?=?sc.accumulator(0)
def?func(x):
total.add(x)
count.add(1)
rdd.foreach(func)
total.value/count.value
2.625
七,分區操作
分區操作包括改變分區操作,以及針對分區執行的一些轉換操作。
glom:將一個分區內的數據轉換為一個列表作為一行。
coalesce:shuffle可選,默認為False情況下窄依賴,不能增加分區。repartition和partitionBy調用它實現。
repartition:按隨機數進行shuffle,相同key不一定在同一個分區
partitionBy:按key進行shuffle,相同key放入同一個分區
HashPartitioner:默認分區器,根據key的hash值進行分區,相同的key進入同一分區,效率較高,key不可為Array.
RangePartitioner:只在排序相關函數中使用,除相同的key進入同一分區,相鄰的key也會進入同一分區,key必須可排序。
TaskContext: ?獲取當前分區id方法 TaskContext.get.partitionId
mapPartitions:每次處理分區內的一批數據,適合需要分批處理數據的情況,比如將數據插入某個表,每批數據只需要開啟一次數據庫連接,大大減少了連接開支
mapPartitionsWithIndex:類似mapPartitions,提供了分區索引,輸入參數為(i,Iterator)
foreachPartition:類似foreach,但每次提供一個Partition的一批數據
glom
#glom將一個分區內的數據轉換為一個列表作為一行。
a?=?sc.parallelize(range(10),2)
b?=?a.glom()
b.collect()
[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]
coalesce
#coalesce?默認shuffle為False,不能增加分區,只能減少分區
#如果要增加分區,要設置shuffle?=?true
#parallelize等許多操作可以指定分區數
a?=?sc.parallelize(range(10),3)
print(a.getNumPartitions())
print(a.glom().collect())
3
[[0,?1,?2],?[3,?4,?5],?[6,?7,?8,?9]]
b?=?a.coalesce(2)
print(b.glom().collect())
[[0,?1,?2],?[3,?4,?5,?6,?7,?8,?9]]
repartition
#repartition按隨機數進行shuffle,相同key不一定在一個分區,可以增加分區
#repartition實際上調用coalesce實現,設置了shuffle?=?True
a?=?sc.parallelize(range(10),3)
c?=?a.repartition(4)
print(c.glom().collect())
[[6,?7,?8,?9],?[3,?4,?5],?[],?[0,?1,?2]]
#repartition按隨機數進行shuffle,相同key不一定在一個分區
a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c?=?a.repartition(2)
print(c.glom().collect())
[[('a',?1),?('a',?2),?('c',?3)],?[('a',?1)]]
partitionBy
#partitionBy按key進行shuffle,相同key一定在一個分區
a?=?sc.parallelize([("a",1),("a",1),("a",2),("c",3)])
c?=?a.partitionBy(2)
print(c.glom().collect())
mapPartitions
#mapPartitions可以對每個分區分別執行操作
#每次處理分區內的一批數據,適合需要按批處理數據的情況
#例如將數據寫入數據庫時,可以極大的減少連接次數。
#mapPartitions的輸入分區內數據組成的Iterator,其輸出也需要是一個Iterator
#以下例子查看每個分區內的數據,相當于用mapPartitions實現了glom的功能。
a?=?sc.parallelize(range(10),2)
a.mapPartitions(lambda?it:iter([list(it)])).collect()
[[0,?1,?2,?3,?4],?[5,?6,?7,?8,?9]]
mapPartitionsWithIndex
#mapPartitionsWithIndex可以獲取兩個參數
#即分區id和每個分區內的數據組成的Iterator
a?=?sc.parallelize(range(11),2)
def?func(pid,it):
s?=?sum(it)
return(iter([str(pid)?+?"|"?+?str(s)]))
[str(pid)?+?"|"?+?str]
b?=?a.mapPartitionsWithIndex(func)
b.collect()
#利用TaskContext可以獲取當前每個元素的分區
from?pyspark.taskcontext?import?TaskContext
a?=?sc.parallelize(range(5),3)
c?=?a.map(lambda?x:(TaskContext.get().partitionId(),x))
c.collect()
[(0,?0),?(1,?1),?(1,?2),?(2,?3),?(2,?4)]
foreachPartitions
#foreachPartition對每個分區分別執行操作
#范例:求每個分區內最大值的和
total?=?sc.accumulator(0.0)
a?=?sc.parallelize(range(1,101),3)
def?func(it):
total.add(max(it))
a.foreachPartition(func)
total.value
199.0
aggregate
#aggregate是一個Action操作
#aggregate比較復雜,先對每個分區執行一個函數,再對每個分區結果執行一個合并函數。
#例子:求元素之和以及元素個數
#三個參數,第一個參數為初始值,第二個為分區執行函數,第三個為結果合并執行函數。
rdd?=?sc.parallelize(range(1,21),3)
def?inner_func(t,x):
return((t[0]+x,t[1]+1))
def?outer_func(p,q):
return((p[0]+q[0],p[1]+q[1]))
rdd.aggregate((0,0),inner_func,outer_func)
(210,?20)
aggregateByKey
#aggregateByKey的操作和aggregate類似,但是會對每個key分別進行操作
#第一個參數為初始值,第二個參數為分區內歸并函數,第三個參數為分區間歸并函數
a?=?sc.parallelize([("a",1),("b",1),("c",2),
("a",2),("b",3)],3)
b?=?a.aggregateByKey(0,lambda?x,y:max(x,y),
lambda?x,y:max(x,y))
b.collect()
[('b',?3),?('a',?2),?('c',?2)]
總結
以上是生活随笔為你收集整理的spark算子大全glom_2小时入门Spark之RDD编程的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 军官领花和士兵领花区别是什么?
- 下一篇: 乌钢镗刀杆可以焊接吗?