@@ -4,12 +4,14 @@ import (
4
4
"context"
5
5
"crypto/tls"
6
6
"crypto/x509"
7
+ "errors"
7
8
"fmt"
8
9
"log"
9
10
"os"
10
11
"time"
11
12
12
13
"github.com/cybertec-postgresql/vip-manager/vipconfig"
14
+ v3rpc "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
13
15
clientv3 "go.etcd.io/etcd/client/v3"
14
16
)
15
17
@@ -69,8 +71,8 @@ func getTransport(conf *vipconfig.Config) (*tls.Config, error) {
69
71
return tlsClientConfig , nil
70
72
}
71
73
72
- // init gets the current leader from etcd
73
- func (elc * EtcdLeaderChecker ) init (ctx context.Context , out chan <- bool ) {
74
+ // get gets the current leader from etcd
75
+ func (elc * EtcdLeaderChecker ) get (ctx context.Context , out chan <- bool ) {
74
76
resp , err := elc .Get (ctx , elc .Key )
75
77
if err != nil {
76
78
log .Printf ("etcd error: %s" , err )
@@ -93,8 +95,12 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
93
95
return ctx .Err ()
94
96
case watchResp := <- watchChan :
95
97
if err := watchResp .Err (); err != nil {
96
- log .Printf ("etcd watcher returned error: %s" , err )
97
- out <- false
98
+ if errors .Is (err , v3rpc .ErrCompacted ) { // revision is compacted, try direct get key
99
+ elc .get (ctx , out )
100
+ } else {
101
+ log .Printf ("etcd watcher returned error: %s" , err )
102
+ out <- false
103
+ }
98
104
continue
99
105
}
100
106
for _ , event := range watchResp .Events {
@@ -108,7 +114,7 @@ func (elc *EtcdLeaderChecker) watch(ctx context.Context, out chan<- bool) error
108
114
// GetChangeNotificationStream monitors the leader in etcd
109
115
func (elc * EtcdLeaderChecker ) GetChangeNotificationStream (ctx context.Context , out chan <- bool ) error {
110
116
defer elc .Close ()
111
- go elc .init (ctx , out )
117
+ go elc .get (ctx , out )
112
118
wctx , cancel := context .WithCancel (ctx )
113
119
defer cancel ()
114
120
return elc .watch (wctx , out )
0 commit comments