成都 [ 更换 ]
热门城市
北京上海广州深圳成都杭州南京武汉天津西安重庆青岛沈阳长沙大连厦门无锡福州济南宁波昆明苏州郑州长春合肥南昌哈尔滨常州烟台南宁温州石家庄太原珠海南通扬州贵阳东莞徐州大庆佛山威海洛阳淮安呼和浩特镇江潍坊桂林中山临沂咸阳包头嘉兴惠州泉州三亚赣州九江金华泰安榆林许昌新乡舟山慈溪南阳聊城海口东营淄博漳州保定沧州丹东宜兴绍兴唐山湖州揭阳江阴营口衡阳郴州鄂尔多斯泰州义乌汕头宜昌大同鞍山湘潭盐城马鞍山襄樊长治日照常熟安庆吉林乌鲁木齐兰州秦皇岛肇庆西宁介休滨州台州廊坊邢台株洲德阳绵阳双流平顶山龙岩银川芜湖晋江连云港张家港锦州岳阳长沙县济宁邯郸江门齐齐哈尔昆山柳州绍兴县运城齐河衢州太仓张家口湛江眉山常德盘锦枣庄资阳宜宾赤峰余姚清远蚌埠宁德德州宝鸡牡丹江阜阳莆田诸暨黄石吉安延安拉萨海宁通辽黄山长乐安阳增城桐乡上虞辽阳遵义韶关泸州南平滁州温岭南充景德镇抚顺乌海荆门阳江曲靖邵阳宿迁荆州焦作丹阳丽水延吉茂名梅州渭南葫芦岛娄底滕州上饶富阳内江三明淮南孝感溧阳乐山临汾攀枝花阳泉长葛汉中四平六盘水安顺新余晋城自贡三门峡本溪防城港铁岭随州广安广元天水遂宁萍乡西双版纳绥化鹤壁湘西松原阜新酒泉张家界黔西南保山昭通河池来宾玉溪梧州鹰潭钦州云浮佳木斯克拉玛依呼伦贝尔贺州通化朝阳百色毕节贵港丽江安康德宏朔州伊犁文山楚雄嘉峪关凉山雅安西藏四川广东河北山西辽宁黑龙江江苏浙江安徽福建江西山东河南湖北湖南海南贵州云南陕西甘肃青海台湾内蒙古广西宁夏香港澳门
培训资讯网 - 为兴趣爱好者提供专业的职业培训资讯知识

Spark SQL|Spark,从入门到精通

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。

发家史

熟悉 Spark SQL 的都知道,Spark SQL 是从 Shark 发展而来。Shark 为了实现 Hive 兼容,在 HQL 方面重用了 Hive 中 HQL 的解析、逻辑执行计划翻译、执行计划优化等逻辑,可以近似认为仅将物理执行计划从 MR 作业替换成了 Spark 作业(辅以内存列式存储等各种和 Hive 关系不大的优化);同时还依赖 Hive Metastore 和 Hive SerDe(用于兼容现有的各种 Hive 存储格式)。

Spark SQL 在 Hive 兼容层面仅依赖 HQL parser、Hive Metastore 和 Hive SerDe。也就是说,从 HQL 被解析成抽象语法树(AST)起,就全部由 Spark SQL 接管了。执行计划生成和优化都由 Catalyst 负责。借助 Scala 的模式匹配等函数式语言特性,利用 Catalyst 开发执行计划优化策略比 Hive 要简洁得多。成都加米谷大数据培训机构,大数据开发数据分析与挖掘,2019春节前报名学费特惠,详情见加米谷大数据官网。

Spark SQL|Spark,从入门到精通

Spark SQL

Spark SQL 提供了多种接口:

纯 Sql 文本;

dataset/dataframe api。

当然,相应的,也会有各种客户端:

sql 文本,可以用 thriftserver/spark-sql;

编码,Dataframe/dataset/sql。

Dataframe/Dataset API 简介

Dataframe/Dataset 也是分布式数据集,但与 RDD 不同的是其带有 schema 信息,类似一张表。

可以用下面一张图详细对比 Dataset/dataframe 和 RDD 的区别:

Spark SQL|Spark,从入门到精通

