Spark 源码系列(八)Spark Streaming 实例分析

val ssc = new StreamingContext(sparkConf, Seconds(1));
// 获得一个DStream负责连接 监听端口:地址
val lines = ssc.socketTextStream(serverIP, serverPort);
// 对每一行数据执行Split操作
val words = lines.flatMap(_.split(" "));
// 统计word的数量
val pairs = words.map(word => (word, 1));
val wordCounts = pairs.reduceByKey(_ + _);
// 输出结果
wordCounts.print();
ssc.start();             // 开始
ssc.awaitTermination();  // 计算完毕退出
复制代码

1、首先实例化一个 StreamingContext

2、调用 StreamingContext 的 socketTextStream

3、对获得的 DStream 进行处理

4、调用 StreamingContext 是 start 方法,然后等待

我们看 StreamingContext 的 socketTextStream 方法吧。

def socketTextStream(
      hostname: String,
      port: Int,
      storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
    ): ReceiverInputDStream[String] = {
    socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
  }
复制代码

1、StoageLevel 是 StorageLevel.MEMORY_AND_DISK_SER_2

2、使用 SocketReceiver 的 bytesToLines 把输入流转换成可遍历的数据

继续看 socketStream 方法,它直接 new 了一个

new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
复制代码

继续深入挖掘 SocketInputDStream,追述一下它的继承关系,SocketInputDStream>>ReceiverInputDStream>>InputDStream>>DStream。

具体实现 ReceiverInputDStream 的类有好几个,基本上都是从网络端来数据的。

它实现了 ReceiverInputDStream 的 getReceiver 方法,实例化了一个 SocketReceiver 来接收数据。

SocketReceiver 的 onStart 方法里面调用了 receive 方法,处理代码如下:

socket = new Socket(host, port)
      val iterator = bytesToObjects(socket.getInputStream())
      while(!isStopped && iterator.hasNext) {
        store(iterator.next)
      }
复制代码

1、new 了一个 Socket 来接收数据,用 bytesToLines 方法把 InputStream 转换成一行一行的字符串。

2、把每一行数据用 store 方法保存起来,store 方法是从 SocketReceiver 的父类 Receiver 继承而来,内部实现是:

def store(dataItem: T) {
    executor.pushSingle(dataItem)
  }
复制代码

executor 是 ReceiverSupervisor 类型,Receiver 的操作都是由它来处理。这里先不深纠,后面我们再说这个 pushSingle 的实现。

到这里我们知道 lines 的类型是 SocketInputDStream,然后对它是一顿的转换,flatMap、map、reduceByKey、print,这些方法都不是 RDD 的那种方法,而是 DStream 独有的。

讲到上面这几个方法,我们开始转入 DStream 了,flatMap、map、reduceByKey、print 方法都涉及到 DStream 的转换,这和 RDD 的转换是类似的。我们讲一下 reduceByKey 和 print。

reduceByKey 方法和 RDD 一样,调用的 combineByKey 方法实现的,不一样的是它直接 new 了一个 ShuffledDStream 了,我们接着看一下它的实现吧。

override def compute(validTime: Time): Option[RDD[(K,C)]] = {
    parent.getOrCompute(validTime) match {
      case Some(rdd) => Some(rdd.combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
      case None => None
    }
  }
复制代码

在 compute 阶段,对通过 Time 获得的 rdd 进行 reduceByKey 操作。接下来的 print 方法也是一个转换:

new ForEachDStream(this, context.sparkContext.clean(foreachFunc)).register()
复制代码

打印前十个,超过 10 个打印 "..."。需要注意 register 方法。

ssc.graph.addOutputStream(this)
复制代码

它会把代码插入到当前的 DStream 添加到 outputStreams 里面,后面输出的时候如果没有 outputStream 就不会有输出,这个需要记住哦!

启动过程分析

前戏结束之后,ssc.start() 高潮开始了。 start 方法很小,最核心的一句是 JobScheduler 的 start 方法。我们得转到 JobScheduler 方法上面去。

下面是 start 方法的代码:

def start(): Unit = synchronized {
  // 接受到JobSchedulerEvent就处理事件
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobSchedulerEvent => processEvent(event)
      }
    }), "JobScheduler")

    listenerBus.start()
    receiverTracker = new ReceiverTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
  }
