Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 25 additions & 3 deletions headerfs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package headerfs
import (
"bytes"
"fmt"
"os"
"io"

"github.com/btcsuite/btcd/chaincfg/chainhash"
"github.com/btcsuite/btcd/wire"
Expand All @@ -17,10 +17,32 @@ type ErrHeaderNotFound struct {

// appendRaw appends a new raw header to the end of the flat file.
func (h *headerStore) appendRaw(header []byte) error {
if _, err := h.file.Write(header); err != nil {
// Get current file position before writing. We'll use this position to
// revert to if the write fails partially.
currentPos, err := h.file.Seek(0, io.SeekCurrent)
if err != nil {
return err
}

n, err := h.file.Write(header)
if err != nil {
// If we wrote some bytes but not all (partial write),
// truncate the file back to its original size to maintain
// consistency. This removes the partial/corrupt header.
if n > 0 {
truncErr := h.file.Truncate(currentPos)
if truncErr != nil {
return fmt.Errorf("failed to write header "+
"type %s: partial write (%d bytes), "+
"write error: %w, truncate error: %v",
h.indexType, n, err, truncErr)
}
}

return fmt.Errorf("failed to write header type %s: write "+
"error: %w", h.indexType, err)
}

return nil
}

Expand Down Expand Up @@ -166,7 +188,7 @@ func (f *FilterHeaderStore) readHeaderRange(startHeight uint32,

// readHeadersFromFile reads a chunk of headers, each of size headerSize, from
// the given file, from startHeight to endHeight.
func readHeadersFromFile(f *os.File, headerSize, startHeight,
func readHeadersFromFile(f File, headerSize, startHeight,
endHeight uint32) (*bytes.Reader, error) {

// Each header is headerSize bytes, so using this information, we'll
Expand Down
285 changes: 285 additions & 0 deletions headerfs/file_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,285 @@
package headerfs

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"strings"
"testing"
)

// TestAppendRow verifies that headerStore.appendRaw correctly appends data to
// the file, handles full and partial write errors, and properly recovers from
// failures.
func TestAppendRow(t *testing.T) {
tests := []struct {
name string
initialData []byte
headerToWrite []byte
writeFn func([]byte, File) (int, error)
truncFn func(int64, File) error
expected []byte
wantErr bool
errMsg string
}{
{
name: "ValidWrite AppendsData",
initialData: []byte{0x01, 0x02, 0x03},
headerToWrite: []byte{0x04, 0x05, 0x06},
expected: []byte{
0x01, 0x02, 0x03, 0x04, 0x05, 0x06,
},
},
{
name: "WriteError NoData Preserved",
initialData: []byte{0x01, 0x02, 0x03},
headerToWrite: []byte{0x04, 0x05, 0x06},
writeFn: func(p []byte, _ File) (int, error) {
return 0, errors.New("simulated write failure")
},
expected: []byte{0x01, 0x02, 0x03},
wantErr: true,
errMsg: "simulated write failure",
},
{
name: "PartialWrite MidwayError Rollback",
initialData: []byte{0x01, 0x02, 0x03},
headerToWrite: []byte{0x04, 0x05, 0x06},
writeFn: func(p []byte, file File) (int, error) {
// Mock a partial write - write the first two
// bytes.
n, err := file.Write(p[:2])
if err != nil {
return n, err
}

return n, errors.New("simulated partial " +
"write failure")
},
expected: []byte{0x01, 0x02, 0x03},
wantErr: true,
errMsg: "simulated partial write failure",
},
{
name: "TruncateError CompoundFail",
initialData: []byte{0x01, 0x02, 0x03},
headerToWrite: []byte{0x04, 0x05, 0x06},
writeFn: func(p []byte, file File) (int, error) {
// Mock a partial write - write just the first
// byte.
n, err := file.Write(p[:1])
if err != nil {
return n, err
}

return n, errors.New("simulated partial " +
"write failure")
},
truncFn: func(size int64, _ File) error {
return errors.New("simulated truncate failure")
},
expected: []byte{0x01, 0x02, 0x03, 0x04},
wantErr: true,
errMsg: fmt.Sprintf("failed to write header type %s: "+
"partial write (1 bytes), write error: "+
"simulated partial write failure, truncate "+
"error: simulated truncate failure", Block),
},
{
name: "PartialWrite TruncateFail Unrecovered",
initialData: []byte{0x01, 0x02, 0x03},
headerToWrite: []byte{0x04, 0x05, 0x06},
writeFn: func(p []byte, file File) (int, error) {
// Mock a partial write - write the first two
// bytes.
n, err := file.Write(p[:2])
if err != nil {
return n, err
}

return n, errors.New("simulated partial " +
"write failure")
},
truncFn: func(size int64, file File) error {
// Simulate an incomplete truncation: shrink the
// file by just one byte, leaving part of the
// partial write data in place in other words
// not fully removing the partially written
// header from the end of the file.
err := file.Truncate(4)
if err != nil {
return err
}

return errors.New("simulated truncate failure")
},
expected: []byte{0x01, 0x02, 0x03, 0x04},
wantErr: true,
errMsg: fmt.Sprintf("failed to write header type "+
"%s: partial write (2 bytes), write error: "+
"simulated partial write failure, truncate "+
"error: simulated truncate failure", Block),
},
{
name: "NormalWrite ValidHeader DataAppended",
initialData: []byte{},
headerToWrite: []byte{0x01, 0x02, 0x03},
expected: []byte{0x01, 0x02, 0x03},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Create a temporary file for testing.
tmpFile, cleanup := createFile(t, "header_store_test")
defer cleanup()

// Write initial data.
_, err := tmpFile.Write(test.initialData)
if err != nil {
t.Fatalf("Failed to write initial "+
"data: %v", err)
}

// Reset the file position to the end of initial data.
_, err = tmpFile.Seek(
int64(len(test.initialData)), io.SeekStart,
)
if err != nil {
t.Fatalf("Failed to seek: %v", err)
}

// Create a mock file that wraps the real file.
mockFile := &mockFile{
File: tmpFile,
writeFn: test.writeFn,
truncFn: test.truncFn,
}

// Create a header store with our mock file.
h := &headerStore{
file: mockFile,
headerIndex: &headerIndex{indexType: Block},
}

// Call the function being tested.
err = h.appendRaw(test.headerToWrite)
if err == nil && test.wantErr {
t.Fatal("expected an error, but got none")
}
if err != nil && !test.wantErr {
t.Fatalf("unexpected error: %v", err)
}
if err != nil && test.wantErr &&
!strings.Contains(err.Error(), test.errMsg) {

t.Errorf("expected error message %q to be "+
"in %q", test.errMsg, err.Error())
}

// Reset file position to start for reading.
if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
t.Fatalf("Failed to seek to start: %v", err)
}

// Read the file contents.
actualData, err := io.ReadAll(tmpFile)
if err != nil {
t.Fatalf("Failed to read file: %v", err)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to my other suggestion: can we also test that if we go down in a partially truncated state, then once we come up we're able to recover? May warrant an entirely distinct change.

Copy link
Contributor Author

@mohamedawnallah mohamedawnallah May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My current understanding is:

  • A Truncate operation is a fatal error.
  • If a Truncate error occurs, it likely indicates a serious issue.
  • The probability of both a partial write failure and a truncate failure occurring together is probably unlikely and would typically require manual intervention.

Possible mitigation:

  • Peer Recovery of Invalid/Incomplete Tail Headers:
    • Request missing or invalid/incomplete tail header from peers upon detection on reads and index it in the store.
  • In-Place Recovery During Reads
    • If the data read is not a complete header (32 bytes for the filter header, 80 bytes for the index), remove it from the binary file.
    • The idea is to handle incomplete or invalid entries immediately (i.e., perform a delete operation during read), so the same issue doesn't recur on the next startup. This also include a delete operation in read operation
    • This serves as a recovery mechanism if the data is recoverable, depending on the mix of operations.
    • However, if the delete operation itself fails, it's unclear how recovery would be handled in that scenario.
  • If we simply choose to ignore the incomplete or invalid entry, we risk leaving the binary file in an inconsistent state: the invalid data would remain in the headers binary file, yet we might still be able to read it successfully.
  • This approach assumes the partial write entry was not fully truncated, since a truncate failure should have already been reported as an error.
  • My concern is that those methods might mask the root cause of the issue, rather than addressing them directly.

What do you think?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Request missing or invalid/incomplete tail header from peers upon detection on reads and index it in the store.

I don't think this makes sense. At this point, we have already fetched the headers.

The probability of both a partial write failure and a truncate failure occurring together is probably unlikely and would typically require manual intervention.

This is precisely what we're attempting to solve. Consider that a user running w/o this patch might have a partial write. Today we require them to go in and delete the file manually. On a mobile platform, this isn't feasible for end users.

If we can make sure that both we write properly, and we can recover from botched writes (rn we fail on read), then we're able to cover all bases.

Copy link
Contributor Author

@mohamedawnallah mohamedawnallah May 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider that a user running w/o this patch might have a partial write. Today we require them to go in and delete the file manually. On a mobile platform, this isn't feasible for end users. If we can make sure that both we write properly, and we can recover from botched writes (rn we fail on read), then we're able to cover all bases.

Thanks that makes sense. I have opened this issue so we could track the recovery on read for partial written headers #315. That said if you wanna me to approach that recovery on read in a specific way would be happy to hear

// Compare expected vs. actual file contents.
if !bytes.Equal(actualData, test.expected) {
t.Fatalf("Expected file data: %v, "+
"got: %v", test.expected, actualData)
}
})
}
}

// BenchmarkHeaderStoreAppendRaw measures performance of headerStore.appendRaw
// by writing 80-byte headers to a file and resetting position between writes
// to isolate raw append performance from file size effects.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we confident with the results of the benchmark, do we lose some performance now with the enhanced partial writing checking ?

Copy link
Contributor Author

@mohamedawnallah mohamedawnallah May 26, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The performance impact depends on how many headers we receive from getheaders or getcfheaders. The maximum number of headers for both block headers and filter headers is configured as 2000 in btcsuite/btcd, which is also the limit Neutrino uses.

When I benchmarked appendRaw with 1000 rows before and after this patch, the performance was nearly the same. This is because the cost and frequency of the seek operation are significantly reduced.

However, in the worst case—when receiving only a single header via getheaders or getcfheaders—the performance cost is similar to the benchmark above, and we lose roughly 7% performance. This depends on various factors.

Overall, I think this is okay because we tradeoff a bit of optimal write performance for improved data integrity during writes.

Batched headers form btcsuite/btcd used in Neutrino

// MsgHeaders implements the Message interface and represents a bitcoin headers
// message. It delivers block header information in response to a getheaders message (MsgGetHeaders).
// The maximum number of block headers per message is currently 2000. See MsgGetHeaders for more details.
type MsgHeaders struct {
    Headers []*BlockHeader
}
// MsgCFHeaders implements the Message interface and represents a bitcoin cfheaders message.
// It delivers committed filter header information in response to a getcfheaders message (MsgGetCFHeaders).
// The maximum number of committed filter headers per message is currently 2000. See MsgGetCFHeaders for details.
type MsgCFHeaders struct {
    FilterType       FilterType
    StopHash         chainhash.Hash
    PrevFilterHeader chainhash.Hash
    FilterHashes     []*chainhash.Hash
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know what the defaults for bitcoin-core are ?

Copy link
Contributor Author

@mohamedawnallah mohamedawnallah Jun 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you know what the defaults for bitcoin-core are ?

It uses the following:

/** Maximum number of compact filters that may be requested with one getcfilters. See BIP 157. */
static constexpr uint32_t MAX_GETCFILTERS_SIZE = 1000;
/** Maximum number of cf hashes that may be requested with one getcfheaders. See BIP 157. */
static constexpr uint32_t MAX_GETCFHEADERS_SIZE = 2000;

func BenchmarkHeaderStoreAppendRaw(b *testing.B) {
// Setup temporary file and headerStore.
tmpFile, cleanup := createFile(b, "header_benchmark")
defer cleanup()

store := &headerStore{
file: tmpFile,
headerIndex: &headerIndex{indexType: Block},
}

// Sample header data.
header := make([]byte, 80)

// Reset timer to exclude setup time.
b.ResetTimer()

// Run benchmark.
for i := 0; i < b.N; i++ {
if err := store.appendRaw(header); err != nil {
b.Fatal(err)
}

// Reset file position to beginning to maintain constant file
// size. This isolates the appendRaw performance overhead
// without measuring effects of increasing file size.
if _, err := tmpFile.Seek(0, io.SeekStart); err != nil {
b.Fatal(err)
}
}
}

// mockFile wraps a real file but allows us to override the Write, Sync, and
// Truncate methods.
type mockFile struct {
*os.File
writeFn func([]byte, File) (int, error)
syncFn func() error
truncFn func(int64, File) error
}

// Write implements the Write method for FileInterface.
func (m *mockFile) Write(p []byte) (int, error) {
if m.writeFn != nil {
return m.writeFn(p, m.File)
}
return m.File.Write(p)
}

// Sync implements the Sync method for FileInterface.
func (m *mockFile) Sync() error {
if m.syncFn != nil {
return m.syncFn()
}
return m.File.Sync()
}

// Truncate implements the Truncate method for FileInterface.
func (m *mockFile) Truncate(size int64) error {
if m.truncFn != nil {
return m.truncFn(size, m.File)
}
return m.File.Truncate(size)
}

// Ensure mockFile implements necessary interfaces.
var _ io.Writer = &mockFile{}

// createFile creates a temporary file for testing.
func createFile(t testing.TB, filename string) (*os.File, func()) {
tmpFile, err := os.CreateTemp(t.TempDir(), filename)
if err != nil {
t.Fatalf("Failed to create temp file: %v", err)
}

cleanup := func() {
tmpFile.Close()
os.Remove(tmpFile.Name())
}

return tmpFile, cleanup
}
12 changes: 12 additions & 0 deletions headerfs/index.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,18 @@ const (
numSubBucketBytes = 2
)

// String returns the string representation of the HeaderType.
func (h HeaderType) String() string {
switch h {
case Block:
return "Block"
case RegularFilter:
return "RegularFilter"
default:
return fmt.Sprintf("UnknownHeaderType(%d)", h)
}
}

// headerIndex is an index stored within the database that allows for random
// access into the on-disk header file. This, in conjunction with a flat file
// of headers consists of header database. The keys have been specifically
Expand Down
20 changes: 19 additions & 1 deletion headerfs/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package headerfs
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"sync"
Expand Down Expand Up @@ -84,6 +85,23 @@ var headerBufPool = sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}

// File defines the minimum file operations needed by headerStore.
type File interface {
// Basic I/O operations.
io.Reader
io.Writer
io.Closer

// Extended I/O positioning.
io.Seeker
io.ReaderAt

// File-specific operations.
Stat() (os.FileInfo, error)
Sync() error
Truncate(size int64) error
}

// headerStore combines a on-disk set of headers within a flat file in addition
// to a database which indexes that flat file. Together, these two abstractions
// can be used in order to build an indexed header store for any type of
Expand All @@ -96,7 +114,7 @@ type headerStore struct {

fileName string

file *os.File
file File

*headerIndex
}
Expand Down