Skip to content

Commit f559507

Browse files
committed
Merge branch 'pipiaha-dev' into dev
2 parents d005636 + 3eead38 commit f559507

File tree

4 files changed

+266
-12
lines changed

4 files changed

+266
-12
lines changed
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package etcd
2+
3+
import (
4+
clientv3 "go.etcd.io/etcd/client/v3"
5+
)
6+
7+
type RoleChangedListener interface {
8+
OnRoleChanged(RoleType)
9+
}
10+
11+
type Option func(*config)
12+
13+
func WithBaseKey(baseKey string) Option {
14+
return func(o *config) {
15+
o.BaseKey = baseKey
16+
}
17+
}
18+
19+
func WithEtcdConfig(cfg clientv3.Config) Option {
20+
return func(o *config) {
21+
o.cfg = cfg
22+
}
23+
}
24+
25+
func WithRoleChangedListener(l RoleChangedListener) Option {
26+
return func(o *config) {
27+
o.RoleChanged = l
28+
}
29+
}
30+
31+
type config struct {
32+
BaseKey string
33+
cfg clientv3.Config
34+
RoleChanged RoleChangedListener
35+
}
36+
37+
func defaultConfig() *config {
38+
return &config{}
39+
}

cluster/clusterproviders/etcd/etcd_provider.go

Lines changed: 139 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"fmt"
66
"log/slog"
77
"net"
8+
"runtime"
89
"strconv"
910
"strings"
1011
"sync/atomic"
@@ -31,6 +32,10 @@ type Provider struct {
3132
retryInterval time.Duration
3233
revision uint64
3334
// deregisterCritical time.Duration
35+
schedulers []*SingletonScheduler
36+
role RoleType
37+
roleChangedChan chan RoleType
38+
roleChangedListener RoleChangedListener
3439
}
3540

3641
func New() (*Provider, error) {
@@ -40,18 +45,27 @@ func New() (*Provider, error) {
4045
})
4146
}
4247

