Skip to content

Commit f394c03

Browse files
Merge branch 'master' into illia-malachyn/747-subscribe-account-statutes-endpoint
2 parents bbc5240 + 8b37d47 commit f394c03

5 files changed

Lines changed: 356 additions & 0 deletions

File tree

access/grpc/client.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,29 @@ func (c *Client) SubscribeEventsByBlockHeight(
329329
return c.grpc.SubscribeEventsByBlockHeight(ctx, startHeight, filter, WithHeartbeatInterval(conf.heartbeatInterval))
330330
}
331331

332+
func (c *Client) SubscribeBlockDigestsFromStartBlockID(
333+
ctx context.Context,
334+
startBlockID flow.Identifier,
335+
blockStatus flow.BlockStatus,
336+
) (<-chan flow.BlockDigest, <-chan error, error) {
337+
return c.grpc.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, blockStatus)
338+
}
339+
340+
func (c *Client) SubscribeBlockDigestsFromStartHeight(
341+
ctx context.Context,
342+
startHeight uint64,
343+
blockStatus flow.BlockStatus,
344+
) (<-chan flow.BlockDigest, <-chan error, error) {
345+
return c.grpc.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, blockStatus)
346+
}
347+
348+
func (c *Client) SubscribeBlockDigestsFromLatest(
349+
ctx context.Context,
350+
blockStatus flow.BlockStatus,
351+
) (<-chan flow.BlockDigest, <-chan error, error) {
352+
return c.grpc.SubscribeBlockDigestsFromLatest(ctx, blockStatus)
353+
}
354+
332355
func (c *Client) SubscribeBlocksFromStartBlockID(
333356
ctx context.Context,
334357
startBlockID flow.Identifier,

access/grpc/convert/convert.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -245,6 +245,22 @@ func MessageToBlockHeader(m *entities.BlockHeader) (flow.BlockHeader, error) {
245245
}, nil
246246
}
247247

248+
func MessageToBlockDigest(m *access.SubscribeBlockDigestsResponse) flow.BlockDigest {
249+
return flow.BlockDigest{
250+
BlockID: flow.BytesToID(m.GetBlockId()),
251+
Height: m.GetBlockHeight(),
252+
Timestamp: m.GetBlockTimestamp().AsTime(),
253+
}
254+
}
255+
256+
func BlockDigestToMessage(blockDigest flow.BlockDigest) *access.SubscribeBlockDigestsResponse {
257+
return &access.SubscribeBlockDigestsResponse{
258+
BlockId: IdentifierToMessage(blockDigest.BlockID),
259+
BlockHeight: blockDigest.Height,
260+
BlockTimestamp: timestamppb.New(blockDigest.Timestamp),
261+
}
262+
}
263+
248264
func BlockStatusToEntity(blockStatus flow.BlockStatus) entities.BlockStatus {
249265
switch blockStatus {
250266
case flow.BlockStatusFinalized:

access/grpc/grpc.go

Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1658,3 +1658,138 @@ func receiveAccountStatusesFromStream[Stream interface {
16581658
}
16591659
}
16601660
}
1661+
1662+
func (c *BaseClient) SubscribeBlockDigestsFromStartBlockID(
1663+
ctx context.Context,
1664+
startBlockID flow.Identifier,
1665+
blockStatus flow.BlockStatus,
1666+
opts ...grpc.CallOption,
1667+
) (<-chan flow.BlockDigest, <-chan error, error) {
1668+
status := convert.BlockStatusToEntity(blockStatus)
1669+
if status == entities.BlockStatus_BLOCK_UNKNOWN {
1670+
return nil, nil, newRPCError(errors.New("unknown block status"))
1671+
}
1672+
1673+
request := &access.SubscribeBlockDigestsFromStartBlockIDRequest{
1674+
StartBlockId: startBlockID.Bytes(),
1675+
BlockStatus: status,
1676+
}
1677+
1678+
subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartBlockID(ctx, request, opts...)
1679+
if err != nil {
1680+
return nil, nil, newRPCError(err)
1681+
}
1682+
1683+
blocksChan := make(chan flow.BlockDigest)
1684+
errChan := make(chan error)
1685+
1686+
go func() {
1687+
defer close(blocksChan)
1688+
defer close(errChan)
1689+
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
1690+
}()
1691+
1692+
return blocksChan, errChan, nil
1693+
}
1694+
1695+
func (c *BaseClient) SubscribeBlockDigestsFromStartHeight(
1696+
ctx context.Context,
1697+
startHeight uint64,
1698+
blockStatus flow.BlockStatus,
1699+
opts ...grpc.CallOption,
1700+
) (<-chan flow.BlockDigest, <-chan error, error) {
1701+
status := convert.BlockStatusToEntity(blockStatus)
1702+
if status == entities.BlockStatus_BLOCK_UNKNOWN {
1703+
return nil, nil, newRPCError(errors.New("unknown block status"))
1704+
}
1705+
1706+
request := &access.SubscribeBlockDigestsFromStartHeightRequest{
1707+
StartBlockHeight: startHeight,
1708+
BlockStatus: status,
1709+
}
1710+
1711+
subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromStartHeight(ctx, request, opts...)
1712+
if err != nil {
1713+
return nil, nil, newRPCError(err)
1714+
}
1715+
1716+
blocksChan := make(chan flow.BlockDigest)
1717+
errChan := make(chan error)
1718+
1719+
go func() {
1720+
defer close(blocksChan)
1721+
defer close(errChan)
1722+
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
1723+
}()
1724+
1725+
return blocksChan, errChan, nil
1726+
}
1727+
1728+
func (c *BaseClient) SubscribeBlockDigestsFromLatest(
1729+
ctx context.Context,
1730+
blockStatus flow.BlockStatus,
1731+
opts ...grpc.CallOption,
1732+
) (<-chan flow.BlockDigest, <-chan error, error) {
1733+
status := convert.BlockStatusToEntity(blockStatus)
1734+
if status == entities.BlockStatus_BLOCK_UNKNOWN {
1735+
return nil, nil, newRPCError(errors.New("unknown block status"))
1736+
}
1737+
1738+
request := &access.SubscribeBlockDigestsFromLatestRequest{
1739+
BlockStatus: status,
1740+
}
1741+
1742+
subscribeClient, err := c.rpcClient.SubscribeBlockDigestsFromLatest(ctx, request, opts...)
1743+
if err != nil {
1744+
return nil, nil, newRPCError(err)
1745+
}
1746+
1747+
blocksChan := make(chan flow.BlockDigest)
1748+
errChan := make(chan error)
1749+
1750+
go func() {
1751+
defer close(blocksChan)
1752+
defer close(errChan)
1753+
receiveBlockDigestFromClient(ctx, subscribeClient, blocksChan, errChan)
1754+
}()
1755+
1756+
return blocksChan, errChan, nil
1757+
}
1758+
1759+
func receiveBlockDigestFromClient[Client interface {
1760+
Recv() (*access.SubscribeBlockDigestsResponse, error)
1761+
}](
1762+
ctx context.Context,
1763+
client Client,
1764+
blockDigestsChan chan<- flow.BlockDigest,
1765+
errChan chan<- error,
1766+
) {
1767+
sendErr := func(err error) {
1768+
select {
1769+
case <-ctx.Done():
1770+
case errChan <- err:
1771+
}
1772+
}
1773+
1774+
for {
1775+
// Receive the next blockDigest response
1776+
blockDigestResponse, err := client.Recv()
1777+
if err != nil {
1778+
if err == io.EOF {
1779+
// End of stream, return gracefully
1780+
return
1781+
}
1782+
1783+
sendErr(fmt.Errorf("error receiving blockDigest: %w", err))
1784+
return
1785+
}
1786+
1787+
blockDigest := convert.MessageToBlockDigest(blockDigestResponse)
1788+
1789+
select {
1790+
case <-ctx.Done():
1791+
return
1792+
case blockDigestsChan <- blockDigest:
1793+
}
1794+
}
1795+
}

access/grpc/grpc_test.go

Lines changed: 175 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2958,6 +2958,181 @@ func assertNoBlockHeaders[BlockHeader any](t *testing.T, blockHeadersChan <-chan
29582958
}
29592959
}
29602960

