Skip to content

Commit 26c34e3

Browse files
grpcproxy: add support for WatchRequest_ProgressRequest
Signed-off-by: alexandre.vilain <[email protected]>
1 parent d6416b8 commit 26c34e3

File tree

3 files changed

+43
-17
lines changed

3 files changed

+43
-17
lines changed

server/proxy/grpcproxy/watch.go

+25
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ func (wps *watchProxyStream) recvLoop() error {
271271
case *pb.WatchRequest_CancelRequest:
272272
wps.delete(uv.CancelRequest.WatchId)
273273
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
274+
case *pb.WatchRequest_ProgressRequest:
275+
err := wps.requestProgressAll()
276+
if err != nil {
277+
return err
278+
}
274279
default:
275280
// Panic or Fatalf would allow to network clients to crash the serve remotely.
276281
wps.lg.Error("not supported request type by gRPC proxy", zap.Stringer("request", req))
@@ -311,3 +316,23 @@ func (wps *watchProxyStream) delete(id int64) {
311316
}
312317
wps.watchCh <- resp
313318
}
319+
320+
func (wps *watchProxyStream) requestProgressAll() error {
321+
wps.mu.Lock()
322+
defer wps.mu.Unlock()
323+
324+
for _, w := range wps.watchers {
325+
// request only the next progress notification on the watcher.
326+
w.requestNextProgress()
327+
}
328+
329+
if err := wps.ranges.wp.cw.RequestProgress(wps.stream.Context()); err != nil {
330+
wps.lg.Error("failed to request progress", zap.Error(err))
331+
// At this point, the error returned by the client isn't
332+
// due to something wrong on the cluster but to closed conns.
333+
// So we don't have to send a watch response.
334+
return err
335+
}
336+
337+
return nil
338+
}

server/proxy/grpcproxy/watcher.go

+18-5
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
package grpcproxy
1616

1717
import (
18+
"sync/atomic"
1819
"time"
1920

2021
pb "go.etcd.io/etcd/api/v3/etcdserverpb"
@@ -34,10 +35,12 @@ func (wr *watchRange) valid() bool {
3435
type watcher struct {
3536
// user configuration
3637

37-
wr watchRange
38-
filters []mvcc.FilterFunc
39-
progress bool
40-
prevKV bool
38+
wr watchRange
39+
filters []mvcc.FilterFunc
40+
progress bool
41+
nextProgress atomic.Bool
42+
43+
prevKV bool
4144

4245
// id is the id returned to the client on its watch stream.
4346
id int64
@@ -50,10 +53,20 @@ type watcher struct {
5053
wps *watchProxyStream
5154
}
5255

56+
// requestNextProgress sets the nextProgress to true to ensure next progress notification
57+
// is forwarded to the client.
58+
func (w *watcher) requestNextProgress() {
59+
if w.progress {
60+
return
61+
}
62+
63+
w.nextProgress.Store(true)
64+
}
65+
5366
// send filters out repeated events by discarding revisions older
5467
// than the last one sent over the watch channel.
5568
func (w *watcher) send(wr clientv3.WatchResponse) {
56-
if wr.IsProgressNotify() && !w.progress {
69+
if wr.IsProgressNotify() && !w.progress && !w.nextProgress.CompareAndSwap(true, false) {
5770
return
5871
}
5972
if w.nextrev > wr.Header.Revision && len(wr.Events) > 0 {

tests/integration/v3_watch_test.go

-12
Original file line numberDiff line numberDiff line change
@@ -1424,12 +1424,6 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
14241424
// TestV3WatchProgressWaitsForSync checks that progress notifications
14251425
// don't get sent until the watcher is synchronised
14261426
func TestV3WatchProgressWaitsForSync(t *testing.T) {
1427-
// Disable for gRPC proxy, as it does not support requesting
1428-
// progress notifications
1429-
if integration.ThroughProxy {
1430-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1431-
}
1432-
14331427
integration.BeforeTest(t)
14341428

14351429
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1490,9 +1484,6 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14901484
}
14911485

14921486
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
1493-
if integration.ThroughProxy {
1494-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1495-
}
14961487
integration.BeforeTest(t)
14971488

14981489
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1536,9 +1527,6 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
15361527
// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
15371528
// if its next revision of events are compacted and no lost events sent to client.
15381529
func TestV3NoEventsLostOnCompact(t *testing.T) {
1539-
if integration.ThroughProxy {
1540-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1541-
}
15421530
integration.BeforeTest(t)
15431531
if len(gofail.List()) == 0 {
15441532
t.Skip("please run 'make gofail-enable' before running the test")

0 commit comments

Comments
 (0)