Skip to content

Commit add9f2b

Browse files
grpcproxy: add support for WatchRequest_ProgressRequest
Signed-off-by: alexandre.vilain <[email protected]>
1 parent d6416b8 commit add9f2b

File tree

2 files changed

+20
-12
lines changed

2 files changed

+20
-12
lines changed

server/proxy/grpcproxy/watch.go

+20
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,11 @@ func (wps *watchProxyStream) recvLoop() error {
271271
case *pb.WatchRequest_CancelRequest:
272272
wps.delete(uv.CancelRequest.WatchId)
273273
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
274+
case *pb.WatchRequest_ProgressRequest:
275+
err := wps.requestProgressAll()
276+
if err != nil {
277+
return err
278+
}
274279
default:
275280
// Panic or Fatalf would allow to network clients to crash the serve remotely.
276281
wps.lg.Error("not supported request type by gRPC proxy", zap.Stringer("request", req))
@@ -311,3 +316,18 @@ func (wps *watchProxyStream) delete(id int64) {
311316
}
312317
wps.watchCh <- resp
313318
}
319+
320+
func (wps *watchProxyStream) requestProgressAll() error {
321+
wps.mu.Lock()
322+
defer wps.mu.Unlock()
323+
324+
if err := wps.ranges.wp.cw.RequestProgress(wps.stream.Context()); err != nil {
325+
wps.lg.Error("failed to request progress", zap.Error(err))
326+
// At this point, the error returned by the client isn't
327+
// due to something wrong on the cluster but to closed conns.
328+
// So we don't have to send a watch response.
329+
return err
330+
}
331+
332+
return nil
333+
}

tests/integration/v3_watch_test.go

-12
Original file line numberDiff line numberDiff line change
@@ -1424,12 +1424,6 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
14241424
// TestV3WatchProgressWaitsForSync checks that progress notifications
14251425
// don't get sent until the watcher is synchronised
14261426
func TestV3WatchProgressWaitsForSync(t *testing.T) {
1427-
// Disable for gRPC proxy, as it does not support requesting
1428-
// progress notifications
1429-
if integration.ThroughProxy {
1430-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1431-
}
1432-
14331427
integration.BeforeTest(t)
14341428

14351429
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1490,9 +1484,6 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14901484
}
14911485

14921486
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
1493-
if integration.ThroughProxy {
1494-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1495-
}
14961487
integration.BeforeTest(t)
14971488

14981489
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1536,9 +1527,6 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
15361527
// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
15371528
// if its next revision of events are compacted and no lost events sent to client.
15381529
func TestV3NoEventsLostOnCompact(t *testing.T) {
1539-
if integration.ThroughProxy {
1540-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1541-
}
15421530
integration.BeforeTest(t)
15431531
if len(gofail.List()) == 0 {
15441532
t.Skip("please run 'make gofail-enable' before running the test")

0 commit comments

Comments
 (0)