Skip to content

Commit 18b4684

Browse files
committed
feat: bizflow框架开发
1 parent cafbc84 commit 18b4684

File tree

7 files changed

+372
-0
lines changed

7 files changed

+372
-0
lines changed

common/bizflow/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# 一个基于DAG的业务流程框架
2+
* 详情参考:https://hardcore-tech.feishu.cn/wiki/Ji9BwqCOUiSxXckiixTcvKCKnXb
3+
* 该框架的工作负载大概在千万量级,后续会在第三期做大幅度的性能优化,使其可在亿级流量场景可用。

common/bizflow/bizflow_test.go

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
package bizflow
2+
3+
import (
4+
"context"
5+
"testing"
6+
)
7+
8+
type testFlow struct {
9+
}
10+
11+
func (f *testFlow) Name() FlowName {
12+
return "test_flow"
13+
}
14+
func (f *testFlow) BuildGraph(e *Engine) *Graph {
15+
g := e.InitGraph()
16+
tt := &testNode{}
17+
ttB := &testNodeB{}
18+
ttC := &testNodeC{}
19+
ttD := &testNodeD{}
20+
g.AddNode(tt).AddNode(ttB).AddNode(ttC).AddNode(ttD)
21+
return g
22+
}
23+
24+
type testNode struct {
25+
in string
26+
}
27+
28+
func (t *testNode) Name() NodeName {
29+
return "testA"
30+
}
31+
func (t *testNode) Deps() []NodeName {
32+
return nil
33+
}
34+
func (t *testNode) Meta() *Meta {
35+
return &Meta{}
36+
}
37+
func (t *testNode) Run(f *Graph) error {
38+
return nil
39+
}
40+
41+
type testNodeB struct {
42+
}
43+
44+
func (t *testNodeB) Name() NodeName {
45+
return "testB"
46+
}
47+
func (t *testNodeB) Deps() []NodeName {
48+
return []NodeName{"testA"}
49+
}
50+
func (t *testNodeB) Meta() *Meta {
51+
return &Meta{}
52+
}
53+
func (t *testNodeB) Run(f *Graph) error {
54+
return nil
55+
}
56+
57+
type testNodeC struct {
58+
}
59+
60+
func (t *testNodeC) Name() NodeName {
61+
return "testC"
62+
}
63+
func (t *testNodeC) Deps() []NodeName {
64+
return []NodeName{"testA"}
65+
}
66+
func (t *testNodeC) Meta() *Meta {
67+
return &Meta{}
68+
}
69+
func (t *testNodeC) Run(f *Graph) error {
70+
return nil
71+
}
72+
73+
type testNodeD struct {
74+
out string
75+
}
76+
77+
func (t *testNodeD) Name() NodeName {
78+
return "testD"
79+
}
80+
func (t *testNodeD) Deps() []NodeName {
81+
return []NodeName{"testB", "testC"}
82+
}
83+
func (t *testNodeD) Meta() *Meta {
84+
return &Meta{}
85+
}
86+
func (t *testNodeD) Run(g *Graph) error {
87+
if node := g.GetNode("testA"); node != nil {
88+
testA, _ := node.(*testNode)
89+
t.out = testA.in
90+
}
91+
return nil
92+
}
93+
func TestBizflow(t *testing.T) {
94+
// 进程初始化
95+
eng := NewEngine(64)
96+
eng.AddFlow(&testFlow{})
97+
98+
// 请求到来时
99+
g := eng.CreateDAG("test_flow")
100+
testA := g.Input().(*testNode)
101+
testA.in = "hello bizflow"
102+
g.Run(context.TODO())
103+
testD := g.Output().(*testNodeD)
104+
t.Logf("TestBizflow %+v", testD.out)
105+
}

common/bizflow/engine.go

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
package bizflow
2+
3+
import (
4+
"github.com/panjf2000/ants"
5+
)
6+
7+
type Engine struct {
8+
flows map[FlowName]Flow
9+
flowParallelNum int
10+
workPool *ants.Pool
11+
}
12+
13+
func NewEngine(workSize int) *Engine {
14+
e := &Engine{flows: map[FlowName]Flow{}}
15+
if wPool, err := ants.NewPool(workSize); err != nil {
16+
panic(err)
17+
} else {
18+
e.workPool = wPool
19+
}
20+
return e
21+
}
22+
23+
func (e *Engine) AddFlow(f Flow) *Engine {
24+
e.flows[f.Name()] = f
25+
// 这里构建一次图是为了在进程初始化时就检查DAG是否合法
26+
f.BuildGraph(e)
27+
return e
28+
}
29+
30+
func (e *Engine) FlowParallelNum(num int) *Engine {
31+
e.flowParallelNum = num
32+
return e
33+
}
34+
func (e *Engine) CreateDAG(name FlowName) *Graph {
35+
if f := e.getFlow(name); f != nil {
36+
g := f.BuildGraph(e)
37+
return g
38+
}
39+
return nil
40+
}
41+
42+
func (e *Engine) InitGraph() *Graph {
43+
return &Graph{e: e, nodes: map[NodeName]*item{}, eventChan: make(chan *item, e.flowParallelNum)}
44+
}
45+
46+
func (e *Engine) getFlow(name FlowName) Flow {
47+
if flow, ok := e.flows[name]; ok {
48+
return flow
49+
}
50+
return nil
51+
}

common/bizflow/flow.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
package bizflow
2+
3+
type FlowName string
4+
type Flow interface {
5+
Name() FlowName
6+
BuildGraph(e *Engine) *Graph
7+
}

