Spark Scala WrodCount 入门(一)

前言

通过WordCount简单实现一个Scala以及Spark程序,来了解Spark中的对RDD数据转换Transformation,动作Action。

Spark-shell方式执行WordCount

注意以下每次转换之后得到RDD的数据结构,每上次的转换都是为后续的步骤做铺垫
  • textFile:通过内置的sc(SparkContext)对象调用textFile方法加载原hdfs文件
  • flatMap:对文件的lines执行flatMap函数(打平,即把数组转换为一条一条的数据)即每一个单词为一条记录
  • map:通过map函数转换数据即(word, number)
  • reduceByKey:通过reduceByKey函数,会把相同的值转换为前后关系即(x,y),然后把它们相加即得到这个word的数量
  • map:再通过map函数转换数据(number,word)为后续的排序取top10做数据准备
  • sortByKey:false倒序,然后调用take方法取top10
1
2
3
4
5
scala> val lines = sc.textFile("hdfs://master201:9000/lishijia/input/the_man_of_property.txt")
lines: org.apache.spark.rdd.RDD[String] = hdfs://master201:9000/lishijia/input/the_man_of_property.txt MapPartitionsRDD[52] at textFile at <console>:24

scala> lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).map(x=>(x._2,x._1)).sortByKey(false).take(10)
res16: Array[(Int, String)] = Array((5144,the), (3407,of), (2782,to), (2573,and), (2543,a), (2139,he), (1912,his), (1702,was), (1694,in), (1526,had))

分解之后的关键步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> val lines = sc.textFile("hdfs://master201:9000/lishijia/input/the_man_of_property.txt")
lines: org.apache.spark.rdd.RDD[String] = hdfs://master201:9000/lishijia/input/the_man_of_property.txt MapPartitionsRDD[52] at textFile at <console>:24

scala> lines.flatMap(x=>x.split(" "))
res17: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[60] at flatMap at <console>:27

scala> lines.flatMap(x=>x.split(" ")).map(x=>(x,1))
res18: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[62] at map at <console>:27

scala> lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y)
res19: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[65] at reduceByKey at <console>:27

scala> lines.flatMap(x=>x.split(" ")).map(x=>(x,1)).reduceByKey((x,y)=>x+y).map(x=>(x._2,x._1))
res20: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[69] at map at <console>:27

Spark sbt打包 集群运行Yarn WordCount

通过Idea sbt创建一个demo工程,并且打包部署提交任务到hadoop yarn集群运行,并且查看运行结果。

此处的代码与在Spark上面运行多了几个步骤

  • 需要自己初始化SparkContext对象
  • 通过标准输出的方式输出top10结果,方便日志查看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package lishijia.spark.demo.wordcount

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("Spark WordCount")
val sc = new SparkContext(conf)
val lines = sc.textFile("hdfs://master201:9000/lishijia/input/the_man_of_property.txt")
lines.flatMap(x=>x.split(" "))
.map(x=>(x,1))
.reduceByKey((x,y)=>x+y)
.map(x=>(x._2,x._1))
.sortByKey(false)
.take(10)
.foreach(println)
}

}
1
2
3
4
5
6
7
8
9
10
// sbt依赖文件类似pom依赖
name := "spark-scala-demo"
version := "0.1"
scalaVersion := "2.11.8"

// https://mvnrepository.com/artifact/org.apache.spark/spark-core
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0"
libraryDependencies += "org.apache.hbase" % "hbase-client" % "1.1.12"
libraryDependencies += "org.apache.hbase" % "hbase-common" % "1.1.12"
libraryDependencies += "org.apache.hbase" % "hbase-server" % "1.1.12"

通过Idea Sbt打包工程项目jar包,然后上传至master服务器,然后执行以下命令即可

1
[root@master201 bin]# ./spark-submit --class lishijia.spark.demo.wordcount.WordCount --master yarn-cluster ../demo/spark-scala-demo.jar 1

运行过程中出现一些问题,有可能是时间没同步的原因,也有可能是防火墙的问题,出现问题后就只做了这么两个操作,程序没有变动,错误信息如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
18/10/21 20:47:22 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.152.203:35460
18/10/21 20:47:23 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, slave203, partition 0, NODE_LOCAL, 5418 bytes)
18/10/21 20:47:23 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, slave203, partition 1, NODE_LOCAL, 5418 bytes)
18/10/21 20:47:23 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerTaskStart(0,0,org.apache.spark.scheduler.TaskInfo@5afb2e48)
18/10/21 20:47:23 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (map at WordCount.scala:12) failed in 2.602 s
18/10/21 20:47:23 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@2968ae8d)
18/10/21 20:47:23 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 0 on executor id: 2 hostname: slave203.
18/10/21 20:47:23 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1540126043173,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
18/10/21 20:47:23 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Launching task 1 on executor id: 4 hostname: slave203.
18/10/21 20:47:23 INFO yarn.YarnAllocator: Driver requested a total number of 0 executor(s).
18/10/21 20:47:23 INFO cluster.YarnClusterSchedulerBackend: Shutting down all executors
18/10/21 20:47:23 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Asking each executor to shut down
18/10/21 20:47:23 INFO cluster.SchedulerExtensionServices: Stopping SchedulerExtensionServices
(serviceOption=None,
services=List(),
started=false)
18/10/21 20:47:23 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
18/10/21 20:47:23 INFO memory.MemoryStore: MemoryStore cleared
18/10/21 20:47:23 INFO storage.BlockManager: BlockManager stopped
18/10/21 20:47:24 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
18/10/21 20:47:24 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
18/10/21 20:47:24 INFO spark.SparkContext: Successfully stopped SparkContext
18/10/21 20:47:24 INFO yarn.ApplicationMaster: Unregistering ApplicationMaster with FAILED (diag message: Max number of executor failures (3) reached)
18/10/21 20:47:25 INFO impl.AMRMClientImpl: Waiting for application to be successfully unregistered.
18/10/21 20:47:25 INFO yarn.ApplicationMaster: Deleting staging directory hdfs://master201:9000/user/root/.sparkStaging/application_1540122054916_0002
18/10/21 20:47:25 INFO util.ShutdownHookManager: Shutdown hook called
18/10/21 20:47:25 INFO util.ShutdownHookManager: Deleting directory /home/lishijia/Soft/hadoop-2.7.2/tmp/nm-local-dir/usercache/root/appcache/application_1540122054916_0002/spark-5434fc83-4c7e-4975-856e-73a4d797c25a

总结

通过本地模式了解到了Spark RDD的一些简单操作,并且通过打包并且上传提交任务至yarn当中运行了自己编写的scala程序。

代码:https://github.com/lishijia/spark-scala-demo/tree/master/src/main/scala/lishijia/spark/demo

Centos虚拟机安装&NAT网络配置

Hadoop2.7.2集群安装

Scala & Spark2.0.2集群安装

分享到 评论