Spark开发指南(0.8.1中文版)

翻译自Spark官方文档 Spark Programming Guide

Spark开发指南

简介

总的来说,每一个Spark的应用,都是由一个驱动程序( driver program )构成,它运行用户的 main 函数,在一个集群上执行各种各样的并行操作。Spark提出的最主要抽象概念是弹性分布式数据集 ( resilient distributed dataset ,RDD),它是一个元素集合,划分到集群的各个节点上,可以被并行操作。RDDs的创建可以从HDFS(或者任意其他支持Hadoop文件系统) 上的一个文件开始,或者通过转换驱动程序( driver program )中已存在的Scala集合而来。用户也可以让Spark 保留 一个RDD在内存中,使其能在并行操作中被有效的重复使用。最后,RDD能自动从节点故障中恢复。

Spark的第二个抽象概念是共享变量( shared variables ),可以在并行操作中使用。在默认情况下,Spark通过不同节点上的一系列任务来运行一个函数,它将每一个函数中用到的变量的拷贝传递到每一个任务中。有时候,一个变量需要在任务之间,或任务与驱动程序之间被共享。Spark 支持两种类型的共享变量: 广播变量 ,可以在内存的所有的结点上缓存变量; 累加器 :只能用于做加法的变量,例如计数或求和。

本指南将展示这些特性,并给出一些例子。读者最好比较熟悉Scala,尤其是 闭包 的语法。请留意,你也可以通过s park-shell 脚本,来交互式地运行Spark。我们建议你在接下来的步骤中这样做。

接入Spark

Spark 0.8.1 需要搭配使用 Scala 2.9.3. 如果你用Scala 来编写应用,你需要使用相同版本的Scala,更新的大版本很可能不兼容。

要写一个Spark 应用,你需要给它加上Spark的依赖。如果你使用SBT或者Maven,Spark可以通过Maven中心库来获得:

1
2
3

groupId = org . apache . spark

artifactId = spark - core_2 . 9.3

version = 0.8.1 - incubating

另外,如果你想访问一个HDFS集群,你需要根据你的HDFS版本,添加一个 hadoop-client 的依赖:

1
2
3

groupId = org . apache . hadoop

artifactId = hadoop - client

version = < your - hdfs - version >

对于其他编译系统,你可以通过运行 sbt/sbt assembly 来把Spark及其依赖打包到一个JAR( assembly/target/scala-2.9.3/spark-assembly-0.8.1-incubating-hadoop*.jar )中,然后将其加入到你的CLASSPATH中。并按照 这里 的描述设置HDFS版本。

最后,你需要将一些Spark的类和隐式转换导入到你的程序中。通过如下语句:

1
2

import org . apache . spark . SparkContext

import org . apache . spark . SparkContext . _

Spark初始化

Spark程序需要做的第一件事情,就是创建一个 SparkContext 对象,它将告诉Spark如何访问一个集群。这个通常是通过下面的构造器来实现的:

1

new SparkContext ( master , appName , [ sparkHome ] , [ jars ] )

master 参数,是一个用于指定所连接的 Spark or Mesos 集群URL 的字符串,也可以是一个如下面所描述的用于在local模式运行的特殊字符串“local”。 appName 是你的应用的名称,将会在集群的Web监控UI中显示。最后,如果部署到集群,在分布式模式下运行,最后两个参数是必须的。后面会有具体的描述。

在Spark shell中,一个特殊的解释器感知的SparkContext已经为你创建好了,变量名是 sc 。创建你自己的SparkContext是不会生效的。你可以用 MASTER 环境变量来设置SparkContext连接到的master。也可以用 ADD_JARS 变量来将JARs加入到你的classpath。例如,如果在四核CPU上运行 spark-shell ,使用:

1

$ MASTER = local [ 4 ] . / spark - shell

或者,同时在classpath中加入 code.jar ,使用:

1

$ MASTER = local [ 4 ] ADD_JARS = code . jar . / spark - shell

Master URLs

传递给Spark的master URL可以是以下任一种形式:

Master URL 含义
local 使用一个Worker线程本地化运行SPARK(完全不并行)
local[K] 使用K个Worker线程本地化运行Spark(理想情况下,K应该根据运行机器的CPU核数设定)
spark://HOST:PORT 连接到指定的Spark单机版集群( Spark standalone cluster )master。必须使用master所配置的接口,默认接口是7077.
mesos://HOST:PORT 连接到指定的 Mesos 集群。host参数是Moses master的hostname。必须使用master所配置的接口,默认接口是5050.

如果没有指定的msater URL, spark shell 的默认值是“local”。

如果在YARN上运行,Spark会在YARN上,启动一个standalone部署的集群实例,查看 running on YARN 获得更多详情。

在集群上部署代码

如果你要在集群上运行应用,你需要给 SparkContext 指定两个可选参数,使其能找到你的代码:

  • sparkHome :你的集群机器上Spark的安装路径(所有机器上路径必须一致)
  • jars : 在本地机器上的JAR文件列表,其中包括你应用的代码以及任何的依赖,Spark将会把他们部署到所有的集群结点上。你需要使用你的编译系统将你的应用打包成一系列JAR文件。例如,如果你使用SBT,用 sbt-assembly 插件将你的代码和所有依赖变成一个JAR文件是一个好的办法。

如果你在一个集群上运行 spark-shell , 在启动之前你可以通过指定 ADD_JAR 环境变量将JAR文件们加载在集群上,这个变量需要包括一个用逗号分隔的JAR文件列表。例如, ADD_JARS=a.jar,b.jar ./spark-shell 将启动一个在classpath中带有 a.jarb.jar 的shell。另外,在shell中定义的任何新类,都会被自动分发出去。

弹性分布式数据集

Spark围绕的概念是弹性分布式数据集(RDD),这是一个有容错机制并可以被并行操作的元素集合。目前有两种类型的RDD:并行集合( Parallelized Collections) :接收一个已经存在的Scala集合,然后进行各种并行计算。 Hadoop数据集( Hadoop Datasets) :在一个文件的每条记录上运行函数。只要文件系统是HDFS,或者hadoop支持的任意存储系统即可。 这两种类型的RDD都可以通过相同的方式进行操作。

并行集合(Parallelized Collections)

并行集合是通过调用 SparkContext的parallelize 方法,在一个已经存在的Scala集合上创建的(一个 Seq 对象)。集合的对象将会被拷贝,创建出一个可以被并行操作的分布式数据集。例如,下面的解释器输出,演示了如何从一个数组创建一个并行集合:

1
2
3
4
5

scala > val data = Array ( 1 , 2 , 3 , 4 , 5 )

data : Array [ Int ] = Array ( 1 , 2 , 3 , 4 , 5 )

scala > val distData = sc . parallelize ( data )

distData : spark . RDD [ Int ] = spark . ParallelCollection @ 10d13e3e

一旦分布式数据集( distData )被创建好,它们将可以被并行操作。例如,我们可以调用 distData.reduce(_ +_) 来将数组的元素相加。我们会在后续的分布式数据集运算中进一步描述。

并行集合的一个重要参数是slices,表示数据集切分的份数。Spark将会在集群上为每一份数据起一个任务。典型地,你可以在集群的每个CPU上分布2-4个slices. 一般来说,Spark会尝试根据集群的状况,来自动设定slices的数目。然而,你也可以通过传递给 parallelize 的第二个参数来进行手动设置。(例如: sc.parallelize(data, 10) ).

Hadoop数据集(Hadoop Datasets)

Spark可以从存储在HDFS,或者Hadoop支持的其它文件系统(包括本地文件, Amazon S3 , Hypertable, HBase等等)上的文件创建分布式数据集。Spark可以支持TextFile, SequenceFiles 以及其它任何Hadoop输入格式。(Python接口目前还不支持SequenceFile,很快会支持吧)

