视频1 视频21 视频41 视频61 视频文章1 视频文章21 视频文章41 视频文章61 推荐1 推荐3 推荐5 推荐7 推荐9 推荐11 推荐13 推荐15 推荐17 推荐19 推荐21 推荐23 推荐25 推荐27 推荐29 推荐31 推荐33 推荐35 推荐37 推荐39 推荐41 推荐43 推荐45 推荐47 推荐49 关键词1 关键词101 关键词201 关键词301 关键词401 关键词501 关键词601 关键词701 关键词801 关键词901 关键词1001 关键词1101 关键词1201 关键词1301 关键词1401 关键词1501 关键词1601 关键词1701 关键词1801 关键词1901 视频扩展1 视频扩展6 视频扩展11 视频扩展16 文章1 文章201 文章401 文章601 文章801 文章1001 资讯1 资讯501 资讯1001 资讯1501 标签1 标签501 标签1001 关键词1 关键词501 关键词1001 关键词1501 专题2001
SparkSQL和DataFrame的学习总结
2020-11-09 16:26:39 责编:小采
文档

使用SQLContext,应用可以从已存在的RDD、hive表或者数据源DataSources中创建DataFrame
示例:从本地 json文件创建

val df = sqlContext.jsonFile(“file:///home/hdfs/people.json”)
df.show()
age name
null Michael
30 Andy
19 Justin
df.printSchema()
|– age: long (nullable = true)
|– name: string (nullable = true)

(2)DataFrame的操作
DataFrame支持RDD的系列操作,可以对表进行过滤和进行多表关联

df.select(“name”).show()
name
Michael
Andy
Justin
df.select(df(“name”),df(“age”)+1).show()
name (age + 1)
Michael null
Andy 31
Justin 20
df.filter(df(“age”)>21).select(“name”).show()
name
Andy
df.groupBy(“age”).count().show()
age count
null 1
19 1
30 1
表之间的连接,3个等号
df.join(df2,df(“name”) === df2(“name”),”left”).show()

df.filter(“age > 30”)
.join(department, df(“deptId”) === department(“id”))
.groupBy(department(“name”), “gender”)
.agg(avg(df(“salary”)), max(df(“age”)))

2、SparkSQL中的数据源

Spark SQL支持通过SchemaRDD接口操作各种数据源。一个SchemaRDD能够作为一个一般的RDD被操作,也可以被注册为一个临时的表。注册一个SchemaRDD为一个表就可以允许你在其数据上运行SQL查询。
加载数据为SchemaRDD的多种数据源,包括RDDs、parquent文件(列式存储)、JSON数据集、Hive表,以下主要介绍将RDDs转换为schemaRDD的两种方法
(1)利用反射推断模式
使用反射来推断包含特定对象类型的RDD的模式(schema)。适用于写spark程序的同时,已经知道了模式,使用反射可以使得代码简洁。结合样本的名字,通过反射读取,作为列的名字。这个RDD可以隐式转化为一个SchemaRDD,然后注册为一个表。表可以在后续的sql语句中使用。

val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
case class Person(name:String,age:Int)
val people = sc.textFile("file:///home/hdfs/people.txt").map(_.split(",")).map(p => Person(p(0),p(1).trim.toInt)).toDF()
people.registerTempTable("people")

val teenagers = sqlContext.sql("SELECT name,age FROM people WHERE age>= 19 AND age <=30")
teenagers.map(t => "Name:"+t(0)).collect().foreach(println)

teenagers.map(t => "Name:" + t.getAs[String]("name")).collect().foreach(println)
teenagers.map(_.getValueMap[Any](List("name","age"))).collect().foreach(println)

(2)编程指定模式
通过一个编程接口构造模式来实现,然后可在存在的RDDs上使用它。适用于当前样本模式未知
一个SchemaRDD可以通过三步来创建。

从原来的RDD创建一个行的RDD
创建由一个StructType表示的模式与第一步创建的RDD的行结构相匹配
在行RDD上通过applySchema方法应用模式

val people = sc.textFile("file:///home/hdfs/people.txt")
val schemaString = "name age"

import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType};

val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

val rowRDD = people.map(_.split(",")).map(p => Row(p(0),p(1).trim))

val peopleSchemaRDD = sqlContext.applySchema(rowRDD,schema)
peopleSchemaRDD.registerTempTable("people")

val results = sqlContext.sql("SELECT name FROM people") //DataFrame and support all the normal RDD operations
results.map(t => "Name:"+t(0)).collect().foreach(println)

结果输出

Name:Andy
Name:Justin
Name:JohnSmith
Name:Bob

3、性能调优
主要通过在内存中缓存数据或者设置实验选项来提高性能,降低工作负载
(1)在内存中缓存数据
Spark SQL可以通过调用sqlContext.cacheTable(“tableName”)方法来缓存使用柱状格式的表。然后,Spark将会仅仅浏览需要的列并且自动地压缩数据以减少内存的使用以及垃圾回收的压力。
也可以在SQLContext上使用setConf方法或者在用SQL时运行SET key=value命令来配置内存缓存。
(2)配置选项
可以通过spark.sql.shuffle.partitions、spark.sql.codegen等选项来调整查询执行的性能。

4、其他
Spark SQL也支持直接运行SQL查询的接口,不用写任何代码。在Spark目录运行下面的命令可以启动Spark SQL CLI。

./bin/spark-sql

下载本文
显示全文
专题