2961+
func TestClient_SubscribeBlockDigest(t *testing.T) {
2962+
blockHeaders := test.BlockHeaderGenerator()
2963+
2964+
generateBlockDigestResponses := func(count uint64) []*access.SubscribeBlockDigestsResponse {
2965+
var resBlockDigests []*access.SubscribeBlockDigestsResponse
2966+
2967+
for i := uint64(0); i < count; i++ {
2968+
blockHeader := blockHeaders.New()
2969+
2970+
digest := flow.BlockDigest{
2971+
BlockID: blockHeader.ID,
2972+
Height: blockHeader.Height,
2973+
Timestamp: blockHeader.Timestamp,
2974+
}
2975+
2976+
resBlockDigests = append(resBlockDigests, convert.BlockDigestToMessage(digest))
2977+
}
2978+
2979+
return resBlockDigests
2980+
}
2981+
2982+
t.Run("Happy Path - from start height", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
2983+
startHeight := uint64(1)
2984+
responseCount := uint64(100)
2985+
2986+
ctx, cancel := context.WithCancel(ctx)
2987+
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
2988+
ctx: ctx,
2989+
responses: generateBlockDigestResponses(responseCount),
2990+
}
2991+
2992+
rpc.
2993+
On("SubscribeBlockDigestsFromStartHeight", ctx, mock.Anything).
2994+
Return(stream, nil)
2995+
2996+
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartHeight(ctx, startHeight, flow.BlockStatusSealed)
2997+
require.NoError(t, err)
2998+
2999+
wg := sync.WaitGroup{}
3000+
wg.Add(1)
3001+
go assertNoErrors(t, errCh, wg.Done)
3002+
3003+
for i := uint64(0); i < responseCount; i++ {
3004+
actualDigest := <-blockDigestsCh
3005+
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
3006+
require.Equal(t, expectedDigest, actualDigest)
3007+
}
3008+
cancel()
3009+
3010+
wg.Wait()
3011+
}))
3012+
3013+
t.Run("Happy Path - from start block id", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
3014+
responseCount := uint64(100)
3015+
3016+
ctx, cancel := context.WithCancel(ctx)
3017+
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
3018+
ctx: ctx,
3019+
responses: generateBlockDigestResponses(responseCount),
3020+
}
3021+
3022+
rpc.
3023+
On("SubscribeBlockDigestsFromStartBlockID", ctx, mock.Anything).
3024+
Return(stream, nil)
3025+
3026+
startBlockID := convert.MessageToIdentifier(stream.responses[0].BlockId)
3027+
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromStartBlockID(ctx, startBlockID, flow.BlockStatusSealed)
3028+
require.NoError(t, err)
3029+
3030+
wg := sync.WaitGroup{}
3031+
wg.Add(1)
3032+
go assertNoErrors(t, errCh, wg.Done)
3033+
3034+
for i := uint64(0); i < responseCount; i++ {
3035+
actualDigest := <-blockDigestsCh
3036+
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
3037+
require.Equal(t, expectedDigest, actualDigest)
3038+
}
3039+
cancel()
3040+
3041+
wg.Wait()
3042+
}))
3043+
3044+
t.Run("Happy Path - from latest", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
3045+
responseCount := uint64(100)
3046+
3047+
ctx, cancel := context.WithCancel(ctx)
3048+
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
3049+
ctx: ctx,
3050+
responses: generateBlockDigestResponses(responseCount),
3051+
}
3052+
3053+
rpc.
3054+
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
3055+
Return(stream, nil)
3056+
3057+
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed)
3058+
require.NoError(t, err)
3059+
3060+
wg := sync.WaitGroup{}
3061+
wg.Add(1)
3062+
go assertNoErrors(t, errCh, wg.Done)
3063+
3064+
for i := uint64(0); i < responseCount; i++ {
3065+
actualDigest := <-blockDigestsCh
3066+
expectedDigest := convert.MessageToBlockDigest(stream.responses[i])
3067+
require.Equal(t, expectedDigest, actualDigest)
3068+
}
3069+
cancel()
3070+
3071+
wg.Wait()
3072+
}))
3073+
3074+
t.Run("Stream returns error", clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
3075+
ctx, cancel := context.WithCancel(ctx)
3076+
stream := &mockBlockDigestClientStream[access.SubscribeBlockDigestsResponse]{
3077+
ctx: ctx,
3078+
err: status.Error(codes.Internal, "internal error"),
3079+
}
3080+
3081+
rpc.
3082+
On("SubscribeBlockDigestsFromLatest", ctx, mock.Anything).
3083+
Return(stream, nil)
3084+
3085+
blockDigestsCh, errCh, err := c.SubscribeBlockDigestsFromLatest(ctx, flow.BlockStatusSealed)
3086+
require.NoError(t, err)
3087+
3088+
wg := sync.WaitGroup{}
3089+
wg.Add(1)
3090+
go assertNoBlockDigests(t, blockDigestsCh, wg.Done)
3091+
3092+
errorCount := 0
3093+
for e := range errCh {
3094+
require.Error(t, e)
3095+
require.ErrorIs(t, e, stream.err)
3096+
errorCount += 1
3097+
}
3098+
cancel()
3099+
3100+
require.Equalf(t, 1, errorCount, "only 1 error is expected")
3101+
3102+
wg.Wait()
3103+
}))
3104+
}
3105+
3106+
type mockBlockDigestClientStream[SubscribeBlockDigestsResponse any] struct {
3107+
grpc.ClientStream
3108+
3109+
ctx context.Context
3110+
err error
3111+
offset int
3112+
responses []*SubscribeBlockDigestsResponse
3113+
}
3114+
3115+
func (s *mockBlockDigestClientStream[SubscribeBlockDigestsResponse]) Recv() (*SubscribeBlockDigestsResponse, error) {
3116+
if s.err != nil {
3117+
return nil, s.err
3118+
}
3119+
3120+
if s.offset >= len(s.responses) {
3121+
<-s.ctx.Done()
3122+
return nil, io.EOF
3123+
}
3124+
defer func() { s.offset++ }()
3125+
3126+
return s.responses[s.offset], nil
3127+
}
3128+
3129+
func assertNoBlockDigests[BlockDigest any](t *testing.T, blockDigestsChan <-chan BlockDigest, done func()) {
3130+
defer done()
3131+
for range blockDigestsChan {
3132+
require.FailNow(t, "should not receive block digests")
3133+
}
3134+
}
3135+
29613136
type mockAccountStatutesClientStream struct {
29623137
grpc.ClientStream
29633138

block.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,3 +76,10 @@ type BlockSeal struct {
7676
// block produces the same receipt among all verifying nodes
7777
ExecutionReceiptID Identifier
7878
}
79+
80+
// BlockDigest holds lightweight block information which includes only block id, block height and block timestamp
81+
type BlockDigest struct {
82+
BlockID Identifier
83+
Height uint64
84+
Timestamp time.Time
85+
}

0 commit comments

Comments
 (0)