Skip to content

Conversation

@fengttt
Copy link
Contributor

@fengttt fengttt commented Dec 8, 2025

User description

What type of PR is this?

  • API-change
  • BUG
  • Improvement
  • Documentation
  • Feature
  • Test and CI
  • Code Refactoring

Which issue(s) this PR fixes:

issue #3433

What this PR does / why we need it:

Give it another try.


PR Type

Bug fix, Enhancement, Tests


Description

  • Refactor memory management: Removed AggMemoryManager interface abstraction and migrated all aggregation functions to use *mpool.MPool directly for simpler, more efficient memory management

  • Implement group operator spilling: Added new Group and MergeGroup operators with spill-to-disk support in pkg/sql/colexec/group/ for memory-constrained execution of aggregations

  • Add streaming serialization: Implemented SaveIntermediateResult(), SaveIntermediateResultOfChunk(), and UnmarshalFromReader() methods across all aggregation functions to support intermediate result persistence and spilling

  • Migrate hash tables to mpool: Updated int64_hash_map and string_hash_map to use *mpool.MPool instead of malloc.Allocator for consistent memory tracking

  • Refactor batch serialization: Replaced Aggs field with ExtraBuf1 and ExtraBuf2 for flexible extra data storage, and added UnmarshalFromReader() for streaming deserialization

  • Fix mpool allocation bugs: Corrected NoLock flag handling, refactored detail tracking with stack-based keys, increased growth threshold from 256 to 4096, and added FreeSlice() helper

  • Add file system operations: Implemented raw file operation methods (EnsureDir(), OpenFile(), CreateFile(), RemoveFile()) in ETL and subpath filesystems for spill support

  • Support distinct aggregations in spilling: Enhanced result handling with distinct value tracking and serialization via distinctFill() and distinctMerge() methods

  • Add streaming I/O utilities: Implemented reader/writer utility functions in pkg/container/types/encoding.go for type serialization/deserialization

  • Simplify allocator API: Removed NoHints and IgnoreMunmapError parameters from deallocate calls across malloc and morpc packages

  • Support MaxDop query parameter: Added logic to respect MaxDop in query execution planning

  • Update window operator: Refactored to use colexec.ExprEvalVector and separate batAggs field for aggregation handling


Diagram Walkthrough

flowchart LR
  A["Aggregation Functions"] -->|"Remove AggMemoryManager"| B["Direct mpool Usage"]
  B -->|"Add Serialization"| C["Intermediate Results"]
  C -->|"Support Spilling"| D["Group Operator"]
  D -->|"Merge Results"| E["MergeGroup Operator"]
  F["Hash Tables"] -->|"Migrate to mpool"| B
  G["Batch"] -->|"Use ExtraBuf"| C
  H["File System"] -->|"Add Operations"| D
Loading

File Walkthrough

Relevant files
Refactoring
4 files
jsonagg_test.go
Refactor memory manager usage to use mpool directly           

pkg/sql/colexec/aggexec/jsonagg_test.go

  • Replaced hackAggMemoryManager() calls with mpool.MustNewZero() to use
    memory pool directly
  • Updated all method calls from mg.Mp() to mg since memory pool is now
    passed directly
  • Changed unmarshal() calls to pass mg as first parameter instead of nil
  • Updated marshalToBytes() call to handle additional return value
+85/-85 
aggFrame_test.go
Remove AggMemoryManager abstraction, use mpool directly   

pkg/sql/colexec/aggexec/aggFrame_test.go

  • Removed hackManager wrapper struct and hackAggMemoryManager() function
  • Added MarshalBinary() method to avgDemoCtx struct for binary
    marshaling support
  • Changed function signatures to accept *mpool.MPool directly instead of
    AggMemoryManager interface
  • Updated all test cases to use mpool.MustNewZeroNoFixed() and pass
    memory pool directly
  • Added free() calls to distinctHash objects in tests
+99/-103
types.go
Remove AggMemoryManager interface, add serialization support

pkg/sql/colexec/aggexec/types.go

  • Removed AggMemoryManager interface and SimpleAggMemoryManager struct
  • Changed all function signatures to accept *mpool.MPool directly
    instead of AggMemoryManager
  • Added serialization methods to AggFuncExec interface:
    SaveIntermediateResult(), SaveIntermediateResultOfChunk(),
    UnmarshalFromReader()
  • Added helper functions for marshaling/unmarshaling aggregation results
    with binary encoding
  • Added MarshalToBuffer() and UnmarshalFromReader() methods to
    AggFuncExecExpression
+258/-49
managed_allocator_test.go
Update deallocate calls to remove NoHints parameter           

pkg/common/malloc/managed_allocator_test.go

  • Updated Deallocate() calls to remove NoHints parameter (now uses
    default)
  • Applied consistently across all test cases and benchmarks
+4/-4     
Enhancement
34 files
helper.go
Add group operator spill helper functions                               

pkg/sql/colexec/group/helper.go

  • New file containing helper functions for group operator spilling and
    loading
  • Implements ResHashRelated struct for hash table management with spill
    support
  • Provides functions for computing bucket indices, spilling data to
    disk, and loading spilled data
  • Includes helper functions for memory management and aggregation list
    creation
