Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
333 changes: 331 additions & 2 deletions external_segment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
t.Fatalf("Expected 2 initial documents, got %d", initialCount)
}

// Create a fresh copy of segment data for this test
segmentBytesCopy := make([]byte, len(segmentBytes))
copy(segmentBytesCopy, segmentBytes)

// Now stream the duplicate segment
receiver3, err := writer3.EnableExternalSegments()
if err != nil {
Expand All @@ -410,7 +414,7 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
t.Fatal(err)
}

err = receiver3.WriteChunk(segmentBytes)
err = receiver3.WriteChunk(segmentBytesCopy)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -445,6 +449,30 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
if receiver3.Status() != index.StreamIntroduced {
t.Errorf("Expected segment to be introduced when deduplication is disabled, got status: %v", receiver3.Status())
}

// Final verification: query for the specific docIDs to ensure they exist
docIDs := []string{"duplicate_doc_1", "duplicate_doc_2"}
for _, docID := range docIDs {
idQuery := NewTermQuery(docID)
idQuery.SetField("_id")

req := NewTopNSearch(1, idQuery)
dmi, err := reader3Final.Search(context.Background(), req)
if err != nil {
t.Fatalf("Error searching for docID %s: %v", docID, err)
}

next, err := dmi.Next()
if err != nil {
t.Fatalf("Error getting next result for docID %s: %v", docID, err)
}

if next == nil {
t.Errorf("Expected to find document with ID %s, but search returned no results", docID)
} else {
t.Logf("Successfully verified document with ID %s exists", docID)
}
}
})

// Step 3b: Test with deduplication ENABLED (should reject duplicates)
Expand Down Expand Up @@ -495,6 +523,10 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
t.Fatalf("Expected 2 initial documents, got %d", initialCount)
}

// Create a fresh copy of segment data for this test
segmentBytesCopy := make([]byte, len(segmentBytes))
copy(segmentBytesCopy, segmentBytes)

// Now stream the duplicate segment
receiver4, err := writer4.EnableExternalSegments()
if err != nil {
Expand All @@ -506,7 +538,7 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
t.Fatal(err)
}

err = receiver4.WriteChunk(segmentBytes)
err = receiver4.WriteChunk(segmentBytesCopy)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -551,6 +583,303 @@ func TestExternalSegmentReceiverWithDuplicates(t *testing.T) {
}

t.Logf("Deduplication test passed - document count unchanged indicating duplicates were filtered out")

// Final verification: query for the specific docIDs to ensure they exist
// When deduplication is enabled, we should still find the original documents
docIDs := []string{"duplicate_doc_1", "duplicate_doc_2"}
for _, docID := range docIDs {
idQuery := NewTermQuery(docID)
idQuery.SetField("_id")

req := NewTopNSearch(1, idQuery)
dmi, err := reader4Final.Search(context.Background(), req)
if err != nil {
t.Fatalf("Error searching for docID %s: %v", docID, err)
}

next, err := dmi.Next()
if err != nil {
t.Fatalf("Error getting next result for docID %s: %v", docID, err)
}

if next == nil {
t.Errorf("Expected to find document with ID %s, but search returned no results", docID)
} else {
t.Logf("Successfully verified document with ID %s exists", docID)
}
}
})

