第17课:Spark Streaming资源动态申请和动态控制消费速率原理剖析

2017-09-11 06:48 出处:csdn 人气: 评论(0

本节课主要谈Spark Streaming两个比较高级的特性:Spark Streaming资源动态申请和动态控制消费速率原理剖析。动态消费速率背后有一套非常复杂的理论,这套理论有一篇非常好的论文, 而动态资源分配背后也有一套理论。因此首先跟大家讲清楚理论是怎么回事情,然后再进行深入的探讨就会容易很多。

我们首先思考一个问题,无论是资源动态分配,还是消费速率动态控制,为什么要动态?Spark在默认情况下是先分配好资源,然后进行计算,是粗粒度的,粗粒度有一个好处:因为资源是提前分配好的,在执行任务的时候,使用分配好的资源就行了。粗粒度不好的地方:从Spark Streaming的角度讲,有高峰值和低峰值,高峰和低峰需要的资源是不一样的,如果对资源的分配是从高峰值的角度去考虑的,那么低峰值的时候会有大量的资源的浪费。

Heron是Twitter公司新开发的实时流式计算框架,Heron可以秒杀storm,在spark以外如再想研究一项技术,那就推荐研究Heron。有几点原因:

1,Heron和Storm都是Twitter公司开发的,Twitter当年开发Storm的时候是为了实时的在线多媒体的交互计算,Twitter是一家社交网络公司,需处理大量的并发消息个性化,对每个用户的消息进行最瞬间的送达以及最瞬间的广播推荐等。为什么推出Heron?因为Twitter的消息量暴增,它的业务量暴增,一家同时开发过Heron和Storm,世界上最深度用过Storm的公司,Twitter现在都不用Storm而转向使用Heron,因此,这是Heron技术值得关注的第一个原因。Heron对新特性的支持,包括资源的动态分配,对硬件更小的消耗及更小的延时。悲剧的一点是Storm以后不需要再用了,Heron可以兼容Storm,Storm原来的代码一行也不用改变,重新编译一下,Heron就可以使用Storm的代码。因为Twitter公司之前那么多业务使用Storm,不能新搞一个框架让业务代码全部重写一遍。理论上讲,所有使用Storm的公司一定会转向Heron。可以想象一下,代码一行不用改,重新编译一下就变成了Heron的代码,硬件消耗降低3倍以上,延时提高了10倍左右,Heron具有Storm的所有功能。所以Storm已经成为历史了,不用再考虑Storm。

2,现在是一个流处理的时代,Spark 2.x对自己的速度,包括对自己的编译器有革命性的变化,当年Spark Streaming推出的时候参考了Storm的想法,在Storm的基础上推出了Spark Streaming。Spark 2.x内核引擎有革命性的变化,Spark Streaming最大的优势在于跟Spark的兄弟框架联手。如果需要完全实时性的,或更高的实时性,Heron是一项值得关注的技术,Heron的开发语言是C++、Java、Python,Heron的配置用了Python的代码,Heron的API级别是Java级别的,我们通常写代码及调试程序的时候基于Java,这也有利于学习Heron。

为什么谈到Heron,因为峰值的问题,很多流处理系统不能很好的进行处理。现在我们考虑Spark Streaming,在资源分配的时候如果按照高峰的峰值进行分配,是粗粒度的,在预分配的时候造成资源的浪费,在低峰值的时候导致大量的浪费。在另外一方面,随着Spark Streaming本身不断的运行,对资源的消耗管理也是我们要考虑的因素,这里我们谈Spark Streaming资源动态申请和动态控制消费速率是高级别的特性,特性的实现对Spark Streaming的运行非常重要的。Spark Streaming本身是基于Spark Core,Spark Core的核心是Spark Context。Spark现在支持资源的动态分配。这里有个配置参数spark.dynamicAllocation.enabled,是否需要开启资源的动态分配,在程序运行的时候进行设置。如果支持动态分配,使用ExecutorAllocationManager,传入参数有ExecutorAllocationClient、listenerBus、_conf 

Spark现在支持资源的动态分配。这里有个配置参数spark.dynamicAllocation.enabled,是否需要开启资源的动态分配,在程序运行的时候进行设置。如果支持动态分配,使用ExecutorAllocationManager,传入参数有ExecutorAllocationClient、listenerBus、_conf。

SparkContext.scala的源代码:

1.            val dynamicAllocationEnabled =Utils.isDynamicAllocationEnabled(_conf)

2.             _executorAllocationManager =

3.               if(dynamicAllocationEnabled) {

4.                 schedulerBackend match {

5.                   case b:ExecutorAllocationClient =>

6.                     Some(newExecutorAllocationManager(

7.                       schedulerBackend.asInstanceOf[ExecutorAllocationClient],listenerBus, _conf))

8.                   case _ =>

9.                     None

10.              }

11.            } else {

12.              None

13.            } 

14.       _executorAllocationManager.foreach(_.start())

15.      ……

16.      Utils.scala的源代码:

17.        def isDynamicAllocationEnabled(conf:SparkConf): Boolean = {

18.          val dynamicAllocationEnabled =conf.getBoolean("spark.dynamicAllocation.enabled", false)

19.          dynamicAllocationEnabled &&

20.            (!isLocalMaster(conf) ||conf.getBoolean("spark.dynamicAllocation.testing", false))

21.        }

ExecutorAllocationManager是根据工作负载动态分配和删除executors 的代理。ExecutorAllocationManager保持目标数量的executors,周期性的同步到集群管理。以配置的初始值开始,根据挂起和正在运行的任务的数量进行变化。当当前目标超过当前处理负载的需要时,减少executors 执行器的目标数量,executors 执行器的目标数目减少,可以立即运行所有当前正在运行和正在等待的任务。如果积压任务等待调度响应,那么增加executors的目标数。如果调度队列在N秒内没有耗尽,则添加新的执行器executors。如果队列持续了M秒钟,需添加更多的executors等。每一轮增加的数量从上一轮的指数增长,直到达到上限。上限基于配置的属性和如上所描述的当前的运行和待处理的任务。指数增长的原因有两方面:

(1)在开始的情况下Executors 应缓慢增加,需要额外的Executors的数量变小。否则,

我们可以添加更多的Executors ,而不是需要稍后删除它们。

(2)Executors 应迅速增加。随着时间的推移,Executors 的最大数量非常高。否则,它将采取长时间的负载下进行繁重的工作。

删除策略比较简单:如果executor 空闲时间为k秒,则意味着它没有计划运行任何任务,然后删除它。在这两种情况下都没有重试逻辑,因为我们假设集群管理器最终将完成它异步接收的所有请求。

相关Spark的属性如下:

l  spark.dynamicAllocation.enabled是否启用此功能

l  spark.dynamicAllocation.minExecutors  executors最小的数量

l  spark.dynamicAllocation.maxExecutors  executors最大的数量

l  spark.dynamicAllocation.initialExecutorsexecutors初始化的数量

l  spark.dynamicAllocation.schedulerBacklogTimeout(M) 如果有积压的任务持续时间,增加新的executors

l  spark.dynamicAllocation.sustainedSchedulerBacklogTimeout(N) 如果积压时间持续,增加更多的executors,仅在初始积压超时后才使用此选项

l  spark.dynamicAllocation.executorIdleTimeout(K)  如果executors在此期间处于空闲状态,删除它。

ExecutorAllocationManager.scala的源代码:

1.          private[spark] classExecutorAllocationManager(

2.             client:ExecutorAllocationClient,

3.             listenerBus: LiveListenerBus,

4.             conf: SparkConf)

5.           extends Logging {

6.         …..

7.           // Clock used to schedule when executorsshould be added and removed

8.           private var clock: Clock = new SystemClock()

有个定时器,定时器不断的去扫描executor的情况:正在运行的Stage,Stage运行在不同的executor中,所谓动态就是指要么增加要么减少executor。例如,减少executor的情况, 判断一个时间如60秒中executor没有一个任务在运行,就把这个executor删掉。这是去掉executor的情况,因为当前的应用程序中运行的所有executor,在Driver中有数据结构对它进行保持引用,每次任务调度的时候循环遍历一下executor可用列表,看一下executor的可用资源,由于有个时钟Clock,有时钟就可以不断的循环,循环检查是否满足增加executor或者删除executor的条件,如果满足条件,就会触发executor的增加和删除。executor的增加和删除非常简单,因为Driver中的ExecutorBackend有对executor的管理关系,例如超时,可以设置一个add的时间,或者评估一下当前的作业资源,如果不够的话申请更多的资源。之所以动态起来,类似于有一个时钟,在固定的周期里检查,如果想删除,就发一个killExecutor的信息,如果想添加,就在具体的Work上启动Executor。

本文标签:

相关文章

网站内容来源于互联网,仅供用于技术学习,请遵循相关法律 规,如有侵权,请联系管理员删除

Copyright © 2002-2017 JISHUX. 技术栈 版权所有

京ICP备15061484号-3