Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说漫谈大数据 - Spark SQL详解,参数调优「终于解决」,希望能够帮助你!!!。
目录
Apache Spark
什么是SparkSQL?
SparkSQL的作用
SparkSQL运行原理
SparkSQL特点
SparkSQL发展历史
Spark SQL底层执行原理
Spark SQL参数调优
读取文件构建DataFrame
1、读取文本文件创建DataFrame
2、读取json文件创建DataFrame
3、读取parquet文件创建DataFrame
DataFrame与DataSet互转
构建DataSet
Apache Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。
spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。
提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎
DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD
将 Spark SQL 转化为 RDD, 然后提交到集群执行
(1)容易整合
(2)统一的数据访问方式
(3)兼容 Hive
(4)标准的数据连接
由于Hadoop在企业生产中的大量使用,HDFS上积累了大量数据,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生。Hive的原理是将SQL语句翻译成MapReduce计算。
但是,MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低了运行效率,为了提供SQL-on-Hadoop的效率,Shark出现了。但随着随着Spark的发展,Shark对于Hive的太多依赖制约了Spark的One Stack rule them all的方针,制约了Spark各个组件的相互集成,同时Shark也无法利用Spark的特性进行深度优化,所以决定放弃Shark,提出了SparkSQL项目。
随着Shark的结束,两个新的项目应运而生:SparkSQL和Hive on Spark。其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。
SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步, 海阔天空”。
底层架构:
SQL语句,经过一个优化器(Catalyst),转化为RDD,交给集群执行。
SQL到 RDD中间经过了一个Catalyst,它就是Spark SQL的核心,是针对SparkSQL语句执行过程中的查询优化框架,基于Scala函数式编程结构。
要了解Spark SQL的执行流程,那么理解Catalyst的工作流程是非常有必要的:
一条SQL语句生成执行引擎可识别的程序,就离不开解析(Parser)、优化(Optimizer)、执行(Execution)这三大过程。而Catalyst优化器在执行计划生成和优化的工作时候,它离不开自己
内部的五大组件,如下所示:
1. Parser模块:将SparkSql字符串解析为一个抽象语法树/AST。
2. Analyzer模块:该模块会遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函
数绑定,然后根据元数据信息Catalog对数据表中的字段进行解析。
3. Optimizer模块:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是
基于规则优化CBO是基于代价优化。
4.SparkPlanner模块:优化后的逻辑执行计划OptimizedLogicalPlan依然是逻辑的,并不能被
Spark 系统理解,此时需要将OptimizedlogicalPlan转换成physical plan(物理计划)
5. CostModel模块:主要根据过去的性能统计数据,选择最佳的物理执行计划。这个过程的优
化就是CBO(基于代价优化)。
//1.下列Hive参数对Spark同样起作用。
set hive.exec.dynamic.partition=true; // 是否允许动态生成分区
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成
set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数
//2.运行行为
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值
set spark.dynamicAllocation.enabled; // 开启动态资源分配
set spark.dynamicAllocation.maxExecutors; //开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.minExecutors; //开启动态资源分配后,最少可分配的Executor数
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端写出的partition个数
set spark.sql.adaptive.enabled; // 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; //当几个stripe的大小大于该值时,会合并到一个task中处理
//3.executor能力
set spark.executor.memory; // executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.yarn.executor.memoryOverhead; //Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.executor.cores; //单个executor上可以同时运行的task数
//加载数据
val rdd1=sc.textFile("file:path.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF
//打印schema信息
personDF.printSchema
//展示数据
personDF.show
val peopleDF=spark.read.json("file:path.json")
//打印schema信息
peopleDF.printSchema
//展示数据
peopleDF.show
val usersDF=spark.read.parquet("file:path.parquet")
//打印schema信息
usersDF.printSchema
//展示数据
usersDF.show
1、把一个DataFrame转换成DataSetval
dataSet=dataFrame.as[强类型]
2、把一个DataSet转换成DataFrameval
dataFrame=dataSet.toDF
1、 通过sparkSession调用createDataset方法
val ds=spark.createDataset(1 to 10) //scala集合
val ds=spark.createDataset(sc.textFile("/person.txt")) //rdd
2、使用scala集合和rdd调用toDS方法
sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS
3、把一个DataFrame转换成DataSet
val dataSet=dataFrame.as[强类型]
4、通过一个DataSet转换生成一个新的DataSet
List(1,2,3,4,5).toDS.map(x=>x*10)