概述 什么是Spark Spark是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写H
map, filter, flatMap, sample, groupByKey, reduceByKey, union, join, cogroup, mapValues, sort,partionBy等多种操作类型,Spark把这些操作称为Transformations。同时还提供Count, collect, reduce, lookup, save等多种actions操作。 RDD的特点:
RDD的好处
RDD的存储与分区
RDD的内部表示
在RDD的内部实现中每个RDD都可以使用5个方面的特性来表示:
RDD的存储级别
RDD根据useDisk、useMemory、deserialized、replication四个参数的组合提供了11种存储级别:
val NONE = new StorageLevel(false, false, false) val DISK_ONLY = new StorageLevel(true, false, false) val DISK_ONLY_2 = new StorageLevel(true, false, false, 2) val MEMORY_ONLY = new StorageLevel(false, true, true) val MEMORY_ONLY_2 = new StorageLevel(false, true, true, 2) val MEMORY_ONLY_SER = new StorageLevel(false, true, false) val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, 2) val MEMORY_AND_DISK = new StorageLevel(true, true, true) val MEMORY_AND_DISK_2 = new StorageLevel(true, true, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
RDD定义了各种操作,不同类型的数据由不同的RDD类抽象表示,不同的操作也由RDD进行抽实现。
下面来看一从Hadoop文件系统生成RDD的方式,如:val file = spark.textFile("hdfs://..."),file变量就是RDD(实际是HadoopRDD实例),生成的它的核心代码如下:
// SparkContext根据文件/目录及可选的分片数创建RDD, 这里我们可以看到Spark与Hadoop MapReduce很像 
 // 需要InputFormat, Key、Value的类型,其实Spark使用的Hadoop的InputFormat, Writable类型。 
 def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = { 
 hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], 
 classOf[Text], minSplits) .map(pair => pair._2.toString) }
 // 根据Hadoop配置,及InputFormat等创建HadoopRDD 
 new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)对RDD进行计算时,RDD从HDFS读取数据时与Hadoop MapReduce几乎一样的:
// 根据hadoop配置和分片从InputFormat中获取RecordReader进行数据的读取。 
 reader = fmt.getRecordReader(split.inputSplit.value, conf, Reporter.NULL)
 val key: K = reader.createKey()
 val value: V = reader.createValue()
 //使用Hadoop MapReduce的RecordReader读取数据,每个Key、Value对以元组返回。
 override def getNext() = {
 try {
 finished = !reader.next(key, value)
 } catch {
 case eof: EOFException =>
 finished = true
 }
 (key, value)
 }下面使用一个例子来示例说明Transformations与Actions在Spark的使用。
