|
| 1 | +package fibre_test |
| 2 | + |
| 3 | +import ( |
| 4 | + "context" |
| 5 | + "crypto/rand" |
| 6 | + "net" |
| 7 | + "path/filepath" |
| 8 | + "testing" |
| 9 | + "time" |
| 10 | + |
| 11 | + sdkmath "cosmossdk.io/math" |
| 12 | + "github.com/celestiaorg/celestia-app-fibre/v6/app" |
| 13 | + "github.com/celestiaorg/celestia-app-fibre/v6/app/encoding" |
| 14 | + "github.com/celestiaorg/celestia-app-fibre/v6/fibre" |
| 15 | + grpcfibre "github.com/celestiaorg/celestia-app-fibre/v6/fibre/grpc" |
| 16 | + "github.com/celestiaorg/celestia-app-fibre/v6/pkg/appconsts" |
| 17 | + "github.com/celestiaorg/celestia-app-fibre/v6/pkg/user" |
| 18 | + "github.com/celestiaorg/celestia-app-fibre/v6/test/util/testnode" |
| 19 | + fibretypes "github.com/celestiaorg/celestia-app-fibre/v6/x/fibre/types" |
| 20 | + valtypes "github.com/celestiaorg/celestia-app-fibre/v6/x/valaddr/types" |
| 21 | + "github.com/celestiaorg/go-square/v4/share" |
| 22 | + "github.com/cometbft/cometbft/privval" |
| 23 | + coregrpc "github.com/cometbft/cometbft/rpc/grpc" |
| 24 | + core "github.com/cometbft/cometbft/types" |
| 25 | + "github.com/cosmos/cosmos-sdk/client/grpc/cmtservice" |
| 26 | + sdk "github.com/cosmos/cosmos-sdk/types" |
| 27 | + stakingtypes "github.com/cosmos/cosmos-sdk/x/staking/types" |
| 28 | + "github.com/stretchr/testify/require" |
| 29 | + "github.com/stretchr/testify/suite" |
| 30 | + "google.golang.org/grpc" |
| 31 | + "google.golang.org/grpc/credentials/insecure" |
| 32 | +) |
| 33 | + |
| 34 | +func TestFibreE2ETestSuite(t *testing.T) { |
| 35 | + if testing.Short() { |
| 36 | + t.Skip("skipping fibre e2e test in short mode") |
| 37 | + } |
| 38 | + suite.Run(t, new(FibreE2ETestSuite)) |
| 39 | +} |
| 40 | + |
| 41 | +type FibreE2ETestSuite struct { |
| 42 | + suite.Suite |
| 43 | + |
| 44 | + ecfg encoding.Config |
| 45 | + cctx testnode.Context |
| 46 | + |
| 47 | + fibreServer *fibre.Server |
| 48 | + grpcServer *grpc.Server |
| 49 | + grpcAddr string |
| 50 | + |
| 51 | + valSetGetter *grpcfibre.SetGetter |
| 52 | + hostRegistry *grpcfibre.HostRegistry |
| 53 | + |
| 54 | + fibreClient *fibre.Client |
| 55 | +} |
| 56 | + |
| 57 | +func (s *FibreE2ETestSuite) SetupSuite() { |
| 58 | + t := s.T() |
| 59 | + |
| 60 | + s.ecfg = encoding.MakeConfig(app.ModuleEncodingRegisters...) |
| 61 | + |
| 62 | + cfg := testnode.DefaultConfig(). |
| 63 | + WithFundedAccounts(fibre.DefaultKeyName). |
| 64 | + WithDelayedPrecommitTimeout(500 * time.Millisecond) |
| 65 | + |
| 66 | + cctx, _, _ := testnode.NewNetwork(t, cfg) |
| 67 | + s.cctx = cctx |
| 68 | + |
| 69 | + _, err := s.cctx.WaitForHeight(1) |
| 70 | + require.NoError(t, err, "failed to wait for first block") |
| 71 | + |
| 72 | + s.setupFibreServer(t) |
| 73 | + s.setupFibreClient(t) |
| 74 | +} |
| 75 | + |
| 76 | +func (s *FibreE2ETestSuite) setupFibreServer(t *testing.T) { |
| 77 | + t.Helper() |
| 78 | + |
| 79 | + listener, err := net.Listen("tcp", "127.0.0.1:0") |
| 80 | + require.NoError(t, err) |
| 81 | + s.grpcAddr = listener.Addr().String() |
| 82 | + |
| 83 | + // Load the validator's private key from testnode config files. |
| 84 | + pvKeyFile := filepath.Join(s.cctx.HomeDir, "config", "priv_validator_key.json") |
| 85 | + pvStateFile := filepath.Join(s.cctx.HomeDir, "data", "priv_validator_state.json") |
| 86 | + filePV := privval.LoadFilePV(pvKeyFile, pvStateFile) |
| 87 | + |
| 88 | + fibreQueryClient := fibretypes.NewQueryClient(s.cctx.GRPCClient) |
| 89 | + s.valSetGetter = grpcfibre.NewSetGetter(coregrpc.NewBlockAPIClient(s.cctx.GRPCClient)) |
| 90 | + |
| 91 | + serverCfg := fibre.DefaultServerConfig() |
| 92 | + serverCfg.ChainID = s.cctx.ChainID |
| 93 | + |
| 94 | + s.fibreServer, err = fibre.NewInMemoryServer(filePV, fibreQueryClient, s.valSetGetter, serverCfg) |
| 95 | + require.NoError(t, err) |
| 96 | + s.fibreServer.Start() |
| 97 | + |
| 98 | + maxMsgSize := serverCfg.MaxMessageSize |
| 99 | + s.grpcServer = grpc.NewServer( |
| 100 | + grpc.MaxRecvMsgSize(maxMsgSize), |
| 101 | + grpc.MaxSendMsgSize(maxMsgSize), |
| 102 | + ) |
| 103 | + fibretypes.RegisterFibreServer(s.grpcServer, s.fibreServer) |
| 104 | + |
| 105 | + go func() { _ = s.grpcServer.Serve(listener) }() |
| 106 | +} |
| 107 | + |
| 108 | +func (s *FibreE2ETestSuite) setupFibreClient(t *testing.T) { |
| 109 | + t.Helper() |
| 110 | + |
| 111 | + txClient, err := user.SetupTxClient( |
| 112 | + s.cctx.GoContext(), |
| 113 | + s.cctx.Keyring, |
| 114 | + s.cctx.GRPCClient, |
| 115 | + s.ecfg, |
| 116 | + user.WithDefaultAccount(fibre.DefaultKeyName), |
| 117 | + ) |
| 118 | + require.NoError(t, err) |
| 119 | + |
| 120 | + s.hostRegistry = grpcfibre.NewHostRegistry(valtypes.NewQueryClient(s.cctx.GRPCClient)) |
| 121 | + |
| 122 | + clientCfg := fibre.DefaultClientConfig() |
| 123 | + clientCfg.ChainID = s.cctx.ChainID |
| 124 | + clientCfg.NewClientFn = newTestClientFn(s.grpcAddr, clientCfg.MaxMessageSize) |
| 125 | + |
| 126 | + s.fibreClient, err = fibre.NewClient(txClient, s.cctx.Keyring, s.valSetGetter, s.hostRegistry, clientCfg) |
| 127 | + require.NoError(t, err) |
| 128 | +} |
| 129 | + |
| 130 | +func (s *FibreE2ETestSuite) TearDownSuite() { |
| 131 | + if s.grpcServer != nil { |
| 132 | + s.grpcServer.Stop() |
| 133 | + } |
| 134 | + if s.fibreServer != nil { |
| 135 | + _ = s.fibreServer.Stop() |
| 136 | + } |
| 137 | + if s.fibreClient != nil { |
| 138 | + _ = s.fibreClient.Close() |
| 139 | + } |
| 140 | +} |
| 141 | + |
| 142 | +func (s *FibreE2ETestSuite) Test01RegisterValidator() { |
| 143 | + t := s.T() |
| 144 | + ctx := s.cctx.GoContext() |
| 145 | + |
| 146 | + // Get the validator's operator address from the staking module. |
| 147 | + stakingClient := stakingtypes.NewQueryClient(s.cctx.GRPCClient) |
| 148 | + validatorsResp, err := stakingClient.Validators(ctx, &stakingtypes.QueryValidatorsRequest{}) |
| 149 | + require.NoError(t, err) |
| 150 | + require.Len(t, validatorsResp.Validators, 1) |
| 151 | + |
| 152 | + valOperatorAddr := validatorsResp.Validators[0].OperatorAddress |
| 153 | + |
| 154 | + // Submit MsgSetFibreProviderInfo to register the fibre server's gRPC address. |
| 155 | + txClient, err := testnode.NewTxClientFromContext(s.cctx) |
| 156 | + require.NoError(t, err) |
| 157 | + |
| 158 | + msg := &valtypes.MsgSetFibreProviderInfo{ |
| 159 | + Signer: valOperatorAddr, |
| 160 | + Host: s.grpcAddr, |
| 161 | + } |
| 162 | + |
| 163 | + txResp, err := txClient.SubmitTx(ctx, []sdk.Msg{msg}, user.SetGasLimit(200_000), user.SetFee(5_000)) |
| 164 | + require.NoError(t, err) |
| 165 | + require.Equal(t, uint32(0), txResp.Code) |
| 166 | + t.Logf("RegisterValidator tx included at height %d, hash: %s", txResp.Height, txResp.TxHash) |
| 167 | + |
| 168 | + // Verify the host is now registered. |
| 169 | + valAddrClient := valtypes.NewQueryClient(s.cctx.GRPCClient) |
| 170 | + |
| 171 | + // Derive the validator consensus address via the cometbft service. |
| 172 | + tmServiceClient := cmtservice.NewServiceClient(s.cctx.GRPCClient) |
| 173 | + valSetResp, err := tmServiceClient.GetLatestValidatorSet(ctx, &cmtservice.GetLatestValidatorSetRequest{}) |
| 174 | + require.NoError(t, err) |
| 175 | + require.Len(t, valSetResp.Validators, 1) |
| 176 | + consAddr, err := sdk.ConsAddressFromBech32(valSetResp.Validators[0].Address) |
| 177 | + require.NoError(t, err) |
| 178 | + |
| 179 | + resp, err := valAddrClient.FibreProviderInfo(ctx, &valtypes.QueryFibreProviderInfoRequest{ |
| 180 | + ValidatorConsensusAddress: consAddr.String(), |
| 181 | + }) |
| 182 | + require.NoError(t, err) |
| 183 | + require.True(t, resp.Found) |
| 184 | + require.Equal(t, s.grpcAddr, resp.Info.Host) |
| 185 | + |
| 186 | + // Refresh the host registry so the client can find the validator. |
| 187 | + err = s.hostRegistry.Start(ctx) |
| 188 | + require.NoError(t, err) |
| 189 | +} |
| 190 | + |
| 191 | +func (s *FibreE2ETestSuite) Test02FundEscrowAccount() { |
| 192 | + t := s.T() |
| 193 | + ctx := s.cctx.GoContext() |
| 194 | + |
| 195 | + // Get client address from the keyring. |
| 196 | + keyInfo, err := s.cctx.Keyring.Key(fibre.DefaultKeyName) |
| 197 | + require.NoError(t, err) |
| 198 | + addr, err := keyInfo.GetAddress() |
| 199 | + require.NoError(t, err) |
| 200 | + |
| 201 | + fibreQueryClient := fibretypes.NewQueryClient(s.cctx.GRPCClient) |
| 202 | + |
| 203 | + // Verify escrow account doesn't exist yet. |
| 204 | + escrowResp, err := fibreQueryClient.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{ |
| 205 | + Signer: addr.String(), |
| 206 | + }) |
| 207 | + require.NoError(t, err) |
| 208 | + require.False(t, escrowResp.Found) |
| 209 | + |
| 210 | + txClient, err := user.SetupTxClient( |
| 211 | + ctx, s.cctx.Keyring, s.cctx.GRPCClient, s.ecfg, |
| 212 | + user.WithDefaultAccount(fibre.DefaultKeyName), |
| 213 | + ) |
| 214 | + require.NoError(t, err) |
| 215 | + |
| 216 | + // First deposit: 50 TIA (50_000_000 utia). |
| 217 | + depositAmount := sdk.NewCoin(appconsts.BondDenom, sdkmath.NewInt(50_000_000)) |
| 218 | + msg := &fibretypes.MsgDepositToEscrow{ |
| 219 | + Signer: addr.String(), |
| 220 | + Amount: depositAmount, |
| 221 | + } |
| 222 | + txResp, err := txClient.SubmitTx(ctx, []sdk.Msg{msg}, user.SetGasLimit(200_000), user.SetFee(5_000)) |
| 223 | + require.NoError(t, err) |
| 224 | + require.Equal(t, uint32(0), txResp.Code) |
| 225 | + t.Logf("First deposit tx at height %d", txResp.Height) |
| 226 | + |
| 227 | + // Verify escrow balance matches deposit. |
| 228 | + escrowResp, err = fibreQueryClient.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{ |
| 229 | + Signer: addr.String(), |
| 230 | + }) |
| 231 | + require.NoError(t, err) |
| 232 | + require.True(t, escrowResp.Found) |
| 233 | + require.Equal(t, depositAmount, escrowResp.EscrowAccount.Balance) |
| 234 | + |
| 235 | + // Second deposit: 25 TIA (25_000_000 utia). |
| 236 | + depositAmount2 := sdk.NewCoin(appconsts.BondDenom, sdkmath.NewInt(25_000_000)) |
| 237 | + msg2 := &fibretypes.MsgDepositToEscrow{ |
| 238 | + Signer: addr.String(), |
| 239 | + Amount: depositAmount2, |
| 240 | + } |
| 241 | + txResp, err = txClient.SubmitTx(ctx, []sdk.Msg{msg2}, user.SetGasLimit(200_000), user.SetFee(5_000)) |
| 242 | + require.NoError(t, err) |
| 243 | + require.Equal(t, uint32(0), txResp.Code) |
| 244 | + t.Logf("Second deposit tx at height %d", txResp.Height) |
| 245 | + |
| 246 | + // Verify cumulative balance is 75 TIA. |
| 247 | + escrowResp, err = fibreQueryClient.EscrowAccount(ctx, &fibretypes.QueryEscrowAccountRequest{ |
| 248 | + Signer: addr.String(), |
| 249 | + }) |
| 250 | + require.NoError(t, err) |
| 251 | + require.True(t, escrowResp.Found) |
| 252 | + expectedBalance := sdk.NewCoin(appconsts.BondDenom, sdkmath.NewInt(75_000_000)) |
| 253 | + require.Equal(t, expectedBalance, escrowResp.EscrowAccount.Balance) |
| 254 | +} |
| 255 | + |
| 256 | +func (s *FibreE2ETestSuite) Test03Put() { |
| 257 | + t := s.T() |
| 258 | + ctx := s.cctx.GoContext() |
| 259 | + |
| 260 | + // Wait for a fresh block to avoid clock skew with payment promise. |
| 261 | + err := s.cctx.WaitForNextBlock() |
| 262 | + require.NoError(t, err) |
| 263 | + |
| 264 | + // Generate 4 KiB of random test data. |
| 265 | + testData := make([]byte, 4*1024) |
| 266 | + _, err = rand.Read(testData) |
| 267 | + require.NoError(t, err) |
| 268 | + |
| 269 | + ns := share.MustNewV0Namespace([]byte{0xDE, 0xAD}) |
| 270 | + |
| 271 | + result, err := s.fibreClient.Put(ctx, ns, testData) |
| 272 | + require.NoError(t, err) |
| 273 | + |
| 274 | + // Verify Put result. |
| 275 | + require.NotEmpty(t, result.Commitment.String(), "commitment should not be empty") |
| 276 | + require.NotEmpty(t, result.ValidatorSignatures, "should have validator signatures") |
| 277 | + require.NotEmpty(t, result.TxHash, "tx hash should not be empty") |
| 278 | + require.Greater(t, result.Height, uint64(0), "height should be positive") |
| 279 | + t.Logf("Put result: commitment=%s, txHash=%s, height=%d", result.Commitment.String(), result.TxHash, result.Height) |
| 280 | + |
| 281 | + // Verify data was stored in server's store. |
| 282 | + shard, err := s.fibreServer.Store().Get(ctx, result.Commitment) |
| 283 | + require.NoError(t, err) |
| 284 | + require.NotNil(t, shard) |
| 285 | + require.NotEmpty(t, shard.Rows, "stored shard should have rows") |
| 286 | + require.NotNil(t, shard.GetRoot(), "stored shard should have an RLC root") |
| 287 | + require.Len(t, shard.GetRoot(), 32, "RLC root should be 32 bytes") |
| 288 | + |
| 289 | + // Verify the PayForFibre tx was included on chain by waiting for the block. |
| 290 | + _, err = s.cctx.WaitForTx(result.TxHash, 5) |
| 291 | + require.NoError(t, err) |
| 292 | +} |
| 293 | + |
| 294 | +// newTestClientFn returns a [grpcfibre.NewClientFn] that always connects to a fixed |
| 295 | +// address, bypassing the host registry lookup. This is needed because the |
| 296 | +// fibre server in this test is running on a separate gRPC listener from the |
| 297 | +// testnode's gRPC server. |
| 298 | +func newTestClientFn(addr string, maxMsgSize int) grpcfibre.NewClientFn { |
| 299 | + return func(_ context.Context, _ *core.Validator) (grpcfibre.Client, error) { |
| 300 | + conn, err := grpc.NewClient(addr, |
| 301 | + grpc.WithTransportCredentials(insecure.NewCredentials()), |
| 302 | + grpc.WithDefaultCallOptions( |
| 303 | + grpc.MaxCallRecvMsgSize(maxMsgSize), |
| 304 | + grpc.MaxCallSendMsgSize(maxMsgSize), |
| 305 | + ), |
| 306 | + ) |
| 307 | + if err != nil { |
| 308 | + return nil, err |
| 309 | + } |
| 310 | + return &fibreClientCloser{ |
| 311 | + FibreClient: fibretypes.NewFibreClient(conn), |
| 312 | + conn: conn, |
| 313 | + }, nil |
| 314 | + } |
| 315 | +} |
| 316 | + |
| 317 | +// fibreClientCloser wraps a [fibretypes.FibreClient] and [grpc.ClientConn] to implement [grpcfibre.Client]. |
| 318 | +type fibreClientCloser struct { |
| 319 | + fibretypes.FibreClient |
| 320 | + conn *grpc.ClientConn |
| 321 | +} |
| 322 | + |
| 323 | +func (f *fibreClientCloser) Close() error { |
| 324 | + return f.conn.Close() |
| 325 | +} |
0 commit comments