+585/-0 
exec2.go
Add group operator execution with spill support                   

pkg/sql/colexec/group/exec2.go

  • New file implementing group operator execution with spill support
  • Implements Prepare() and Call() methods for group operator state
    machine
  • Provides batch building, hash table construction, and result output
    functions
  • Supports both final and intermediate result output modes
+532/-0 
result.go
Refactor aggregation result handling with distinct support

pkg/sql/colexec/aggexec/result.go

  • Replaced AggMemoryManager interface with direct *mpool.MPool parameter
    throughout initialization functions
  • Added hasDistinct parameter to result initialization and tracking
    distinct aggregations
  • Implemented new marshaling/unmarshaling methods supporting distinct
    data serialization via marshalToBytes(), unmarshalFromBytes(), and
    reader-based I/O
  • Added distinctFill() and distinctMerge() methods to handle distinct
    value tracking in aggregation results
  • Introduced setupT() method for type-safe vector setup and
    getNthChunkSize() for chunk size queries
+226/-29
string_hash_map.go
Migrate string hash map to mpool allocator                             

pkg/container/hashtable/string_hash_map.go

  • Replaced malloc.Allocator with *mpool.MPool for memory management
  • Removed rawData and rawDataDeallocators fields, using
    mpool.MakeSlice() for allocation
  • Simplified allocate() to work with slice count instead of byte size
  • Updated Free(), ResizeOnDemand(), and Size() methods to use mpool API
  • Added AllGroupHash() method to retrieve hash values for all groups
+42/-50 
int64_hash_map.go
Migrate int64 hash map to mpool allocator                               

pkg/container/hashtable/int64_hash_map.go

  • Replaced malloc.Allocator with *mpool.MPool for memory management
  • Removed rawData and rawDataDeallocators fields, using
    mpool.MakeSlice() for allocation
  • Simplified allocate() to work with slice count instead of byte size
  • Updated Free(), ResizeOnDemand(), and Size() methods to use mpool API
  • Added AllGroupHash() method to retrieve hash values for all groups
+43/-47 
batch.go
Refactor batch serialization to use extra buffers               

pkg/container/batch/batch.go

  • Removed aggregation function serialization from MarshalBinary() and
    UnmarshalBinaryWithAnyMp()
  • Replaced Aggs field handling with ExtraBuf1 and ExtraBuf2 for flexible
    extra data storage
  • Added MarshalBinaryWithBuffer() with optional buffer reset parameter
  • Implemented UnmarshalFromReader() for streaming deserialization from
    io.Reader
  • Updated Clean() and IsEmpty() to work with new buffer fields instead
    of aggregations
+88/-93 
jsonagg.go
Add streaming serialization to JSON aggregation                   

pkg/sql/colexec/aggexec/jsonagg.go

  • Updated marshal() to handle three-part return from marshalToBytes()
    including distinct data
  • Added SaveIntermediateResult() and SaveIntermediateResultOfChunk()
    methods for buffer-based serialization
  • Implemented UnmarshalFromReader() for streaming deserialization with
    distinct support
  • Updated unmarshal() to pass nil for distinct data parameter and use mp
    parameter
  • Changed constructor to accept *mpool.MPool directly and initialize
    distinctHash with memory pool
+115/-12
concat.go
Refactor group concat with distinct in result                       

pkg/sql/colexec/aggexec/concat.go

  • Removed embedded distinctHash field from groupConcatExec struct
  • Updated marshal() to handle three-part return from marshalToBytes()
    with distinct data
  • Added SaveIntermediateResult(), SaveIntermediateResultOfChunk(), and
    UnmarshalFromReader() methods
  • Updated unmarshal() to pass distinct data from groups parameter
  • Modified Fill() to use distinctFill() method from result object
    instead of embedded hash
  • Changed merge() to use distinctMerge() from result object
  • Updated constructor to accept *mpool.MPool and pass hasDistinct to
    result initialization
+56/-35 
fromFixedRetBytes.go
Add streaming serialization to fixed-to-bytes aggregation

pkg/sql/colexec/aggexec/fromFixedRetBytes.go

  • Replaced AggMemoryManager parameter with *mpool.MPool in constructor
    and initialization
  • Added SaveIntermediateResult(), SaveIntermediateResultOfChunk(), and
    UnmarshalFromReader() methods for streaming serialization
  • Updated marshal() to validate distinct data is nil
  • Modified unmarshal() to pass nil for distinct data parameter
  • Updated init() to create distinctHash with memory pool when distinct
    is enabled
  • Changed result initialization to pass hasDistinct parameter
+42/-12 
count.go
Refactor aggregation serialization and memory management 

pkg/sql/colexec/aggexec/count.go

  • Added imports for bytes, io, and moerr packages
  • Modified marshal() to handle distinct hash data returned from
    marshalToBytes()
  • Added three new methods: SaveIntermediateResult(),
    SaveIntermediateResultOfChunk(), and UnmarshalFromReader() for
    intermediate result serialization
  • Changed function signatures to accept *mpool.MPool instead of
    AggMemoryManager
  • Updated unmarshal() to pass groups parameter to unmarshalFromBytes()
  • Modified initialization to set distinctHash.mp field