common/bizflow/graph.go

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
package bizflow
2+
3+
import (
4+
"context"
5+
"errors"
6+
"fmt"
7+
"sync"
8+
9+
"github.com/bytedance/gopkg/util/logger"
10+
)
11+
12+
type Graph struct {
13+
nodes map[NodeName]*item
14+
lastNodeName NodeName
15+
firstNodeName NodeName
16+
eventChan chan *item
17+
e *Engine
18+
}
19+
20+
func (g *Graph) AddNode(node Node) *Graph {
21+
it := &item{node: node, nexts: map[NodeName]*item{}}
22+
nodeName := node.Name()
23+
deps := node.Deps()
24+
meta := node.Meta()
25+
g.nodes[nodeName] = it
26+
if len(deps) == 0 {
27+
if len(g.firstNodeName) == 0 {
28+
g.firstNodeName = nodeName
29+
} else {
30+
panic("There cannot be any independent stages except for the first node.")
31+
}
32+
}
33+
g.lastNodeName = nodeName
34+
it.depsNum = len(deps)
35+
it.retryNum = meta.RetryNum
36+
for _, parent := range deps {
37+
if parentItem := g.getItem(parent); parentItem != nil {
38+
if parent == nodeName {
39+
panic("The node has a dependency on itself.")
40+
}
41+
parentItem.nexts[nodeName] = it
42+
} else {
43+
panic(fmt.Sprintf("The dependent %s is not initialized. There is a cyclic dependency:[%s<-%s] "+
44+
"Please ensure that the order of adding nodes during buildgraph follows the precedence order.", parent, nodeName, parent))
45+
}
46+
}
47+
return g
48+
}
49+
50+
func (g *Graph) GetNode(name NodeName) Node {
51+
if it := g.getItem(name); it != nil {
52+
event := it.getEvent()
53+
if event == execOK {
54+
return it.node
55+
} else {
56+
panic("The node you obtained has not been completed yet. Please follow the declared DAG dependency rules.")
57+
}
58+
}
59+
return nil
60+
}
61+
62+
func (g *Graph) Input() Node {
63+
return g.getItem(g.firstNodeName).node
64+
}
65+
66+
func (g *Graph) Output() Node {
67+
return g.GetNode(g.lastNodeName)
68+
}
69+
70+
func (g *Graph) getItem(name NodeName) *item {
71+
if it, ok := g.nodes[name]; ok {
72+
return it
73+
}
74+
return nil
75+
}
76+
77+
func (g *Graph) Run(ctx context.Context) error {
78+
if len(g.firstNodeName) == 0 {
79+
return errors.New("Graph has not been created")
80+
}
81+
firstIt := g.getItem(g.firstNodeName)
82+
if g.firstNodeName == g.lastNodeName {
83+
return firstIt.node.Run(g)
84+
}
85+
g.e.workPool.Submit(func() { g.work(firstIt) })
86+
for it := range g.eventChan {
87+
switch it.event {
88+
case execOK:
89+
g.execNext(it)
90+
g.tryStop(it)
91+
case RetryableError:
92+
if it.retryNum > 0 {
93+
g.e.workPool.Submit(func() { g.work(it) })
94+
}
95+
case NonRetryable:
96+
logger.CtxErrorf(ctx, "bizflow.masterRun NonRetryable.err=%+v", it.err)
97+
g.tryStop(it)
98+
return it.err
99+
case AbortErr:
100+
close(g.eventChan)
101+
return nil
102+
}
103+
}
104+
return nil
105+
}
106+
107+
func (g *Graph) tryStop(it *item) {
108+
if g.lastNodeName == it.node.Name() {
109+
close(g.eventChan)
110+
}
111+
}
112+
113+
func (g *Graph) execNext(it *item) {
114+
for _, itt := range it.nexts {
115+
itt.depsNum -= 1
116+
if itt.depsNum <= 0 {
117+
g.e.workPool.Submit(func() { g.work(itt) })
118+
}
119+
}
120+
}
121+
func (g *Graph) masterChanClosed() bool {
122+
select {
123+
case _, ok := <-g.eventChan:
124+
return !ok
125+
default:
126+
return false
127+
}
128+
}
129+
func (g *Graph) work(it *item) {
130+
// 先检查一下
131+
if g.masterChanClosed() {
132+
return
133+
}
134+
defer func() {
135+
if !g.masterChanClosed() {
136+
g.eventChan <- it
137+
}
138+
}()
139+
err := it.node.Run(g)
140+
if err == nil {
141+
it.setEvent(execOK)
142+
return
143+
}
144+
it.err = err
145+
meta := it.node.Meta()
146+
if meta.IsRetryErr[err] {
147+
it.setEvent(RetryableError)
148+
return
149+
}
150+
if meta.IsNonRetryErr[err] {
151+
it.setEvent(NonRetryable)
152+
return
153+
}
154+
if meta.AbortErr[err] {
155+
it.setEvent(AbortErr)
156+
return
157+
}
158+
}
159+
160+
type eventName string
161+
162+
const (
163+
execOK eventName = "execOK"
164+
RetryableError eventName = "Retryable error"
165+
NonRetryable eventName = "NonRetryable error"
166+
AbortErr eventName = "abort error"
167+
)
168+
169+
type item struct {
170+
node Node
171+
event eventName
172+
nexts map[NodeName]*item
173+
err error
174+
depsNum int
175+
retryNum int
176+
sync.RWMutex
177+
}
178+
179+
func (it *item) getEvent() eventName {
180+
it.RLock()
181+
defer it.RUnlock()
182+
return it.event
183+
}
184+
185+
func (it *item) setEvent(e eventName) {
186+
it.Lock()
187+
defer it.Unlock()
188+
it.event = e
189+
}

common/bizflow/meta.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
package bizflow
2+
3+
type Meta struct {
4+
RetryNum int
5+
IsRetryErr map[error]bool
6+
IsNonRetryErr map[error]bool
7+
AbortErr map[error]bool
8+
}

common/bizflow/node.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
package bizflow
2+
3+
type NodeName string
4+
type Node interface {
5+
Name() NodeName
6+
Deps() []NodeName
7+
Meta() *Meta
8+
Run(g *Graph) error
9+
}

0 commit comments

Comments
 (0)