漫谈大数据 - Spark SQL详解,参数调优「终于解决」

大数据 (148) 2023-03-27 15:47

Hi,大家好,我是编程小6,很荣幸遇见你,我把这些年在开发过程中遇到的问题或想法写出来,今天说一说漫谈大数据 - Spark SQL详解,参数调优「终于解决」,希望能够帮助你!!!。
漫谈大数据 - Spark SQL详解,参数调优「终于解决」_https://bianchenghao6.com/blog_大数据_第1张

目录

Apache Spark

什么是SparkSQL?

SparkSQL的作用

SparkSQL运行原理

SparkSQL特点

SparkSQL发展历史

shark

Shark -> SparkSQL

Spark SQL底层执行原理

Spark SQL参数调优

读取文件构建DataFrame

1、读取文本文件创建DataFrame

2、读取json文件创建DataFrame

3、读取parquet文件创建DataFrame

DataFrame与DataSet互转

构建DataSet


Apache Spark 

漫谈大数据 - Spark SQL详解,参数调优「终于解决」_https://bianchenghao6.com/blog_大数据_第2张

        Apache Spark是用于大规模数据处理的统一分析引擎,基于内存计算,提高了在大数据环境下数据处理的实时性,同时保证了高容错性和高可伸缩性,允许用户将Spark部署在大量硬件之上,形成集群。

什么是SparkSQL?

spark SQL是spark的一个模块,主要用于进行结构化数据的处理。它提供的最核心的编程抽象就是DataFrame。

SparkSQL的作用

提供一个编程抽象(DataFrame) 并且作为分布式 SQL 查询引擎

DataFrame:它可以根据很多源进行构建,包括:结构化的数据文件,hive中的表,外部的关系型数据库,以及RDD

SparkSQL运行原理

将 Spark SQL 转化为 RDD, 然后提交到集群执行

SparkSQL特点

(1)容易整合

(2)统一的数据访问方式

(3)兼容 Hive

(4)标准的数据连接

SparkSQL发展历史

        由于Hadoop在企业生产中的大量使用,HDFS上积累了大量数据,为了给熟悉RDBMS但又不理解MapReduce的技术人员提供快速上手的工具,Hive应运而生。Hive的原理是将SQL语句翻译成MapReduce计算。

shark

        但是,MapReduce计算过程中大量的中间磁盘落地过程消耗了大量的I/O,降低了运行效率,为了提供SQL-on-Hadoop的效率,Shark出现了。但随着随着Spark的发展,Shark对于Hive的太多依赖制约了Spark的One Stack rule them all的方针,制约了Spark各个组件的相互集成,同时Shark也无法利用Spark的特性进行深度优化,所以决定放弃Shark,提出了SparkSQL项目。

        随着Shark的结束,两个新的项目应运而生:SparkSQL和Hive on Spark。其中SparkSQL作为Spark生态的一员继续发展,而不再受限于Hive,只是兼容Hive;而Hive on Spark是一个Hive的发展计划,该计划将Spark作为Hive的底层引擎之一,也就是说,Hive将不再受限于一个引擎,可以采用Map-Reduce、Tez、Spark等引擎。

Shark -> SparkSQL

SparkSQL无论在数据兼容、性能优化、组件扩展方面都得到了极大的方便,真可谓“退一步, 海阔天空”。

  1. 数据兼容方面 不但兼容hive,还可以从RDD、parquet文件、JSON文件中获取数据,未来版本甚至支持获取RDBMS数据以及cassandra等NOSQL数据
  2. 性能优化方面 除了采取In-Memory Columnar Storage、byte-code generation等优化技术外、将会引进Cost Model对查询进行动态评估、获取最佳物理计划等等
  3. 组件扩展方面 无论是SQL的语法解析器、分析器还是优化器都可以重新定义,进行扩展

Spark SQL底层执行原理

底层架构:

漫谈大数据 - Spark SQL详解,参数调优「终于解决」_https://bianchenghao6.com/blog_大数据_第3张

        SQL语句,经过一个优化器(Catalyst),转化为RDD,交给集群执行。

        SQL到 RDD中间经过了一个Catalyst,它就是Spark SQL的核心,是针对SparkSQL语句执行过程中的查询优化框架,基于Scala函数式编程结构。

        要了解Spark SQL的执行流程,那么理解Catalyst的工作流程是非常有必要的:

一条SQL语句生成执行引擎可识别的程序,就离不开解析(Parser)、优化(Optimizer)、执行(Execution)这三大过程。而Catalyst优化器在执行计划生成和优化的工作时候,它离不开自己
内部的五大组件,如下所示:

        1. Parser模块:将SparkSql字符串解析为一个抽象语法树/AST。

        2. Analyzer模块:该模块会遍历整个AST,并对AST上的每个节点进行数据类型的绑定以及函

