前言 Spark集成Jieba分词处理Hive数据,根据处理后的结果数据生成新的Hive表数据集。Demo项目可以直接下载运行打包提交到Yarn集群运行验证测试
环境准备
Hive数据准备 数据准备,把需要分词的文件提交至hdfs
1 2 [root@hadoop100 apache-hive-1.2.2]# hadoop fs -mkdir /jiarong/input/seg_news_content [root@hadoop100 apache-hive-1.2.2]# hadoop fs -put allfiles.txt /jiarong/input/seg_news_content/
hive创建外部表
1 2 3 create external table seg_news_content(sentence string )row format delimited fields terminated by '\n' stored as textfile location '/jiarong/input/seg_news_content'
由于我准备的源数据是已经分词好的,所以做了下二次处理,把空格替换。把lable提取成一个字段
1 2 create table news_seg as select split (regexp_replace(sentence,' ' ,'' ),'##@@##' )[0 ] as sentence,split (regexp_replace(sentence,' ' ,'' ),'##@@##' )[1 ] as label from seg_news_content;
Spark结巴分词Hive数据Demo 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 package lishijia.spark.demo.hivejiebaimport com.huaban.analysis.jieba.{JiebaSegmenter , SegToken }import com.huaban.analysis.jieba.JiebaSegmenter .SegMode import org.apache.spark.SparkConf import org.apache.spark.sql.{DataFrame , SparkSession }import org.apache.spark.sql.functions._object HiveJieba { def main (args: Array [String ]): Unit = { val conf = new SparkConf () .registerKryoClasses(Array (classOf[JiebaSegmenter ])) .set("spark.rpc.message.maxSize" , "800" ) val spark = SparkSession .builder() .appName("Jieba udf" ) .enableHiveSupport() .config(conf) .getOrCreate() val df =spark.sql("select sentence,label from lishijia.news_seg limit 300" ) val df_seg = jieba_seg(df,"sentence" ) df_seg.show() df_seg.write.mode("overwrite" ).saveAsTable("lishijia.news_jieba" ) } def jieba_seg (df:DataFrame , columnName:String ) : DataFrame = { val segmenter = new JiebaSegmenter () val seg = df.sparkSession.sparkContext.broadcast(segmenter) val jieba_udf = udf{ (sentence:String ) => val segV = seg.value segV.process(sentence.toString, SegMode .INDEX ) .toArray().map(_.asInstanceOf[SegToken ].word) .filter(_.length>1 ).mkString("/" ) } df.withColumn("seg" , jieba_udf(col(columnName))) } }
mvn assembly打包 通过assembly方式打包,把所依赖的jar包都打成一个jar
执行脚本 1 2 3 4 5 6 cd /home/soft/spark-2.2.0-bin-hadoop2.6 ./bin/spark-submit \ --class lishijia.spark.demo.hivejieba.HiveJieba \ --master yarn-cluster \ --files $HIVE_HOME/conf/hive-site.xml \ /home/dev/spark/hivespark/spark-scala-demo-maven-1.0-SNAPSHOT.jar
当中遇到的问题 在使用maven scala工程之前,我使用的是sbt的工程,因为maven之前有使用过而且也比较熟悉,所以换了一种方式玩下。哪知道打包之后,当中的jieba包没打进去(一直还不知道什么原因),后来在公司又试了一把,jieba所依赖的包打进去了,但是怎么提交都是ClassNotFound,打包出来的jar所有class都没有少,一直在排出问题,环境也没问题,因为Spark自带的SparkPI是可以提交到Yarn集群运行的(最终还是没找到问题所在,但是可以肯定是打的包有问题)。最终还是切换回了熟悉的方式Maven工程
有时间了,再研究研究这个sbt
Hadoop2.7.2集群安装
Scala & Spark2.0.2集群安装
Spark Scala WrodCount 入门(一)
Spark Scala 统计多个文件中的最大值与最小值 map自定义函数实现(二)
Spark Scala 多字段二次排序(三)
SparkSql操作Hive表数据(四)
代码:https://github.com/lishijia/scala-spark-demo-mvn/tree/master/src/main/scala/lishijia/spark/demo/hivejieba
参考 https://spark.apache.org/docs/2.0.2/sql-programming-guide.html
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset
https://github.com/huaban/jieba-analysis