+62/-28 
approx_count.go
Add intermediate result serialization for approx count     

pkg/sql/colexec/aggexec/approx_count.go

  • Added imports for bytes, io, and moerr packages
  • Added three new serialization methods: SaveIntermediateResult(),
    SaveIntermediateResultOfChunk(), and UnmarshalFromReader()
  • Modified marshal() to validate that distinct data is nil
  • Changed function signatures to use *mpool.MPool instead of
    AggMemoryManager
  • Updated unmarshal() calls to pass nil for distinct parameter
+62/-8   
median.go
Implement serialization for median aggregation function   

pkg/sql/colexec/aggexec/median.go

  • Added imports for bytes and io packages
  • Added three new serialization methods with custom group handling
  • Modified marshal() to validate distinct is nil
  • Changed function signatures to accept *mpool.MPool instead of
    AggMemoryManager
  • Updated initialization to set distinctHash.mp and pass distinct flag
    to result initialization
+59/-8   
fromFixedRetFixed.go
Add serialization support for fixed-to-fixed aggregations

pkg/sql/colexec/aggexec/fromFixedRetFixed.go

  • Added imports for bytes, io, and moerr packages
  • Added three new serialization methods with group context encoding
  • Modified marshal() to validate distinct is nil
  • Changed function signatures to use *mpool.MPool instead of
    AggMemoryManager
  • Updated newDistinctHash() call to pass memory pool parameter
+40/-8   
encoding.go
Add streaming I/O utility functions for types                       

pkg/container/types/encoding.go

  • Added new I/O utility functions for reading/writing various types
    from/to readers and writers
  • Implemented WriteSizeBytes(), ReadInt64(), ReadUint64(), WriteInt64(),
    WriteUint64(), ReadBool(), ReadInt32(), WriteInt32(),
    ReadInt32AsInt(), ReadByte(), ReadByteAsInt(), ReadType(),
    ReadSizeBytes(), ReadSizeBytesMp()
  • These functions support streaming serialization/deserialization with
    memory pool integration
+129/-0 
window.go
Refactor window operator aggregation handling                       

pkg/sql/colexec/window/window.go

  • Changed import from group.ExprEvalVector to colexec.ExprEvalVector
  • Renamed ctr.bat.Aggs to ctr.batAggs field
  • Updated MakeAgg() call to use proc.Mp() instead of full process object
  • Removed unused import of mergegroup package
+12/-13 
window.go
Implement serialization for window functions                         

pkg/sql/colexec/aggexec/window.go

  • Added imports for bytes and io packages
  • Created i64Slice type with MarshalBinary() method
  • Added three new serialization methods for window functions
  • Changed function signature to use *mpool.MPool instead of
    AggMemoryManager
  • Updated unmarshal() to pass nil for distinct parameter
+57/-7   
fromBytesRetFixed.go
Add serialization for bytes-to-fixed aggregations               

pkg/sql/colexec/aggexec/fromBytesRetFixed.go

  • Added imports for bytes, io, and moerr packages
  • Added three new serialization methods with group context encoding
  • Modified marshal() to validate distinct is nil
  • Changed function signatures to use *mpool.MPool instead of
    AggMemoryManager
  • Updated newDistinctHash() call to pass memory pool parameter
+38/-6   
fromBytesRetBytes.go
Add serialization for bytes-to-bytes aggregations               

pkg/sql/colexec/aggexec/fromBytesRetBytes.go

  • Added imports for bytes, io, and moerr packages
  • Added three new serialization methods with group context encoding
  • Modified marshal() to validate distinct is nil
  • Changed function signature to use *mpool.MPool instead of
    AggMemoryManager
  • Updated newDistinctHash() call to pass memory pool parameter
+42/-6   
remoterun.go
Migrate MergeGroup to group package                                           

pkg/sql/compile/remoterun.go

  • Removed import of mergegroup package
  • Removed PreAllocSize field from Group pipeline instruction
  • Changed MergeGroup type from mergegroup.MergeGroup to group.MergeGroup
  • Updated DecodeMergeGroup() function signature to use group.MergeGroup
  • Added SpillMem field assignment for MergeGroup
+12/-9   
strhashmap.go
Update string hashmap to use memory pool                                 

pkg/common/hashmap/strhashmap.go

  • Changed NewStrHashMap() to accept *mpool.MPool parameter instead of
    using nil
  • Updated UnmarshalBinary() and UnmarshalFrom() to use *mpool.MPool
    instead of malloc.Allocator
  • Added AllGroupHash() method to return all group hash codes
+11/-7   
distinct.go
Refactor distinct hash to use memory pool                               

pkg/sql/colexec/aggexec/distinct.go

  • Added mp field to distinctHash struct to store memory pool reference
  • Updated newDistinctHash() to accept and store *mpool.MPool parameter
  • Modified grows() to pass memory pool to NewStrHashMap()
  • Added marshalToBuffers() method for selective marshaling based on
    flags
  • Updated unmarshal() and added unmarshalFromReader() for reader-based
    deserialization
  • Changed allocator parameter from hashtable.DefaultAllocator() to
    memory pool
+34/-5   
inthashmap.go
Update integer hashmap to use memory pool                               

