一、数据本地化
1、背景
数据本地化对于Spark Job性能有着巨大的影响。如果数据以及要计算它的代码是在一起的,那么性能当然会非常高。但是,如果数据和计算它的代码是分开的,那么其中之一必须到另外一方的机器上。通常来说,移动代码到其他节点,会比移动数据到代码所在的节点上去,速度要快得多,因为代码比较小。Spark也正是基于这个数据本地化的原则来构建task调度算法的。数据本地化,指的是,数据离计算它的代码有多近。基于数据距离代码的距离,有几种数据本地化级别:1、PROCESS_LOCAL:数据和计算它的代码在同一个JVM进程中。2、NODE_LOCAL:数据和计算它的代码在一个节点上,但是不在一个进程中,比如在不同的executor进程中,或者是数据在HDFS文件的block中。3、NO_PREF:数据从哪里过来,性能都是一样的。4、RACK_LOCAL:数据和计算它的代码在一个机架上。5、ANY:数据可能在任意地方,比如其他网络环境内,或者其他机架上。
2、图解
Task要处理的partition的数据,在某一个Executor中,TaskScheduler首先会尽量用最好的本地化级别去启动task,也就是说,会尽量在哪个包含了要处理的partition的executor中,去启动task;此时,Executor已经再执行好几个task了,没有空闲资源来执行这个task;默认情况下,spark会等待一会,等待Executor什么时候可以空闲出一个cpu core,从而来启动这个task,让它实现最好的本地化级别;但是如果等待了一会(时间是可以调优的,通过参数设置),发现始终没有等到Executor的core释放,那么会放大一个级别,去尝试启动这个task;如果这个rdd之前持久化过,task会去调用RDD的iterator()方法,然后通过executor关联的BlockManager,来尝试获取数据,BlockManager底层,首先尝试从getLocal()在本地找数据,如果没有找到的话,那么用getRemote(),通过BlockTransferService,链接到有数据的BlockManager,来获取数据;如果没有持久化过,那么就computerOrReadCheckpoint();如果还是不能启动,继续放大级别;
3、数据本地化优化
Spark倾向于使用最好的本地化级别来调度task,但是这是不可能的。如果没有任何未处理的数据在空闲的executor上,那么Spark就会放低本地化级别。这时有两个选择:第一,等待,直到executor上的cpu释放出来,那么就分配task过去;第二,立即在任意一个executor上启动一个task。Spark默认会等待一会儿,来期望task要处理的数据所在的节点上的executor空闲出一个cpu,从而将task分配过去。只要超过了时间,那么Spark就会将task分配到其他任意一个空闲的executor上。可以设置参数,spark.locality系列参数,来调节Spark等待task可以进行数据本地化的时间。spark.locality.wait(3000毫秒)、spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack。