Skip to content

Conversation

@aunjgr
Copy link
Contributor

@aunjgr aunjgr commented Dec 1, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #22508

What this PR does / why we need it:

  • implement filter by distance in Reader for IVFFlat
  • replace per-block heaps by universal heap to reduce copy
  • refactor some interfaces

PR Type

Enhancement


Description

  • Implement universal heap in Reader for IVFFlat index operations

  • Reduce code duplication by consolidating heap logic

  • Simplify distance calculation and filtering logic

  • Replace sort with slices for null filtering


Diagram Walkthrough

flowchart LR
  A["BlockReadTopOp"] -->|"adds DistHeap field"| B["Float64Heap"]
  B -->|"implements heap.Interface"| C["heap operations"]
  D["HandleOrderByLimitOnIVFFlatIndex"] -->|"refactored to use"| B
  D -->|"replaces SearchResultMaxHeap"| E["simplified logic"]
  F["SetBlockTop"] -->|"initializes"| B
Loading

File Walkthrough

Relevant files
Enhancement
types.go
Add Float64Heap type and DistHeap field                                   

pkg/objectio/types.go

  • Added Float64Heap type implementing heap.Interface for max-heap
    operations
  • Implemented Len(), Less(), Swap(), Push(), and Pop() methods
  • Extended BlockReadTopOp struct with DistHeap field of type Float64Heap
+23/-0   
reader.go
Initialize DistHeap in SetBlockTop method                               

pkg/vm/engine/readutil/reader.go

  • Initialize DistHeap field in SetBlockTop() method
  • Create empty Float64Heap with capacity equal to limit
  • Commented out LowerBound and UpperBound fields for future use
+5/-0     
read.go
Refactor to use universal Float64Heap for distances           

pkg/vm/engine/tae/blockio/read.go

  • Replaced sort import with slices for null filtering
  • Refactored HandleOrderByLimitOnIVFFlatIndex() to use universal
    Float64Heap
  • Consolidated duplicate code for float32 and float64 distance
    calculations
  • Simplified heap management by storing only distances instead of
    SearchResult objects
  • Removed conditional branching for limit vs non-limit cases
  • Use slices.DeleteFunc() for null filtering and distance threshold
    filtering
+56/-114
Bug fix
operator.go
Update table function to use BlockLimit                                   

pkg/sql/compile/operator.go

  • Changed arg.Limit = n.Limit to arg.Limit = n.BlockLimit
  • Updates table function argument to use BlockLimit instead of Limit
+1/-1     
apply_indices_ivfflat.go
Replace Limit with BlockLimit in IVFFlat                                 

pkg/sql/plan/apply_indices_ivfflat.go

  • Replaced three instances of tableFuncNode.Limit with
    tableFuncNode.BlockLimit
  • Affects both constant and non-constant limit assignments in IVFFlat
    index application
+3/-3     

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 1, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
🟢
No security concerns identified No security vulnerabilities detected by AI analysis. Human verification advised for critical code.
Ticket Compliance
🎫 No ticket provided
  • Create ticket/issue
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
No auditing: The new search/heap operations for IVFFlat top-N selection do not add or reference any
audit logging for critical actions, but it is unclear whether auditing is handled
elsewhere in the system.

Referred Code
func HandleOrderByLimitOnIVFFlatIndex(
	ctx context.Context,
	selectRows []int64,
	vecCol *vector.Vector,
	orderByLimit *objectio.BlockReadTopOp,
) ([]int64, []float64, error) {
	if selectRows == nil {
		selectRows = make([]int64, vecCol.Length())
		for i := range selectRows {
			selectRows[i] = int64(i)
		}
	}

	nullsBm := vecCol.GetNulls()
	selectRows = slices.DeleteFunc(selectRows, func(row int64) bool {
		return nullsBm.Contains(uint64(row))
	})

	searchResults := make([]vectorindex.SearchResult, 0, len(selectRows))

	switch orderByLimit.Typ {


 ... (clipped 80 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Heap boundary edge-case: The code reads orderByLimit.DistHeap[0] after building the heap without explicitly
ensuring it is non-empty in all paths, relying on earlier logic; this may be safe but
merits verification for zero-limit or empty-selectRows cases.

Referred Code
searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
	return res.Distance > orderByLimit.DistHeap[0]
})

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Type validation: The function restricts allowed types to float32/float64 and returns an internal error for
others, but it does not validate that orderByLimit.Limit is > 0 and may rely on
external guarantees for safe heap usage.

Referred Code
default:
	return nil, nil, moerr.NewInternalError(ctx, fmt.Sprintf("only support float32/float64 type for topn: %s", orderByLimit.Typ))
}

searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
	return res.Distance > orderByLimit.DistHeap[0]
})

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 1, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
High-level
Refactor heap logic to avoid excessive memory allocation

The searchResults slice in HandleOrderByLimitOnIVFFlatIndex allocates memory for
all candidate rows before filtering, which is inefficient. Instead, the heap
should directly manage SearchResult objects to keep memory usage proportional to
the query limit.

Examples:

pkg/vm/engine/tae/blockio/read.go [398-477]
	searchResults := make([]vectorindex.SearchResult, 0, len(selectRows))

	switch orderByLimit.Typ {
	case types.T_array_float32:
		distFunc, err := metric.ResolveDistanceFn[float32](orderByLimit.Metric)
		if err != nil {
			return nil, nil, err
		}

		rhs := types.BytesToArray[float32](orderByLimit.NumVec)

 ... (clipped 70 lines)

Solution Walkthrough:

Before:

func HandleOrderByLimitOnIVFFlatIndex(...) {
    // searchResults can grow to the size of all rows in the block
    searchResults := make([]vectorindex.SearchResult, 0, len(selectRows))

    for _, row := range selectRows {
        dist64, _ := calculate_distance(row)

        // Heap of distances is managed correctly
        if len(orderByLimit.DistHeap) < limit {
            heap.Push(&orderByLimit.DistHeap, dist64)
        } else if dist64 < orderByLimit.DistHeap[0] {
            // ... update heap
        }

        // But all results are appended, causing large memory allocation
        searchResults = append(searchResults, vectorindex.SearchResult{Id: row, Distance: dist64})
    }

    // Filter the large slice at the end
    searchResults = slices.DeleteFunc(searchResults, ...)
    // ...
}

After:

func HandleOrderByLimitOnIVFFlatIndex(...) {
    // Use a heap of SearchResult objects, bounded by the limit
    hp := make(vectorindex.SearchResultMaxHeap, 0, orderByLimit.Limit)

    for _, row := range selectRows {
        dist64, _ := calculate_distance(row)

        if len(hp) < int(orderByLimit.Limit) {
            heap.Push(&hp, &vectorindex.SearchResult{Id: row, Distance: dist64})
        } else if dist64 < hp[0].GetDistance() {
            // Only update the heap if the new item is in the top-K
            hp[0] = &vectorindex.SearchResult{Id: row, Distance: dist64}
            heap.Fix(&hp, 0)
        }
    }

    // The heap now contains only the top-K results, avoiding large allocations.
    // ... convert heap to slice and return
}
Suggestion importance[1-10]: 9

__

Why: The suggestion correctly identifies a critical flaw where searchResults is allocated for all rows, causing excessive memory usage and negating the benefit of using a heap for top-K selection.

High
Possible issue
Fix incorrect top-K filtering logic

Add a condition to only filter searchResults when the distance heap has reached
its capacity (limit) to prevent incorrect filtering of valid results.

pkg/vm/engine/tae/blockio/read.go [468-470]

-searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
-	return res.Distance > orderByLimit.DistHeap[0]
-})
+if len(orderByLimit.DistHeap) >= int(orderByLimit.Limit) {
+	searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
+		return res.Distance > orderByLimit.DistHeap[0]
+	})
+}
  • Apply / Chat
Suggestion importance[1-10]: 8

__

Why: This suggestion correctly identifies a bug where results could be incorrectly filtered out if the number of candidates is less than the limit, as DistHeap[0] would not be the correct threshold. Applying the filter only when the heap is full fixes this critical logic error.

Medium
General
Optimize memory usage during top-K selection

To optimize memory usage, append to searchResults only when a candidate is added
to the distance heap, instead of appending all candidates and filtering later.

pkg/vm/engine/tae/blockio/read.go [398-470]

-searchResults := make([]vectorindex.SearchResult, 0, len(selectRows))
+searchResults := make([]vectorindex.SearchResult, 0, orderByLimit.Limit)
 
 switch orderByLimit.Typ {
 case types.T_array_float32:
 ...
-			searchResults = append(searchResults, vectorindex.SearchResult{
-				Id:       row,
-				Distance: dist64,
-			})
+			if len(orderByLimit.DistHeap) >= int(orderByLimit.Limit) {
+				if dist64 < orderByLimit.DistHeap[0] {
+					orderByLimit.DistHeap[0] = dist64
+					heap.Fix(&orderByLimit.DistHeap, 0)
+					searchResults = append(searchResults, vectorindex.SearchResult{
+						Id:       row,
+						Distance: dist64,
+					})
+				}
+			} else {
+				heap.Push(&orderByLimit.DistHeap, dist64)
+				searchResults = append(searchResults, vectorindex.SearchResult{
+					Id:       row,
+					Distance: dist64,
+				})
+			}
 		}
 
 	case types.T_array_float64:
 ...
-			searchResults = append(searchResults, vectorindex.SearchResult{
-				Id:       row,
-				Distance: dist64,
-			})
+			if len(orderByLimit.DistHeap) >= int(orderByLimit.Limit) {
+				if dist64 < orderByLimit.DistHeap[0] {
+					orderByLimit.DistHeap[0] = dist64
+					heap.Fix(&orderByLimit.DistHeap, 0)
+					searchResults = append(searchResults, vectorindex.SearchResult{
+						Id:       row,
+						Distance: dist64,
+					})
+				}
+			} else {
+				heap.Push(&orderByLimit.DistHeap, dist64)
+				searchResults = append(searchResults, vectorindex.SearchResult{
+					Id:       row,
+					Distance: dist64,
+				})
+			}
 		}
 
 	default:
 ...
 	}
 
-	searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
-		return res.Distance > orderByLimit.DistHeap[0]
-	})
+	if len(orderByLimit.DistHeap) >= int(orderByLimit.Limit) {
+		searchResults = slices.DeleteFunc(searchResults, func(res vectorindex.SearchResult) bool {
+			return res.Distance > orderByLimit.DistHeap[0]
+		})
+	}

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion proposes a valid memory optimization by conditionally appending to searchResults instead of storing all candidates. This reduces peak memory usage, which is a good performance improvement, though the current logic is not incorrect, just inefficient.

Medium
  • Update

@matrix-meow matrix-meow added the size/M Denotes a PR that changes [100,499] lines label Dec 1, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants