【开发工具】Spark Shell 的使用
前言
前一章中我們介紹了Spark的Standalone模式的安裝. 本章我們介紹下Spark Shell操作窗口的基本的安裝.
- 基本啟動(dòng)與使用
- 基本算子使用
基本啟動(dòng)與使用
- 本地啟動(dòng)
進(jìn)入./bin目錄, 使用spark-shell即可啟動(dòng). 未鏈接集群, 直接啟動(dòng)了一個(gè)Worker結(jié)點(diǎn). 可以通過 http://localhost:4040 進(jìn)行訪問.
Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 本地啟動(dòng) - 鏈接集群 / 指定配置 spark-shell --master spark://localhost:7077 --total-executor-cores 1 --executor-memory 1g
Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@66d2885c
scala>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
啟動(dòng)成功后, 我們可以在http://localhost:8080上面看到spark-shell進(jìn)程.
隨后,我們可以使用spark-shell內(nèi)使用Scala語言完成一定的操作.
Spark submit
當(dāng)我們?cè)谏a(chǎn)部署與發(fā)布的時(shí)候通常是使用spark-submit腳本進(jìn)行提交的.(./bin目錄下.) 我們通常是使用Maven將程序進(jìn)行打包, 隨后通過spark-submit提交進(jìn)行.
(注: Maven打全碼包這邊就不再敘述了, 更多請(qǐng)看Maven 打包實(shí)戰(zhàn).)
Q & A
localhost:bin Sean$ spark-shell --master spark://192.168.31.80:7077 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Picked up JAVA_TOOL_OPTIONS: -Dfile.encoding=UTF-8 Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 19/03/29 18:11:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 19/03/29 18:11:38 WARN Utils: Your hostname, localhost resolves to a loopback address: 127.0.0.1; using 192.168.31.80 instead (on interface en0) 19/03/29 18:11:38 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address 19/03/29 18:11:39 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:11:59 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:12:19 WARN StandaloneAppClient$ClientEndpoint: Failed to connect to master 192.168.31.80:7077 org.apache.spark.SparkException: Exception thrown in awaitResult:at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:205)at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:100)at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:108)at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106)at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)at java.util.concurrent.FutureTask.run(FutureTask.java:266)at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)at java.lang.Thread.run(Thread.java:745) Caused by: java.io.IOException: Failed to connect to /192.168.31.80:7077at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:232)at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:182)at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:197)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:194)at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:190)... 4 more Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /192.168.31.80:7077at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:257)at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:291)at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:631)at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:566)at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:480)at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:442)at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:144)... 1 more 19/03/29 18:12:39 ERROR StandaloneSchedulerBackend: Application has been killed. Reason: All masters are unresponsive! Giving up. 19/03/29 18:12:39 WARN StandaloneSchedulerBackend: Application ID is not initialized yet. 19/03/29 18:12:39 WARN StandaloneAppClient$ClientEndpoint: Drop UnregisterApplication(null) because has not yet connected to master 19/03/29 18:12:40 ERROR SparkContext: Error initializing SparkContext. java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)at $line3.$read$$iw$$iw.<init>(<console>:15)at $line3.$read$$iw.<init>(<console>:42)at $line3.$read.<init>(<console>:44)at $line3.$read$.<init>(<console>:48)at $line3.$read$.<clinit>(<console>)at $line3.$eval$.$print$lzycompute(<console>:7)at $line3.$eval$.$print(<console>:6)at $line3.$eval.$print(<console>)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at scala.tools.nsc.interpreter.IMain$ReadEvalPrint.call(IMain.scala:786)at scala.tools.nsc.interpreter.IMain$Request.loadAndRun(IMain.scala:1047)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:638)at scala.tools.nsc.interpreter.IMain$WrappedRequest$$anonfun$loadAndRunReq$1.apply(IMain.scala:637)at scala.reflect.internal.util.ScalaClassLoader$class.asContext(ScalaClassLoader.scala:31)at scala.reflect.internal.util.AbstractFileClassLoader.asContext(AbstractFileClassLoader.scala:19)at scala.tools.nsc.interpreter.IMain$WrappedRequest.loadAndRunReq(IMain.scala:637)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:569)at scala.tools.nsc.interpreter.IMain.interpret(IMain.scala:565)at scala.tools.nsc.interpreter.ILoop.interpretStartingWith(ILoop.scala:807)at scala.tools.nsc.interpreter.ILoop.command(ILoop.scala:681)at scala.tools.nsc.interpreter.ILoop.processLine(ILoop.scala:395)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply$mcV$sp(SparkILoop.scala:38)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop$$anonfun$initializeSpark$1.apply(SparkILoop.scala:37)at scala.tools.nsc.interpreter.IMain.beQuietDuring(IMain.scala:214)at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:37)at org.apache.spark.repl.SparkILoop.loadFiles(SparkILoop.scala:98)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply$mcZ$sp(ILoop.scala:920)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.tools.nsc.interpreter.ILoop$$anonfun$process$1.apply(ILoop.scala:909)at scala.reflect.internal.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:97)at scala.tools.nsc.interpreter.ILoop.process(ILoop.scala:909)at org.apache.spark.repl.Main$.doMain(Main.scala:74)at org.apache.spark.repl.Main$.main(Main.scala:54)at org.apache.spark.repl.Main.main(Main.scala)at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)at java.lang.reflect.Method.invoke(Method.java:498)at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:775)at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) java.lang.IllegalArgumentException: requirement failed: Can only call getServletHandlers on a running MetricsSystemat scala.Predef$.require(Predef.scala:224)at org.apache.spark.metrics.MetricsSystem.getServletHandlers(MetricsSystem.scala:91)at org.apache.spark.SparkContext.<init>(SparkContext.scala:524)at org.apache.spark.SparkContext$.getOrCreate(SparkContext.scala:2516)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:918)at org.apache.spark.sql.SparkSession$Builder$$anonfun$6.apply(SparkSession.scala:910)at scala.Option.getOrElse(Option.scala:121)at org.apache.spark.sql.SparkSession$Builder.getOrCreate(SparkSession.scala:910)at org.apache.spark.repl.Main$.createSparkSession(Main.scala:101)... 47 elided <console>:14: error: not found: value sparkimport spark.implicits._^ <console>:14: error: not found: value sparkimport spark.sql^ Welcome to____ __/ __/__ ___ _____/ /___\ \/ _ \/ _ `/ __/ '_//___/ .__/\_,_/_/ /_/\_\ version 2.2.1/_/Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 161
- 162
- 163
- 164
- 165
- 166
- 167
- 168
- 169
- 170
- 171
- 172
- 173
- 174
- 175
- 176
- 177
- 178
- 179
- 180
- 181
- 182
- 183
- 184
- 185
- 186
- 187
- 188
- 189
- 190
- 191
解決措施: 這種問題的主要形成問題主要有如下幾種:
- 檢查防火墻信息, 查看到Spark的地址是否打開;
- 檢查本地的Scala版本是否與遠(yuǎn)程Spark提交的Scala版本一致.(conf/spark-env.sh文件//etc/profile/spark-shell直接啟動(dòng)時(shí)顯示的版本號(hào))
- 本地的/etc/hosts的配置錯(cuò)誤.
- 本地啟動(dòng)偽集群時(shí), 需要將spark-env.sh文件與slaves進(jìn)行如下配置.
- 1
- 2
- 3
- 4
- 1
- 2
[1]. 關(guān)于Spark報(bào)錯(cuò)不能連接到Server的解決辦法(Failed to connect to master master_hostname:7077)
[2]. Unable to connect to Spark master
[3]. Spark報(bào)錯(cuò)——AnnotatedConnectException拒絕連接
[4]. 單機(jī)spark綁定端口
Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
scala> localhost:bin Sean$
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 本地硬盤 & 內(nèi)存 & CPU核數(shù)資源不夠時(shí)都可能造成上述問題.
spark-shell任務(wù)完成, 但是資源不夠失敗了.()我們可以通過-- total-executors 1減少所占用的CPU核數(shù)與內(nèi)存數(shù)目.
基本算子使用
由于本地的機(jī)器限制, 我們這邊就直接使用spark-shell進(jìn)行下面的算子操作.
前置條件
- Spark
- Hadoop
基本操作
- 進(jìn)入bin/spark-shell
Using Scala version 2.11.8 (Java HotSpot? 64-Bit Server VM, Java 1.8.0_102)
Type in expressions to have them evaluated.
Type :help for more information.
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 將文件放到Hadoop上
- 1
- 2
- 3
- 4
- 從Hadoop上讀取數(shù)據(jù)
計(jì)算WordCount
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).collect
res0: Array[(String, Int)] = Array((t,8), (l,3), (a,3), (i,3), (y,3), (p,2), (e,2), (c,2), (0,1), (b,1), (h,1), (2,1), (" ",1), (k,1), (o,1), (9,1), (1,1))
保存到HDFS上 多個(gè)文件
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525/out”)
保存到HDFS上 一個(gè)文件 (reduceByKey(+,1))
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split("")).map((,1)).reduceByKey(+,1).sortBy(_._2,false).saveAsTextFile(“hdfs://localhost:9000/wordcount/20190525-2/out”)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 輸出結(jié)果(單文件)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 輸出結(jié)果(多文件)
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
WordCount細(xì)節(jié)
- textFile()
從某個(gè)地方讀取數(shù)據(jù), 并轉(zhuǎn)換為RDD返回.
- 1
- 2
-
map()
遍歷所有的元素. -
split()
拆分. -
collect
搜集到主結(jié)點(diǎn).
- 1
- 2
- flatMap() & map().flatten
map().flatten是Scala內(nèi)到寫法, Spark內(nèi)貌似沒有這樣到寫法.
- 1
- 2
- map((_,1))
添加計(jì)數(shù)操作.
- 1
- 2
- reduceByKey()
根據(jù)key值進(jìn)行劃分. reduceByKey是RDD獨(dú)有的算子, Scala內(nèi)不存在.
后面的((F),1)的1是什么含義, 指定分區(qū)數(shù)目.(上面的輸出例子, 將結(jié)果寫入1個(gè)文件還是3個(gè)文件.)
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+).collect
res21: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (cat,2), (kitty,1))
scala> sc.textFile(“hdfs://localhost:9000/wordcount/input”).flatMap(.split(" ")).map((,1)).reduceByKey(+,1).collect
res22: Array[(String, Int)] = Array((hello,1), (pitty,2), (able,1), (2019,1), (kitty,1), (cat,2))
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
-
reduceByKey 與 groupByKey的區(qū)別?
reduceByKey先在各個(gè)分片進(jìn)行計(jì)算, 最后進(jìn)行匯總計(jì)算. groupByKey直接進(jìn)行匯總計(jì)算.
深入理解groupByKey、reduceByKey區(qū)別——本質(zhì)就是一個(gè)local machine的reduce操作
reduceByKey應(yīng)用舉例 -
sortBy(_._2,false)
Spark上的sortBy(X,false), 后一個(gè)參數(shù)可以表示是生序還是降序的.
- 1
- 2
- saveAsTextFile()
將處理后的結(jié)果存儲(chǔ)出來.
- 1
- 2
Reference
[1]. 深入理解groupByKey、reduceByKey區(qū)別——本質(zhì)就是一個(gè)local machine的reduce操作
[2]. reduceByKey應(yīng)用舉例
總結(jié)
以上是生活随笔為你收集整理的【开发工具】Spark Shell 的使用的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 【风险管理】金融业务风控相关框架
- 下一篇: 【联邦学习】联盟学习到底是什么?他们画了