Skip to content

Commit cc944d6

Browse files
grpcproxy: add support for WatchRequest_ProgressRequest
Signed-off-by: alexandre.vilain <[email protected]>
1 parent d6416b8 commit cc944d6

File tree

2 files changed

+16
-12
lines changed

2 files changed

+16
-12
lines changed

server/proxy/grpcproxy/watch.go

+16
Original file line numberDiff line numberDiff line change
@@ -271,6 +271,22 @@ func (wps *watchProxyStream) recvLoop() error {
271271
case *pb.WatchRequest_CancelRequest:
272272
wps.delete(uv.CancelRequest.WatchId)
273273
wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId))
274+
case *pb.WatchRequest_ProgressRequest:
275+
wps.mu.Lock()
276+
for _, w := range wps.watchers {
277+
// Enable the progress notify on the watcher, otherwise the send()
278+
// method may drop progress events.
279+
w.progress = true
280+
wps.lg.Debug("enabled process notify on watcher", zap.Int64("watcherId", w.id))
281+
}
282+
if err := wps.ranges.wp.cw.RequestProgress(wps.stream.Context()); err != nil {
283+
wps.lg.Error("failed to request progress", zap.Error(err))
284+
// At this point, the error returned by the client isn't
285+
// due to something wrong on the cluster but to closed conns.
286+
// So we don't have to send a watch response.
287+
return err
288+
}
289+
wps.mu.Unlock()
274290
default:
275291
// Panic or Fatalf would allow to network clients to crash the serve remotely.
276292
wps.lg.Error("not supported request type by gRPC proxy", zap.Stringer("request", req))

tests/integration/v3_watch_test.go

-12
Original file line numberDiff line numberDiff line change
@@ -1424,12 +1424,6 @@ func TestV3WatchCloseCancelRace(t *testing.T) {
14241424
// TestV3WatchProgressWaitsForSync checks that progress notifications
14251425
// don't get sent until the watcher is synchronised
14261426
func TestV3WatchProgressWaitsForSync(t *testing.T) {
1427-
// Disable for gRPC proxy, as it does not support requesting
1428-
// progress notifications
1429-
if integration.ThroughProxy {
1430-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1431-
}
1432-
14331427
integration.BeforeTest(t)
14341428

14351429
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1490,9 +1484,6 @@ func TestV3WatchProgressWaitsForSync(t *testing.T) {
14901484
}
14911485

14921486
func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
1493-
if integration.ThroughProxy {
1494-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1495-
}
14961487
integration.BeforeTest(t)
14971488

14981489
clus := integration.NewCluster(t, &integration.ClusterConfig{Size: 1})
@@ -1536,9 +1527,6 @@ func TestV3WatchProgressWaitsForSyncNoEvents(t *testing.T) {
15361527
// TestV3NoEventsLostOnCompact verifies that slow watchers exit with compacted watch response
15371528
// if its next revision of events are compacted and no lost events sent to client.
15381529
func TestV3NoEventsLostOnCompact(t *testing.T) {
1539-
if integration.ThroughProxy {
1540-
t.Skip("grpc proxy currently does not support requesting progress notifications")
1541-
}
15421530
integration.BeforeTest(t)
15431531
if len(gofail.List()) == 0 {
15441532
t.Skip("please run 'make gofail-enable' before running the test")

0 commit comments

Comments
 (0)