Skip to content

Conversation

@eric-wang-1990
Copy link
Owner

Summary

Implements RecyclableMemoryStream for LZ4 decompression to reduce memory pressure and fix stream corruption issues.

Changes

  • Modified Lz4Utilities.DecompressLz4Async to return MemoryStream instead of (buffer, length) tuple
  • Stream is now disposed by the caller (Arrow) after reading, not immediately in the utility method
  • Uses RecyclableMemoryStreamManager for pooled memory allocation
  • Decompression uses CopyAsync with 80KB chunks for efficient streaming
  • Fixed bug where buffer was recycled before Arrow finished reading it

Problem Fixed

Previously, the RecyclableMemoryStream was disposed immediately after decompression, returning its buffer to the pool. This caused "Unexpectedly reached end of stream" errors when Arrow tried to read from the buffer later, as it could have been overwritten by other operations.

Solution

The new API returns the stream itself without disposing it. Arrow reads from the stream and then disposes it when done, ensuring the buffer is only returned to the pool after reading completes.

Benefits

  • ✅ Reduces GC pressure by reusing memory streams from pool
  • ✅ Fixes "Unexpectedly reached end of stream" error
  • ✅ Memory is only returned to pool after Arrow completes reading
  • ✅ More efficient memory usage for high-volume CloudFetch operations
  • ✅ Uses CopyAsync for chunk-by-chunk decompression (80KB chunks)

Testing

  • Code compiles successfully on Mac for all target frameworks (netstandard2.0, net472, net8.0)
  • Needs testing on Windows with benchmark suite
  • Verify CloudFetch E2E benchmark completes without stream errors

🤖 Generated with Claude Code

…uce memory pressure

Changes:
- Modified Lz4Utilities.DecompressLz4Async to return MemoryStream instead of (buffer, length) tuple
- Stream is now disposed by the caller (Arrow) after reading, not immediately
- Uses RecyclableMemoryStream for pooled memory allocation
- Decompression uses CopyAsync with 80KB chunks for efficient streaming
- Fixed bug where buffer was recycled before Arrow finished reading

Benefits:
- Reduces GC pressure by reusing memory streams from pool
- Fixes "Unexpectedly reached end of stream" error
- Memory is only returned to pool after Arrow completes reading
- More efficient memory usage for high-volume CloudFetch operations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <[email protected]>
@eric-wang-1990
Copy link
Owner Author

Closing - will create PR against apache/arrow-adbc instead

eric-wang-1990 pushed a commit that referenced this pull request Jan 15, 2026
…che#3870)

## Fix Critical Deadlocks and Race Conditions in Snowflake Record Reader

This PR addresses multiple critical concurrency issues in the Snowflake
driver's `recordReader` that could cause complete application hangs
under normal racing conditions.

### Issues Fixed

*1. Critical Deadlock: `Release()` Blocking Forever*

*Problem*: When `Release()` was called while producer goroutines were
blocked on channel sends, a permanent deadlock occurred:

* `Release()` cancels context and attempts to drain channels
* Producer goroutines blocked on `ch <- rec` cannot see the cancellation
* Channels never close because producers never exit
* `Release()` blocks forever on `for rec := range ch`

*Fix:* Added a `done` channel that signals when all producer goroutines
have completed. `Release()` now waits for this signal before attempting
to drain channels.

*2. Severe Deadlock: Non-Context-Aware Channel Sends*

*Problem:* Channel send operations at lines 694 and 732 checked context
before the send but not during:

```go
for rr.Next() && ctx.Err() == nil {  // Context checked here
    // ... 
    ch <- rec  // But send blocks here without checking context
}
```

*Fix:* Wrapped all channel sends in `select` statements with context
awareness:

```go
select {
case chs[0] <- rec:
    // Successfully sent
case <-ctx.Done():
    rec.Release()
    return ctx.Err()
}
```

*3. Critical Race Condition: Nil Channel Reads*

*Problem:* Channels were created asynchronously in goroutines after
`newRecordReader` returned. If `Next()` was called quickly after
creation, it could read from uninitialized (nil) channels, causing
infinite blocking.

*Fix:* Initialize all channels upfront before starting any goroutines:

```go
chs := make([]chan arrow.RecordBatch, len(batches))
for i := range chs {
    chs[i] = make(chan arrow.RecordBatch, bufferSize)
}
```

*4. Goroutine Leaks on Initialization Errors*

*Problem:* Error paths only cleaned up the first channel, potentially
leaking goroutines if initialization failed after starting concurrent
operations.

*Fix:* Moved all error-prone initialization (GetStream, NewReader)
before goroutine creation, and added proper cleanup on errors.

----------------------

#### Changes

* Added `done` channel to `reader` struct to signal goroutine completion
* Initialize all channels upfront to eliminate race conditions
* Use context-aware sends with `select` statements for all channel
operations
* Update `Release()` to wait on `done` channel before draining
* Reorganize initialization to handle errors before starting goroutines
* Signal completion by closing `done` channel after all producers finish

#### Reproduction Scenarios Prevented

*Deadlock #1:*

1. bufferSize = 1, producer generates 2 records quickly
2. Channel becomes full after first record
3. Producer blocks on send
4. Consumer calls Release() before Next()
5. Without fix: permanent deadlock
6. With fix: producer responds to cancellation, Release() completes

*Race Condition:*

1. Query returns 3 batches
2. First batch processes quickly
3. Next() advances to second channel
4. Without fix: reads from nil channel, blocks forever
5. With fix: channel already initialized, works correctly

See apache#3730
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants