SparkSql操作Hive表数据(四)

前言

前面关于Spark的一些操作都是基于RDD操作,以下我们将通过SparkSql DataFrame操作Hive的数据库表,以及与RDD的操作做一些简单对比。

环境准备

  • 前置环境

    前提把hive,spark环境准备好,如果还没准备好的,可以根据以下连接尝试安装

    Hadoop2.7.2集群安装

    Scala & Spark2.0.2集群安装

    Hive 安装配置 & 简单使用

  • spark操作hive环境

    1
    2
    [root@master201 hive]# cp /home/lishijia/Soft/apache-hive-1.2.2-bin/conf/hive-site.xml /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/conf/
    [root@hadoop100 data]# cp /home/lishijia/Soft/apache-hive-1.2.2/lib/mysql-connector-java-5.1.43.jar /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/jars/

SparkSql操作Hive表原理

  • 解析sql语法树获取需要读书的数据库,表等信息。以及后续的数据解析转换等
  • 首先借助于hive的元数据信息获取到表结构,以及对应表数据存储hdfs位置信息(包括分区分桶等)
  • 加载hdfs数据,根据sql语法树的解析规则生成执行计划
  • 根据sql加载出来的数据未Dataframe

Spark-shell操作hive表

通过以下方式进入到spark-shell本地模式,然后通过Spark Session即内置对象spark操作

如下:通过内置对象spark.sql函数执行sql语句,返回一个df(Dataframe),然后通过show的方式显示出来

1
2
3
4
5
6
7
8
9
10
11
12
13
[root@hadoop100 bin]# ./spark-shell --master local[2]
scala> val df = spark.sql("select * from default.lsj_order")
df: org.apache.spark.sql.DataFrame = [order_id: string, amount: string ... 1 more field]

scala> df.show
+--------+------+---------+
|order_id|amount|user_name|
+--------+------+---------+
| 1| 100| lishijia|
| 2| 200| lishijia|
| 3| 300| john|
| 4| 400| eva|
+--------+------+---------+

以下通过Dataframe的函数进行分组,然后count显示,它相对于RDD的操作来说更加的简单

1
2
3
4
5
6
7
8
scala> df.select("order_id","user_name").groupBy("user_name").count.show
+---------+-----+
|user_name|count|
+---------+-----+
| lishijia| 2|
| john| 1|
| eva| 1|
+---------+-----+

可以看到,以下是通过模拟rdd的方式得出以上的结果。先通过Dataframe转换为rdd模型,然后把数据转换为(k,v)方式进行groupByKey,返回一个新的(K, Iterable)形式的数据集,最终再通过map转换为(key,count)最终的结果数据集,然后输出

1
2
3
4
scala> df.select("order_id","user_name").rdd.map(x=>(x(1),x(0))).groupByKey().map(x=>(x._1,x._2.size)).foreach(println)
(lishijia,2)
(eva,1)
(john,1)

sum函数使用,以下的使用方式如果想给他重命名,发现不太好操作,另外如果有多列需要sum的话也没办法操作,一般可以通过第二种方式来操作聚合函数

1
2
3
4
5
6
7
8
9
10
11
scala> val df = spark.sql("select * from default.lsj_order")
df: org.apache.spark.sql.DataFrame = [order_id: string, amount: float ... 1 more field]

scala> df.select("user_name","amount").groupBy("user_name").sum().show
+---------+-----------+
|user_name|sum(amount)|
+---------+-----------+
| lishijia| 300.0|
| john| 300.0|
| eva| 400.0|
+---------+-----------+
1
2
3
4
5
6
7
8
scala> df.groupBy("user_name").agg(sum("amount") as "totalAmount").show
+---------+-----------+
|user_name|totalAmount|
+---------+-----------+
| lishijia| 300.0|
| john| 300.0|
| eva| 400.0|
+---------+-----------+

通过以下的方式进行排序操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala> df.groupBy("user_name").agg(sum("amount") as "totalAmount").sort("totalAmount").show
+---------+-----------+
|user_name|totalAmount|
+---------+-----------+
| lishijia| 300.0|
| john| 300.0|
| eva| 400.0|
+---------+-----------+
scala> df.groupBy("user_name").agg(sum("amount") as "totalAmount").sort($"totalAmount".desc).show
+---------+-----------+
|user_name|totalAmount|
+---------+-----------+
| eva| 400.0|
| john| 300.0|
| lishijia| 300.0|
+---------+-----------+

Spark-shell实现简单udf

引入udf的函数包,然后通过udf的方式定义一个函数,传入列明,然后给传入的列增加一个test udf字符。通过这种方式实现简单的udf自定义函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala> import org.apache.spark.sql.functions._
import org.apache.spark.sql.functions._
scala> val plusUDF = udf((st:String)=> st + " test udf")
plusUDF: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,StringType,Some(List(StringType)))

scala> df.withColumn("plus_column",plusUDF(col("user_name"))).show
+--------+------+---------+-----------------+
|order_id|amount|user_name| plus_column|
+--------+------+---------+-----------------+
| 1| 100| lishijia|lishijia test udf|
| 2| 200| lishijia|lishijia test udf|
| 3| 300| john| john test udf|
| 4| 400| eva| eva test udf|
+--------+------+---------+-----------------+

总结

即通过Spark-shell的方式操作Hive的表数据,以及对Spark DataFrame的数据模型做个简单初步了解,另外对比RDD的数据操作进行简单比较,从示例也可以看出来,DataFrame数据模型在做数据转换的时候会更加的方便,而且Spark3.0以后RDD将不在维护,逐渐全部转换为DataFrame数据集模型。

Hadoop2.7.2集群安装

Scala & Spark2.0.2集群安装

Spark Scala WrodCount 入门(一)

Spark Scala 统计多个文件中的最大值与最小值 map自定义函数实现(二)

Spark Scala 多字段二次排序(三)

参考

https://spark.apache.org/docs/2.0.2/sql-programming-guide.html

https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

分享到 评论