SparkSQL
Spark SQL是Spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
作用:提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件
,Hive中的表
,外部的关系型数据库
,以及RDD
原理:将
Spark SQL
转化为RDD
,然后提交到集群执行
特点:
- 容易整合
- 统一的数据访问方式
- 兼容 Hive
- 标准的数据连接
SparkSession
SparkSession
是Spark 2.0引如的新概念。SparkSession为用户提供了统一的切入点
,来让用户学习spark的各项功能。
在Spark的早期版本中,由于RDD是主要的API,SparkContext是spark的主要切入点,我们可以通过Sparkcontext来创建和操作RDD。但这样,对于每个其他的API,我们需要使用不同的context。 例如,对于Streming,我们需要使用StreamingContext;对于sql,使用sqlContext;对于Hive,使用HiveContext。
随着DataSet和DataFrame的API逐渐成为标准的API,就需要为他们建立接入点。所以在Spark2.0中,引入SparkSession作为DataSet和DataFrame API的切入点,
SparkSession封装了SparkConf、SparkContext和SQLContext,为了向后兼容,SQLContext和HiveContext也被保存下来。
SparkSession实质上是SQLContext和HiveContext的组合(未来可能还会加上StreamingContext),所以在SQLContext和HiveContext上可用的API在SparkSession上同样是可以使用的。 SparkSession内部封装了SparkContext,所以计算实际上是由sparkContext完成的。
特点:
- 为用户提供一个统一的切入点使用
Spark
各项功能- 允许用户通过它调用
DataFrame
和Dataset
相关API
来编写程序- 减少了用户需要了解的一些概念,可以很容易的与
Spark
进行交互- 与
Spark
交互之时不需要显示的创建SparkConf
,SparkContext
以及SQlContext
,这些对象已经封闭在SparkSession
中
DataFrames
在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格
。
DataFrame与RDD的主要区别在于,DataFrame
带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,
从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。
反观RDD
,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。
RDD转换成为DataFrame
数据文件
tom,12
sandy,34
alixe,23
marry,31
- 通过
case class
创建DataFrames(反射)
//定义case class,相当于表结构
case class People(var name:String,var age:Int)
object TestDataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc = new SparkContext(conf)
val context = new SQLContext(sc)
// 将本地的数据读入 RDD, 并将 RDD 与 case class 关联
val peopleRDD = sc.textFile("E:\\test\\data\\people.txt").map(line => People(line.split(",")(0), line.split(",")(1).trim.toInt))
import context.implicits._
// 将RDD 转换成 DataFrames
val df = peopleRDD.toDF
//将DataFrames创建成一个临时的视图
df.createOrReplaceTempView("view_people")
//使用SQL语句进行查询
context.sql("select * from view_people").show()
}
}
- 通过
structType
创建DataFrames(编程接口)
object TestDataFrame {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val fileRDD = sc.textFile("E:\\test\\data\\people.txt")
// 将 RDD 数据映射成 Row,需要 import org.apache.spark.sql.Row
val rowRDD: RDD[Row] = fileRDD.map(line => {
val fields = line.split(",")
Row(fields(0), fields(1).trim.toInt)
})
// 创建 StructType 来定义结构
val structType: StructType = StructType(
//字段名,字段类型,是否可以为空
StructField("name", StringType, true) ::
StructField("age", IntegerType, true) :: Nil
)
/**
* rows: java.util.List[Row],
* schema: StructType
* */
val df:DataFrame = sqlContext.createDataFrame(rowRDD,structType)
df.createOrReplaceTempView("view_people")
sqlContext.sql("select * from view_people").show()
}
}
- 通过
json
文件创建DataFrames
object TestDataFrame3 {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("RDDToDataFrame").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df: DataFrame = sqlContext.read.json("E:\\test\\data\\people.json")
df.createOrReplaceTempView("view_people")
sqlContext.sql("select * from view_people").show()
}
}
DataFrame文件读取与保存
- 读取
object TestRead {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameRead").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//方式一(指定格式读)
val df1 = sqlContext.read.json("E:\\test\\data\\people.json")
val df2 = sqlContext.read.parquet("E:\\test\\data\\users.parquet")
//方式二(设置格式读)
val df3 = sqlContext.read.format("json").load("E:\\test\\data\\people.json")
val df4 = sqlContext.read.format("parquet").load("E:\\test\\data\\users.parquet")
//方式三,默认是parquet格式(默认格式读)
val df5 = sqlContext.load("E:\\test\\data\\users.parquet")
}
}
- 保存
object TestSave {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("DataFrameSave").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val df1 = sqlContext.read.json("E:\\test\\data\\people.json")
//方式一(指定格式写)
df1.write.json("E:\\test\\data\\success\\111")
df1.write.parquet("E:\\test\\data\\success\\222")
//方式二(设置格式写)
df1.write.format("json").save("E:\\test\\data\\success\\333")
df1.write.format("parquet").save("E:\\test\\data\\success\\444")
//方式三(默认格式写)
df1.write.save("E:\\test\\data\\success\\555")
}
}
- 保存模式
即数据保存目录的
追加
、覆盖
、忽略
、存在抛异常
四种模式。
数据源
-
读取JSON数据文件计算
-
读取parquet数据文件计算
-
读取MySQL表数据计算
object TestMySQL {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("TestMySQL").setMaster("local")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val url = "jdbc:mysql://192.168.123.102:3306/hivedb"
val table = "tb_user"
val properties = new Properties()
properties.setProperty("user","root")
properties.setProperty("password","root")
//需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
val df = sqlContext.read.jdbc(url,table,properties)
df.createOrReplaceTempView("view_user")
sqlContext.sql("select * from view_user").show()
}
}
- 读取Hive表数据计算
开发前,需要集群整合SparkSQL和Hive,可用
引入Hive依赖
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.3.0</version>
</dependency>
拷贝集群
hive-site.xml
到工程resources
目录下
测试代码
object TestHive {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName(this.getClass.getSimpleName)
val sc = new SparkContext(conf)
val sqlContext = new HiveContext(sc)
sqlContext.sql("select * from myhive.student").show()
}
}