pkg/common/hashmap/inthashmap.go

  • Changed NewIntHashMap() to accept *mpool.MPool parameter instead of
    using nil
  • Updated UnmarshalBinary() and UnmarshalFrom() to use *mpool.MPool
    instead of malloc.Allocator
  • Added AllGroupHash() method to return all group hash codes
+11/-7   
sub_path.go
Add raw file operation methods to subpath filesystem         

pkg/fileservice/sub_path.go

  • Added import for os package
  • Implemented EnsureDir() method to create directories
  • Implemented OpenFile(), CreateFile(), RemoveFile(), and
    CreateAndRemoveFile() methods for raw file operations
+69/-0   
compile.go
Support MaxDop query parameter and migrate MergeGroup       

pkg/sql/compile/compile.go

  • Removed import of mergegroup package
  • Added logic to respect MaxDop query parameter when calculating DOP
  • Changed MergeGroup type reference from mergegroup.MergeGroup to
    group.MergeGroup
  • Updated constructMergeGroup() call to pass plan node parameter
+10/-6   
operator.go
Remove PreAllocSize and migrate MergeGroup type                   

pkg/sql/compile/operator.go

  • Removed import of mergegroup package
  • Removed PreAllocSize field from Group operator duplication
  • Removed pre-allocation size calculation logic from constructGroup()
  • Changed MergeGroup type from mergegroup.MergeGroup to group.MergeGroup
  • Updated constructMergeGroup() to accept plan node and set SpillMem
+9/-14   
local_etl_fs.go
Add raw file operation methods to ETL filesystem                 

pkg/fileservice/local_etl_fs.go

  • Implemented EnsureDir() method for directory creation
  • Implemented OpenFile() for opening files in read-write mode
  • Implemented CreateFile() for creating or truncating files
  • Implemented RemoveFile() for file deletion
  • Implemented CreateAndRemoveFile() for temporary file operations
+71/-0   
vector.go
Add streaming deserialization for vectors                               

pkg/container/vector/vector.go

  • Added import for io package
  • Implemented UnmarshalWithReader() method for streaming deserialization
    from reader
  • Supports reading vector class, type, length, data, area, and null
    space from reader
+56/-0   
sendfunc.go
Update dispatch send functions for batch marshaling           

pkg/sql/colexec/dispatch/sendfunc.go

  • Updated all MarshalBinaryWithBuffer() calls to pass true parameter
  • Removed unnecessary blank lines
+4/-6     
var_pop.go
Add MarshalBinary methods to var_pop contexts                       

pkg/sql/plan/function/agg/var_pop.go

  • Reordered imports to place math before other imports
  • Added MarshalBinary() methods to three context types for binary
    marshaling support
+5/-1     
buffer.go
Update spool buffer to use extra buffers instead of aggs 

pkg/container/pSpool/buffer.go

  • Reordered imports to place sync before other imports
  • Replaced bat.Aggs cleanup with ExtraBuf1 and ExtraBuf2 cleanup
+4/-7     
types.go
Refactor window container aggregation handling                     

pkg/sql/colexec/window/types.go

  • Changed import from group.ExprEvalVector to colexec.ExprEvalVector
  • Added batAggs field to container struct
  • Updated freeAggFun() to use batAggs instead of bat.Aggs
+7/-6     
copy.go
Update spool copy to use extra buffers                                     

pkg/container/pSpool/copy.go

  • Reordered imports to place math before other imports
  • Replaced bat.Aggs with ExtraBuf1 and ExtraBuf2 in batch copying
+6/-3     
var_sample.go
Add MarshalBinary methods to var_sample contexts                 

pkg/sql/plan/function/agg/var_sample.go

  • Added MarshalBinary() methods to three context types for binary
    marshaling support
+3/-0     
sample.go
Update sample operator for memory pool integration             

pkg/sql/colexec/sample/sample.go

  • Updated hashAndSample() method signature to accept *process.Process
    parameter
  • Modified NewIntHashMap() and NewStrHashMap() calls to pass proc.Mp()
    parameter
+4/-4     
Bug fix
5 files
mpool.go
Refactor mpool detail tracking and fix allocation bugs     

pkg/common/mpool/mpool.go

  • Refactored recordAlloc() and recordFree() to accept detail key string
    parameter instead of computing it internally
  • Added getDetailK() method to compute stack-based detail key for memory
    tracking
  • Fixed bug: changed mp.noLock = (flag & NoFixed) to mp.noLock = (flag &
    NoLock)
  • Refactored Alloc(), Free(), Grow() methods to use detail key parameter
    variants
  • Added allocWithDetailK(), freeWithDetailK(), freePtr(),
    growWithDetailK(), reAllocWithDetailK() internal methods
  • Increased growth threshold from 256 to 4096 for better performance
  • Added FreeSlice() helper function for freeing slices allocated from
    mpool
  • Removed IgnoreMunmapError flag from deallocate call
+94/-57 
checked_allocator.go
Simplify allocator deallocation call                                         

pkg/common/malloc/checked_allocator.go

  • Removed DoNotReuse hint flag from deallocator call
  • Simplified Deallocate() call to use no parameters
+1/-2     
codec.go
Remove NoHints parameter from deallocate calls                     

