项目地址:
https://github.com/sado0823/go-map-reduce
what?
MapReduce是Google提出的一个软件架构,用于大规模数据集(大于1TB)的并行运算。概念“Map(映射)”和“Reduce(归纳)”,及他们的主要思想,都是从函数式编程语言借鉴的
有一些术语要介绍一下:
- Job。整个MapReduce计算称为Job。
- Task。每一次MapReduce调用称为Task。
- Resource。生成的需要处理的数据
- Map。数据处理函数, 输入中间数据到Reduce
- Reduce。聚合map中间数据, 输出最终结果
所以,对于一个完整的MapReduce Job,它由一些Map Task和一些Reduce Task组成。所以这是一个单词计数器的例子,它解释了MapReduce的基本工作方式。
why?
在微服务的架构下, 可能某一份数据需要call几个service聚合而成, 如果用串行
的方式去做, 接口的耗时肯定会高, 参考Ma pReduce
的思想, 我们可以使用并行的方式去做, 也就是说, 接口的最高耗时肯定是延迟最高的service call
+ 自身处理时间
在go
中, 实现这种思想的方式(或者说并发控制
), 有waitGroup
和errorGroup
可以使用:
waitGroup
: 需要手动写 add
和wait
, 对于新手不友善
errorGroup
: 官方包有context
问题和panic
没有recover
的问题, 一般会进行二次封装使用
how?
基本思想
今天的MapReduce
的方式主要采用以下方式实现:
-
一个原始数据生成方法 GenarateFunc
-
并发多goroutine(可指定
)执行的 MapFunc
-
单goroutine执行的ReduceFunc
-
resource, map, reduce之间通过channel
进行通信
源码分析
工具库里封装了几个helper方法, 其根源都是来源于一个方法MapReduceWithSource
函数签名MapReduceWithSource
func MapReduceWithSource(source <-chan interface{}, mapFunc MapFuncCancel, reduceFunc ReduceFunc, opts ...WithOption) (interface{}, error) {
}
类型定义MapFuncCancel
MapFuncCancel func(item interface{}, iterator Iterator, cancel func(err error))
类型定义ReduceFunc
ReduceFunc func(pipe <-chan interface{}, iterator Iterator, cancel func(err error))
类型定义WithOption
这是一个经典的函数选项设计模式
的应用, 通过函数来指定并发数
WithOption func(opts *options)
方法实现
func MapReduceWithSource(source <-chan interface{}, mapFunc MapFuncCancel, reduceFunc ReduceFunc, opts ...WithOption) (interface{}, error) {
op := buildOptions(opts...)
output := make(chan interface{})
defer func() {
for range output {
panic("reduce output should be one element")
}
}()
var (
collector = make(chan interface{}, op.workers)
dc = syncx.NewDoneChan()
iterator = newDefaultIterator(output, dc.Done())
closeOnce sync.Once
retErr errrorx.AtomicError
finish = func() {
closeOnce.Do(func() {
dc.Close()
close(output)
})
}
cancel = once(func(err error) {
if err != nil {
retErr.Store(err)
} else {
retErr.Store(ErrCancelWithNil)
}
drain(source)
finish()
})
)
go func() {
defer func() {
drain(collector)
if r := recover(); r != nil {
fmt.Printf("got cancel---: %v \n", r)
cancel(fmt.Errorf("%v", r))
} else {
finish()
}
}()
reduceFunc(collector, iterator, cancel)
}()
go executeMapFunc(func(item interface{}, iterator Iterator) {
mapFunc(item, iterator, cancel)
}, source, collector, dc.Done(), op.workers)
v, ok := <-output
if err := retErr.Load(); err != nil {
return nil, err
} else if ok {
return v, nil
} else {
return nil, ErrReduceNoOutput
}
}
example
并发调用获取数据
var (
foo struct {
Name string
Age int
Bar bool
}
)
err := Finish(func() error {
foo.Name = callNameSerice()
return nil
}, func() error {
foo.Age = callAgeService()
return nil
}, func() error {
foo.Bar = bar
return nil
})
批数据处理
func checkAdult(ages []int64) ([]int64, error) {
r, err := mr.MapReduce(func(source chan<- interface{}) {
for _, aeg := range ages {
source <- age
}
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
age := item.(int64)
if age < 18{
cancel(fmt.Errorf("发现未成年, 年龄为: %d",age))
}
if ok {
writer.Write(age)
}
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
var ages []int64
for p := range pipe {
ages = append(ages, p.(int64))
}
writer.Write(ages)
})
if err != nil {
log.Printf("check error: %v", err)
return nil, err
}
return r.([]int64), nil
}
references
1.https://github.com/tal-tech/go-zero
2.https://pdos.csail.mit.edu/6.824/papers/mapreduce.pdf