Spark使用Jieba分词Hive数据生成新表(五)

前言

Spark集成Jieba分词处理Hive数据,根据处理后的结果数据生成新的Hive表数据集。Demo项目可以直接下载运行打包提交到Yarn集群运行验证测试

环境准备

Hive数据准备

数据准备,把需要分词的文件提交至hdfs

1
2
[root@hadoop100 apache-hive-1.2.2]# hadoop fs -mkdir /jiarong/input/seg_news_content
[root@hadoop100 apache-hive-1.2.2]# hadoop fs -put allfiles.txt /jiarong/input/seg_news_content/

hive创建外部表

1
2
3
create external table seg_news_content(sentence string)
row format delimited fields terminated by '\n'
stored as textfile location '/jiarong/input/seg_news_content'

由于我准备的源数据是已经分词好的,所以做了下二次处理,把空格替换。把lable提取成一个字段

1
2
create table news_seg as 
select split(regexp_replace(sentence,' ',''),'##@@##')[0] as sentence,split(regexp_replace(sentence,' ',''),'##@@##')[1] as label from seg_news_content;

Spark结巴分词Hive数据Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package lishijia.spark.demo.hivejieba

import com.huaban.analysis.jieba.{JiebaSegmenter, SegToken}
import com.huaban.analysis.jieba.JiebaSegmenter.SegMode
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._

object HiveJieba {
def main(args: Array[String]): Unit = {

val conf = new SparkConf() .registerKryoClasses(Array(classOf[JiebaSegmenter]))
.set("spark.rpc.message.maxSize", "800")

val spark = SparkSession
.builder()
.appName("Jieba udf")
.enableHiveSupport()
.config(conf)
.getOrCreate()

val df =spark.sql("select sentence,label from lishijia.news_seg limit 300")
// 调用分词方法
val df_seg = jieba_seg(df,"sentence")
df_seg.show()
/**
* DataFrame对象生成表
*/
df_seg.write.mode("overwrite").saveAsTable("lishijia.news_jieba")
}

/**
* 分词方法
* @param df(DataFrame,需要处理数据源)
* @param columnName(DataFrame数据源需要处理的列)
* @return
*/
def jieba_seg(df:DataFrame, columnName:String) : DataFrame = {
val segmenter = new JiebaSegmenter()
// 通过spark上下文把jieba分词对象发送到广播中(其他节点通过广播使用)
val seg = df.sparkSession.sparkContext.broadcast(segmenter)
// 定义udf函数
val jieba_udf = udf{ (sentence:String) =>
// 通过spark广播获取jieba分词对象(分布式)
val segV = seg.value
// jieba分词对象处理一个句子
segV.process(sentence.toString, SegMode.INDEX)
.toArray().map(_.asInstanceOf[SegToken].word)
.filter(_.length>1).mkString("/")
}
/**
* 调用udf分词函数
*/
df.withColumn("seg", jieba_udf(col(columnName)))
}

}

mvn assembly打包

通过assembly方式打包,把所依赖的jar包都打成一个jar

1
mvn assembly:assembly

执行脚本

1
2
3
4
5
6
cd /home/soft/spark-2.2.0-bin-hadoop2.6                 
./bin/spark-submit \
--class lishijia.spark.demo.hivejieba.HiveJieba \
--master yarn-cluster \
--files $HIVE_HOME/conf/hive-site.xml \
/home/dev/spark/hivespark/spark-scala-demo-maven-1.0-SNAPSHOT.jar

当中遇到的问题

在使用maven scala工程之前,我使用的是sbt的工程,因为maven之前有使用过而且也比较熟悉,所以换了一种方式玩下。哪知道打包之后,当中的jieba包没打进去(一直还不知道什么原因),后来在公司又试了一把,jieba所依赖的包打进去了,但是怎么提交都是ClassNotFound,打包出来的jar所有class都没有少,一直在排出问题,环境也没问题,因为Spark自带的SparkPI是可以提交到Yarn集群运行的(最终还是没找到问题所在,但是可以肯定是打的包有问题)。最终还是切换回了熟悉的方式Maven工程

有时间了,再研究研究这个sbt

Hadoop2.7.2集群安装

Scala & Spark2.0.2集群安装

Spark Scala WrodCount 入门(一)

Spark Scala 统计多个文件中的最大值与最小值 map自定义函数实现(二)

Spark Scala 多字段二次排序(三)

SparkSql操作Hive表数据(四)

代码:https://github.com/lishijia/scala-spark-demo-mvn/tree/master/src/main/scala/lishijia/spark/demo/hivejieba

参考

https://spark.apache.org/docs/2.0.2/sql-programming-guide.html

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

https://github.com/huaban/jieba-analysis

分享到 评论