数绑定,然后根据元数据信息Catalog对数据表中的字段进行解析。

        3. Optimizer模块:该模块是Catalyst的核心,主要分为RBO和CBO两种优化策略,其中RBO是

基于规则优化CBO是基于代价优化。

        4.SparkPlanner模块:优化后的逻辑执行计划OptimizedLogicalPlan依然是逻辑的,并不能被

Spark 系统理解,此时需要将OptimizedlogicalPlan转换成physical plan(物理计划)

        5. CostModel模块:主要根据过去的性能统计数据,选择最佳的物理执行计划。这个过程的优

化就是CBO(基于代价优化)。

Spark SQL参数调优

//1.下列Hive参数对Spark同样起作用。
set hive.exec.dynamic.partition=true; // 是否允许动态生成分区
set hive.exec.dynamic.partition.mode=nonstrict; // 是否容忍指定分区全部动态生成
set hive.exec.max.dynamic.partitions = 100; // 动态生成的最多分区数

//2.运行行为
set spark.sql.autoBroadcastJoinThreshold; // 大表 JOIN 小表,小表做广播的阈值
set spark.dynamicAllocation.enabled; // 开启动态资源分配
set spark.dynamicAllocation.maxExecutors; //开启动态资源分配后,最多可分配的Executor数
set spark.dynamicAllocation.minExecutors; //开启动态资源分配后,最少可分配的Executor数
set spark.sql.shuffle.partitions; // 需要shuffle是mapper端写出的partition个数
set spark.sql.adaptive.enabled; // 是否开启调整partition功能,如果开启,spark.sql.shuffle.partitions设置的partition可能会被合并到一个reducer里运行
set spark.sql.adaptive.shuffle.targetPostShuffleInputSize; //开启spark.sql.adaptive.enabled后,两个partition的和低于该阈值会合并到一个reducer
set spark.sql.adaptive.minNumPostShufflePartitions; // 开启spark.sql.adaptive.enabled后,最小的分区数
set spark.hadoop.mapreduce.input.fileinputformat.split.maxsize; //当几个stripe的大小大于该值时,会合并到一个task中处理

//3.executor能力
set spark.executor.memory; // executor用于缓存数据、代码执行的堆内存以及JVM运行时需要的内存
set spark.yarn.executor.memoryOverhead; //Spark运行还需要一些堆外内存,直接向系统申请,如数据传输时的netty等。
set spark.sql.windowExec.buffer.spill.threshold; //当用户的SQL中包含窗口函数时,并不会把一个窗口中的所有数据全部读进内存,而是维护一个缓存池,当池中的数据条数大于该参数表示的阈值时,spark将数据写到磁盘
set spark.executor.cores; //单个executor上可以同时运行的task数

读取文件构建DataFrame

1、读取文本文件创建DataFrame

//加载数据
val rdd1=sc.textFile("file:path.txt").map(x=>x.split(" "))
//定义一个样例类
case class Person(id:String,name:String,age:Int)
//把rdd与样例类进行关联
val personRDD=rdd1.map(x=>Person(x(0),x(1),x(2).toInt))
//把rdd转换成DataFrame
val personDF=personRDD.toDF

//打印schema信息
personDF.printSchema
//展示数据
personDF.show

2、读取json文件创建DataFrame

val peopleDF=spark.read.json("file:path.json")
//打印schema信息
peopleDF.printSchema

//展示数据
peopleDF.show

3、读取parquet文件创建DataFrame

val usersDF=spark.read.parquet("file:path.parquet")
//打印schema信息
usersDF.printSchema

//展示数据
usersDF.show

DataFrame与DataSet互转

1、把一个DataFrame转换成DataSetval

dataSet=dataFrame.as[强类型]

2、把一个DataSet转换成DataFrameval

dataFrame=dataSet.toDF

构建DataSet

1、 通过sparkSession调用createDataset方法

val ds=spark.createDataset(1 to 10) //scala集合
val ds=spark.createDataset(sc.textFile("/person.txt"))  //rdd

2、使用scala集合和rdd调用toDS方法

sc.textFile("/person.txt").toDS
List(1,2,3,4,5).toDS

3、把一个DataFrame转换成DataSet

val dataSet=dataFrame.as[强类型]

4、通过一个DataSet转换生成一个新的DataSet

List(1,2,3,4,5).toDS.map(x=>x*10)

发表回复