fix(transport,pool): resolve concurrency issues and add timeout policy#62
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses concurrency issues in MultiplexedTransport and fixes timeout handling in BatchCall to prevent hangs when context deadlines expire. It also introduces a structured TimeoutError type with timeout source classification.
Key Changes:
- Fixed race conditions in
MultiplexedTransportby adding write serialization, safe cleanup patterns, and proper channel close guards - Made
BatchCallreturn immediately on context cancellation withTimeoutErrorfor incomplete items - Added
TimeoutErrortype withKindclassification andUnwrapsupport forerrors.Iscompatibility
Reviewed changes
Copilot reviewed 8 out of 9 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| pkg/pyproc/transport_multiplexed.go | Added writeMu for frame write serialization, cleanupOnce for safe pending cleanup, and comprehensive timeout handling with effectiveDeadline |
| pkg/pyproc/transport_multiplexed_unit_test.go | New unit tests covering concurrent requests, context cancellation, transport timeouts, and error handling paths |
| pkg/pyproc/transport_multiplexed_test.go | Enhanced concurrent request test with synchronization barriers and retry helper for transport creation |
| pkg/pyproc/timeout.go | New timeout infrastructure with TimeoutError type, effectiveDeadline for timeout priority selection, and helper functions |
| pkg/pyproc/timeout_test.go | Comprehensive tests for timeout error formatting, deadline selection, and context timeout classification |
| pkg/pyproc/pool_generic.go | Updated BatchCall to handle context cancellation and populate incomplete items with TimeoutError |
| pkg/pyproc/pool_generic_test.go | Added test verifying BatchCall returns on timeout with proper TimeoutError classification |
| pkg/pyproc/pool_generic_unit_test.go | New unit tests for typed pool lifecycle, batch operations, and error wrapping |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| tmpDir := filepath.Join("/tmp", "pyproc") | ||
| _ = os.MkdirAll(tmpDir, 0o755) |
There was a problem hiding this comment.
The error from os.MkdirAll is being ignored. If directory creation fails, subsequent socket operations will fail with unclear errors. Consider using t.TempDir() instead for automatic cleanup and error handling, or at minimum check and handle the error.
| tmpDir := filepath.Join("/tmp", "pyproc") | |
| _ = os.MkdirAll(tmpDir, 0o755) | |
| tmpDir := t.TempDir() |
| opts := PoolOptions{ | ||
| Config: PoolConfig{Workers: 1, MaxInFlight: 2}, | ||
| WorkerConfig: WorkerConfig{ | ||
| SocketPath: "/tmp/test-typed-batch-timeout.sock", |
There was a problem hiding this comment.
Hard-coded socket paths in /tmp can cause conflicts in parallel test execution and leave orphaned sockets. Use t.TempDir() combined with filepath.Join to generate unique socket paths, or reuse the newShortSocketPath helper from the unit test file.
| return nil, fmt.Errorf("request timeout after %v", timeout) | ||
| case <-ctx.Done(): | ||
| if ctx.Err() == context.DeadlineExceeded { | ||
| return nil, newTimeoutError(TimeoutKindContext, timeoutDuration(start, deadline), context.DeadlineExceeded) |
There was a problem hiding this comment.
The deadline variable may be uninitialized if effectiveDeadline returned hasDeadline=false. This would pass a zero time to timeoutDuration, returning 0. Either use the context deadline directly from ctx.Deadline(), or ensure the deadline variable is always set when ctx.Err() == context.DeadlineExceeded.
| return nil, newTimeoutError(TimeoutKindContext, timeoutDuration(start, deadline), context.DeadlineExceeded) | |
| // Prefer the context's own deadline when computing the timeout duration | |
| ctxDeadline, ok := ctx.Deadline() | |
| var timeout time.Duration | |
| if ok { | |
| timeout = timeoutDuration(start, ctxDeadline) | |
| } else if hasDeadline { | |
| timeout = timeoutDuration(start, deadline) | |
| } else { | |
| timeout = time.Since(start) | |
| } | |
| return nil, newTimeoutError(TimeoutKindContext, timeout, context.DeadlineExceeded) |
| if ctx.Err() != nil { | ||
| t.Fatalf("Failed to create transport: %v", err) | ||
| } | ||
| _ = sleepWithCtx(ctx, 10*time.Millisecond) |
There was a problem hiding this comment.
The retry loop ignores the error from sleepWithCtx. If the context is cancelled during sleep, the loop continues and checks ctx.Err() on the next iteration, but it would be clearer to check the sleep error directly and return immediately.
| _ = sleepWithCtx(ctx, 10*time.Millisecond) | |
| if err := sleepWithCtx(ctx, 10*time.Millisecond); err != nil { | |
| t.Fatalf("Failed to wait for transport creation: %v", err) | |
| } |
| transport := &MultiplexedTransport{ | ||
| config: TransportConfig{ | ||
| Address: "/tmp/nonexistent-transport.sock", |
There was a problem hiding this comment.
Hard-coded path in /tmp may cause issues on systems where /tmp is mounted with noexec or has restricted permissions. Consider using t.TempDir() to generate a guaranteed-writable temporary directory path for more robust testing across different environments.
| transport := &MultiplexedTransport{ | |
| config: TransportConfig{ | |
| Address: "/tmp/nonexistent-transport.sock", | |
| tmpDir := t.TempDir() | |
| socketPath := filepath.Join(tmpDir, "nonexistent-transport.sock") | |
| transport := &MultiplexedTransport{ | |
| config: TransportConfig{ | |
| Address: socketPath, |
This change addresses Issues #34 and #35 by fixing concurrency bugs in MultiplexedTransport and ensuring BatchCall always returns on context cancellation. ## MultiplexedTransport Concurrency Fixes (#35) - Add writeMu to serialize frame writes and prevent frame corruption - Introduce cleanupOnce for safe pending request cleanup - Fix request ID handling to prefer payload ID over frame header - Prevent double-close of closeCh in handleReadError and Close - Use RLock instead of Lock when iterating pending requests ## BatchCall Timeout Handling (#34) - BatchCall now returns immediately when ctx.Done() fires - Incomplete items receive TimeoutError in their error slots - No more hangs when context deadline expires ## Timeout Policy (timeout-policy spec) - Add TimeoutError type with Kind (Context/PerCall/Transport) - TimeoutError.Unwrap returns underlying cause for errors.Is compat - effectiveDeadline selects min(ctx, perCall, transportDefault) - All timeout errors now use consistent classification ## Test Coverage - Add concurrent request tests with unique payloads - Add BatchCall timeout tests ensuring no hangs - Add comprehensive TimeoutError unit tests - Add effectiveDeadline priority tests Closes #34, Closes #35 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
67789ab to
f4c424a
Compare
To satisfy per-function 100% coverage for the A1 scope, add a marshal hook for deterministic error tests and remove an unreachable branch in BatchCall to exercise all paths.
This change addresses Issues #34 and #35 by fixing concurrency bugs in MultiplexedTransport and ensuring BatchCall always returns on context cancellation.
MultiplexedTransport Concurrency Fixes (#35)
BatchCall Timeout Handling (#34)
Timeout Policy (timeout-policy spec)
Test Coverage
Closes #34, Closes #35
🤖 Generated with Claude Code
Description
Type of Change
Related Issues
Closes #
Checklist
go test ./...andcd worker/python && uv run pytest)Testing
Go Tests
# Run: go test -v ./...Python Tests
# Run: cd worker/python && uv run pytest -vPerformance Impact
Benchmark Results (if applicable)
# Run: cd bench && go test -bench=. -benchmemAdditional Context
Screenshots/Examples