Skip to content

Commit 62c0aa7

Browse files
Remove redundant DataStreamer retry mechanism (#3957)
* Remove internal daprovider server for AnyTrust This change is a backport of 0195abd. There were significant changes in this area that prevent it from being applied cleanly but the idea is the same. The internal daprovider server was originally added in PR #2533 to unify the code paths between internal AnyTrust and external DA providers by making both use RPC clients. After working with it for a while it's clear that the drawbacks outweigh any benefit from it: it added an unnecessary HTTP/RPC layer for in-process communication, introduced timeout configuration complexity, and made retry behavior difficult to reason about. The proper abstraction point is the daprovider.Writer and Reader interfaces, not the transport layer. External DA providers need RPC because they're remote, but internal AnyTrust components can communicate directly. This change removes the internal server entirely and connects the Nitro node directly to the AnyTrust aggregator writer and REST aggregator reader. * Remove redundant DataStreamer retry mechanism The DataStreamer had its own retry logic (5 attempts with exponential backoff) on top of the underlying RPC client's retry mechanism (4 attempts with configurable retry patterns). This created a nested retry system where: - RPC client retries transient errors (timeouts, connection failures) - DataStreamer retried ALL errors indiscriminately, including permanent ones like "method does not exist" This redundancy caused problems in production: 1. Blocked fallback logic: When a DAS backend doesn't support chunked streaming, it returns "method does not exist". The DataStreamer would retry this error 5 times over 30 seconds instead of failing fast, preventing the intended fallback to legacy store from executing before the overall request timeout. 2. Excessive retry attempts: Each RPC method (start/chunk/finalize) could be retried up to 20 times (5 DataStreamer × 4 RPC client), wasting time and resources on non-retryable errors. 3. Error masking complexity: To properly handle permanent errors while keeping DataStreamer retries, we would need error filtering at TWO levels: - RPC client level (already handles transient vs permanent) - DataStreamer level (would need to duplicate this logic) This duplication adds complexity with no benefit since the RPC client already provides appropriate retry behavior. The RPC client's retry mechanism is sufficient: - Retries on context.DeadlineExceeded - Retries on connection errors (configurable regex pattern) - Immediately fails on application errors (e.g., "method does not exist") - Configurable timeout and retry count per deployment Removed CLI flags: - --*.data-stream.base-retry-delay - --*.data-stream.max-retry-delay - --*.data-stream.max-retry-attempts
1 parent 64f6515 commit 62c0aa7

File tree

2 files changed

+9
-83
lines changed

2 files changed

+9
-83
lines changed

daprovider/data_streaming/protocol_test.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ package data_streaming
55

66
import (
77
"context"
8-
"errors"
98
"math/rand"
109
"net"
1110
"net/http"
@@ -169,23 +168,6 @@ func TestDataStreaming_ProtocolSucceedsEvenWithDelays(t *testing.T) {
169168
require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message")
170169
}
171170

172-
func TestDataStreaming_ClientRetriesWhenThereAreConnectionProblems(t *testing.T) {
173-
// Server 'goes offline' for a moment just before reading the second chunk
174-
var alreadyWentOffline = false
175-
ctx, streamer := prepareTestEnv(t, func(i uint64) error {
176-
if i == 1 && !alreadyWentOffline {
177-
alreadyWentOffline = true
178-
return errors.New("service unavailable")
179-
}
180-
return nil
181-
182-
})
183-
message, _ := getLongRandomMessage(streamer.chunkSize)
184-
result, err := streamer.StreamData(ctx, message, timeout)
185-
testhelpers.RequireImpl(t, err)
186-
require.Equal(t, message, ([]byte)(result.Message), "protocol resulted in an incorrect message")
187-
}
188-
189171
func TestDataStreaming_ServerDeniesTooOldAndFutureRequests(t *testing.T) {
190172
ctx, streamer := prepareTestEnv(t, nil)
191173
message, _ := getLongRandomMessage(streamer.chunkSize)

daprovider/data_streaming/sender.go

Lines changed: 9 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -19,60 +19,42 @@ import (
1919
)
2020

2121
const (
22-
DefaultHttpBodyLimit = 5 * 1024 * 1024 // Taken from go-ethereum http.defaultBodyLimit
23-
TestHttpBodyLimit = 1024
24-
DefaultBaseRetryDelay = 2 * time.Second
25-
DefaultMaxRetryDelay = 1 * time.Minute
26-
DefaultMaxRetryAttempts = 5
22+
DefaultHttpBodyLimit = 5 * 1024 * 1024 // Taken from go-ethereum http.defaultBodyLimit
23+
TestHttpBodyLimit = 1024
2724
)
2825

2926
// lint:require-exhaustive-initialization
3027
type DataStreamerConfig struct {
3128
MaxStoreChunkBodySize int `koanf:"max-store-chunk-body-size"`
3229
RpcMethods DataStreamingRPCMethods `koanf:"rpc-methods"`
33-
34-
// Retry policy for RPC calls
35-
BaseRetryDelay time.Duration `koanf:"base-retry-delay"`
36-
MaxRetryDelay time.Duration `koanf:"max-retry-delay"`
37-
MaxRetryAttempts int `koanf:"max-retry-attempts"`
3830
}
3931

4032
func DefaultDataStreamerConfig(rpcMethods DataStreamingRPCMethods) DataStreamerConfig {
4133
return DataStreamerConfig{
4234
MaxStoreChunkBodySize: DefaultHttpBodyLimit,
4335
RpcMethods: rpcMethods,
44-
BaseRetryDelay: DefaultBaseRetryDelay,
45-
MaxRetryDelay: DefaultMaxRetryDelay,
46-
MaxRetryAttempts: DefaultMaxRetryAttempts,
4736
}
4837
}
4938

5039
func TestDataStreamerConfig(rpcMethods DataStreamingRPCMethods) DataStreamerConfig {
5140
return DataStreamerConfig{
5241
MaxStoreChunkBodySize: TestHttpBodyLimit,
5342
RpcMethods: rpcMethods,
54-
BaseRetryDelay: 100 * time.Millisecond,
55-
MaxRetryDelay: 100 * time.Millisecond,
56-
MaxRetryAttempts: 3,
5743
}
5844
}
5945

6046
func DataStreamerConfigAddOptions(prefix string, f *pflag.FlagSet, defaultRpcMethods DataStreamingRPCMethods) {
6147
f.Int(prefix+".max-store-chunk-body-size", DefaultHttpBodyLimit, "maximum HTTP body size for chunked store requests")
62-
f.Duration(prefix+".base-retry-delay", DefaultBaseRetryDelay, "base delay for retrying failed RPC calls")
63-
f.Duration(prefix+".max-retry-delay", DefaultMaxRetryDelay, "maximum delay for retrying failed RPC calls")
64-
f.Int(prefix+".max-retry-attempts", DefaultMaxRetryAttempts, "maximum number of attempts for retrying failed RPC calls")
6548
DataStreamingRPCMethodsAddOptions(prefix+".rpc-methods", f, defaultRpcMethods)
6649
}
6750

6851
// DataStreamer allows sending arbitrarily big payloads with JSON RPC. It follows a simple chunk-based protocol.
6952
// lint:require-exhaustive-initialization
7053
type DataStreamer[Result any] struct {
71-
rpcClient *rpcclient.RpcClient
72-
chunkSize uint64
73-
dataSigner *PayloadSigner
74-
rpcMethods DataStreamingRPCMethods
75-
retryDelayPolicy *expDelayPolicy
54+
rpcClient *rpcclient.RpcClient
55+
chunkSize uint64
56+
dataSigner *PayloadSigner
57+
rpcMethods DataStreamingRPCMethods
7658
}
7759

7860
// DataStreamingRPCMethods configuration specifies names of the protocol's RPC methods on the server side.
@@ -104,11 +86,6 @@ func NewDataStreamer[T any](config DataStreamerConfig, dataSigner *PayloadSigner
10486
chunkSize: chunkSize,
10587
dataSigner: dataSigner,
10688
rpcMethods: config.RpcMethods,
107-
retryDelayPolicy: &expDelayPolicy{
108-
baseDelay: config.BaseRetryDelay,
109-
maxDelay: config.MaxRetryDelay,
110-
maxAttempts: config.MaxRetryAttempts,
111-
},
11289
}, nil
11390
}
11491

@@ -144,7 +121,7 @@ func (ds *DataStreamer[Result]) startStream(ctx context.Context, params streamPa
144121
}
145122

146123
var result StartStreamingResult
147-
err = ds.call(
124+
err = ds.rpcClient.CallContext(
148125
ctx,
149126
&result,
150127
ds.rpcMethods.StartStream,
@@ -172,51 +149,18 @@ func (ds *DataStreamer[Result]) sendChunk(ctx context.Context, messageId Message
172149
if err != nil {
173150
return err
174151
}
175-
return ds.call(ctx, nil, ds.rpcMethods.StreamChunk, hexutil.Uint64(messageId), hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(payloadSignature))
152+
return ds.rpcClient.CallContext(ctx, nil, ds.rpcMethods.StreamChunk, hexutil.Uint64(messageId), hexutil.Uint64(chunkId), hexutil.Bytes(chunkData), hexutil.Bytes(payloadSignature))
176153
}
177154

178155
func (ds *DataStreamer[Result]) finalizeStream(ctx context.Context, messageId MessageId) (result *Result, err error) {
179156
payloadSignature, err := ds.sign(nil, uint64(messageId))
180157
if err != nil {
181158
return nil, err
182159
}
183-
err = ds.call(ctx, &result, ds.rpcMethods.FinalizeStream, hexutil.Uint64(messageId), hexutil.Bytes(payloadSignature))
160+
err = ds.rpcClient.CallContext(ctx, &result, ds.rpcMethods.FinalizeStream, hexutil.Uint64(messageId), hexutil.Bytes(payloadSignature))
184161
return
185162
}
186163

187-
type expDelayPolicy struct {
188-
baseDelay, maxDelay time.Duration
189-
maxAttempts int
190-
}
191-
192-
func (e *expDelayPolicy) NextDelay(attempt int) (time.Duration, bool) {
193-
if attempt >= e.maxAttempts {
194-
return 0, false
195-
}
196-
if attempt <= 0 {
197-
return time.Duration(0), true
198-
}
199-
200-
delay := e.baseDelay * time.Duration(1<<uint(attempt-1)) // nolint:gosec
201-
if delay > e.maxDelay {
202-
delay = e.maxDelay
203-
}
204-
return delay, true
205-
}
206-
207-
func (ds *DataStreamer[Result]) call(ctx context.Context, result interface{}, method string, args ...interface{}) (err error) {
208-
for attempt := 1; ; attempt++ {
209-
if err = ds.rpcClient.CallContext(ctx, result, method, args...); err == nil {
210-
return nil
211-
}
212-
delay, proceed := ds.retryDelayPolicy.NextDelay(attempt)
213-
if !proceed {
214-
return fmt.Errorf("failed after %d attempts: %w", attempt, err)
215-
}
216-
time.Sleep(delay)
217-
}
218-
}
219-
220164
func (ds *DataStreamer[Result]) sign(bytes []byte, extras ...uint64) ([]byte, error) {
221165
return ds.dataSigner.signPayload(bytes, extras...)
222166
}

0 commit comments

Comments
 (0)