FlumeJava论文分享
大纲
- 提出FlumeJava的背景
- FlumeJava是什么?解决什么问题?已有哪些应用?
- FlumeJava特性介绍:
- 核心数据集抽象
- 编程接口
- 优化器
- FlumeJava中未实现的优化策略
提出FlumeJava的背景
- MapReduce的出现大大降低了大规模数据并行处理的门槛,但实际应用场景中需要的不止是单一的map+reduce,而是MapReduce Pipeline
- 开发,调试,管理一个MapReduce Pipeline很困难:
- 需要编写多个map和reduce程序
- 需要编写额外代码将这些map,reduce组织起来
- 自行管理这些map,reduce产出的中间数据
- 新加入的开发者接手代码后,只有分散的map,reduce程序,而不是一个整体的计算处理逻辑,开发难度大
- 计算逻辑发生变化后,要修改多个map,reduce程序,维护成本大
- 需要一套数据并行处理pipeline,屏蔽维护pipeline的细节,使用户专注在计算逻辑上
FlumeJava是什么?解决什么问题?已有哪些应用?
- 在开发者看来,FlumeJava是一个Java库。定义了一系列的不可变的并行数据集抽象;同时为这些数据集抽象定义了适量的函数方法,可对这些数据集进行并行处理
- FlumeJava为数据并行处理提供了统一抽象,屏蔽了如何描述不同类型的并行数据集和处理流程中的细节
- 将计算逻辑转化为多个MapReduce组成的pipeline,清除中间数据产出,重试计算,cache等工作,都由FlumeJava管理。
- 使用FlumeJava开发的应用,性能接近于有经验的工程师经过手动优化后的MapReduce程序
- 截止到2010年3月,在Google内部拥有175个开发者。相当数量的产品使用FlumeJava开发(无具体数据)
FlumeJava特性概述
- 核心数据集抽象
- PCollection
- PTable
- PObject
- PCollection
- 编程接口
- 底层接口:parallelDo & DoFn,groupByKey,combineValues,flatten
- 高级衍生接口:count,join,top ……
- 优化器
核心数据集抽象
PCollection<T>
- 用于描述不可变的数据集
- 支持有序(sequence)和无序(collection)数据集合
- 可由内存中的Java Collection对象构造得到,也可以从外部存储中读取数据进行构造
- 支持序列化:recordsOf();开发者可从外部存储中读取任意格式(文本or二进制)的数据,然后通过recordsOf接口将数据转化为Java对象
- 不支持PCollection<PCollection<T>>
PTable<K,V>
- PTable<K,V>从实现上是PCollection<Pair<K,V>>的子类,用于描述无序的不可变的Key-Value Pair数据集,Key允许重复
- PTable是一种特殊的PCollection的,应用在PCollection<T>上的方法也可以应用在PTable<K,V>上
PObject<T>
- 用于承载单个Java对象
- 在pipeline run结束后,可通过getValue()方法,将PObject内装载的Java对象取出
- 可通过PColleciton上的asSequentialCollection()方法将一个PCollection<T>转化为PObject<Collection<T>>,再通过getValue()将Collection<T>取出。
1 | PTable<String,Integer> wordCounts = ...; |
- 可通过PCollection上的combine方法,将PCollection<T>聚合为PObject<T>
1 | PCollection<Boolean> haveConverged = ...; |
编程接口
底层接口
FlumeJava定义了几个底层接口,用来操作它所定义的数据集抽象(PCollection,PTable,PObject)。
由这几个底层接口,能完成所有对数据集抽象的操作。
这些底层接口在真正进行数据处理时,都是并行执行的。
另外,还支持一些高级衍生接口,其本质是对底层接口的封装,通过组合不同的底层接口来完成功能。
parallelDo & DoFn<T,S>
输入一个PCollection<T>,将其中的每个T类型对象转化为S类型对象,输出一个PCollection<S>
- 开发者向parallelDo注册回调函数DoFn<T,S>,在DoFn<T,S>中实现业务逻辑
- parallelDo遍历PCollection<T>中的每个元素,对每个元素执行一遍DoFn<T,S>
- DoFn<T,S>是一个仿函数对象,将一个T类型Java对象转化为S类型Java对象
- 可在parallelDo中,决定产出的PCollection的组织结构:
- collectionOf():产出的PCollection内的元素是无序集合
- sequenceOf():产出的PCollection内的元素是有序集合
- tableOf():产出的PCollection内的元素要求是Key-Value pair,产出一个PTable
- 一个parallelDo操作最后会转化为一个map或一个reduce节点
1 | PCollection<String> words = lines.parallelDo(new DoFn<String,String>() { |
groupByKey
将multi-key的PTable<K,V>转化为uni-key的PTable<K,Collection<V>>
- groupByKey会被FlumeJava转化为MapReduce的Shuffle阶段
1 | PTable<URL,DocInfo> backlinks = ...; |
combineValues
将PTable<K,Collection<V>>转化为PTable<K,V>
- 开发者向combineValues注册combining function,该回调函数将Collection<T>聚合为单个T类型对象
1 | PTable<String,Collection<Integer>> groupedWordsWithOnes = ...; |
flatten
将一系列的PCollection<T>数据全部展开,最后组成一个PCollection<T>
- 在实现上,不会将所有PCollection<T>内的元素复制出来,只是创建了一个新的PCollection<T>,并记录下flatten操作,在pipeline run时才触发计算
- 论文中没有给出flatten()的代码示例,可能的接口风格如:
1 | Collection<PCollection<T>> pcs = ...; |
通过单词计数的例子,使用所有底层接口(flatten除外)
1 | // 从外部存储中读取文本文件,并解析为一个包含单词的PCollection |
高级衍生接口
高级接口内部是通过调用底层接口来实现功能的,开发者也可以自己基于底层接口进行封装。FlumeJava提供了一些常用的高级接口。
count
输入PCollection<T>,返回PTable<T,Integer>
- parallelDo + groupByKey + combineValues
- parallelDo:PCollection<T> —> PTable<T,Integer>
- groupByKey:PTable<T,Integer> —> PTable<T,Collection<Integer>>
- combineValues: PTable<T,Collection<Integer>> —> PTable<T,Integer>
join
输入PTable<K,V1>, PTable<K,V2>,输出PTable<K,Tuple2<Collection<V1>, Collection<V2>>>
- parallelDo + flatten + groupByKey + parallelDo
- parallelDo: PTable<K, Vi> —> Collection<PTable<K, TaggedUnion2<V1,V2>>> (存疑)
- flatten: Collection<PTable<K, TaggedUnion2<V1,V2>>> —> PTable<K, TaggedUnion2<V1,V2>>
- groupByKey: PTable<K, TaggedUnion2<V1,V2>> —> PTable<K,Collection<TaggedUnion2<V1,V2>>>
- parallelDo(): PTable<K,Collection<TaggedUnion2<V1,V2>>> —> PTable<K,Tuple2<Collection<V1>, Collection<V2>>>
优化器
ParallelDo producer-consumer fusion
若parallelDo f的输入是parallelDo g的输入,则f和g可以合并为一个parallelDo,避免产出中间结果文件
ParallelDo sibling fusion
若多个parallelDo,接受同一个PCollection作为输入,那么这些parallelDo可以合并为一个parallelDo,合并后的parallelDo是一个多输出的算子(原本的parallelDo各自有自己的输出)
ParallelDo producer-consumer fusion + ParallelDo sibling fusion
因此这两种优化,可以统一为一种优化。若多个parallelDo有一个共同的上游parallelDo,则这个上游parallelDo以及所有下游parallelDo可以合并到一起
The MapShuffleCombineReduce (MSCR) Operation
FlumeJava中定义的核心操作,可将开发者的计算逻辑,划分为MapReduce任务
- 有M个input channels,每个代表 一个map
- 有R个output channels,每个代表 一次shuffle+combine+一个reduce
- input channels:
- 每个input channels,接受一个PCollection<T>作为输入,并产出R路输出,每路输出的类型均为PTable<K,V>
- input channels可以任意emit那R路输出
- output channels:
- 由于每个input channels都产出R路输出,所有每个output channels都会接受M个输入
- output channels使用flatten将M个PTable<K,V>展开为一个PTable<K,V>
- 随后可能对展开后的PTable执行两种不同的操作:
- grouping: 执行groupByKey+combineValues,产出R路输出,分别传递给R个reduce
- pass-through: 直接将数据写出到外部存储
- pass-through操作,用于实现,将MapReduce中map的产出,直接写到存储并作为最终计算的结果
下面是一张典型的MSCR操作示意图,其中第1,2路执行了grouping操作,第3路执行了pass-through操作:
MSCR Fusion
- 一组相关联的groupByKey调用,将产生一个MSCR操作
- 怎么算是相关联的groupByKey的调用?
- 如果多个groupByKey操作,接受了同一个parallelDo的输出作为输入,则这些groupByKey操作是相关联的
- 关联性传染,若GBK1和GBK2相关联,GBK2和GBK3相关联,则认为GBK1,GBK2,GBK3相关联
- 这些相关联的groupByKey将合并到MSCR操作中
- 作为共同上游的parallelDo也将合并到MSCR操作中,作为一个input channel
- MSCR中的groupByKey的上游parallelDo,也将合并到MSCR中,作为一个input channel
- 每个groupByKey之后若调用了combineValue,该combineValue也合并到MSCR中
- groupByKey或者groupByKey+combineValue的输出被作为某些parallelDo的输入,则这些parallelDo也将合并到MSCR中
下面这张图展示了MSCR操作是怎么被构造出来的
Sink Flattens
flatten操作下沉(消除),创造ParallelDo Fusion的机会
- 若flatten后面紧接一个下游parallelDo,可以将该flatten下沉到parallelDo之后
- 下沉后,可以将flatten的上游parallelDo和下游parallelDo合并
- flatten有多个上游parallelDo,要对应地复制下游parallelDo
Lift CombineValues operations
若combineValues操作后面紧跟一个groupByKey操作,则该combineValues可视为一个普通的parallelDo,可参与parallelDo Fusion
Insert fusion blocks
当两个groupByKey操作之间,有一条parallelDo调用链,这两个groupByKey最终会被划分到两个MSCR Fusion中。
两个GBK之间的所有parallelDo,将面临着如何划分的问题:是应该划分到上游的MSCR中作为output channel,还是划分到下游的MSCR中作为input channel?
FlumeJava的做法是:
- 在paralleDo链中,找到输出数据量最小的那个parallelDo,以此为边界
- 上游的parallelDo划分到上游MSCR中作为output channel,下游parallelDo划分到下游MSCR中作为input channel
FlumeJava中未实现的优化策略
- 为DoFn添加hint参数,用于预测DoFn内部的行为:
- 例如,开发者可以预估DoFn的输出数据量,FlumeJava根据预估的输出量,决定该算子在本地还是MapReduce集群上执行。
- 分析开发者编写的业务函数,优化后重新生成新的代码,并与FlumeJava代码内联到一起进行整体优化
- 去除重复和无用的算子
- 结合底层MapReduce计算引擎,实现动态监控,在一定的时间间隔内汇报输出数据量,实时调整并行处理的MapReduce worker数量和占用的CPU,IO资源