Spark2.x精通:CacheManager源码深度剖析

一、概述

CacheManager主要发生在利用RDD的数据执行算子的时候,之前我们讲过在ShufffleWriter进行数据写时,会调用RDD对应的Iterator()方法,获取RDD对应的数据,CacheManager主要干三件事:

a. 管理Spark的缓存,可以基于内存,也可以基于磁盘;

b.底层是通过BlockManager进行数据的读写操作;

c.Task运行会调用RDD中的iterator方法进行数据的计算;

二、CacheManager源码剖析

1.之前我们讲解的ShuffleMapTask中的runTask方法时,ShuffleWriter写数据的参数传入的是rdd.iterator()方法计算出来的那个partition数据,代码如下:

var writer: ShuffleWriter[Any, Any] = null

try {

val manager = SparkEnv.get.shuffleManager

writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)

//这里就是ShuffleMapTask类的runTask()方法中对应的代码调用

writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])

writer.stop(success = true).get

} catch {

...................

}

2.这里看RDD类中的iterator方法,代码如下:

final def iterator(split: Partition, context: TaskContext): Iterator[T] = {

//判断下如果StorageLevel.NONE这说明RDD,之前肯定是进行了持久化

//getOrCompute中会通过CacheManager获取之前持久化的数据

if (storageLevel != StorageLevel.NONE) {

getOrCompute(split, context)

//如果没有进行过持久化,就需要通过父RDD定义的算子去获取数据

//注意这里如果有CheckPoint,会通过CheckPoint获取,checkPoint获取不到才去重新计算

} else {

computeOrReadCheckpoint(split, context)

}

}


3.跟进去看下持久化的RDD的处理,getOrCompute()函数,代码如下:

private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {

val blockId = RDDBlockId(id, partition.index)

var readCachedBlock = true

//CacheManger这里是通过BlockManager获取持久化数据,

//如果获取成功直接返回,如果获取失败,调用computeOrReadCheckpoint进行计算

//内存数据为啥会丢失? 之前我们知道内存中的数据如果空间不够的话,同时如果指定可以将数据缓存到磁盘,会溢写到磁盘,

//如果未指定溢写到磁盘,这些数据就会丢失掉 就需要重新计算

SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, elementClassTag, () => {

readCachedBlock = false

//获取不到重新计算,这里要注意,代码执行到这里说明这个RDD肯定是经过持久化的

//这里计算出数据后,会在getOrElseUpdate里面通过makeIterator参数对数据进行重新持久化(这里理解的不太透彻)

computeOrReadCheckpoint(partition, context)

}) match {

case Left(blockResult) =>

if (readCachedBlock) {

val existingMetrics = context.taskMetrics().inputMetrics

existingMetrics.incBytesRead(blockResult.bytes)

new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {

override def next(): T = {

existingMetrics.incRecordsRead(1)

delegate.next()

}

}

} else {

new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])

}

case Right(iter) =>

new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])

}

}

4.这里继续跟踪getOrElseUpdate()获取持久化的数据 ,代码如下:

//这里会调用get()方法从本地或者远程获取block数据,直接返回

//如果没有读取到数据就需要重新计算数据,由于代码执行到这里,rdd肯定是经过持久化的

//这里计算出数据后,通过makeIterator迭代器,重新进行持久化(这里理解的不太透彻)

def getOrElseUpdate[T](

blockId: BlockId,

level: StorageLevel,

classTag: ClassTag[T],

makeIterator: () => Iterator[T]): Either[BlockResult, Iterator[T]] = {

// Attempt to read the block from local or remote storage. If it's present, then we don't need

// to go through the local-get-or-put path.

//这里会调用get()方法从本地或者远程获取block数据,直接返回

get[T](blockId)(classTag) match {

case Some(block) =>

return Left(block)

case _ =>

// Need to compute the block.

}

//这里的处理意思是:对于本地远程没有获取到数据,然后computeOrReadCheckpoint重新计算的数据

//由于RDD是持久化的,原来的持久化数据可能丢了,这里根据持久化级别重新进行数据的持久化

//这里代码有点不太好理解 要结合上面2中第12-14行代码 一起理解

doPutIterator(blockId, makeIterator, level, classTag, keepReadLock = true) match {

case None =>

// doPut() didn't hand work back to us, so the block already existed or was successfully

// stored. Therefore, we now hold a read lock on the block.

val blockResult = getLocalValues(blockId).getOrElse {

// Since we held a read lock between the doPut() and get() calls, the block should not

// have been evicted, so get() not returning the block indicates some internal error.

releaseLock(blockId)

throw new SparkException(s"get() failed for block $blockId even though we held a lock")

}

// We already hold a read lock on the block from the doPut() call and getLocalValues()

// acquires the lock again, so we need to call releaseLock() here so that the net number

// of lock acquisitions is 1 (since the caller will only call release() once).

releaseLock(blockId)

Left(blockResult)

case Some(iter) =>

// The put failed, likely because the data was too large to fit in memory and could not be

// dropped to disk. Therefore, we need to pass the input iterator back to the caller so

// that they can decide what to do with the values (e.g. process them without caching).

Right(iter)

}

}

5.这里回过头来看computeOrReadCheckpoint方法,如果计算数据的,代码如下:

private[spark] def computeOrReadCheckpoint(split: Partition, context: TaskContext): Iterator[T] =

{

//如果设置了CheckPoint,从Checkpoint中获取数据

//这里CheckPoint相关的知识,先不讲解,后面有篇文章单独讲解

if (isCheckpointedAndMaterialized) {

firstParent[T].iterator(split, context)

} else {

//如果数据没有进行过Checkpoint,这里只能重新计算一次

//这里就是根据自己的rdd算子重新计算

compute(split, context)

}

}

6.最后总结下CacheManager数据计算的大体流程:

1).如果RDD进行过持久化,根据持久化级别通过BlockManager从本地或者远程获取数据,如果数据获取不到,则需要重新计算,由于这里RDD进行过持久化,只是由于某种原因丢失,还需要根据持久化级别重新进行一次数据的持久化。

2).如果RDD没有进行持久化,就需要重新计算,重新计算时,这里如果RDD进行了CheckPoint,则优先获取CheckPoint过的数据,如果没有,则需要从RDD的父RDD执行我们定义的算子来重新计算Partition数据。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章