Skip to content

Commit bbda86f

Browse files
authored
[deliveryorderer] Process headers-only blocks only in the headers stream (#633)
#### Type of change - Bug fix #### Description - Verify that all block fields exists as expected, for all delivery methods - Process blocks without data in the headers-only stream, regardless of their data source. #### Related issues - resolves #632 Signed-off-by: Liran Funaro <liran.funaro@gmail.com>
1 parent a97e78c commit bbda86f

6 files changed

Lines changed: 58 additions & 13 deletions

File tree

service/sidecar/sidecar.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,8 @@ func (s *Service) deliverBlocks(
573573
if err != nil {
574574
return common.Status_INTERNAL_SERVER_ERROR, errors.Wrap(err, "error sending response")
575575
}
576-
logger.Infof("Successfully sent block %d:%d to client.", block.Header.Number, len(block.Data.Data))
576+
logger.Infof("Successfully sent block [%d] (size: %d) to client.",
577+
block.Header.Number, len(block.Data.Data))
577578

578579
if stopNum == block.Header.Number {
579580
break

utils/deliver/delivery.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -120,8 +120,8 @@ func toQueueWithoutReconnect(ctx context.Context, p *Parameters) error {
120120
// We make minimal verifications to ensure we receive blocks in order.
121121
// This allows us to restart the connection from the next expected block.
122122
// We restart the connection upon failure.
123-
if block == nil || block.Header == nil {
124-
return errors.Join(backoff, errors.New("received nil block or with nil header"))
123+
if block == nil || block.Header == nil || block.Metadata == nil || (!p.HeaderOnly && block.Data == nil) {
124+
return errors.Join(backoff, errors.New("received nil block, header, metadata, or data"))
125125
}
126126
if block.Header.Number != p.NextBlockNum {
127127
return errors.Join(backoff, errors.Errorf("received block number %d != %d (expected)",

utils/deliver/delivery_test.go

Lines changed: 46 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ func TestDeliverToChannel(t *testing.T) {
3838
// 10 times for the pulled blocks, 10 times for the blocks in the buffer, and 1 that blocks because
3939
// the buffer is full.
4040
s.EXPECT().RecvBlockOrStatus().MaxTimes(21).DoAndReturn(func() (*common.Block, *common.Status, error) {
41-
b := &common.Block{Header: &common.BlockHeader{Number: nextBlockNum}}
41+
b := &common.Block{
42+
Header: &common.BlockHeader{Number: nextBlockNum},
43+
Metadata: &common.BlockMetadata{},
44+
Data: &common.BlockData{},
45+
}
4246
nextBlockNum++
4347
return b, nil, nil
4448
})
@@ -48,7 +52,11 @@ func TestDeliverToChannel(t *testing.T) {
4852
outputBlock := channel.NewReader(t.Context(), e.outputBlock)
4953
outputBlockWithSourceID := channel.NewReader(t.Context(), e.outputBlockWithSourceID)
5054
for i := range uint64(10) {
51-
expectedBlock := &common.Block{Header: &common.BlockHeader{Number: i}}
55+
expectedBlock := &common.Block{
56+
Header: &common.BlockHeader{Number: i},
57+
Metadata: &common.BlockMetadata{},
58+
Data: &common.BlockData{},
59+
}
5260

5361
readBlock, ok := outputBlock.ReadWithTimeout(3 * time.Second)
5462
require.True(t, ok)
@@ -109,23 +117,56 @@ func TestDeliverToChannelFailedStreamer(t *testing.T) {
109117
t.Log("Receive bad block (nil header)")
110118
s = NewMockStreamer(e.ctrl)
111119
s.EXPECT().Send(gomock.Not(gomock.Nil())).Times(1).Return(nil)
112-
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{}, nil, nil)
120+
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{
121+
Metadata: &common.BlockMetadata{},
122+
Data: &common.BlockData{},
123+
}, nil, nil)
124+
streamerQueue.Write(s)
125+
_, ok = outputBlock.ReadWithTimeout(time.Second)
126+
require.False(t, ok)
127+
128+
t.Log("Receive bad block (nil metadata)")
129+
s = NewMockStreamer(e.ctrl)
130+
s.EXPECT().Send(gomock.Not(gomock.Nil())).Times(1).Return(nil)
131+
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{
132+
Header: &common.BlockHeader{Number: 1},
133+
Data: &common.BlockData{},
134+
}, nil, nil)
135+
streamerQueue.Write(s)
136+
_, ok = outputBlock.ReadWithTimeout(time.Second)
137+
require.False(t, ok)
138+
139+
t.Log("Receive bad block (nil data)")
140+
s = NewMockStreamer(e.ctrl)
141+
s.EXPECT().Send(gomock.Not(gomock.Nil())).Times(1).Return(nil)
142+
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{
143+
Header: &common.BlockHeader{Number: 1},
144+
Metadata: &common.BlockMetadata{},
145+
}, nil, nil)
113146
streamerQueue.Write(s)
114147
_, ok = outputBlock.ReadWithTimeout(time.Second)
115148
require.False(t, ok)
116149

117150
t.Log("Receive bad block (wrong number 1 != 0)")
118151
s = NewMockStreamer(e.ctrl)
119152
s.EXPECT().Send(gomock.Not(gomock.Nil())).Times(1).Return(nil)
120-
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{Header: &common.BlockHeader{Number: 1}}, nil, nil)
153+
s.EXPECT().RecvBlockOrStatus().Times(1).Return(&common.Block{
154+
Header: &common.BlockHeader{Number: 1},
155+
Metadata: &common.BlockMetadata{},
156+
Data: &common.BlockData{},
157+
}, nil, nil)
121158
streamerQueue.Write(s)
122159
_, ok = outputBlock.ReadWithTimeout(time.Second)
123160
require.False(t, ok)
124161

125162
t.Log("Correct block, then bad block (wrong number 0 != 1)")
126163
s = NewMockStreamer(e.ctrl)
127164
s.EXPECT().Send(gomock.Not(gomock.Nil())).Times(1).Return(nil)
128-
correctBlock := &common.Block{Header: &common.BlockHeader{Number: 0}}
165+
correctBlock := &common.Block{
166+
Header: &common.BlockHeader{Number: 0},
167+
Metadata: &common.BlockMetadata{},
168+
Data: &common.BlockData{},
169+
}
129170
s.EXPECT().RecvBlockOrStatus().Times(2).Return(correctBlock, nil, nil)
130171
streamerQueue.Write(s)
131172
rb, ok := outputBlock.ReadWithTimeout(3 * time.Second)

utils/deliverorderer/orderer.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -280,12 +280,10 @@ func (d *ftDelivery) readNextBlock(ctx context.Context) *deliver.BlockWithSource
280280
}
281281

282282
func (d *ftDelivery) getProcessingState(block *deliver.BlockWithSourceID) *blockVerificationStateMachine {
283-
switch block.SourceID {
284-
case d.curDataBlockSourceID:
283+
if block.Block.Data != nil && block.SourceID == d.curDataBlockSourceID {
285284
return &d.dataStream
286-
default:
287-
return &d.headerOnlyStream
288285
}
286+
return &d.headerOnlyStream
289287
}
290288

291289
func (d *ftDelivery) checkNextStreamAction(state *blockVerificationStateMachine) error {

utils/deliverorderer/orderer_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,9 @@ func TestOrdererDeliverBFT(t *testing.T) {
442442
p1.HoldFromBlock.Store(expectedBlockNum)
443443
p2.HoldFromBlock.Store(expectedBlockNum + 1)
444444
p0.ReplaceBlock.Store(expectedBlockNum, &common.Block{
445-
Header: &common.BlockHeader{Number: expectedBlockNum},
445+
Header: &common.BlockHeader{Number: expectedBlockNum},
446+
Data: &common.BlockData{},
447+
Metadata: &common.BlockMetadata{},
446448
})
447449
b = e.submit(t)
448450
require.EqualValues(t, 2, b.SourceID)

utils/deliverorderer/verify.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,9 @@ func (s *blockVerificationStateMachine) verificationStepAndUpdateState(blk *deli
150150
// verifyBlockForm checks whether the block is well-formed.
151151
// It is only used internally.
152152
func (s *blockVerificationStateMachine) verifyBlockForm(block *common.Block) bool {
153+
// The delivery already filters nil blocks, headers, metadata, or data.
154+
// We only need to verify that the metadata is with valid form.
155+
// We verify everything anyway to ensure correctness.
153156
if block == nil || block.Header == nil || block.Metadata == nil {
154157
return false
155158
}

0 commit comments

Comments
 (0)