val sc = new SparkContext(master, "Example", System.getenv("SPARK_HOME"), 
 Seq(System.getenv("SPARK_TEST_JAR")))
 val rdd_A = sc.textFile(hdfs://.....)
 val rdd_B = rdd_A.flatMap((line => line.split("\\s+"))).map(word => (word, 1))
 val rdd_C = sc.textFile(hdfs://.....)
 val rdd_D = rdd_C.map(line => (line.substring(10), 1))
 val rdd_E = rdd_D.reduceByKey((a, b) => a + b)
 val rdd_F = rdd_B.jion(rdd_E)
 rdd_F.saveAsSequenceFile(hdfs://....)Spark对于资源管理与作业调度可以使用Standalone(模式),Apache Mesos及Hadoop YARN来实现。 Spark on Yarn在Spark0.6时引用,但真正可用是在现在的branch-0.8版本。Spark on Yarn遵循YARN的官方规范实现,得益于Spark天生支持多种Scheduler和Executor的良好设计,对YARN的支持也就非常容易,Spark on Yarn的大致框架图。 
让Spark运行于YARN上与Hadoop共用集群资源可以提高资源利用率。
Spark使用Scala开发,默认使用Scala作为编程语言。编写Spark程序比编写Hadoop MapReduce程序要简单的多,SparK提供了Spark-Shell,可以在Spark-Shell测试程序。写SparK程序的一般步骤就是创建或使用(SparkContext)实例,使用SparkContext创建RDD,然后就是对RDD进行操作。如:
val sc = new SparkContext(master, appName, [sparkHome], [jars]) 
 val textFile = sc.textFile("hdfs://.....") 
 textFile.map(....).filter(.....).....Spark支持Java编程,但对于使用Java就没有了Spark-Shell这样方便的工具,其它与Scala编程是一样的,因为都是JVM上的语言,Scala与Java可以互操作,Java编程接口其实就是对Scala的封装。如:
JavaSparkContext sc = new JavaSparkContext(...); 
 JavaRDD lines = ctx.textFile("hdfs://..."); 
 JavaRDD words = lines.flatMap( 
 new FlatMapFunction() { 
 public Iterable call(String s) { 
 return Arrays.asList(s.split(" ")); 
 } 
 } 
 );现在Spark也提供了Python编程接口,Spark使用py4j来实现python与java的互操作,从而实现使用python编写Spark程序。Spark也同样提供了pyspark,一个Spark的python shell,可以以交互式的方式使用Python编写Spark程序。 如:
from pyspark import SparkContext 
 sc = SparkContext("local", "Job Name", pyFiles=['MyFile.py', 'lib.zip', 'app.egg']) 
 words = sc.textFile("/usr/share/dict/words") 
 words.filter(lambda w: w.startswith("spar")).take(5)以Standalone模式运行Spark集群
http://spark-project.org/download/spark-0.7.3-prebuilt-cdh4.tgz)修改配置(conf/*) slaves: 配置工作节点的主机名 spark-env.sh:配置环境变量。
SCALA_HOME=/home/spark/scala-2.9.3 JAVA_HOME=/home/spark/jdk1.6.0_45 SPARK_MASTER_IP=spark1 SPARK_MASTER_PORT=30111 SPARK_MASTER_WEBUI_PORT=30118 SPARK_WORKER_CORES=2 SPARK_WORKER_MEMORY=4g SPARK_WORKER_PORT=30333 SPARK_WORKER_WEBUI_PORT=30119 SPARK_WORKER_INSTANCES=1
把Hadoop配置copy到conf目录下
在master主机上对其它机器做ssh无密码登录
把配置好的Spark程序使用scp copy到其它机器
在master启动集群
$SPARK_HOME/start-all.sh
以Yarn模式运行Spark
下载Spark代码.
git clone git://github.com/mesos/spark
切换到branch-0.8
cd spark git checkout -b yarn --track origin/yarn
使用sbt编译Spark并
$SPARK_HOME/sbt/sbt > package > assembly
把Hadoop yarn配置copy到conf目录下
运行测试
SPARK_JAR=./core/target/scala-2.9.3/spark-core-assembly-0.8.0-SNAPSHOT.jar \ ./run spark.deploy.yarn.Client --jar examples/target/scala-2.9.3/ \ --class spark.examples.SparkPi --args yarn-standalone
$SPARK_HOME/spark-shell进入shell即可,在Spark-shell中SparkContext已经创建好了,实例名为sc可以直接使用,还有一个需要注意的是,在Standalone模式下,Spark默认使用的调度器的FIFO调度器而不是公平调度,而Spark-shell作为一个Spark程序一直运行在Spark上,其它的Spark程序就只能排队等待,也就是说同一时间只能有一个Spark-shell在运行。在Spark-shell上写程序非常简单,就像在Scala Shell上写程序一样。
scala> val textFile = sc.textFile("hdfs://hadoop1:2323/user/data") 
 textFile: spark.RDD[String] = spark.MappedRDD@2ee9b6e3
 scala> textFile.count() // Number of items in this RDD
 res0: Long = 21374
 scala> textFile.first() // First item in this RDD
 res1: String = # Spark在Spark中Spark程序称为Driver程序,编写Driver程序很简单几乎与在Spark-shell上写程序是一样的,不同的地方就是SparkContext需要自己创建。如WorkCount程序如下:
import spark.SparkContext
import SparkContext._
object WordCount {
 def main(args: Array[String]) {
 if (args.length ==0 ){
 println("usage is org.test.WordCount ")
 }
 println("the args: ")
 args.foreach(println)
 val hdfsPath = "hdfs://hadoop1:8020"
 // create the SparkContext, args(0)由yarn传入appMaster地址
 val sc = new SparkContext(args(0), "WrodCount",
 System.getenv("SPARK_HOME"), Seq(System.getenv("SPARK_TEST_JAR")))
 val textFile = sc.textFile(hdfsPath + args(1))
 val result = textFile.flatMap(line => line.split("\\s+"))
 .map(word => (word, 1)).reduceByKey(_ + _)
 result.saveAsTextFile(hdfsPath + args(2))
 }
}原文地址:Spark:一个高效的分布式计算系统, 感谢原作者分享。
下载本文