Skip to content

Commit 7e3a9b4

Browse files
Merge pull request #610 from OffchainLabs/das-rpc
Switch DAS server over to rpc
2 parents f88b6fa + 1b826b7 commit 7e3a9b4

File tree

6 files changed

+161
-63
lines changed

6 files changed

+161
-63
lines changed

cmd/daserver/daserver.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,6 @@ func startup() error {
128128
return err
129129
}
130130
<-sigint
131-
server.Stop()
132131

133-
return nil
132+
return server.Shutdown(ctx)
134133
}

das/dasrpc/dasRpcClient.go

Lines changed: 28 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -6,75 +6,75 @@ package dasrpc
66
import (
77
"context"
88
"fmt"
9+
10+
"github.com/ethereum/go-ethereum/common/hexutil"
11+
12+
"github.com/ethereum/go-ethereum/rpc"
913
"github.com/offchainlabs/nitro/das"
1014

1115
"github.com/offchainlabs/nitro/arbstate"
1216
"github.com/offchainlabs/nitro/blsSignatures"
13-
"google.golang.org/grpc"
14-
"google.golang.org/grpc/credentials/insecure"
1517
)
1618

1719
type DASRPCClient struct { // implements DataAvailabilityService
18-
clnt DASServiceImplClient
20+
clnt *rpc.Client
1921
}
2022

2123
func NewDASRPCClient(target string) (*DASRPCClient, error) {
22-
// TODO revisit insecure setting
23-
conn, err := grpc.Dial(target, grpc.WithTransportCredentials(insecure.NewCredentials()))
24+
clnt, err := rpc.Dial(target)
2425
if err != nil {
2526
return nil, err
2627
}
27-
clnt := NewDASServiceImplClient(conn)
2828
return &DASRPCClient{clnt: clnt}, nil
2929
}
3030

