Skip to content

Commit f065575

Browse files
Merge pull request #520 from OffchainLabs/add-timeout-to-das-store
Add timeout parameter to DAS Store interface
2 parents 4751e9f + f379509 commit f065575

File tree

6 files changed

+27
-13
lines changed

6 files changed

+27
-13
lines changed

arbnode/batch_poster.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type BatchPosterConfig struct {
4747
BatchPollDelay time.Duration `koanf:"poll-delay"`
4848
PostingErrorDelay time.Duration `koanf:"error-delay"`
4949
CompressionLevel int `koanf:"compression-level"`
50+
DASRetentionPeriod time.Duration `koanf:"das-retention-period"`
5051
}
5152

5253
func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
@@ -56,6 +57,7 @@ func BatchPosterConfigAddOptions(prefix string, f *flag.FlagSet) {
5657
f.Duration(prefix+".poll-delay", DefaultBatchPosterConfig.BatchPollDelay, "how long to delay after successfully posting batch")
5758
f.Duration(prefix+".error-delay", DefaultBatchPosterConfig.PostingErrorDelay, "how long to delay after error posting batch")
5859
f.Int(prefix+".compression-level", DefaultBatchPosterConfig.CompressionLevel, "batch compression level")
60+
f.Duration(prefix+".das-retention-period", DefaultBatchPosterConfig.DASRetentionPeriod, "In AnyTrust mode, the period which DASes are requested to retain the stored batches.")
5961
}
6062

