大纲
MapReduce设计方案
- 整体架构与执行流程
- master设计
- 如何容错
- worker故障
- master失败
- 存储,调度与带宽
- 如何切分任务
- 预测执行
MapReduce特性和扩展功能
- 分区函数
- reduce顺序处理
- combiner函数
- 输入输出类型支持
- 自动忽略坏数据
- local引擎支持
- job状态监控
- counter
MapReduce设计方案
整体架构
关键角色
- master:分配map和reduce任务;收集worker上报的数据;向worker下发信息和指令。
- worker:执行map和reduce任务;向master汇报状态,上报数据;
- 执行map任务的称为map worker
- 执行reduce任务的称为reduce worker
执行流程
- 将输入文件切分为M个数据分片,每个分片大小一般从16MB~64MB,用户可配置
- Master将创建M个map任务,R个reduce任务,并将这M+R个任务,分配给空闲worker执行,每个空闲worker一次执行一个任务
- map worker从输入数据分片中读入数据,解析成key-value pair(其中key是文件偏移量,value是一条文本行数据),并将key-value pair传递给用户注册的map函数。map函数对输出key-value pair处理完后,要求产出中间key-value pair
- 每个map worker产出的中间key-value pair会被缓存在内存中,并通过分区函数分到R个分区中,缓存一定量后写入map worker执行时的本地磁盘,并记录下存储路径,回传给master。每个map任务的输出都会被分到R个分区中,整个集群总共产出M*R个中间输出文件
- master将中间输出文件所在的机器+路径发送给reduce worker,所有map worker产出的所有R[i]输出文件将发送给R[i] reduce worker
- reduce worker通过RPC从对应机器上读取中间输出文件,读取完所有数据后,按key进行排序,再将所有相同key的数据聚合在一起,形成\<key, iterator(value)>的结构。若内存不足以完成排序,则借助外部存储。
- reudce worker将\<key, iterator(value)>传递到用户注册的reduce函数。reduce函数处理结果追加到所属分区的输出文件。一个reduce worker产出一个最终输出文件,保存在全局分布式文件系统上。
master设计
论文中没有给出master的设计方案,更没有给出工程实现细节,只是针对master的功能,简述其需要维护的数据结构
- 保存每个map和reduce任务的状态(空闲,执行中,完成)
- 保存worker机器的标识
- 保存worker进程上传的中间输出文件的位置
如何容错
由于map-reduce程序运行在大规模分布式集群中,发生错误是常态,需要考虑如何容错。论文中以work故障和master失败两方面介绍
worker故障
如何判断worker的运行状态?
master周期性ping每个worker,并期望在一个约定时间内收到work返回的信息。若没有如期收到Worker回复,将相应的worker标记为失效
worker失效后怎么办?
所有由该worker完成的和正在执行中的map任务,都会被重新置为空闲状态,等待master重新调度到其他worker重新执行。worker失败后,导致了map任务产出的中间输出文件无法访问,所以即使是已完成的map任务也需要重新执行;而已完成的reduce任务,最终输出文件是保存在全局分布式文件系统上,因此不需要重新执行。
master失败
论文中提到的解决方案
提出了以checkpoint的方案来解决master失败的问题,周期性将master维护的数据结构和状态写入磁盘。master失败后,从最后一个checkpoint开始启动新的master进程,恢复master状态
实际工程实现
但其工程实现上只有一个master进程,master失败后会终止整个MapReduce作业。
存储,调度与带宽
输入文件保存在GFS中,读取时会涉及到网络传输,而带宽资源匮乏,在调度任务时要着重考虑带宽问题。GFS把每个文件按64MB一个Block分隔,每个Block一般以3副本的形式保存在集群不同机器中。
如何调度map任务,策略是什么?
- 原则:副本文件离map任务越近越好
- map任务从GFS读取文件,master会尽量将一个map任务调度到包含相关副本的机器上执行,否则就调度到附加的机器上执行。
如何切分任务
整个MapReduce作业由M个map和R个reduce任务组成,如何决定M和R的值,是影响整个作业性能的关键点。论文中从集群的动态负载能力,故障恢复速度,资源占用等方面进行阐述。
- M和R的值应该比worker的机器数量多得多,使得每个work执行大量不同的任务,以此提高集群的动态负载能力。
- 任务被切分得更细,使得一个任务被重试的代价更小。
- 任务被切分得更细,当一个worker故障,其处理完及正在处理的map任务,能分布到更多的worker中重新执行,加快了故障恢复的速度
- master需要维护M+R个任务的状态,每个任务状态占1 byte即可(1 byte = 8 bits,可以表达256种状态),因为内存占用并不可观。
设置M和R值的建议
- R值一般是用户根据业务需求进行设置。建议设置为worker机器数量的较小的倍数
- 设置M值时,应考虑总输入数据量,使得每个map任务能读入16M到64M的数据为佳
- 一个常用的设置比例:M=200000,R=5000,2000台worker机器
预测执行
另一个影响MapReduce作业总执行时间的关键因素就是长尾任务,指那些最后阶段,要花很长时间才能完成的map和reduce任务。导致长尾任务出现的原因有很多,例如:
- 机器故障,导致从分布式文件系统读取文件时速度下降
- 调度不均导致多个任务被调度到一台机器上执行,CPU,内存,本地磁盘,网络带宽等计算资源被抢占
- 代码bug
- ……
论文中就怎么解决长尾任务的问题,提出了一种解决方案:
- 当整个MapReduce作业接近完成时,master会针对正在执行中的任务(in-process tasks),启动备用任务(backup executions)。备用任务可看作是执行中任务的副本。
- 原任务或备用任务中的任意一个执行完成,都认为该任务已经完成。
- 该解决方案只会多占用少于10%的计算资源。
MapReduce的特性和扩展功能
具备基本功能后,MapReduce框架还提供了不少扩展功能。论文中介绍了这些扩展功能的原理以及是如何提升MapReduce框架性能的。
分区函数
分区函数作用在map和reduce之间,即map任务产出的中间key-value pair如何分配到R个reduce任务中。MapReduce框架提供的默认分区函数为(hash(key) mod R),同时提供了编程接口,允许用户注册自定义的分区函数
reduce顺序处理
reduce任务在处理map任务产出的中间key-value pair时,保证按照key值增量顺序处理,这样也保证了每个reduce任务产出的最终输出文件也是按key值增量保存
combiner函数
当reduce函数满足结合律和交换律时,可以在map任务中对输出数据先进行一次预聚合,以减少map任务产出的中间输出。
- combiner函数和reduce函数一般是相同的
- MapReduce框架负责将combiner函数的输出写入中间结果文件,将reduce函数的输出写入最后输出文件
输入输出类型支持
论文提及了MapReduce框架支持多种数据输入输出格式,框架自身提供了一些预定义类型,同时允许用户注册自定义类型。本节中对一些预定义类型进行了介绍:
- 文本模式下,输入文件的每一行作为一个key-value pair,key是文件偏移量,value是该行内容
- 对key排序后的key-value pair序列
同时提供了扩展思路,读取数据不一定从文本文件中读入,可以实现自定义的reader,从数据库或内存数据结构中读取。
跳过损坏数据
脏数据或者用户程序bug可能导致map或reduce函数在处理某些key-value pair时crash。MapReduce框架提供了一种机制,在确定了某些记录多次引起crash后,允许跳过这些损坏记录,在论文给出实现方案:
- worker设置了信号处理函数捕获segmentation violation和bus error
- 在worker处理记录导致crash时,在信号处理函数中通过UDP包向master汇报引起crash的记录序号
- 当记录失败次数达到规定的阈值时,master标记该记录需要被跳过,在下次调度任务时,通知worker忽略该记录。
local引擎支持
为了方便用户调试程序,提供了本地执行版本
job状态信息
master内嵌HTTP服务器,用于向用户展示作业执行状态,如执行进度,输入字节数,中间产出字节数,输出字节数,计算资源占用率等
counter
counter用于统计一个作业中不同事件发生的次数。counter的值,附加在worker回复给master的ping包中,周期性汇报给master,master将执行成功的map和reduce任务的counter值进行累加。
master启动的备用任务,worker故障后被重新调度的map任务,都会向master汇报counter值。master在设计上要考虑避免重复累加这些counter