前言
前面关于Spark的一些操作都是基于RDD操作,以下我们将通过SparkSql DataFrame操作Hive的数据库表,以及与RDD的操作做一些简单对比。
环境准备
前置环境
前提把hive,spark环境准备好,如果还没准备好的,可以根据以下连接尝试安装
spark操作hive环境
1
2[root@master201 hive]# cp /home/lishijia/Soft/apache-hive-1.2.2-bin/conf/hive-site.xml /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/conf/
[root@hadoop100 data]# cp /home/lishijia/Soft/apache-hive-1.2.2/lib/mysql-connector-java-5.1.43.jar /home/lishijia/Soft/spark-2.0.2-bin-hadoop2.7/jars/
SparkSql操作Hive表原理
- 解析sql语法树获取需要读书的数据库,表等信息。以及后续的数据解析转换等
- 首先借助于hive的元数据信息获取到表结构,以及对应表数据存储hdfs位置信息(包括分区分桶等)
- 加载hdfs数据,根据sql语法树的解析规则生成执行计划
- 根据sql加载出来的数据未Dataframe
Spark-shell操作hive表
通过以下方式进入到spark-shell本地模式,然后通过Spark Session即内置对象spark操作
如下:通过内置对象spark.sql函数执行sql语句,返回一个df(Dataframe),然后通过show的方式显示出来
1 | [root2] 100 bin]# ./spark-shell --master local[ |
以下通过Dataframe的函数进行分组,然后count显示,它相对于RDD的操作来说更加的简单
1 | scala> df.select("order_id","user_name").groupBy("user_name").count.show |
可以看到,以下是通过模拟rdd的方式得出以上的结果。先通过Dataframe转换为rdd模型,然后把数据转换为(k,v)方式进行groupByKey,返回一个新的(K, Iterable)形式的数据集,最终再通过map转换为(key,count)最终的结果数据集,然后输出
1 | scala> df.select("order_id","user_name").rdd.map(x=>(x(1),x(0))).groupByKey().map(x=>(x._1,x._2.size)).foreach(println) |
sum函数使用,以下的使用方式如果想给他重命名,发现不太好操作,另外如果有多列需要sum的话也没办法操作,一般可以通过第二种方式来操作聚合函数
1 | scala> val df = spark.sql("select * from default.lsj_order") |
1 | scala> df.groupBy("user_name").agg(sum("amount") as "totalAmount").show |
通过以下的方式进行排序操作
1 | scala> df.groupBy("user_name").agg(sum("amount") as "totalAmount").sort("totalAmount").show |
Spark-shell实现简单udf
引入udf的函数包,然后通过udf的方式定义一个函数,传入列明,然后给传入的列增加一个test udf字符。通过这种方式实现简单的udf自定义函数
1 | scala> import org.apache.spark.sql.functions._ |
总结
即通过Spark-shell的方式操作Hive的表数据,以及对Spark DataFrame的数据模型做个简单初步了解,另外对比RDD的数据操作进行简单比较,从示例也可以看出来,DataFrame数据模型在做数据转换的时候会更加的方便,而且Spark3.0以后RDD将不在维护,逐渐全部转换为DataFrame数据集模型。
Spark Scala 统计多个文件中的最大值与最小值 map自定义函数实现(二)
参考
https://spark.apache.org/docs/2.0.2/sql-programming-guide.html
https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset