Skip to content

Commit 7a9ff9d

Browse files
committed
Add heatbeat for register
1 parent 437a763 commit 7a9ff9d

6 files changed

Lines changed: 441 additions & 5 deletions

File tree

api.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,15 +32,21 @@ func Register(zone, app string, instance *Instance) error {
3232

3333
// status: http.StatusNoContent
3434
result := requests.Post(u).Json(info).Send().Status2xx()
35+
if result.Err == nil {
36+
instance.Beater.AddBeatInfo(app, instance)
37+
}
3538
return result.Err
3639
}
3740

3841
// UnRegister 删除实例
3942
// DELETE /eureka/v2/apps/appID/instanceID
40-
func UnRegister(zone, app, instanceID string) error {
41-
u := zone + "apps/" + app + "/" + instanceID
43+
func UnRegister(zone, app string, instance *Instance) error {
44+
u := zone + "apps/" + app + "/" + instance.InstanceID
4245
// status: http.StatusNoContent
4346
result := requests.Delete(u).Send().StatusOk()
47+
if result.Err == nil && instance.Beater != nil {
48+
instance.Beater.RemoveBeatInfo(app, instance.InstanceID)
49+
}
4450
return result.Err
4551
}
4652

beat.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package eureka_client
2+
3+
import (
4+
"context"
5+
"log"
6+
"sync"
7+
"time"
8+
9+
"golang.org/x/sync/semaphore"
10+
)
11+
12+
type BeatReactor struct {
13+
config *Config
14+
beatMap ConcurrentMap
15+
clientBeatInterval int64
16+
beatThreadCount int
17+
beatThreadSemaphore *semaphore.Weighted
18+
beatRecordMap ConcurrentMap
19+
mux *sync.Mutex
20+
log Logger
21+
Period time.Duration
22+
}
23+
24+
const DefaultBeatThreadNum = 20
25+
26+
var ctx = context.Background()
27+
28+
func NewBeatReactor(config *Config, clientBeatInterval int64) BeatReactor {
29+
br := BeatReactor{
30+
config: config,
31+
}
32+
if clientBeatInterval <= 0 {
33+
clientBeatInterval = 5 * 1000
34+
}
35+
br.beatMap = NewConcurrentMap()
36+
br.clientBeatInterval = clientBeatInterval
37+
br.beatThreadCount = DefaultBeatThreadNum
38+
br.beatRecordMap = NewConcurrentMap()
39+
br.beatThreadSemaphore = semaphore.NewWeighted(int64(br.beatThreadCount))
40+
br.mux = new(sync.Mutex)
41+
br.log = NewLogger()
42+
br.Period = time.Duration(time.Millisecond.Milliseconds() * clientBeatInterval)
43+
return br
44+
}
45+
46+
func (br *BeatReactor) AddBeatInfo(serviceName string, beatInfo *Instance) {
47+
k := beatInfo.InstanceID
48+
defer br.mux.Unlock()
49+
br.mux.Lock()
50+
if data, ok := br.beatMap.Get(k); ok {
51+
beatInfo = data.(*Instance)
52+
beatInfo.Status = "UP"
53+
br.beatMap.Remove(k)
54+
}
55+
br.beatMap.Set(k, beatInfo)
56+
go br.sendInstanceBeat(k, beatInfo)
57+
}
58+
59+
func (br *BeatReactor) RemoveBeatInfo(serviceName string, instanceId string) {
60+
log.Printf("remove beat: %s@%s from beat map", serviceName, instanceId)
61+
k := instanceId
62+
defer br.mux.Unlock()
63+
br.mux.Lock()
64+
data, exist := br.beatMap.Get(k)
65+
if exist {
66+
beatInfo := data.(*Instance)
67+
beatInfo.Status = "UP"
68+
}
69+
br.beatMap.Remove(k)
70+
}
71+
72+
func (br *BeatReactor) sendInstanceBeat(k string, beatInfo *Instance) {
73+
for {
74+
err := br.beatThreadSemaphore.Acquire(ctx, 1)
75+
if err != nil {
76+
log.Printf("sendInstanceBeat failed to acquire semaphore: %v", err)
77+
return
78+
}
79+
//如果当前实例注销,则进行停止心跳
80+
if beatInfo.Status != "UP" {
81+
log.Printf("instance[%s] stop heartBeating", k)
82+
br.beatThreadSemaphore.Release(1)
83+
return
84+
}
85+
86+
//进行心跳通信
87+
// /eureka/apps/ORDER-SERVICE/localhost:order-service:8886?status=UP
88+
err = Heartbeat(br.config.DefaultZone, beatInfo.App, beatInfo.InstanceID)
89+
//u := br.config.DefaultZone + "apps/" + beatInfo.App + "/" + beatInfo.InstanceID + "?status=UP"
90+
//result := requests.Put(u).Send().Status2xx()
91+
92+
if err != nil {
93+
log.Printf("beat to server return error:%+v", err)
94+
br.beatThreadSemaphore.Release(1)
95+
t := time.NewTimer(br.Period)
96+
<-t.C
97+
continue
98+
}
99+
100+
br.beatRecordMap.Set(k, time.Now().UnixNano()/1e6)
101+
br.beatThreadSemaphore.Release(1)
102+
103+
t := time.NewTimer(br.Period)
104+
<-t.C
105+
}
106+
}

client.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (c *Client) Start() {
5050
// refresh 刷新服务列表
5151
func (c *Client) refresh() {
5252
timer := time.NewTimer(0)
53-
interval := time.Duration(c.Config.RenewalIntervalInSecs) * time.Second
53+
interval := time.Duration(c.Config.RegistryFetchIntervalSeconds) * time.Second
5454
for c.running {
5555
select {
5656
case <-timer.C:
@@ -70,7 +70,7 @@ func (c *Client) refresh() {
7070
// heartbeat 心跳
7171
func (c *Client) heartbeat() {
7272
timer := time.NewTimer(0)
73-
interval := time.Duration(c.Config.RegistryFetchIntervalSeconds) * time.Second
73+
interval := time.Duration(c.Config.RenewalIntervalInSecs) * time.Second
7474
for c.running {
7575
select {
7676
case <-timer.C:

0 commit comments

Comments
 (0)