6163
var DefaultBatchPosterConfig = BatchPosterConfig{
@@ -65,6 +67,7 @@ var DefaultBatchPosterConfig = BatchPosterConfig{
6567
PostingErrorDelay: time.Second * 10,
6668
MaxBatchPostInterval: time.Hour,
6769
CompressionLevel: brotli.DefaultCompression,
70+
DASRetentionPeriod: time.Hour * 24 * 15,
6871
}
6972

7073
var TestBatchPosterConfig = BatchPosterConfig{
@@ -74,6 +77,7 @@ var TestBatchPosterConfig = BatchPosterConfig{
7477
PostingErrorDelay: time.Millisecond * 10,
7578
MaxBatchPostInterval: 0,
7679
CompressionLevel: 2,
80+
DASRetentionPeriod: time.Hour * 24 * 15,
7781
}
7882

7983
func NewBatchPoster(l1Reader *L1Reader, inbox *InboxTracker, streamer *TransactionStreamer, config *BatchPosterConfig, contractAddress common.Address, refunder common.Address, transactOpts *bind.TransactOpts, das das.DataAvailabilityService) (*BatchPoster, error) {
@@ -369,7 +373,7 @@ func (b *BatchPoster) maybePostSequencerBatch(ctx context.Context, timeSinceBatc
369373
}
370374

371375
if b.das != nil {
372-
cert, err := b.das.Store(ctx, sequencerMsg)
376+
cert, err := b.das.Store(ctx, sequencerMsg, uint64(time.Now().Add(b.config.DASRetentionPeriod).Unix()))
373377
if err != nil {
374378
log.Warn("Unable to batch to DAS, falling back to storing data on chain", "err", err)
375379
} else {

das/das.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,8 @@ import (
1515
)
1616

1717
type DataAvailabilityServiceWriter interface {
18-
Store(ctx context.Context, message []byte) (*arbstate.DataAvailabilityCertificate, error)
18+
// Requests that the message be stored until timeout (UTC time in unix epoch seconds).
19+
Store(ctx context.Context, message []byte, timeout uint64) (*arbstate.DataAvailabilityCertificate, error)
1920
}
2021

2122
type DataAvailabilityService interface {

das/das_test.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ package das
66
import (
77
"bytes"
88
"context"
9+
"fmt"
910
"io/ioutil"
1011
"os"
1112
"testing"
13+
"time"
1214

1315
"github.com/offchainlabs/nitro/util/testhelpers"
1416
)
@@ -23,9 +25,13 @@ func TestDASStoreRetrieveMultipleInstances(t *testing.T) {
2325

2426
ctx := context.Background()
2527

28+
timeout := uint64(time.Now().Add(time.Hour * 24).Unix())
2629
messageSaved := []byte("hello world")
27-
cert, err := das.Store(ctx, messageSaved)
30+
cert, err := das.Store(ctx, messageSaved, timeout)
2831
Require(t, err, "Error storing message")
32+
if cert.Timeout != timeout {
33+
Fail(t, fmt.Sprintf("Expected timeout of %d in cert, was %d", timeout, cert.Timeout))
34+
}
2935

3036
certBytes := Serialize(*cert)
3137

@@ -57,8 +63,12 @@ func TestDASMissingMessage(t *testing.T) {
5763
ctx := context.Background()
5864

5965
messageSaved := []byte("hello world")
60-
cert, err := das.Store(ctx, messageSaved)
66+
timeout := uint64(time.Now().Add(time.Hour * 24).Unix())
67+
cert, err := das.Store(ctx, messageSaved, timeout)
6168
Require(t, err, "Error storing message")
69+
if cert.Timeout != timeout {
70+
Fail(t, fmt.Sprintf("Expected timeout of %d in cert, was %d", timeout, cert.Timeout))
71+
}
6272

6373
// Change the hash to look up
6474
cert.DataHash[0] += 1

das/dasrpc/dasRpcServer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (serv *DASRPCServer) Stop() {
4444
}
4545

4646
func (serv *DASRPCServer) Store(ctx context.Context, req *StoreRequest) (*StoreResponse, error) {
47-
cert, err := serv.localDAS.Store(ctx, req.Message)
47+
cert, err := serv.localDAS.Store(ctx, req.Message, req.Timeout)
4848
if err != nil {
4949
return nil, err
5050
}

das/local_disk_das.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@ import (
99
"errors"
1010
"os"
1111
"sync"
12-
"time"
1312

1413
"github.com/ethereum/go-ethereum/crypto"
1514
"github.com/ethereum/go-ethereum/log"
@@ -20,11 +19,10 @@ import (
2019
var dasMutex sync.Mutex
2120

2221
type LocalDiskDataAvailabilityService struct {
23-
dbPath string
24-
pubKey *blsSignatures.PublicKey
25-
privKey blsSignatures.PrivateKey
26-
retentionPeriod time.Duration
27-
signerMask uint64
22+
dbPath string
23+
pubKey *blsSignatures.PublicKey
24+
privKey blsSignatures.PrivateKey
25+
signerMask uint64
2826
}
2927

3028
func readKeysFromFile(dbPath string) (*blsSignatures.PublicKey, blsSignatures.PrivateKey, error) {
@@ -89,14 +87,14 @@ func NewLocalDiskDataAvailabilityService(dbPath string) (*LocalDiskDataAvailabil
8987
}, nil
9088
}
9189

92-
func (das *LocalDiskDataAvailabilityService) Store(ctx context.Context, message []byte) (c *arbstate.DataAvailabilityCertificate, err error) {
90+
func (das *LocalDiskDataAvailabilityService) Store(ctx context.Context, message []byte, timeout uint64) (c *arbstate.DataAvailabilityCertificate, err error) {
9391
dasMutex.Lock()
9492
defer dasMutex.Unlock()
9593

9694
c = &arbstate.DataAvailabilityCertificate{}
9795
copy(c.DataHash[:], crypto.Keccak256(message))
9896

99-
c.Timeout = uint64(time.Now().Add(das.retentionPeriod).Unix())
97+
c.Timeout = timeout
10098
c.SignersMask = das.signerMask
10199

102100
fields := serializeSignableFields(*c)

das/wireFormat.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ service DASServiceImpl {
1313

1414
message StoreRequest {
1515
bytes message = 1;
16+
uint64 timeout = 2;
1617
}
1718

1819
message StoreResponse {

0 commit comments

Comments
 (0)