复制代码

1、启动了一个 Actor 来处理 JobScheduler 的 JobStarted、JobCompleted、ErrorReported 事件。

2、启动 StreamingListenerBus 作为监听器。

3、启动 ReceiverTracker。

4、启动 JobGenerator。

我们接下来看看 ReceiverTracker 的 start 方法。

def start() = synchronized {if (!receiverInputStreams.isEmpty) {
      actor = ssc.env.actorSystem.actorOf(Props(new ReceiverTrackerActor), "ReceiverTracker")
      receiverExecutor.start()
    }
  }
复制代码

1、首先判断了一下 receiverInputStreams 不能为空,那 receiverInputStreams 是怎么时候写入值的呢?答案在 SocketInputDStream 的父类 InputDStream 当中,当实例化 InputDStream 的时候会在 DStreamGraph 里面添加 InputStream。

abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext) extends DStream[T](ssc_) {
  ssc.graph.addInputStream(this)
  //....
}
复制代码

2、实例化 ReceiverTrackerActor,它负责 RegisterReceiver(注册 Receiver)、AddBlock、ReportError(报告错误)、DeregisterReceiver(注销 Receiver)等事件的处理。

3、启动 receiverExecutor(实际类是 ReceiverLauncher,这名字起得。。),它主要负责启动 Receiver,start 方法里面调用了 startReceivers 方法吧。

private def startReceivers() {
     // 对应着上面的那个例子,getReceiver方法获得是SocketReceiver
      val receivers = receiverInputStreams.map(nis => {
        val rcvr = nis.getReceiver()
        rcvr.setReceiverId(nis.id)
        rcvr
      })

      // 查看是否所有的receivers都有优先选择机器,这个需要重写Receiver的preferredLocation方法,目前只有FlumeReceiver重写了
      val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_ && _)

      // 创建一个并行receiver集合的RDD, 把它们分散到各个worker节点上
      val tempRDD =
        if (hasLocationPreferences) {
          val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
          ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
        } else {
          ssc.sc.makeRDD(receivers, receivers.size)
        }

      // 在worker节点上启动Receiver的方法,遍历所有Receiver,然后启动
      val startReceiver = (iterator: Iterator[Receiver[_]]) => {
        if (!iterator.hasNext) {
          throw new SparkException("Could not start receiver as object not found.")
        }
        val receiver = iterator.next()
        val executor = new ReceiverSupervisorImpl(receiver, SparkEnv.get)
        executor.start()
        executor.awaitTermination()
      }
      // 运行这个重复的作业来确保所有的slave都已经注册了,避免所有的receivers都到一个节点上
      if (!ssc.sparkContext.isLocal) {
        ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
      }

      // 把receivers分发出去,启动
      ssc.sparkContext.runJob(tempRDD, startReceiver)
    }
复制代码

1、遍历 receiverInputStreams 获取所有的 Receiver。

2、查看这些 Receiver 是否全都有优先选择机器。

3、把 SparkContext 的 makeRDD 方法把所有 Receiver 包装到 ParallelCollectionRDD 里面,并行度是 Receiver 的数量。

4、发个小任务给确保所有的 slave 节点都已经注册了(这个小任务有点儿莫名其妙,感觉怪怪的)。

5、提交作业,启动所有 Receiver。

Spark 写得实在是太巧妙了,居然可以把 Receiver 包装在 RDD 里面,当做是数据来处理!

启动 Receiver 的时候,new 了一个 ReceiverSupervisorImpl,然后调的 start 方法,主要干了这么三件事情,代码就不贴了。

1、启动 BlockGenerator。

2、调用 Receiver 的 OnStart 方法,开始接受数据,并把数据写入到 ReceiverSupervisor。

3、调用 onReceiverStart 方法,发送 RegisterReceiver 消息给 driver 报告自己启动了。

保存接收到的数据

ok,到了这里,重点落到了 BlockGenerator。前面说到 SocketReceiver 把接受到的数据调用 ReceiverSupervisor 的 pushSingle 方法保存。

