Skip to content

Commit 3a9b9ce

Browse files
zhoushuguangzhoushuguang
and
zhoushuguang
authored
add (#54)
Co-authored-by: zhoushuguang <[email protected]>
1 parent 3128d63 commit 3a9b9ce

File tree

3 files changed

+123
-1
lines changed

3 files changed

+123
-1
lines changed

doc/images/mr_time.png

104 KB
Loading

doc/mapreduce.md

+9-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,14 @@
11
# 通过MapReduce降低服务响应时间
22

3-
go-zero微服务框架中提供了许多开箱即用的工具,好的工具不仅能提升服务的性能而且还能提升代码的鲁棒性避免出错,实现代码风格的统一方便他人阅读等等,本系列文章将分别介绍go-zero框架中工具的使用及其实现原理
3+
在微服务中开发中,api网关扮演对外提供restful api的角色,而api的数据往往会依赖其他服务,复杂的api更是会依赖多个甚至数十个服务。虽然单个被依赖服务的耗时一般都比较低,但如果多个服务串行依赖的话那么整个api的耗时将会大大增加。
4+
5+
那么通过什么手段来优化呢?我们首先想到的是通过并发来的方式来处理依赖,这样就能降低整个依赖的耗时,Go基础库中为我们提供了 [WaitGroup](https://golang.org/pkg/sync/#WaitGroup) 工具用来进行并发控制,但实际业务场景中多个依赖如果有一个出错我们期望能立即返回而不是等所有依赖都执行完再返回结果,而且WaitGroup中对变量的赋值往往需要加锁,每个依赖函数都需要添加Add和Done对于新手来说比较容易出错
6+
7+
基于以上的背景,go-zero框架中为我们提供了并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/blob/master/core/mr/mapreduce.go),该工具开箱即用,不需要做什么初始化,我们通过下图看下使用MapReduce和没使用的耗时对比:
8+
9+
![依赖耗时对比](./images/mr_time.png)
10+
11+
相同的依赖,串行处理的话需要200ms,使用MapReduce后的耗时等于所有依赖中最大的耗时为100ms,可见MapReduce可以大大降低服务耗时,而且随着依赖的增加效果就会越明显,减少处理耗时的同时并不会增加服务器压力
412

513
## 并发处理工具[MapReduce](https://github.com/tal-tech/go-zero/tree/master/core/mr)
614

example/mapreduce/mr/mr.go

+114
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package main
2+
3+
import (
4+
"log"
5+
"time"
6+
7+
"github.com/tal-tech/go-zero/core/mr"
8+
"github.com/tal-tech/go-zero/core/timex"
9+
)
10+
11+
type user struct{}
12+
13+
func (u *user) User(uid int64) (interface{}, error) {
14+
time.Sleep(time.Millisecond * 30)
15+
return nil, nil
16+
}
17+
18+
type store struct{}
19+
20+
func (s *store) Store(pid int64) (interface{}, error) {
21+
time.Sleep(time.Millisecond * 50)
22+
return nil, nil
23+
}
24+
25+
type order struct{}
26+
27+
func (o *order) Order(pid int64) (interface{}, error) {
28+
time.Sleep(time.Millisecond * 40)
29+
return nil, nil
30+
}
31+
32+
var (
33+
userRpc user
34+
storeRpc store
35+
orderRpc order
36+
)
37+
38+
func main() {
39+
start := timex.Now()
40+
_, err := productDetail(123, 345)
41+
if err != nil {
42+
log.Printf("product detail error: %v", err)
43+
return
44+
}
45+
log.Printf("productDetail time: %v", timex.Since(start))
46+
47+
// the data processing
48+
res, err := checkLegal([]int64{1, 2, 3})
49+
if err != nil {
50+
log.Printf("check error: %v", err)
51+
return
52+
}
53+
log.Printf("check res: %v", res)
54+
}
55+
56+
type ProductDetail struct {
57+
User interface{}
58+
Store interface{}
59+
Order interface{}
60+
}
61+
62+
func productDetail(uid, pid int64) (*ProductDetail, error) {
63+
var pd ProductDetail
64+
err := mr.Finish(func() (err error) {
65+
pd.User, err = userRpc.User(uid)
66+
return
67+
}, func() (err error) {
68+
pd.Store, err = storeRpc.Store(pid)
69+
return
70+
}, func() (err error) {
71+
pd.Order, err = orderRpc.Order(pid)
72+
return
73+
})
74+
75+
if err != nil {
76+
return nil, err
77+
}
78+
79+
return &pd, nil
80+
}
81+
82+
func checkLegal(uids []int64) ([]int64, error) {
83+
r, err := mr.MapReduce(func(source chan<- interface{}) {
84+
for _, uid := range uids {
85+
source <- uid
86+
}
87+
}, func(item interface{}, writer mr.Writer, cancel func(error)) {
88+
uid := item.(int64)
89+
ok, err := check(uid)
90+
if err != nil {
91+
cancel(err)
92+
}
93+
if ok {
94+
writer.Write(uid)
95+
}
96+
}, func(pipe <-chan interface{}, writer mr.Writer, cancel func(error)) {
97+
var uids []int64
98+
for p := range pipe {
99+
uids = append(uids, p.(int64))
100+
}
101+
writer.Write(uids)
102+
})
103+
if err != nil {
104+
return nil, err
105+
}
106+
107+
return r.([]int64), nil
108+
}
109+
110+
func check(uid int64) (bool, error) {
111+
// do something check user legal
112+
time.Sleep(time.Millisecond * 20)
113+
return true, nil
114+
}

0 commit comments

Comments
 (0)