// Step 3c: Test with deduplication ENABLED but NO duplicates (should accept unique documents)
t.Run("DeduplicationEnabledNoDuplicates", func(t *testing.T) {
tempDir5 := filepath.Join(os.TempDir(), "bluge_test_dup_dest_enabled_no_dups")
tempSegDir5 := filepath.Join(tempDir5, "temp_segments")
os.RemoveAll(tempDir5)
defer os.RemoveAll(tempDir5)

config5 := DefaultConfig(tempDir5).WithExternalSegments(tempSegDir5, true) // Deduplication enabled
writer5, err := OpenWriter(config5)
if err != nil {
t.Fatal(err)
}
defer writer5.Close()

// Add existing documents with DIFFERENT IDs (no duplicates)
existingDoc1 := NewDocument("existing_doc_1").
AddField(NewKeywordField("type", "existing").StoreValue()).
AddField(NewTextField("content", "existing content 1"))

existingDoc2 := NewDocument("existing_doc_2").
AddField(NewKeywordField("type", "existing").StoreValue()).
AddField(NewTextField("content", "existing content 2"))

// Insert documents using batch
batch := index.NewBatch()
batch.Insert(existingDoc1)
batch.Insert(existingDoc2)
err = writer5.Batch(batch)
if err != nil {
t.Fatal(err)
}

// Verify we have 2 documents initially
reader5, err := writer5.Reader()
if err != nil {
t.Fatal(err)
}

initialCount, err := reader5.Count()
if err != nil {
t.Fatal(err)
}
reader5.Close()

if initialCount != 2 {
t.Fatalf("Expected 2 initial documents, got %d", initialCount)
}

// Create a fresh segment for this test case to avoid data corruption
tempDirSource := filepath.Join(os.TempDir(), "bluge_test_fresh_source")
tempSegDirSource := filepath.Join(tempDirSource, "temp_segments")
os.RemoveAll(tempDirSource)
defer os.RemoveAll(tempDirSource)

configSource := DefaultConfig(tempDirSource).WithExternalSegments(tempSegDirSource, true)
writerSource, err := OpenWriter(configSource)
if err != nil {
t.Fatal(err)
}

// Create fresh documents with UNIQUE IDs (different from existing and source)
freshDoc1 := NewDocument("unique_doc_1").
AddField(NewKeywordField("type", "fresh").StoreValue()).
AddField(NewTextField("content", "fresh content 1"))

freshDoc2 := NewDocument("unique_doc_2").
AddField(NewKeywordField("type", "fresh").StoreValue()).
AddField(NewTextField("content", "fresh content 2"))

// Insert documents using batch
freshBatch := index.NewBatch()
freshBatch.Insert(freshDoc1)
freshBatch.Insert(freshDoc2)
err = writerSource.Batch(freshBatch)
if err != nil {
t.Fatal(err)
}

// Close writerSource to persist the segment
err = writerSource.Close()
if err != nil {
t.Fatal(err)
}

// Extract segment data from the fresh writer
dirSource := configSource.indexConfig.DirectoryFunc()
err = dirSource.Setup(true) // read-only
if err != nil {
t.Fatal(err)
}

freshSegmentFiles, err := dirSource.List(index.ItemKindSegment)
if err != nil {
t.Fatal(err)
}

if len(freshSegmentFiles) == 0 {
t.Fatal("No segment files found in fresh source directory")
}

// Use the most recent segment file
freshSegmentID := freshSegmentFiles[len(freshSegmentFiles)-1]
t.Logf("Found fresh segment file with ID: %d", freshSegmentID)

freshSegmentData, freshCloser, err := dirSource.Load(index.ItemKindSegment, freshSegmentID)
if err != nil {
t.Fatal(err)
}
defer func() {
if freshCloser != nil {
freshCloser.Close()
}
}()

freshSegmentBytes, err := freshSegmentData.Read(0, freshSegmentData.Len())
if err != nil {
t.Fatal(err)
}

if len(freshSegmentBytes) == 0 {
t.Fatal("Fresh segment data is empty")
}

t.Logf("Read fresh segment data of %d bytes", len(freshSegmentBytes))

// Now stream the fresh segment with unique documents (different docIDs)
receiver5, err := writer5.EnableExternalSegments()
if err != nil {
t.Fatal(err)
}

err = receiver5.StartSegment()
if err != nil {
t.Fatal(err)
}

err = receiver5.WriteChunk(freshSegmentBytes)
if err != nil {
t.Fatal(err)
}

err = receiver5.CompleteSegment()
if err != nil {
t.Fatal(err)
}

if receiver5.Status() != index.StreamIntroduced {
t.Fatal("Segment not introduced:", receiver5.Status())
}

// Verify the segment was accepted (document count should increase)
reader5Final, err := writer5.Reader()
if err != nil {
t.Fatal(err)
}
defer reader5Final.Close()

finalCount, err := reader5Final.Count()
if err != nil {
t.Fatal(err)
}

t.Logf("Document count after transfer: %d (was %d)", finalCount, initialCount)

// Document count should increase since all documents are unique
expectedCount := initialCount + 2 // 2 unique documents from the segment
if finalCount != expectedCount {
t.Errorf("Expected document count to be %d when deduplication is enabled but no duplicates exist, got %d (was %d)", expectedCount, finalCount, initialCount)
}

if receiver5.Status() != index.StreamIntroduced {
t.Errorf("Expected segment to be introduced when deduplication is enabled but no duplicates exist, got status: %v", receiver5.Status())
}

// Verify specific document IDs are present
expectedDocIDs := []string{"existing_doc_1", "existing_doc_2", "unique_doc_1", "unique_doc_2"}
foundDocIDs := make(map[string]bool)

// Search for each expected document ID
for _, docID := range expectedDocIDs {
query := NewTermQuery(docID).SetField("_id")
searchRequest := NewTopNSearch(10, query)
dmi, err := reader5Final.Search(context.Background(), searchRequest)
if err != nil {
t.Fatalf("Error searching for document %s: %v", docID, err)
}

next, err := dmi.Next()
if err != nil {
t.Fatalf("Error getting next result for document %s: %v", docID, err)
}

if next != nil {
foundDocIDs[docID] = true
t.Logf("Found document with ID: %s", docID)
} else {
t.Errorf("Expected document with ID %s to be present, but it was not found", docID)
}
}

// Verify all expected documents were found
if len(foundDocIDs) != len(expectedDocIDs) {
t.Errorf("Expected to find %d documents, but only found %d", len(expectedDocIDs), len(foundDocIDs))
}

// Verify document content for the fresh documents
query1 := NewTermQuery("unique_doc_1").SetField("_id")
searchRequest1 := NewTopNSearch(1, query1)
dmi1, err := reader5Final.Search(context.Background(), searchRequest1)
if err != nil {
t.Fatalf("Error searching for unique_doc_1: %v", err)
}

next1, err := dmi1.Next()
if err != nil {
t.Fatalf("Error getting next result for unique_doc_1: %v", err)
}

if next1 != nil {
// Check content field
err = next1.VisitStoredFields(func(field string, value []byte) bool {
if field == "content" && string(value) != "fresh content 1" {
t.Errorf("Expected content 'fresh content 1' for unique_doc_1, got '%s'", string(value))
}
if field == "type" && string(value) != "fresh" {
t.Errorf("Expected type 'fresh' for unique_doc_1, got '%s'", string(value))
}
return true
})
if err != nil {
t.Errorf("Error accessing stored fields for unique_doc_1: %v", err)
}
} else {
t.Error("Expected unique_doc_1 to be found")
}

query2 := NewTermQuery("unique_doc_2").SetField("_id")
searchRequest2 := NewTopNSearch(1, query2)
dmi2, err := reader5Final.Search(context.Background(), searchRequest2)
if err != nil {
t.Fatalf("Error searching for unique_doc_2: %v", err)
}

next2, err := dmi2.Next()
if err != nil {
t.Fatalf("Error getting next result for unique_doc_2: %v", err)
}

if next2 != nil {
// Check content field
err = next2.VisitStoredFields(func(field string, value []byte) bool {
if field == "content" && string(value) != "fresh content 2" {
t.Errorf("Expected content 'fresh content 2' for unique_doc_2, got '%s'", string(value))
}
if field == "type" && string(value) != "fresh" {
t.Errorf("Expected type 'fresh' for unique_doc_2, got '%s'", string(value))
}
return true
})
if err != nil {
t.Errorf("Error accessing stored fields for unique_doc_2: %v", err)
}
} else {
t.Error("Expected unique_doc_2 to be found")
}

t.Logf("Test completed successfully - document count and docid verification passed")
t.Logf("Expected %d documents, found %d documents", expectedCount, finalCount)
t.Logf("All expected document IDs verified: %v", expectedDocIDs)
t.Logf("Deduplication enabled with no duplicates - all unique documents were accepted")
})

t.Log("Duplicate document handling test completed successfully")
Expand Down
Loading
Loading