pkg/common/morpc/codec.go

  • Removed malloc.NoHints parameter from all Deallocate() calls
  • Updated 7 instances where deallocators are called to use parameterless
    method
+7/-7     
checked_allocator_test.go
Remove NoHints parameter from deallocate calls                     

pkg/common/malloc/checked_allocator_test.go

  • Removed NoHints parameter from all Deallocate() calls (3 instances)
+4/-4     
managed_allocator.go
Remove hints parameter from deallocate method                       

pkg/common/malloc/managed_allocator.go

  • Removed hints parameter from Deallocate() method signature
  • Updated internal deallocate() call to not pass hints parameter
+4/-4     
Tests
9 files
timewin_test.go
Simplify test memory management by removing wrapper           

pkg/sql/colexec/timewin/timewin_test.go

  • Removed testAggMemoryManager wrapper struct and
    newTestAggMemoryManager() factory function
  • Updated all test functions to use mpool.MustNewZeroNoFixed() directly
    instead of memory manager wrapper
  • Simplified memory pool access by calling methods directly on
    *mpool.MPool instead of through interface
+39/-50 
hash_test.go
Update hash table tests for mpool integration                       

pkg/container/hashtable/hash_test.go

  • Updated hash map initialization to use *mpool.MPool instead of
    DefaultAllocator()
  • Changed Init() and UnmarshalBinary() calls to pass memory pool
    directly
  • Removed dependency on DefaultAllocator() function in test cases
+28/-18 
result_test.go
Update aggregation result tests for new API                           

pkg/sql/colexec/aggexec/result_test.go

  • Replaced SimpleAggMemoryManager wrapper with direct *mpool.MPool usage
  • Updated init() calls to pass hasDistinct parameter (set to false for
    tests)
  • Updated marshalToBytes() and unmarshalFromBytes() calls to handle
    three-part return with distinct data
  • Simplified memory pool access throughout test functions
+39/-31 
inthashmap_test.go
Update int hash map tests for mpool parameter                       

pkg/common/hashmap/inthashmap_test.go

  • Updated NewIntHashMap() calls to pass *mpool.MPool parameter
  • Changed UnmarshalBinary() calls to use memory pool instead of
    hashtable.DefaultAllocator()
  • Removed import of hashtable package
+12/-13 
strhashmap_test.go
Update string hash map tests for mpool parameter                 

pkg/common/hashmap/strhashmap_test.go

  • Updated NewStrHashMap() calls to pass *mpool.MPool parameter
  • Changed UnmarshalBinary() calls to use memory pool instead of
    hashtable.DefaultAllocator()
  • Removed import of hashtable package
+10/-11 
median_test.go
Simplify median test to use memory pool directly                 

pkg/sql/colexec/aggexec/median_test.go

  • Replaced hackAggMemoryManager() with direct mpool.MustNewZeroNoFixed()
    calls
  • Updated all memory pool references to use mp variable directly
  • Simplified test setup by removing memory manager wrapper
+14/-13 
batch_test.go
Update batch tests for extra buffer fields                             

pkg/container/batch/batch_test.go

  • Added assertions for ExtraBuf1 and ExtraBuf2 fields in batch
    marshaling tests
  • Updated MarshalBinaryWithBuffer() call to pass true parameter
  • Added test for UnmarshalFromReader() method
  • Removed aggregation initialization from test batch setup
+16/-3   
mpool_test.go
Update mpool tests for realloc method signature                   

pkg/common/mpool/mpool_test.go

  • Updated reAlloc() calls to use reAllocWithDetailK() with detail key
    parameter
+3/-3     
remoterun_test.go
Update remoterun tests for MergeGroup migration                   

pkg/sql/compile/remoterun_test.go

  • Removed import of mergegroup package
  • Changed MergeGroup type from mergegroup.MergeGroup to group.MergeGroup
+1/-18   
Feature
2 files
types2.go
Add new group operator with spill support                               

pkg/sql/colexec/group/types2.go

  • New file implementing Group and MergeGroup operators for aggregation
    with spill-to-disk support
  • Defines container structure managing hash tables, aggregation lists,
    and spill buckets for memory-constrained execution
  • Implements group-by evaluation, aggregation, and distinct handling
    with configurable spill memory thresholds
  • Provides operator lifecycle methods: Prepare(), Call(), Free(),
    Reset() for pipeline integration
+415/-0 
mergeGroup.go
Add merge group operator for result aggregation                   

pkg/sql/colexec/group/mergeGroup.go

  • New file implementing MergeGroup operator for merging partial
    aggregation results
  • Handles deserialization of aggregation expressions and partial results
    from lower operators
  • Implements hash-based grouping and aggregation merging with spill
    support
  • Provides Prepare() and Call() methods for pipeline execution