43-
func NewWithConfig(baseKey string, cfg clientv3.Config) (*Provider, error) {
44-
client, err := clientv3.New(cfg)
48+
func NewWithConfig(baseKey string, cfg clientv3.Config, opts ...Option) (*Provider, error) {
49+
c := defaultConfig()
50+
WithBaseKey(baseKey)(c)
51+
WithEtcdConfig(cfg)(c)
52+
for _, opt := range opts {
53+
opt(c)
54+
}
55+
client, err := clientv3.New(c.cfg)
4556
if err != nil {
4657
return nil, err
4758
}
4859
p := &Provider{
49-
client: client,
50-
keepAliveTTL: 3 * time.Second,
51-
retryInterval: 1 * time.Second,
52-
baseKey: baseKey,
53-
members: map[string]*Node{},
54-
cancelWatchCh: make(chan bool),
60+
client: client,
61+
keepAliveTTL: 3 * time.Second,
62+
retryInterval: 1 * time.Second,
63+
baseKey: c.BaseKey,
64+
members: map[string]*Node{},
65+
cancelWatchCh: make(chan bool),
66+
role: Follower,
67+
roleChangedChan: make(chan RoleType, 1),
68+
roleChangedListener: c.RoleChanged,
5569
}
5670
return p, nil
5771
}
@@ -78,7 +92,12 @@ func (p *Provider) StartMember(c *cluster.Cluster) error {
7892
if err := p.init(c); err != nil {
7993
return err
8094
}
95+
// register self
96+
if err := p.registerService(); err != nil {
97+
return err
98+
}
8199

100+
p.startRoleChangedNotifyLoop()
82101
// fetch memberlist
83102
nodes, err := p.fetchNodes()
84103
if err != nil {
@@ -89,12 +108,9 @@ func (p *Provider) StartMember(c *cluster.Cluster) error {
89108
p.publishClusterTopologyEvent()
90109
p.startWatching()
91110

92-
// register self
93-
if err := p.registerService(); err != nil {
94-
return err
95-
}
96111
ctx := context.TODO()
97112
p.startKeepAlive(ctx)
113+
p.updateLeadership(nodes)
98114
return nil
99115
}
100116

@@ -116,6 +132,7 @@ func (p *Provider) StartClient(c *cluster.Cluster) error {
116132
func (p *Provider) Shutdown(graceful bool) error {
117133
p.shutdown = true
118134
if !p.deregistered {
135+
p.updateLeadership(nil)
119136
err := p.deregisterService()
120137
if err != nil {
121138
p.cluster.Logger().Error("deregisterMember", slog.Any("error", err))
@@ -254,6 +271,8 @@ func (p *Provider) handleWatchResponse(resp clientv3.WatchResponse) map[string]*
254271
p.cluster.Logger().Debug("New member.", slog.String("key", key))
255272
}
256273
changes[nodeId] = node
274+
node.SetMeta(metaKeySeq, nodeId)
275+
node.SetMeta(metaKeySeq, fmt.Sprintf("%d", ev.Kv.Lease))
257276
case clientv3.EventTypeDelete:
258277
node, ok := p.members[nodeId]
259278
if !ok {
@@ -267,6 +286,8 @@ func (p *Provider) handleWatchResponse(resp clientv3.WatchResponse) map[string]*
267286
cloned := *node
268287
cloned.SetAlive(false)
269288
changes[nodeId] = &cloned
289+
node.SetMeta(metaKeySeq, nodeId)
290+
node.SetMeta(metaKeySeq, fmt.Sprintf("%d", ev.Kv.Lease))
270291
default:
271292
p.cluster.Logger().Error("Invalid etcd event.type.", slog.String("key", key),
272293
slog.String("type", ev.Type.String()))
@@ -294,6 +315,11 @@ func (p *Provider) _keepWatching(stream clientv3.WatchChan) error {
294315
}
295316
nodesChanges := p.handleWatchResponse(resp)
296317
p.updateNodesWithChanges(nodesChanges)
318+
l := make([]*Node, 0)
319+
for _, node := range nodesChanges {
320+
l = append(l, node)
321+
}
322+
p.updateLeadership(l)
297323
p.publishClusterTopologyEvent()
298324
}
299325
return nil
@@ -304,6 +330,17 @@ func (p *Provider) startWatching() {
304330
ctx, cancel := context.WithCancel(ctx)
305331
p.cancelWatch = cancel
306332
go func() {
333+
//recover
334+
defer func() {
335+
if r := recover(); r != nil {
336+
p.cluster.Logger().Error("Recovered from panic in keepWatching.", slog.Any("error", r))
337+
p.clusterError = fmt.Errorf("keepWatching panic: %v", r)
338+
}
339+
if p.cancelWatchCh != nil {
340+
close(p.cancelWatchCh)
341+
}
342+
p.cancelWatch = nil
343+
}()
307344
for !p.shutdown {
308345
if err := p.keepWatching(ctx); err != nil {
309346
p.cluster.Logger().Error("Failed to keepWatching.", slog.Any("error", err))
@@ -339,6 +376,10 @@ func (p *Provider) fetchNodes() ([]*Node, error) {
339376
return nil, err
340377
}
341378
nodes = append(nodes, &n)
379+
if v.Lease > 0 {
380+
n.SetMeta(metaKeySeq, fmt.Sprintf("%d", v.Lease))
381+
n.SetMeta(metaKeyID, n.ID)
382+
}
342383
}
343384
p.revision = uint64(resp.Header.GetRevision())
344385
// plog.Debug("fetch nodes",
@@ -393,6 +434,7 @@ func (p *Provider) getLeaseID() clientv3.LeaseID {
393434

394435
func (p *Provider) setLeaseID(leaseID clientv3.LeaseID) {
395436
atomic.StoreInt64((*int64)(&p.leaseID), (int64)(leaseID))
437+
p.self.SetMeta(metaKeySeq, fmt.Sprintf("%d", leaseID))
396438
}
397439

398440
func (p *Provider) newLeaseID() (clientv3.LeaseID, error) {
@@ -417,3 +459,88 @@ func splitHostPort(addr string) (host string, port int, err error) {
417459
}
418460
return
419461
}
462+
463+
func (p *Provider) RegisterSingletonScheduler(scheduler *SingletonScheduler) {
464+
p.schedulers = append(p.schedulers, scheduler)
465+
}
466+
467+
// 修改现有的角色变化处理逻辑
468+
func (p *Provider) updateLeadership(ns []*Node) {
469+
role := Follower
470+
ns, err := p.fetchNodes()
471+
if err != nil {
472+
p.cluster.Logger().Error("Failed to fetch nodes in updateLeadership.", slog.Any("error", err))
473+
}
474+
475+
if p.isLeaderOf(ns) {
476+
role = Leader
477+
}
478+
if role != p.role {
479+
p.cluster.Logger().Info("Role changed.", slog.String("from", p.role.String()), slog.String("to", role.String()))
480+
p.role = role
481+
p.roleChangedChan <- role
482+
483+
// 通知所有注册的 SingletonScheduler
484+
for _, scheduler := range p.schedulers {
485+
safeRun(p.cluster.Logger(), func() {
486+
scheduler.OnRoleChanged(role)
487+
})
488+
}
489+
}
490+
}
491+
492+
func safeRun(logger *slog.Logger, fn func()) {
493+
defer func() {
494+
if r := recover(); r != nil {
495+
logger.Warn("OnRoleChanged.", slog.Any("error", fmt.Errorf("%v\n%s", r, string(getRunTimeStack()))))
496+
}
497+
}()
498+
fn()
499+
}
500+
501+
func getRunTimeStack() []byte {
502+
const size = 64 << 10
503+
buf := make([]byte, size)
504+
return buf[:runtime.Stack(buf, false)]
505+
}
506+
507+
func (p *Provider) isLeaderOf(ns []*Node) bool {
508+
if ns == nil {
509+
return false
510+
}
511+
if len(ns) == 1 && p.self != nil && ns[0].ID == p.self.ID {
512+
return true
513+
}
514+
var minSeq int
515+
for _, node := range ns {
516+
if !node.IsAlive() {
517+
continue
518+
}
519+
if seq := node.GetSeq(); (seq > 0 && seq < minSeq) || minSeq == 0 {
520+
minSeq = seq
521+
}
522+
}
523+
//for _, node := range ns {
524+
// if p.self != nil && node.ID == p.self.ID {
525+
// return minSeq > 0 && minSeq == p.self.GetSeq()
526+
// }
527+
//}
528+
if minSeq <= 0 { // 没有在线actor
529+
return true
530+
}
531+
if p.self != nil && p.self.GetSeq() <= minSeq {
532+
return true
533+
}
534+
return false
535+
}
536+
537+
func (p *Provider) startRoleChangedNotifyLoop() {
538+
go func() {
539+
for !p.shutdown {
540+
role := <-p.roleChangedChan
541+
if lis := p.roleChangedListener; lis != nil {
542+
safeRun(p.cluster.Logger(), func() { lis.OnRoleChanged(role) })
543+
}
544+
}
545+
}()
546+
}

cluster/clusterproviders/etcd/node.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,16 @@ package etcd
22

33
import (
44
"encoding/json"
5+
"strconv"
56

67
"github.com/asynkron/protoactor-go/cluster"
78
)
89

10+
const (
11+
metaKeyID = "id"
12+
metaKeySeq = "seq"
13+
)
14+
915
type Node struct {
1016
ID string `json:"id"`
1117
Name string `json:"name"`
@@ -64,6 +70,17 @@ func (n *Node) GetMeta(name string) (string, bool) {
6470
val, ok := n.Meta[name]
6571
return val, ok
6672
}
73+
func (n *Node) GetSeq() int {
74+
if seqStr, ok := n.GetMeta(metaKeySeq); ok {
75+
return strToInt(seqStr)
76+
}
77+
return 0
78+
}
79+
80+
func strToInt(s string) int {
81+
i, _ := strconv.ParseInt(s, 10, 64)
82+
return int(i)
83+
}
6784

6885
func (n *Node) MemberStatus() *cluster.Member {
6986
host, port := n.GetAddress()
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package etcd
2+
3+
import (
4+
"github.com/asynkron/protoactor-go/actor"
5+
"sync"
6+
)
7+
8+
// RoleType 表示节点在集群中的角色
9+
type RoleType int
10+
11+
const (
12+
Follower RoleType = iota
13+
Leader
14+
)
15+
16+
func (r RoleType) String() string {
17+
switch r {
18+
case Leader:
19+
return "Leader"
20+
default:
21+
return "Follower"
22+
}
23+
}
24+
25+
// SingletonScheduler 管理在领导节点上运行的单例 actor
26+
type SingletonScheduler struct {
27+
sync.Mutex
28+
root *actor.RootContext
29+
props []*actor.Props
30+
pids []*actor.PID
31+
}
32+
33+
func NewSingletonScheduler(rc *actor.RootContext) *SingletonScheduler {
34+
return &SingletonScheduler{root: rc}
35+
}
36+
37+
func (s *SingletonScheduler) FromFunc(f actor.ReceiveFunc) *SingletonScheduler {
38+
s.Lock()
39+
defer s.Unlock()
40+
s.props = append(s.props, actor.PropsFromFunc(f))
41+
return s
42+
}
43+
44+
func (s *SingletonScheduler) FromProducer(f actor.Producer) *SingletonScheduler {
45+
s.Lock()
46+
defer s.Unlock()
47+
s.props = append(s.props, actor.PropsFromProducer(f))
48+
return s
49+
}
50+
51+
func (s *SingletonScheduler) OnRoleChanged(rt RoleType) {
52+
s.Lock()
53+
defer s.Unlock()
54+
if rt == Follower {
55+
if len(s.pids) > 0 {
56+
s.root.Logger().Info("I am follower, poison singleton actors")
57+
for _, pid := range s.pids {
58+
s.root.Poison(pid)
59+
}
60+
s.pids = nil
61+
}
62+
} else if rt == Leader {
63+
if len(s.props) > 0 {
64+
s.root.Logger().Info("I am leader now, start singleton actors")
65+
s.pids = make([]*actor.PID, len(s.props))
66+
for i, p := range s.props {
67+
s.pids[i] = s.root.Spawn(p)
68+
}
69+
}
70+
}
71+
}

0 commit comments

Comments
 (0)