Text file的RDDs可以通过SparkContext’s textFile的方式创建,该方法接受一个文件的URI地址(或者机器上的一个本地路径,或者一个hdfs://, sdn://,kfs://,其它URI). 下面是一个调用例子:

1
2

scala > val distFile = sc . textFile ( "data.txt" )

distFile : spark . RDD [ String ] = spark . HadoopRDD @ 1d4cee08

一旦创建完成, distFile 可以被进行数据集操作。例如,我们可以通过使用如下的map和reduce操作: distFile.map(_.size).reduce(_ + _ ) 将所有数据行的长度相加。

textFile 方法也可以通过输入一个可选的第二参数,来控制文件的分片数目。默认情况下,Spark为每一块文件创建一个分片(HDFS默认的块大小为64MB),但是你也可以通过传入一个更大的值,来指定一个更高的片值。注意,你不能指定一个比块数更小的片值(和Map数不能小于Block数一样,但是可以比它多)

对于 SequenceFiles ,可以使用SparkContext的 sequenceFile[K, V] 方法创建,其中K和V是文件中的key和values的类型。像 IntWritableText 一样,它们必须是Hadoop的 Writable interface的子类。另外,对于几种通用Writable类型,Spark允许你指定原生类型来替代。例如: sequencFile[Int, String] 将会自动读取IntWritable和Texts。

最后,对于其他类型的Hadoop输入格式,你可以使用 SparkContext.hadoopRDD 方法,它可以接收任意类型的 JobConf 和输入格式类,键类型和值类型。按照像Hadoop作业一样的方法,来设置输入源就可以了。

RDD 的操作

RDD支持两种操作: 转换( transformation 从现有的数据集创建一个新的数据集;而 动作(actions) 在数据集上运行计算后,返回一个值给驱动程序。 例如, map 就是一种转换,它将数据集每一个元素都传递给函数,并返回一个新的分布数据集表示结果。另一方面, reduce 是一种动作,通过一些函数将所有的元素叠加起来,并将最终结果返回给Driver程序。(不过还有一个并行的 reduceByKey ,能返回一个分布式数据集)

Spark中的所有转换都是惰性的,也就是说,他们并不会直接计算结果。相反的,它们只是记住应用到基础数据集(例如一个文件)上的这些转换动作。只有当发生一个要求返回结果给Driver的动作时,这些转换才会真正运行。这个设计让Spark更加有效率的运行。例如,我们可以实现:通过 map 创建的一个新数据集,并在 reduce 中使用,最终只返回 reduce 的结果给driver,而不是整个大的新数据集。

默认情况下,每一个转换过的RDD都会在你在它之上执行一个动作时被重新计算。不过,你也可以使用 persist (或者 cache )方法,持久化一个RDD在内存中。在这种情况下,Spark将会在集群中,保存相关元素,下次你查询这个RDD时,它将能更快速访问。在磁盘上持久化数据集,或在集群间复制数据集也是支持的,这些选项将在本文档的下一节进行描述。

下面的表格列出了目前所支持的转换和动作(详情请参见 RDD API doc ):

转换( transformation

 转换 含义
map ( func ) 返回一个新分布式数据集,由每一个输入元素经过 func 函数转换后组成
filter ( func ) 返回一个新数据集,由经过 func 函数计算后返回值为true的输入元素组成
flatMap ( func ) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(因此 func 应该返回一个序列,而不是单一元素)
mapPartitions ( func ) 类似于map,但独立地在RDD的每一个分块上运行,因此在类型为T的RDD上运行时, func 的函数类型必须是Iterator[T] => Iterator[U]
mapPartitionsWithSplit ( func ) 类似于mapPartitions, 但 func带有 一个整数参数表示分块的索引值。因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Iterator[T]) => Iterator[U]
sample ( withReplacement , fractionseed ) 根据 fraction 指定的比例,对数据进行采样,可以选择是否用随机数进行替换,seed用于指定随机数生成器种子
union ( otherDataset ) 返回一个新的数据集,新数据集是由源数据集和参数数据集联合而成
distinct ([ numTasks ])) 返回一个包含源数据集中所有不重复元素的新数据集
groupByKey ([ numTasks ]) 在一个(K,V)对的数据集上调用,返回一个(K,Seq[V])对的数据集
注意: 默认情况下,只有8个并行任务来做操作,但是你可以传入一个可选的 numTasks 参数来改变它
reduceByKey ( func , [ numTasks ]) 在一个(K,V)对的数据集上调用时,返回一个(K,V)对的数据集,使用指定的 reduce 函数,将相同key的值聚合到一起。类似 groupByKeyreduce 任务个数是可以通过第二个可选参数来配置的
sortByKey ([ ascending ], [ numTasks ]) 在一个(K,V)对的数据集上调用,K必须实现Ordered接口,返回一个按照Key进行排序的(K,V)对数据集。升序或降序由 ascending 布尔参数决定
join ( otherDataset , [ numTasks ]) 在类型为(K,V)和(K,W)类型的数据集上调用时,返回一个相同key对应的所有元素对在一起的(K, (V, W))数据集
cogroup ( otherDataset , [ numTasks ]) 在类型为(K,V)和(K,W)的数据集上调用,返回一个 (K, Seq[V], Seq[W])元组的数据集。这个操作也可以称之为 groupwith
cartesian ( otherDataset ) 笛卡尔积,在类型为 T 和 U 类型的数据集上调用时,返回一个 (T, U)对数据集(两两的元素对)

完整的转换列表可以在 RDD API doc 中获得。

动作(actions)

 动作 含义
reduce ( func ) 通过函数 func (接受两个参数,返回一个参数)聚集数据集中的所有元素。这个功能必须可交换且可关联的,从而可以正确的被并行执行。
collect () 在驱动程序中,以数组的形式,返回数据集的所有元素。这通常会在使用filter或者其它操作并返回一个足够小的数据子集后再使用会比较有用。
count () 返回数据集的元素的个数。
first () 返回数据集的第一个元素(类似于take(1))
take ( n ) 返回一个由数据集的前 n 个元素组成的数组。注意,这个操作目前并非并行执行,而是由驱动程序计算所有的元素
takeSample ( withReplacement , numseed ) 返回一个数组,在数据集中随机采样 num 个元素组成,可以选择是否用随机数替换不足的部分,Seed用于指定的随机数生成器种子
saveAsTextFile ( path ) 将数据集的元素,以textfile的形式,保存到本地文件系统,HDFS或者任何其它hadoop支持的文件系统。对于每个元素,Spark将会调用 toString 方法,将它转换为文件中的文本行
saveAsSequenceFile ( path ) 将数据集的元素,以Hadoop sequencefile的格式,保存到指定的目录下,本地系统,HDFS或者任何其它hadoop支持的文件系统。这个只限于由key-value对组成,并实现了Hadoop的Writable接口,或者隐式的可以转换为Writable的RDD。(Spark包括了基本类型的转换,例如Int,Double,String,等等)
countByKey () 对(K,V)类型的RDD有效,返回一个(K,Int)对的Map,表示每一个key对应的元素个数
foreach ( func ) 在数据集的每一个元素上,运行函数 func 进行更新。这通常用于边缘效果,例如更新一个累加器,或者和外部存储系统进行交互,例如HBase

完整的转换列表可以在 RDD API doc 中获得。

RDD 的持久化

Spark最重要的一个功能,就是在不同操作间, 持久化 (或缓存)一个数据集在内存中。当你持久化一个RDD,每一个结点都将把它的计算分块结果保存在内存中,并在对此数据集(或者衍生出的数据集)进行的其它动作中重用。这将使得后续的动作(Actions)变得更加迅速(通常快10倍)。缓存是用Spark构建迭代算法的关键。

你可以用 persist()cache() 方法来标记一个要被持久化的RDD,然后一旦首次被一个动作(Action)触发计算,它将会被保留在计算结点的内存中并重用。Cache有容错机制,如果RDD的任一分区丢失了,通过使用原先创建它的转换操作,它将会被自动重算(不需要全部重算,只计算丢失的部分)。

此外,每一个RDD都可以用不同的保存级别进行保存,从而允许你持久化数据集在硬盘,或者在内存作为序列化的Java对象(节省空间),甚至于跨结点复制。这些等级选择,是通过将一个 org.apache.spark.storage.StorageLevel 对象传递给 persist() 方法进行确定。 cache() 方法是使用默认存储级别的快捷方法,也就是 StorageLevel.MEMORY_ONLY (将反序列化的对象存入内存)。

完整的可选存储级别如下:

存储级别  意义
MEMORY_ONLY 将RDD作为反序列化的的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算。这是是默认的级别
MEMORY_AND_DISK 将RDD作为反序列化的的对象存储在JVM中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取
MEMORY_ONLY_SER 将RDD作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用 fast serializer ,但在读取时会比较占用CPU
MEMORY_AND_DISK_SER MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
DISK_ONLY 只将RDD分区存储在硬盘上
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. 与上述的存储级别一样,但是将每一个分区都复制到两个集群结点上

存储级别的选择

Spark的不同存储级别,旨在满足内存使用和CPU效率权衡上的不同需求。我们建议通过以下的步骤来进行选择:

  • 如果你的RDDs可以很好的与默认的存储级别( MEMORY_ONLY )契合,就不需要做任何修改了。这已经是CPU使用效率最高的选项,它使得RDDs的操作尽可能的快。
  • 如果不行,试着使用 MEMORY_ONLY_SER 并且选择一个 快速序列化的库 使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。
  • 尽可能不要存储到硬盘上,除非计算数据集的函数,计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度,和与从硬盘中读取基本差不多快。
  • 如果你想有快速故障恢复能力,使用复制存储级别(例如:用Spark来响应web应用的请求)。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让你在RDD上持续的运行任务,而不需要等待丢失的分区被重新计算。

如果你想要定义你自己的存储级别(比如复制因子为3而不是2),可以使用 StorageLevel 单例对象的 apply() 方法。

共享变量

一般来说,当一个函数被传递给Spark操作(例如 mapreduce ),在一个远程集群上运行,它实际上操作的是这个函数用到的所有变量的独立拷贝。这些变量会被拷贝到每一台机器,在远程机器上对变量的所有更新都不会被传播回驱动程序。通常看来,在任务之间中,读写共享变量显然不够高效。然而,Spark还是为两种常见的使用模式,提供了两种有限的共享变量:广播变量和累加器。

广播变量

广播变量允许程序员保留一个只读的变量,缓存在每一台机器上,而非每个任务保存一份拷贝。他们可以这样被使用,例如,以一种高效的方式给每个结点一个大的输入数据集。Spark会尝试使用一种高效的广播算法来传播广播变量,从而减少通信的代价。

广播变量是通过调用 SparkContext.broadcast(v) 方法从变量v创建的。广播变量是一个v的封装器,它的值可以通过调用 value 方法获得。如下模块展示了这个:

1
2
3
4
5

scala > val broadcastVar = sc . broadcast ( Array ( 1 , 2 , 3 ) )

broadcastVar : spark . Broadcast [ Array [ Int ] ] = spark . Broadcast ( b5c40191 - a864 - 4c7d - b9bf - d87e1a4e787c )

scala > broadcastVar . value

res0 : Array [ Int ] = Array ( 1 , 2 , 3 )

在广播变量被创建后,它应该在集群运行的任何函数中,代替 v 值被调用,从而v值不需要被再次传递到这些结点上。另外,对象 v 不能在广播后修改,这样可以保证所有结点的收到的都是一模一样的广播值。

累加器

累加器是一种只能通过关联操作进行“加”操作的变量,因此可以高效被并行支持。它们可以用来实现计数器(如MapReduce中)和求和器。Spark原生就支持 IntDouble 类型的累加器,开发者可以自己添加新的支持类型。

一个累加器可以通过调用 SparkContext.accumulator(v) 方法从一个初始值v中创建。运行在集群上的任务,可以通过使用 += 来给它加值。然而,他们不能读取这个值。只有驱动程序可以使用 value 的方法来读取累加器的值。

如下的解释器模块,展示了如何利用累加器,将一个数组里面的所有元素相加:

1
2
3
4
5
6
7
8
9

scala > val accum = sc . accumulator ( 0 )

accum : spark . Accumulator [ Int ] = 0

scala > sc . parallelize ( Array ( 1 , 2 , 3 , 4 ) ) . foreach ( x = > accum + = x )

. . .

10 / 09 / 29 18 : 41 : 08 INFO SparkContext : Tasks finished in 0.317106 s

scala > accum . value

res2 : Int = 10

更多信息

你可以在Spark的网站上看到 spark程序的样例 。Spark还在 examples/src/main/scala 上收入了一些例子,其中一些既有Spark版本,又有本地(非并行)版本。这些案例让你看到要让程序以集群化的方式跑起来的话,需要做什么修改。你可以通过将类名传递给spark中的 run-example 脚本来运行它们,例如:

1

. / run - example org . apache . spark . examples . SparkPi

任何样例程序在运行时如果没有提供任何参数,都会打印使用帮助。

当需要优化程序的帮助, configurationtuning 指导提供了最佳实践信息。它们对于确保你的数据以高效的格式存储在内存中,至关重要。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章