Spark Streaming 接Socket WrodCount(一)

前言

通过一个WordCount来简单了解Spark Streaming执行过程。

Spark Streaming示例之WordCount

以下方式可以直接在本地运行,在运行前先在Master201机器上通过nc -lk 9999的方式开启一个socket服务,用于发送数据

1
2
3
4
5
6
7
8
9
10
11
12
[root@master201 ~]# nc -lk 9999
12
11
22
11
11
22
33
44
1`1
11
111

然后直接通过以下代码在本地运行即可,咱们在正常开发的时候也可以使用这种方式调试好自己的代码,然后再提交带集群当中去执行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package lishijia.spark.demo.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
* Streaming 读取 socket 数据
*/
object SocketToStreaming {

def main(args: Array[String]): Unit = {

val sc = new SparkConf().setAppName("SocketToStreaming").setMaster("local[2]")

// 批次执行间隔
val ssc = new StreamingContext(sc, Seconds(2))

// 接收来自socket的输入源数据
val lines = ssc.socketTextStream("master201", 9999)

val words = lines.flatMap(_.split(" "))
val wordCounts = words.map( x => (x, 1)).reduceByKey(_+_)

// 打印每个批次单词出现的次数(此处打印单词出现的次数不包含上一个批次单词出现的此处,即不会累加上一个批次同一个单词出现的次数)
// 如果需要累加上一个批次同一个单词出现的次数需要另做处理
wordCounts.print()

ssc.start()

ssc.awaitTermination()

}

}

Spark Streaming 工作原理

  • 在Spark Streaming中会有一个组件Receiver作为一个长期运行的Task执行在Executor上

  • 每个Receiver都会负责一个Input DStream,这个Input可以来源于文件流、套接字流、或者Kafka等

  • Spark Streaming通过Input DStream与外部数据进行连接,读取相关数据。

    DStream可以理解为是一个数据集(数组或者List),它里面存储的每一个元素是RDD结构;即DStream是由一串RDD所组成的数据集

Spark Streaming程序基本执行步骤

编写Spark Streaming程序的基本步骤如下:

  • 通过创建输入DStream来定义输入源
  • 通过DStream应用转换操作和输出操作来定义流计算
  • 用streamContext.start()来开始接收数据和处理流程
  • 通过streamContext.awaitTermination()方法来等待处理结束(手动结束或因错误结束)
  • 通过streamContext.stop()来手动结算计算进程或者kill方式

Spark Streaming提交Yarn运行

提交到Yarn任务调度中心执行最好把setMaster删除,之前写的是local[0]执行方式

1
val sc = new SparkConf().setAppName("SocketToStreaming")
1
2
# maven打包,通过assembly打全量包
mvn assembly:assembly

创建提交任务执行命令脚本:run_wordcount.sh

1
2
3
4
5
cd /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7              
./bin/spark-submit \
--class lishijia.spark.demo.streaming.SocketToStreaming \
--master yarn-cluster \
/home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/demo/scala-spark-demo-mvn-1.0-SNAPSHOT-jar-with-dependencies.jar

通过Hadoop的Yarn管理控制台可以看到任务执行情况

执行结果如下,也可以看执行的间隔时间为2s

通过如下命令可以关闭任务执行

1
2
3
4
[root@master201 spark-streaming]# yarn application -kill application_1543841766630_0001
18/12/03 21:22:40 INFO client.RMProxy: Connecting to ResourceManager at master201/192.168.152.201:8032
Killing application application_1543841766630_0001
18/12/03 21:22:40 INFO impl.YarnClientImpl: Killed application application_1543841766630_0001

总结

通过以上方式可以初步了解到一个Spark Streaming程序的执行基本过程。

代码:https://github.com/lishijia/scala-spark-demo-mvn/tree/master/src/main/scala/lishijia/spark/demo/streaming

Hadoop2.7.2集群安装

Scala & Spark2.0.2集群安装

分享到 评论