// 这是ReceiverSupervisorImpl的方法
  def pushSingle(data: Any) {
    blockGenerator += (data)
  }
  // 这是BlockGenerator的方法
   def += (data: Any): Unit = synchronized {
    currentBuffer += data
  }
复制代码

我们看一下它的 start 方法吧。

def start() {
    blockIntervalTimer.start()
    blockPushingThread.start()
  }
复制代码

它启动了一个定时器 RecurringTimer 和一个线程执行 keepPushingBlocks 方法。

先看 RecurringTimer 的实现:

while (!stopped) {
        clock.waitTillTime(nextTime)
        callback(nextTime)
        prevTime = nextTime
        nextTime += period
      }
复制代码

每隔一段时间就执行 callback 函数,callback 函数是 new 的时候传进来的,是 BlockGenerator 的 updateCurrentBuffer 方法。

private def updateCurrentBuffer(time: Long): Unit = synchronized {
    try {
      val newBlockBuffer = currentBuffer
      currentBuffer = new ArrayBuffer[Any]
      if (newBlockBuffer.size > 0) {
        val blockId = StreamBlockId(receiverId, time - blockInterval)
        val newBlock = new Block(blockId, newBlockBuffer)
        blocksForPushing.put(newBlock) 
      }
    } catch {case t: Throwable =>
        reportError("Error in block updating thread", t)
    }
  }
复制代码

它 new 了一个 Block 出来,然后添加到 blocksForPushing 这个 ArrayBlockingQueue 队列当中。

提到这里,有两个参数需要大家注意的:

spark.streaming.blockInterval   默认值是200
spark.streaming.blockQueueSize  默认值是10
复制代码

这是前面提到的间隔时间和队列的长度,间隔时间默认是 200 毫秒,队列是最多能容纳 10 个 Block,多了就要阻塞了。

我们接下来看一下 BlockGenerator 另外启动的那个线程执行的 keepPushingBlocks 方法到底在干什么?

private def keepPushingBlocks() {
    while(!stopped) {
        Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
          case Some(block) => pushBlock(block)
          case None =>
        }
      }
   // ...退出之前把剩下的也输出去了
  }
复制代码

它在把 blocksForPushing 中的 block 不停的拿出来,调用 pushBlock 方法,这个方法属于在实例化 BlockGenerator 的时候,从 ReceiverSupervisorImpl 传进来的 BlockGeneratorListener 的。

private val blockGenerator = new BlockGenerator(new BlockGeneratorListener {
    def onError(message: String, throwable: Throwable) {
      reportError(message, throwable)
    }

    def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]) {
      pushArrayBuffer(arrayBuffer, None, Some(blockId))
    }
  }, streamId, env.conf)
复制代码

1、reportError,通过 actor 向 driver 发送错误报告消息 ReportError。

2、调用 pushArrayBuffer 保存数据。

下面是 pushArrayBuffer 方法:

def pushArrayBuffer(arrayBuffer: ArrayBuffer[_], optionalMetadata: Option[Any], optionalBlockId: Option[StreamBlockId]
    ) {
    val blockId = optionalBlockId.getOrElse(nextBlockId)
    val time = System.currentTimeMillis
    blockManager.put(blockId, arrayBuffer.asInstanceOf[ArrayBuffer[Any]], storageLevel, tellMaster = true)
    reportPushedBlock(blockId, arrayBuffer.size, optionalMetadata)
  }
复制代码

1、把 Block 保存到 BlockManager 当中,序列化方式为之前提到的 StorageLevel.MEMORY_AND_DISK_SER_2(内存不够就写入到硬盘,并且在 2 个节点上保存的方式)。

2、调用 reportPushedBlock 给 driver 发送 AddBlock 消息,报告新添加的 Block,ReceiverTracker 收到消息之后更新内部的 receivedBlockInfo 映射关系。

处理接收到的数据

前面只讲了数据的接收和保存,那数据是怎么处理的呢?

之前一直讲 ReceiverTracker,而忽略了之前的 JobScheduler 的 start 方法里面最后启动的 JobGenerator。

def start(): Unit = synchronized {
    eventActor = ssc.env.actorSystem.actorOf(Props(new Actor {
      def receive = {
        case event: JobGeneratorEvent =>  processEvent(event)
      }
    }), "JobGenerator")
    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }
复制代码

1、启动一个 actor 处理 JobGeneratorEvent 事件。

2、如果是已经有 CheckPoint 了,就接着上次的记录进行处理,否则就是第一次启动。

我们先看 startFirstTime 吧,CheckPoint 以后再说吧,有点儿小复杂。

private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
  }
复制代码

1、timer.getStartTime 计算出来下一个周期的到期时间,计算公式:(math.floor(clock.currentTime.toDouble / period) + 1).toLong * period,以当前的时间 / 除以间隔时间,再用 math.floor 求出它的上一个整数(即上一个周期的到期时间点),加上 1,再乘以周期就等于下一个周期的到期时间。

2、启动 DStreamGraph,启动时间 = startTime - graph.batchDuration。

3、启动 Timer,我们看看它的定义:

private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventActor ! GenerateJobs(new Time(longTime)), "JobGenerator")
复制代码

到这里就清楚了,DStreamGraph 的间隔时间就是 timer 的间隔时间,启动时间要设置成比 Timer 早一个时间间隔,原因再慢慢探究。

可以看出来每隔一段时间,Timer 给 eventActor 发送 GenerateJobs 消息,我们直接去看它的处理方法 generateJobs 吧,中间忽略了一步,大家自己看。

private def processEvent(event: JobGeneratorEvent) {
    event match {
      case GenerateJobs(time) => generateJobs(time)
      case ClearMetadata(time) => clearMetadata(time)
      case DoCheckpoint(time) => doCheckpoint(time)
      case ClearCheckpointData(time) => clearCheckpointData(time)
    }
  }
复制代码

下面是 generateJobs 方法。

private def generateJobs(time: Time) {
    SparkEnv.set(ssc.env)
    Try(graph.generateJobs(time)) match {
      case Success(jobs) =>
        val receivedBlockInfo = graph.getReceiverInputStreams.map { stream =>
          val streamId = stream.id
          val receivedBlockInfo = stream.getReceivedBlockInfo(time)
          (streamId, receivedBlockInfo)
        }.toMap
        jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventActor ! DoCheckpoint(time)
  }
复制代码

1、DStreamGraph 生成 jobs。

2、从 stream 那里获取接收到的 Block 信息。

3、调用 submitJobSet 方法提交作业。

4、提交完作业之后,做一个 CheckPoint。

先看 DStreamGraph 是怎么生成的 jobs。

def generateJobs(time: Time): Seq[Job] = {
    val jobs = this.synchronized {
      outputStreams.flatMap(outputStream => outputStream.generateJob(time))
    }
    jobs
  }
复制代码

outputStreams 在这个例子里面是 print 这个方法里面添加的,这个在前面说了,我们继续看 DStream 的 generateJob。

private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) => {
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      }
      case None => None
    }
  }
复制代码

1、调用 getOrCompute 方法获得 RDD

2、new 了一个方法去提交这个作业,缺什么都不做

为什么呢?这是直接跳转的错误,呵呵,因为这个 outputStream 是 print 方法返回的,它应该是 ForEachDStream,所以我们应该看的是它里面的 generateJob 方法。

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
复制代码

这里请大家千万要注意,不要在这块被卡住了。

我们看看它这个 RDD 是怎么出来的吧。

private[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
    // If this DStream was not initialized (i.e., zeroTime not set), then do it
    // If RDD was already generated, then retrieve it from HashMap
    generatedRDDs.get(time) match {

      // 这个RDD已经被生成过了,直接用就是了
      case Some(oldRDD) => Some(oldRDD)

      // 还没生成过,就调用compte函数生成一个
      case None => {
        if (isTimeValid(time)) {
          compute(time) match {
            case Some(newRDD) =>
         // 设置保存的级别
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
              }
         // 如果现在需要,就做CheckPoint
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
              }
         // 添加到generatedRDDs里面去,可以再次利用
              generatedRDDs.put(time, newRDD)
              Some(newRDD)
            case None =>
              None
          }
        } else {
          None
        }
      }
    }
  }
复制代码