Dataset 是在 spark1.6 引入的,目的是提供像 RDD 一样的强类型、使用强大的 lambda 函数,同时使用 Spark SQL 的优化执行引擎。到 spark2.0 以后,DataFrame 变成类型为 Row 的 Dataset,即为:

type DataFrame = Dataset[Row]

Spark SQL|Spark,从入门到精通

所以,很多移植 spark1.6 及之前的代码到 spark2+的都会报错误,找不到 dataframe 类。

基本操作

val df = spark.read.json(“file:///opt/meitu/bigdata/src/main/data/people.json”)

df.show()

import spark.implicits._

df.printSchema()

df.select("name").show()

df.select($"name", $"age" + 1).show()

df.filter($"age" > 21).show()

df.groupBy("age").count().show()

spark.stop()

分区分桶 排序

分桶排序保存hive表

df.write.bucketBy(42,“name”).sortBy(“age”).saveAsTable(“people_bucketed”)

分区以parquet输出到指定目录

df.write.partitionBy("favorite_color").format("parquet").save("namesPartByColor.parquet")

分区分桶保存到hive表

df.write .partitionBy("favorite_color").bucketBy(42,"name").saveAsTable("users_partitioned_bucketed")

cube rullup pivot

cube

sales.cube("city", "year”).agg(sum("amount")as "amount”) .show()

rull up

sales.rollup("city", "year”).agg(sum("amount")as "amount”).show()

pivot 只能跟在groupby之后

sales.groupBy("year").pivot("city",Seq("Warsaw","Boston","Toronto")).agg(sum("amount")as "amount”).show()

SQL 编程

Spark SQL 允许用户提交 SQL 文本,支持以下三种手段编写 SQL 文本:

1. spark 代码

2. spark-sql的shell

3. thriftserver

支持 Spark SQL 自身的语法,同时也兼容 HSQL。

1. 编码

要先声明构建 SQLContext 或者 SparkSession,这个是 SparkSQL 的编码入口。早起的版本使用的是 SQLContext 或者 HiveContext,spark2 以后,建议使用的是 SparkSession。

SQLContext

new SQLContext(SparkContext)

HiveContext

new HiveContext(spark.sparkContext)

SparkSession

不使用 hive 元数据:

val spark = SparkSession.builder()

.config(sparkConf) .getOrCreate()

使用 hive 元数据:

val spark = SparkSession.builder()

.config(sparkConf) .enableHiveSupport().getOrCreate()

使用

val df =spark.read.json("examples/src/main/resources/people.json")

df.createOrReplaceTempView("people")

spark.sql("SELECT * FROM people").show()

2. spark-sql 脚本

spark-sql 启动的时候类似于 spark-submit 可以设置部署模式资源等,可以使用

bin/spark-sql –help 查看配置参数。

需要将 hive-site.xml 放到 ${SPARK_HOME}/conf/ 目录下,然后就可以测试

show tables;

select count(*) from student;

3. thriftserver

thriftserver jdbc/odbc 的实现类似于 hive1.2.1 的 hiveserver2,可以使用 spark 的 beeline 命令来测试 jdbc server。

安装部署

/1 开启 hive 的 metastore

bin/hive --service metastore

/2 将配置文件复制到spark/conf/目录下

/3 thriftserver

sbin/start-thriftserver.sh --masteryarn --deploy-mode client

对于 yarn 只支持 client 模式。

/4 启动 bin/beeline

/5 连接到 thriftserver

!connect jdbc:hive2://localhost:10001

Spark SQL|Spark,从入门到精通

用户自定义函数

1. UDF

定义一个 udf 很简单,例如我们自定义一个求字符串长度的 udf:

val len = udf{(str:String) => str.length}

spark.udf.register("len",len)

val ds =spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")

ds.createOrReplaceTempView("employees")

ds.show()

spark.sql("select len(name) from employees").show()

2. UserDefinedAggregateFunction

定义一个 UDAF

import org.apache.spark.sql.{Row, SparkSession}

import org.apache.spark.sql.expressions.MutableAggregationBuffer

import org.apache.spark.sql.expressions.UserDefinedAggregateFunction

import org.apache.spark.sql.types._

object MyAverageUDAF extends UserDefinedAggregateFunction {

//Data types of input arguments of this aggregate function

definputSchema:StructType = StructType(StructField("inputColumn", LongType) :: Nil)

//Data types of values in the aggregation buffer

defbufferSchema:StructType = {

StructType(StructField("sum", LongType):: StructField("count", LongType) :: Nil)

}

//The data type of the returned value

defdataType:DataType = DoubleType

//Whether this function always returns the same output on the identical input

defdeterministic: Boolean = true

//Initializes the given aggregation buffer. The buffer itself is a `Row` that inaddition to

// standard methods like retrieving avalue at an index (e.g., get(), getBoolean()), provides

// the opportunity to update itsvalues. Note that arrays and maps inside the buffer are still

// immutable.

definitialize(buffer:MutableAggregationBuffer): Unit = {

buffer(0) = 0L

buffer(1) = 0L

}

//Updates the given aggregation buffer `buffer` with new input data from `input`

defupdate(buffer:MutableAggregationBuffer, input: Row): Unit ={

if(!input.isNullAt(0)) {

buffer(0) = buffer.getLong(0)+ input.getLong(0)

buffer(1) = buffer.getLong(1)+ 1

}

}

// Mergestwo aggregation buffers and stores the updated buffer values back to `buffer1`

defmerge(buffer1:MutableAggregationBuffer, buffer2: Row): Unit ={

buffer1(0) = buffer1.getLong(0)+ buffer2.getLong(0)

buffer1(1) = buffer1.getLong(1)+ buffer2.getLong(1)

}

//Calculates the final result

defevaluate(buffer:Row): Double =buffer.getLong(0).toDouble /buffer.getLong(1)

}

使用 UDAF

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json")

ds.createOrReplaceTempView("employees")

ds.show()

spark.udf.register("myAverage", MyAverageUDAF)

val result = spark.sql("SELECT myAverage(salary) as average_salary FROM employees")

result.show()

3. Aggregator

定义一个 Aggregator

import org.apache.spark.sql.{Encoder, Encoders, SparkSession}

import org.apache.spark.sql.expressions.Aggregator

case class Employee(name: String, salary: Long)

case class Average(var sum: Long, var count: Long)

object MyAverageAggregator extends Aggregator[Employee, Average, Double] {

// A zero value for this aggregation. Should satisfy the property that any b + zero = b

def zero: Average = Average(0L, 0L)

// Combine two values to produce a new value. For performance, the function may modify `buffer`

// and return it instead of constructing a new object

def reduce(buffer: Average, employee: Employee): Average = {

buffer.sum += employee.salary

buffer.count += 1

buffer

}

// Merge two intermediate values

def merge(b1: Average, b2: Average): Average = {

b1.sum += b2.sum

b1.count += b2.count

b1

}

// Transform the output of the reduction

def finish(reduction: Average): Double = reduction.sum.toDouble / reduction.count

// Specifies the Encoder for the intermediate value type

def bufferEncoder: Encoder[Average] = Encoders.product

// Specifies the Encoder for the final output value type

def outputEncoder: Encoder[Double] = Encoders.scalaDouble

}

使用

spark.udf.register("myAverage2", MyAverageAggregator)

import spark.implicits._

val ds = spark.read.json("file:///opt/meitu/bigdata/src/main/data/employees.json").as[Employee]

ds.show()

val averageSalary = MyAverageAggregator.toColumn.name("average_salary")

val result = ds.select(averageSalary)

result.show()

Spark SQL|Spark,从入门到精通

总体执行流程如下:从提供的输入 API(SQL,Dataset, dataframe)开始,依次经过 unresolved 逻辑计划,解析的逻辑计划,优化的逻辑计划,物理计划,然后根据 cost based 优化,选取一条物理计划进行执行。

简单化成四个部分:

/1 analysis

Spark 2.0 以后语法树生成使用的是 antlr4,之前是 scalaparse。

/2 logical optimization

常量合并,谓词下推,列裁剪,boolean 表达式简化,和其它的规则。

/3 physical planning

eg:SortExec 。

/4 Codegen

codegen 技术是用 scala 的字符串插值特性生成源码,然后使用 Janino 编译成 java字节码,Eg: SortExec。

2. 自定义优化器

/1 实现

继承 Rule[LogicalPlan]

object MultiplyOptimizationRule extends Rule[LogicalPlan] {

def apply(plan: LogicalPlan): LogicalPlan = plan transformAllExpressions {

case Multiply(left,right) if

相关内容

丘陵山地轻简农业生产装备作业技术国际培训会在成都金堂隆重举行

来源:【红星新闻网】红星新闻网(记者刘杰)10月24日报道10月24日上午,丘陵山地轻简农业生产装备作业技术国际培训会在成都金堂隆重举行。来自泰国、越南、缅甸等6个东南亚国家专家代表特邀出席本次培训,国内农业专家、科研代表及相关部门领导也应···

成都市民办教育协会发布倡议书:严守法规,向校外培训乱象坚决说“不”

封面新闻记者 何方迪近日,成都市民办教育协会发布《校外培训行业自律倡议》,号召全市校外培训机构及其从业人员自觉做到“七要”“七不”。内容如下:2023年10月15日,教育部颁布的《校外培训行政处罚暂行办法》正式实施。我们认为,这是校外培训监···

成都农交所德阳所组织业务骨干参加“2023年全省土地管理培训会”

10月9日-11日,全省农村土地承包管理培训班在德阳市举办,成都农交所德阳所组织业务骨干参加此次培训班。培训班组织参训学员实地参观了绵竹市剑南粮油现代农业园区、什邡市雍城街道城东村两个现场点,学习解决农村土地细碎化问题试点和第二轮土地承包到···

让违法者付出代价,合规者受到保护!成都举行校外培训执法培训会

记者丨张瑾2023年10月15日,教育部颁布的《校外培训行政处罚暂行办法》(以下简称《办法》)开始正式实施。《办法》实施的第一周,对于部分还不太清楚政策风向的网友和家长来说,最为关心的莫过于校外培训今后还能不能上?这个政策一出是不是培训机构···

同心话融合,携手共成长 这个骨干教师培训班参访成都成华特校

10月13日,记者从成都市成华区特殊教育学校(以下简称成华特校)获悉,近日,青海省大通县随班就读和送教上门的骨干教师培训班一行50余人,在该县特教资源中心执行主任、特教学校校长何惠兴的带领下,走进成华特校进行访问交流。据悉,此次活动旨在助推···

成都市公共卫生临床医疗中心举办艾滋病合并内分泌代谢疾病诊治进展培训班

为推广医院在艾滋病合并内分泌代谢性疾病诊治过程中积累的宝贵经验、加强经验交流,成都市公共卫生临床医疗中心于10月8日在医院航天院区学术厅举办了艾滋病合并内分泌代谢疾病诊治进展培训班。成都市8家医疗机构共50余人全程参加培训。会议开始,成都市···

成都青羊实验中学附小举办家庭教育培训,助孩子以积极心态交往

百年大计,教育为本,教育是人类传承文明和知识、培养年轻一代、创造美好生活的根本途径。为进一步提升家长的家庭教育理念,引导孩子以积极心态交往,促进孩子健康成长和全面发展,近日,由中国关工委儿童发展研究中心专家团成员李萍教授主讲的“如何引导孩子···

成都高考冲刺培训机构排行榜来了!

基础薄弱、跟不上学习进度、抓不住复习重点……高三学生在备考文化课的过程中面临许多困难。时间紧迫,而进度缓慢让学生们倍感压力。对于基础较差或学习提升有瓶颈的学生,最重要的问题就是如何在有限的时间内学习有针对性地突破。成都的培训机构很多,那么有···

大运倒计时丨大运会志愿者代表培训班开班

来源:【人民网】成都大运会将于7月28日正式开幕,随着赛事日益临近,大运会也进入了最后的筹备冲刺阶段。人民网推出《大运倒计时》栏目,关注大运筹备动态,聚焦大运赛事信息。《我要说大运》解说员全国招募活动决赛收官5月8日,《我要说大运》解说员全···

成都市锦江区召开2023年腾讯99公益日格桑花开专项基金“唐卡非遗文化进社区”项目培训会

9月5日,成都市锦江区召开2023年腾讯99公益日格桑花开专项基金“唐卡非遗文化进社区”项目培训会,成都市锦江区社会组织发展基金会(简称:锦基金)向锦江区11个街道介绍了“唐卡非遗文化进社区”项目的内容、目的及意义,以及结合腾讯99公益日开···

落实“锦城教育学” 三维一体培训体系凸显锦城特色!

落实“锦城教育学” 三维一体培训体系凸显锦城特色——成都锦城学院2023年第二期(总第26期)新进教职员工入职暨“锦城教育学”教育理论研讨培训圆满举行为了锦城的高质量教学,让新进教职工更好更快地融入锦城,投入锦城工作,由人事处、教师教育发展···

闪亮明眸好视力 共护孩子眼健康——成都市菱窠路小学举行2023年秋季学期教师、家长眼健康培训

近视、散光等问题在当今社会尤为普遍,尤其在中小学生群体中,这样的现象日益严重。为进一步普及科学用眼知识,提高青少年眼健康水平,帮助家长引导孩子了解用眼的卫生与保健,9月19日下午,成都市菱窠路小学邀请成都市视力保护和健康促进学会成员、小艾眼···

成都农交所德阳所为旌阳区百余名新型农业经营主体开展专题培训

近日,成都农交所德阳所旌阳服务中心负责人在“旌阳区2023年高素质农民培育粮经复合种植技术培训班”上为辖区内100余名新型农业经营主体开展农村产权流转交易专题培训。培训会上,成都农交所德阳所旌阳服务中心负责人有针对性地从政策法规、平台建设、···

2023年度发电企业信用评价培训班在成都举办

中新网四川新闻9月28日电(黄进喜) 26日,由四川省电力行业协会、四川电力交易中心联合主办的2023年度发电企业信用评价培训班在成都举办。四川省经济和信息化厅电力处处长马健民、北京电力交易中心市场交易七部主任周全、四川电力交易中心总经理熊···

“双减”如何走好下一步?50位名师名校长来了一场成都和西安的“双城”对话

“双减”政策落地两年以来,对教师们提出了哪些具体的新要求?学校又该如何适应新要求来提升管理水平?教育部门如何引导需求,完善相关配套政策?4月18—21日,由成都市教育科学研究院、成都市教育学会指导,华商报联合成都商报·红星新闻·红星教育共同···

新经济观察丨关注成都骑手职业福利及发展 闪送宣布第四个骑士之家落地成都丨封面天天见

封面新闻记者 张越熙数字经济的发展,诞生了骑手这一社会职业群体。据国家信息中心发布的《中国共享经济发展报告》数据显示,我国依赖互联网的零工经济人群超过800万,其中将近十分之一为外卖骑士。近年来,伴随着越来越多人成为骑手,该职业作为就业“蓄···

猛将云集,越战越勇!三原的开学季,从高质量的培训活动开始!

带着秋草的味道,结束盛夏的桎梏,2023年8月16日-8月30日,为促进教师专业提升,有序推进新学年工作,成都市三原外国语学校(以下简称:三原)吹响“集结号”,以“深化管理 增效提质”为主题,组织了2023年骨干教师培训会、全员培训会、新教···

成都:会议、会展、培训可正常举办,餐厅、酒吧、KTV等全面恢复经营

转自:健康成都官微面对当前疫情形势,12月7日,国务院疫情联防联控机制综合组发布“新十条”。针对新政策,成都迅速调整落地优化疫情防控相关措施。今日,针对市民关心的会议会展举办、居家管理和医疗救治等问题,市疫情防控指挥部继续作答。问:阳性人员···

成都武侯区微网格治理引入食品安全培训

中新网四川新闻9月4日电 (记者 刘忠俊)近日,成都市武侯区食品安全办在簇桥街道锦城社区开展微网格实格治理网格员培训,社区网格长、一般网格员和微网格员80余人参加,通过培训切实提升食品安全基层治理能力水平。食品安全培训现场。成都市市场监管局···

简阳教育发布丨简阳市2023年中小学校(园)财务人员基础能力培训班顺利举办

为加强财务管理与监督,规范财务行为,提高财务人员业务水平和职业素养,简阳市教育局于2023年9月17日至19日在简阳市射洪坝水东小学成功举办简阳市教育局2023年教师培训。本次培训由成都蜀贤书院教育咨询有限公司承办,共计培训学员78人。在历···