31-
func (clnt *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) {
31+
func (c *DASRPCClient) Retrieve(ctx context.Context, cert *arbstate.DataAvailabilityCertificate) ([]byte, error) {
3232
certBytes := das.Serialize(cert)
33-
response, err := clnt.clnt.Retrieve(ctx, &RetrieveRequest{CertBytes: certBytes})
34-
if err != nil {
33+
var ret hexutil.Bytes
34+
if err := c.clnt.CallContext(ctx, &ret, "das_retrieve", hexutil.Bytes(certBytes)); err != nil {
3535
return nil, err
3636
}
37-
return response.Result, nil
37+
return ret, nil
3838
}
3939

40-
func (clnt *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) {
41-
response, err := clnt.clnt.Store(ctx, &StoreRequest{Message: message, Timeout: timeout, Sig: reqSig})
42-
if err != nil {
40+
func (c *DASRPCClient) Store(ctx context.Context, message []byte, timeout uint64, reqSig []byte) (*arbstate.DataAvailabilityCertificate, error) {
41+
var ret StoreResult
42+
if err := c.clnt.CallContext(ctx, &ret, "das_store", hexutil.Bytes(message), hexutil.Uint64(timeout), hexutil.Bytes(reqSig)); err != nil {
4343
return nil, err
4444
}
4545
var keysetHash [32]byte
46-
copy(keysetHash[:], response.KeysetHash)
46+
copy(keysetHash[:], ret.KeysetHash)
4747
var dataHash [32]byte
48-
copy(dataHash[:], response.DataHash)
49-
respSig, err := blsSignatures.SignatureFromBytes(response.Sig)
48+
copy(dataHash[:], ret.DataHash)
49+
respSig, err := blsSignatures.SignatureFromBytes(ret.Sig)
5050
if err != nil {
5151
return nil, err
5252
}
5353
return &arbstate.DataAvailabilityCertificate{
5454
DataHash: dataHash,
55-
Timeout: response.Timeout,
56-
SignersMask: response.SignersMask,
55+
Timeout: uint64(ret.Timeout),
56+
SignersMask: uint64(ret.SignersMask),
5757
Sig: respSig,
5858
KeysetHash: keysetHash,
5959
}, nil
6060
}
6161

62-
func (clnt *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) {
63-
response, err := clnt.clnt.KeysetFromHash(ctx, &KeysetFromHashRequest{KsHash: ksHash})
64-
if err != nil {
62+
func (c *DASRPCClient) KeysetFromHash(ctx context.Context, ksHash []byte) ([]byte, error) {
63+
var ret hexutil.Bytes
64+
if err := c.clnt.CallContext(ctx, &ret, "das_keysetFromHash", hexutil.Bytes(ksHash)); err != nil {
6565
return nil, err
6666
}
67-
return response.Result, nil
67+
return ret, nil
6868
}
6969

70-
func (clnt *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) {
71-
response, err := clnt.clnt.CurrentKeysetBytes(ctx, &CurrentKeysetBytesRequest{})
72-
if err != nil {
70+
func (c *DASRPCClient) CurrentKeysetBytes(ctx context.Context) ([]byte, error) {
71+
var ret hexutil.Bytes
72+
if err := c.clnt.CallContext(ctx, &ret, "das_currentKeysetBytes"); err != nil {
7373
return nil, err
7474
}
75-
return response.Result, nil
75+
return ret, nil
7676
}
7777

78-
func (clnt *DASRPCClient) String() string {
79-
return fmt.Sprintf("DASRPCClient{clnt:%v}", clnt.clnt)
78+
func (c *DASRPCClient) String() string {
79+
return fmt.Sprintf("DASRPCClient{c:%v}", c.clnt)
8080
}

das/dasrpc/dasRpcServer.go

Lines changed: 40 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -8,86 +8,99 @@ import (
88
"context"
99
"fmt"
1010
"net"
11+
"net/http"
12+
13+
"github.com/ethereum/go-ethereum/common/hexutil"
14+
15+
"github.com/ethereum/go-ethereum/rpc"
1116

1217
"github.com/offchainlabs/nitro/arbstate"
1318
"github.com/offchainlabs/nitro/blsSignatures"
1419
"github.com/offchainlabs/nitro/das"
15-
"google.golang.org/grpc"
1620
)
1721

1822
type DASRPCServer struct {
19-
UnimplementedDASServiceImplServer // this allows grpc to verify its version invariant
20-
grpcServer *grpc.Server
21-
localDAS das.DataAvailabilityService
23+
localDAS das.DataAvailabilityService
2224
}
2325

24-
func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*DASRPCServer, error) {
26+
func StartDASRPCServer(ctx context.Context, addr string, portNum uint64, localDAS das.DataAvailabilityService) (*http.Server, error) {
2527
listener, err := net.Listen("tcp", fmt.Sprintf("%s:%d", addr, portNum))
2628
if err != nil {
2729
return nil, err
2830
}
2931
return StartDASRPCServerOnListener(ctx, listener, localDAS)
3032
}
3133

32-
func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*DASRPCServer, error) {
33-
grpcServer := grpc.NewServer()
34-
dasServer := &DASRPCServer{grpcServer: grpcServer, localDAS: localDAS}
35-
RegisterDASServiceImplServer(grpcServer, dasServer)
34+
func StartDASRPCServerOnListener(ctx context.Context, listener net.Listener, localDAS das.DataAvailabilityService) (*http.Server, error) {
35+
rpcServer := rpc.NewServer()
36+
err := rpcServer.RegisterName("das", &DASRPCServer{localDAS: localDAS})
37+
if err != nil {
38+
return nil, err
39+
}
40+
41+
srv := &http.Server{
42+
Handler: rpcServer,
43+
}
44+
3645
go func() {
37-
err := grpcServer.Serve(listener)
46+
err := srv.Serve(listener)
3847
if err != nil {
3948
return
4049
}
4150
}()
4251
go func() {
4352
<-ctx.Done()
44-
grpcServer.GracefulStop()
53+
_ = srv.Shutdown(context.Background())
4554
}()
46-
return dasServer, nil
55+
return srv, nil
4756
}
4857

49-
func (serv *DASRPCServer) Stop() {
50-
serv.grpcServer.GracefulStop()
58+
type StoreResult struct {
59+
DataHash hexutil.Bytes `json:"dataHash,omitempty"`
60+
Timeout hexutil.Uint64 `json:"timeout,omitempty"`
61+
SignersMask hexutil.Uint64 `json:"signersMask,omitempty"`
62+
KeysetHash hexutil.Bytes `json:"keysetHash,omitempty"`
63+
Sig hexutil.Bytes `json:"sig,omitempty"`
5164
}
5265

53-
func (serv *DASRPCServer) Store(ctx context.Context, req *StoreRequest) (*StoreResponse, error) {
54-
cert, err := serv.localDAS.Store(ctx, req.Message, req.Timeout, req.Sig)
66+
func (serv *DASRPCServer) Store(ctx context.Context, message hexutil.Bytes, timeout hexutil.Uint64, sig hexutil.Bytes) (*StoreResult, error) {
67+
cert, err := serv.localDAS.Store(ctx, message, uint64(timeout), sig)
5568
if err != nil {
5669
return nil, err
5770
}
58-
return &StoreResponse{
71+
return &StoreResult{
5972
KeysetHash: cert.KeysetHash[:],
6073
DataHash: cert.DataHash[:],
61-
Timeout: cert.Timeout,
62-
SignersMask: cert.SignersMask,
74+
Timeout: hexutil.Uint64(cert.Timeout),
75+
SignersMask: hexutil.Uint64(cert.SignersMask),
6376
Sig: blsSignatures.SignatureToBytes(cert.Sig),
6477
}, nil
6578
}
6679

67-
func (serv *DASRPCServer) Retrieve(ctx context.Context, req *RetrieveRequest) (*RetrieveResponse, error) {
68-
cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(req.CertBytes))
80+
func (serv *DASRPCServer) Retrieve(ctx context.Context, certBytes hexutil.Bytes) (hexutil.Bytes, error) {
81+
cert, err := arbstate.DeserializeDASCertFrom(bytes.NewReader(certBytes))
6982
if err != nil {
7083
return nil, err
7184
}
7285
result, err := serv.localDAS.Retrieve(ctx, cert)
7386
if err != nil {
7487
return nil, err
7588
}
76-
return &RetrieveResponse{Result: result}, nil
89+
return result, nil
7790
}
7891

79-
func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, req *KeysetFromHashRequest) (*KeysetFromHashResponse, error) {
80-
resp, err := serv.localDAS.KeysetFromHash(ctx, req.KsHash)
92+
func (serv *DASRPCServer) KeysetFromHash(ctx context.Context, ksHash hexutil.Bytes) (hexutil.Bytes, error) {
93+
resp, err := serv.localDAS.KeysetFromHash(ctx, ksHash)
8194
if err != nil {
8295
return nil, err
8396
}
84-
return &KeysetFromHashResponse{Result: resp}, nil
97+
return resp, nil
8598
}
8699

87-
func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context, req *CurrentKeysetBytesRequest) (*CurrentKeysetBytesResponse, error) {
100+
func (serv *DASRPCServer) CurrentKeysetBytes(ctx context.Context) (hexutil.Bytes, error) {
88101
resp, err := serv.localDAS.CurrentKeysetBytes(ctx)
89102
if err != nil {
90103
return nil, err
91104
}
92-
return &CurrentKeysetBytesResponse{Result: resp}, nil
105+
return resp, nil
93106
}

das/dasrpc/rpc_aggregator.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,9 @@ package dasrpc
55

66
import (
77
"encoding/json"
8+
9+
"github.com/offchainlabs/nitro/solgen/go/bridgegen"
10+
811
"github.com/ethereum/go-ethereum/common"
912
"github.com/offchainlabs/nitro/arbutil"
1013

@@ -33,6 +36,14 @@ func NewRPCAggregatorWithL1Info(config das.AggregatorConfig, l1client arbutil.L1
3336
return das.NewAggregatorWithL1Info(config, services, l1client, seqInboxAddress)
3437
}
3538

39+
func NewRPCAggregatorWithSeqInboxCaller(config das.AggregatorConfig, seqInboxCaller *bridgegen.SequencerInboxCaller) (*das.Aggregator, error) {
40+
services, err := setUpServices(config)
41+
if err != nil {
42+
return nil, err
43+
}
44+
return das.NewAggregatorWithSeqInboxCaller(config, services, seqInboxCaller)
45+
}
46+
3647
func setUpServices(config das.AggregatorConfig) ([]das.ServiceDetails, error) {
3748
var cs []BackendConfig
3849
err := json.Unmarshal([]byte(config.Backends), &cs)

das/dasrpc/rpc_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
package dasrpc
2+
3+
import (
4+
"bytes"
5+
"context"
6+
"encoding/base64"
7+
"encoding/json"
8+
"net"
9+
"testing"
10+
11+
"github.com/offchainlabs/nitro/blsSignatures"
12+
"github.com/offchainlabs/nitro/das"
13+
"github.com/offchainlabs/nitro/util/testhelpers"
14+
)
15+
16+
func blsPubToBase64(pubkey *blsSignatures.PublicKey) string {
17+
pubkeyBytes := blsSignatures.PublicKeyToBytes(*pubkey)
18+
encodedPubkey := make([]byte, base64.StdEncoding.EncodedLen(len(pubkeyBytes)))
19+
base64.StdEncoding.Encode(encodedPubkey, pubkeyBytes)
20+
return string(encodedPubkey)
21+
}
22+
23+
func TestRPC(t *testing.T) {
24+
ctx := context.Background()
25+
lis, err := net.Listen("tcp", "localhost:0")
26+
testhelpers.RequireImpl(t, err)
27+
keyDir := t.TempDir()
28+
dataDir := t.TempDir()
29+
pubkey, _, err := das.GenerateAndStoreKeys(keyDir)
30+
testhelpers.RequireImpl(t, err)
31+
dasConfig := das.LocalDiskDASConfig{
32+
KeyDir: keyDir,
33+
DataDir: dataDir,
34+
}
35+
localDas, err := das.NewLocalDiskDASWithSeqInboxCaller(dasConfig, nil)
36+
testhelpers.RequireImpl(t, err)
37+
dasServer, err := StartDASRPCServerOnListener(ctx, lis, localDas)
38+
defer func() {
39+
if err := dasServer.Shutdown(ctx); err != nil {
40+
panic(err)
41+
}
42+
}()
43+
testhelpers.RequireImpl(t, err)
44+
config := BackendConfig{
45+
URL: "http://" + lis.Addr().String(),
46+
PubKeyBase64Encoded: blsPubToBase64(pubkey),
47+
SignerMask: 1,
48+
}
49+
50+
backendsJsonByte, err := json.Marshal([]BackendConfig{config})
51+
testhelpers.RequireImpl(t, err)
52+
aggConf := das.AggregatorConfig{
53+
AssumedHonest: 1,
54+
Backends: string(backendsJsonByte),
55+
}
56+
rpcAgg, err := NewRPCAggregatorWithSeqInboxCaller(aggConf, nil)
57+
testhelpers.RequireImpl(t, err)
58+
59+
msg := testhelpers.RandomizeSlice(make([]byte, 100))
60+
cert, err := rpcAgg.Store(ctx, msg, 0, nil)
61+
testhelpers.RequireImpl(t, err)
62+
63+
retrievedMessage, err := rpcAgg.Retrieve(ctx, cert)
64+
testhelpers.RequireImpl(t, err)
65+
66+
if !bytes.Equal(msg, retrievedMessage) {
67+
testhelpers.FailImpl(t, "failed to retrieve correct message")
68+
}
69+
}

system_tests/das_test.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,13 +4,15 @@ import (
44
"context"
55
"encoding/base64"
66
"encoding/json"
7-
"github.com/ethereum/go-ethereum/common"
8-
"github.com/offchainlabs/nitro/arbutil"
97
"math/big"
108
"net"
9+
"net/http"
1110
"testing"
1211
"time"
1312

13+
"github.com/ethereum/go-ethereum/common"
14+
"github.com/offchainlabs/nitro/arbutil"
15+
1416
"github.com/ethereum/go-ethereum/ethclient"
1517

1618
"github.com/offchainlabs/nitro/blsSignatures"
@@ -30,7 +32,7 @@ func startLocalDASServer(
3032
dataDir string,
3133
l1client arbutil.L1Interface,
3234
seqInboxAddress common.Address,
33-
) (*dasrpc.DASRPCServer, *blsSignatures.PublicKey, dasrpc.BackendConfig) {
35+
) (*http.Server, *blsSignatures.PublicKey, dasrpc.BackendConfig) {
3436
lis, err := net.Listen("tcp", "localhost:0")
3537
Require(t, err)
3638
keyDir := t.TempDir()
@@ -45,7 +47,7 @@ func startLocalDASServer(
4547
dasServer, err := dasrpc.StartDASRPCServerOnListener(ctx, lis, localDas)
4648
Require(t, err)
4749
config := dasrpc.BackendConfig{
48-
URL: lis.Addr().String(),
50+
URL: "http://" + lis.Addr().String(),
4951
PubKeyBase64Encoded: blsPubToBase64(pubkey),
5052
SignerMask: 1,
5153
}
@@ -109,9 +111,13 @@ func TestDASRekey(t *testing.T) {
109111
nodeA.StopAndWait()
110112
nodeB.StopAndWait()
111113

112-
dasServerA.Stop()
114+
err = dasServerA.Shutdown(ctx)
115+
Require(t, err)
113116
dasServerB, pubkeyB, backendConfigB := startLocalDASServer(t, ctx, dasDataDir, l1client, addresses.SequencerInbox)
114-
defer dasServerB.Stop()
117+
defer func() {
118+
err = dasServerB.Shutdown(ctx)
119+
Require(t, err)
120+
}()
115121
authorizeDASKeyset(t, ctx, pubkeyB, l1info, l1client)
116122

117123
// Restart the node on the new keyset against the new DAS server running on the same disk as the first with new keys

0 commit comments

Comments
 (0)