从上面的方法可以看出来它是通过每个 DStream 自己实现的 compute 函数得出来的 RDD。我们找到 SocketInputDStream,没有 compute 函数,在父类 ReceiverInputDStream 里面找到了。

override def compute(validTime: Time): Option[RDD[T]] = {
    // 如果出现了时间比startTime早的话,就返回一个空的RDD,因为这个很可能是master挂了之后的错误恢复
    if (validTime >= graph.startTime) {
      val blockInfo = ssc.scheduler.receiverTracker.getReceivedBlockInfo(id)
      receivedBlockInfo(validTime) = blockInfo
      val blockIds = blockInfo.map(_.blockId.asInstanceOf[BlockId])
      Some(new BlockRDD[T](ssc.sc, blockIds))
    } else {
      Some(new BlockRDD[T](ssc.sc, Array[BlockId]()))
    }
  }
复制代码

通过 DStream 的 id 把 receiverTracker 当中把接收到的 block 信息全部拿出来,记录到 ReceiverInputDStream 自身的receivedBlockInfo 这个 HashMap 里面,就把 RDD 返回了,RDD 里面实际包含的是 Block 的 id 的集合。

现在我们就可以回到之前 JobGenerator 的 generateJobs 方法,我们就清楚它这句是提交的什么了。

jobScheduler.submitJobSet(JobSet(time, jobs, receivedBlockInfo))
复制代码

JobSet 是记录 Job 的完成情况的,直接看 submitJobSet 方法吧。

def submitJobSet(jobSet: JobSet) {
    if (jobSet.jobs.isEmpty) {
    } else {
      jobSets.put(jobSet.time, jobSet)
      jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    }
  }
复制代码

遍历 jobSet 里面的所有 jobs,通过 jobExecutor 这个线程池提交。我们看一下 JobHandler 就知道了。

private class JobHandler(job: Job) extends Runnable {
    def run() {
      eventActor ! JobStarted(job)
      job.run()
      eventActor ! JobCompleted(job)
    }
  }
复制代码

1、通知 eventActor 处理 JobStarted 事件。

2、运行 job。

3、通知 eventActor 处理 JobCompleted 事件。

这里的重点是 job.run,事件处理只是更新相关的 job 信息。

def run() {
    result = Try(func())
  }
复制代码

在遍历 BlockRDD 的时候,在 compute 函数获取该 Block(详细请看 BlockRDD),然后对这个 RDD 的结果进行打印。

到这里就算结束了,最后来个总结吧,图例在下一章补上,这一章只是过程分析:

1、可以有多个输入,我们可以通过 StreamingContext 定义多个输入,比如我们监听多个(host,ip),可以给它们定义各自的处理逻辑和输出,输出方式不仅限于 print 方法,还可以有别的方法,saveAsTextFiles 和 saveAsObjectFiles。这块的设计是支持共享 StreamingContext 的。

2、StreamingContext 启动了 JobScheduler,JobScheduler 启动 ReceiverTracker 和 JobGenerator。

3、ReceiverTracker 是通过把 Receiver 包装成 RDD 的方式,发送到 Executor 端运行起来的,Receiver 起来之后向 ReceiverTracker 发送 RegisterReceiver 消息。

3、Receiver 把接收到的数据,通过 ReceiverSupervisor 保存。

4、ReceiverSupervisorImpl 把数据写入到 BlockGenerator 的一个 ArrayBuffer 当中。

5、BlockGenerator 内部每个一段时间(默认是 200 毫秒)就把这个 ArrayBuffer 构造成 Block 添加到 blocksForPushing 当中。

6、BlockGenerator 的另外一条线程则不断的把加入到 blocksForPushing 当中的 Block 写入到 BlockManager 当中,并向 ReceiverTracker 发送 AddBlock 消息。

7、JobGenerator 内部有个定时器,定期生成 Job,通过 DStream 的 id,把 ReceiverTracker 接收到的 Block 信息从 BlockManager 上抓取下来进行处理,这个间隔时间是我们在实例化 StreamingContext 的时候传进去的那个时间,在这个例子里面是 Seconds(1)。

我来评几句
登录后评论

已发表评论数()

相关站点

+订阅
热门文章