+239/-0 
Additional files
70 files
go.mod +0/-1     
go.sum +0/-2     
allocator.go +1/-1     
allocator_bench_test.go +2/-2     
allocator_test.go +1/-1     
chain_deallocator.go +2/-2     
closure_deallocator.go +2/-2     
fixed_size_make_allocator.go +1/-1     
func_deallocator.go +3/-5     
fuzz_test.go +1/-1     
hints.go +0/-1     
inuse_tracking_allocator.go +1/-1     
leaks_tracking_allocator_test.go +1/-1     
read_only_allocator_test.go +1/-1     
size_bounded_allocator_test.go +1/-1     
alloc.go +0/-47   
malloc.go +5/-13   
spool_test.go +1/-1     
types.go +3/-4     
malloc_test.go +0/-51   
encoding_test.go +1/-1     
packer.go +2/-2     
bytes.go +2/-2     
io_entry.go +2/-2     
local_fs.go +5/-1     
mutable_file_service.go +11/-1   
branch_hashmap.go +1/-1     
branch_hashmap_test.go +1/-1     
types.go +1/-1     
variables.go +8/-0     
plan.pb.go +827/-783
aggContext.go +30/-0   
aggMethod.go +1/-0     
serialize.go +4/-9     
evalExpression.go +40/-0   
exec.go +0/-503 
exec_test.go +0/-600 
execctx.go +0/-419 
execctx_test.go +0/-96   
types.go +0/-240 
hashmap_util.go +2/-2     
hashmap_util_test.go +6/-4     
intersect.go +1/-1     
intersectall.go +1/-1     
exec.go +0/-221 
exec_test.go +0/-219 
types.go +0/-101 
minus.go +1/-1     
s3writer_delegate.go +1/-1     
join.go +2/-2     
timewin.go +3/-2     
top.go +4/-1     
scope.go +2/-3     
explain_node.go +7/-0     
avg.go +5/-3     
avg_tw_cache.go +3/-0     
avg_tw_result.go +2/-1     
bitmap.go +1/-0     
sum.go +3/-2     
query_builder.go +9/-0     
shuffle.go +10/-0   
stats.go +1/-1     
index_util.go +3/-1     
operator_analyzer.go +9/-0     
process.go +12/-1   
process2.go +8/-1     
types.go +9/-1     
plan.proto +3/-0     
group.result +96/-0   
group.sql +66/-0   

Unscrew.  Expect tons of bugs ...
Now we have something that compiles, but there will be
tons of bugs ...
yeah, I know ...
Get rid of totally unnecessary agg mem manager, let mpool do its job.
This has to go ...
Speechless.  WTF are we doing -- a damn alloc goes through
about 6 or 7 level of abstractions.

MPool.Alloc -> ManagedAllocator -> MetricsAllocator
->  ShardedAllolcator 0> ClassAllocator -> fixedSizeMmapAlloc
@mergify
Copy link
Contributor

mergify bot commented Dec 8, 2025

⚠️ The sha of the head commit of this PR conflicts with #22686. Mergify cannot evaluate rules on this PR. ⚠️

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 8, 2025

PR Compliance Guide 🔍

Below is a summary of compliance checks for this PR:

Security Compliance
Unsafe memory handling

Description: Off-heap allocations via malloc allocator are returned to Go as byte slices constructed
around raw memory without explicit bounds checks, and manual header/guard management in
alloc/free paths increases risk of memory corruption or double-free if guard logic fails
or pointers are misused.
mpool.go [557-699]

Referred Code
var CapLimit = math.MaxInt32 // 2GB - 1

func (mp *MPool) Alloc(sz int, offHeap bool) ([]byte, error) {
	detailk := mp.getDetailK()
	return mp.allocWithDetailK(detailk, sz, offHeap)
}

func (mp *MPool) allocWithDetailK(detailk string, sz int, offHeap bool) ([]byte, error) {
	// reject unexpected alloc size.
	if sz < 0 || sz > CapLimit {
		logutil.Errorf("mpool memory allocation exceed limit with requested size %d: %s", sz, string(debug.Stack()))
		return nil, moerr.NewInternalErrorNoCtxf("mpool memory allocation exceed limit with requested size %d", sz)
	}

	if sz == 0 {
		return nil, nil
	}

	if atomic.LoadInt32(&mp.available) == Unavailable {
		return nil, moerr.NewInternalErrorNoCtxf("mpool %s unavailable for alloc", mp.tag)
	}


 ... (clipped 122 lines)
Information exposure

Description: Detail tracking key includes function and name via stack inspection which may leak
function identifiers or stack traces into logs/metrics; if exposed, this can reveal
sensitive internal topology; verify exposure and sanitize before external reporting.
mpool.go [368-379]

Referred Code
	mp.details = nil
}

func (mp *MPool) getDetailK() string {
	if mp == nil || mp.details == nil {
		return ""
	}
	f := stack.Caller(2)
	k := fmt.Sprintf("%v:%n", f, f)
	return k
}
Ticket Compliance
🟡
🎫 #3433
🟢 Implement necessary serialization/deserialization to support spilling intermediate results
Integrate memory tracking with the project's memory pool (mpool) for accurate accounting
Hash join and hash aggregation must track memory usage during execution
When memory consumption exceeds a configured budget, execution must spill to disk
Codebase Duplication Compliance
Codebase context is not defined

Follow the guide to enable codebase context checks.

Custom Compliance
🟢
Generic: Meaningful Naming and Self-Documenting Code

Objective: Ensure all identifiers clearly express their purpose and intent, making code
self-documenting

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Error Handling

Objective: To prevent the leakage of sensitive system information through error messages while
providing sufficient detail for internal debugging.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Secure Logging Practices

Objective: To ensure logs are useful for debugging and auditing without exposing sensitive
information like PII, PHI, or cardholder data.

Status: Passed

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Comprehensive Audit Trails

Objective: To create a detailed and reliable record of critical system actions for security analysis
and compliance.

Status:
No audit logs: New memory allocation/free pathways (e.g., allocWithDetailK, freeWithDetailK, FreeSlice)
introduce critical resource actions without any auditing or logging to reconstruct events.

Referred Code
var CapLimit = math.MaxInt32 // 2GB - 1

func (mp *MPool) Alloc(sz int, offHeap bool) ([]byte, error) {
	detailk := mp.getDetailK()
	return mp.allocWithDetailK(detailk, sz, offHeap)
}

func (mp *MPool) allocWithDetailK(detailk string, sz int, offHeap bool) ([]byte, error) {
	// reject unexpected alloc size.
	if sz < 0 || sz > CapLimit {
		logutil.Errorf("mpool memory allocation exceed limit with requested size %d: %s", sz, string(debug.Stack()))
		return nil, moerr.NewInternalErrorNoCtxf("mpool memory allocation exceed limit with requested size %d", sz)
	}

	if sz == 0 {
		return nil, nil
	}

	if atomic.LoadInt32(&mp.available) == Unavailable {
		return nil, moerr.NewInternalErrorNoCtxf("mpool %s unavailable for alloc", mp.tag)
	}


 ... (clipped 268 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Robust Error Handling and Edge Case Management

Objective: Ensure comprehensive error handling that provides meaningful context and graceful
degradation

Status:
Nil manager param: Several unmarshal calls pass nil memory manager while others pass a non-nil mpool, which
may indicate inconsistent edge case handling for required dependencies.

Referred Code
	exec2 := newJsonArrayAggExec(mg, info)
	require.NoError(t, exec2.unmarshal(nil, encoded.Result, encoded.Empties, encoded.Groups))
	exec2.Free()
	vec.Free(mg)
	exec1.Free()
}

func TestJsonObjectAggUnmarshalWithEmptyGroups(t *testing.T) {
	mg := mpool.MustNewZero()
	info := multiAggInfo{
		aggID:     44,
		distinct:  false,



 ... (clipped 9 lines)

Learn more about managing compliance generic rules or creating your own custom rules

Generic: Security-First Input Validation and Data Handling

Objective: Ensure all data inputs are validated, sanitized, and handled securely to prevent
vulnerabilities

Status:
Binary handling: New tests assert errors for binary JSON inputs but core changes add streaming and file
ops; without seeing all added files, validation and authorization around spill-to-disk
paths cannot be confirmed.

Referred Code
	exec := newJsonArrayAggExec(mg, info)
	require.NoError(t, exec.GroupGrow(1))

	vec := fromValueListToVector(mg, types.T_binary.ToType(), []string{"abc"}, nil)
	err := exec.Fill(0, 0, []*vector.Vector{vec})
	require.Error(t, err)
	require.Contains(t, err.Error(), "binary data not supported")

	vec.Free(mg)
	exec.Free()
}

func TestJsonObjectAggKeyMustBeString(t *testing.T) {
	mg := mpool.MustNewZero()
	info := multiAggInfo{

Learn more about managing compliance generic rules or creating your own custom rules

  • Update
Compliance status legend 🟢 - Fully Compliant
🟡 - Partial Compliant
🔴 - Not Compliant
⚪ - Requires Further Human Verification
🏷️ - Compliance label

@qodo-code-review
Copy link

qodo-code-review bot commented Dec 8, 2025

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
Possible issue
Prevent potential index out of bounds

Correct the loop boundary for marshaling distinct data to iterate over
distinctData instead of r.distinct to prevent a potential index-out-of-bounds
panic.

pkg/sql/colexec/aggexec/result.go [274-302]

 func (r *optSplitResult) marshalToBytes() ([][]byte, [][]byte, [][]byte, error) {
 	var err error
 
 	// WTF?   min(r.nowIdx1+1, len...)
 	resultData := make([][]byte, min(r.nowIdx1+1, len(r.resultList)))
 	emptyData := make([][]byte, min(r.nowIdx1+1, len(r.emptyList)))
 
 	for i := range resultData {
 ...
 	}
 	for i := range emptyData {
 ...
 	}
 
 	if len(r.distinct) > 0 {
 		distinctData := make([][]byte, min(r.nowIdx1+1, len(r.distinct)))
-		for i := range r.distinct {
+		for i := range distinctData {
 			if distinctData[i], err = r.distinct[i].marshal(); err != nil {
 				return nil, nil, nil, err
 			}
 		}
 		return resultData, emptyData, distinctData, nil
 	}
 	return resultData, emptyData, nil, nil
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential index-out-of-bounds panic when len(r.distinct) is greater than min(r.nowIdx1+1, len(r.distinct)), and provides a valid fix.

Medium
Prevent panic on distinct data merge

Add a check in distinctMerge to verify that the other optSplitResult has
distinct data before attempting a merge, preventing a potential panic.

pkg/sql/colexec/aggexec/result.go [804-811]

 func (r *optSplitResult) distinctMerge(x1 int, other *optSplitResult, x2 int) error {
 	if !r.optInformation.hasDistinct {
 		// this is fine.
+		return nil
+	}
+	if !other.optInformation.hasDistinct {
+		// other has no distinct, this is also fine.
 		return nil
 	}
 	// thank god, the following will bomb out if they collide.
 	return r.distinct[x1].merge(&other.distinct[x2])
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 8

__

Why: The suggestion correctly identifies a potential panic when merging distinct data if the other result does not have distinct data, and the proposed check prevents this crash.

Medium
Pass memory pool to unmarshal

Pass the memory pool mg to the unmarshal function instead of nil to align with
the refactoring and prevent potential panics.

pkg/sql/colexec/aggexec/jsonagg_test.go [277]

-err = exec.unmarshal(nil, result, empties, nil)
+err = exec.unmarshal(mg, result, empties, nil)

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies an inconsistent call to unmarshal with nil for the memory pool, while other calls were updated in the PR to pass the mg memory pool, making the code more robust and consistent.

Low
High-level
Refactor batch serialization for aggregations

The generic ExtraBuf1 and ExtraBuf2 fields in batch.Batch should be replaced
with a more structured, typed mechanism for carrying intermediate aggregation
data. This would improve type safety and maintainability.

Examples:

pkg/container/batch/batch.go [153-154]
	types.WriteSizeBytes(bat.ExtraBuf1, w)
	types.WriteSizeBytes(bat.ExtraBuf2, w)
pkg/sql/colexec/group/exec2.go [471-494]

Solution Walkthrough:

Before:

type Batch struct {
    // ... other fields
    Aggs []aggexec.AggFuncExec
}

func (bat *Batch) MarshalBinary() ([]byte, error) {
    // ...
    // Serialize Aggs
    aggInfos := make([][]byte, len(bat.Aggs))
    for i, exec := range bat.Aggs {
        data, err := aggexec.MarshalAggFuncExec(exec)
        // ...
        aggInfos[i] = data
    }
    // ... write aggInfos to buffer
    return w.Bytes(), nil
}

After:

type Batch struct {
    // ... other fields
    ExtraBuf1 []byte
    ExtraBuf2 []byte
}

// In group operator
func (group *Group) getNextIntermediateResult(proc *process.Process) (vm.CallResult, bool, error) {
    // ...
    var buf1, buf2 bytes.Buffer
    // Manually serialize metadata to buf1
    buf1.Write(types.EncodeInt32(&group.ctr.mtyp))
    // ...
    batch.ExtraBuf1 = buf1.Bytes()

    // Manually serialize aggregation state to buf2
    for _, ag := range group.ctr.aggList {
        ag.SaveIntermediateResultOfChunk(curr, &buf2)
    }
    batch.ExtraBuf2 = buf2.Bytes()
    // ...
}
Suggestion importance[1-10]: 7

__

Why: The suggestion correctly identifies a design trade-off in batch.Batch where type safety is sacrificed for flexibility and package decoupling. While the current implementation is functional and enables spilling, the suggestion raises a valid, significant concern about long-term maintainability and the increased complexity of manual serialization/deserialization in operators.

Medium
General
Avoid memory allocation in loop

Optimize computeBucketIndex by reusing a buffer for types.EncodeUint64 to avoid
repeated memory allocations within the loop.

pkg/sql/colexec/group/helper.go [177-182]

 func (ctr *container) computeBucketIndex(hashCodes []uint64, myLv uint64) {
+	buf := make([]byte, 8)
 	for i := range hashCodes {
 		x := hashCodes[i] + myLv
-		hashCodes[i] = xxhash.Sum64(types.EncodeUint64(&x)) & (spillNumBuckets - 1)
+		types.EncodeUint64To(x, buf)
+		hashCodes[i] = xxhash.Sum64(buf) & (spillNumBuckets - 1)
 	}
 }

[To ensure code accuracy, apply this suggestion manually]

Suggestion importance[1-10]: 7

__

Why: The suggestion provides a valid performance optimization by avoiding memory allocation inside a loop, which reduces garbage collector pressure and improves efficiency.

Medium
Use a wrapper for field presence

Wrap the MaxDop field in google.protobuf.Int64Value to distinguish between an
unset field and a field explicitly set to its zero value.

pkg/pb/plan/plan.pb.go [8135-8136]

 // run time config that can change query behaviors
-MaxDop               int64    `protobuf:"varint,10,opt,name=maxDop,proto3" json:"maxDop,omitempty"`
+MaxDop               *wrappers.Int64Value `protobuf:"bytes,10,opt,name=maxDop,proto3" json:"maxDop,omitempty"`
  • Apply / Chat
Suggestion importance[1-10]: 6

__

Why: The suggestion correctly identifies the ambiguity of using a scalar int64 in proto3, where an unset field is indistinguishable from one set to 0, and proposes the standard wrapper solution to allow for explicit presence checking.

Low
  • Update

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

do-not-merge/wip kind/bug Something isn't working kind/feature Review effort 4/5 size/M Denotes a PR that changes [100,499] lines

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants