Conversation
Greptile SummaryThis PR backports a suite of F3 reliability and performance improvements from the 2.7 branch to Key observations:
Confidence Score: 4/5
Important Files Changed
Sequence DiagramsequenceDiagram
participant Sender as TxTask (ByteStreamer)
participant SC as SocketConnection
participant HB as HeartbeatMonitor
participant Receiver as RxTask (ByteReceiver)
participant DS as DownloadService
Note over Sender,SC: Send with timeout (select-based)
Sender->>SC: send_frame(buffer)
SC->>SC: _send_with_timeout(frame, 30s)
SC-->>Sender: CommError(TIMEOUT) if stalled > 30s
Note over HB,SC: Stall detection
loop every 5s tick
HB->>SC: get_send_stall_seconds()
alt stall_sec > sfm_send_stall_timeout (45s)
HB->>HB: stall_counts[conn]++
opt close_stalled_connection && count >= stall_consecutive_checks
HB->>SC: conn.close()
end
else no stall
HB->>HB: stall_counts[conn] = 0
HB->>Receiver: send_heartbeat(PING)
end
end
Note over Sender,Receiver: ACK progress tracking
Sender->>Receiver: stream chunk (fire_and_forget)
Receiver-->>Sender: ACK (offset)
Sender->>Sender: last_ack_progress_ts = now
alt no ACK progress for ack_progress_timeout (60s)
Sender->>Sender: stop(StreamError)
else total wait > ack_wait (300s)
Sender->>Sender: stop(StreamError)
end
Note over Sender,DS: Object download with retry
Sender->>DS: download_object(ref_id, state)
DS-->>Sender: ReturnCode.TIMEOUT
loop retry (max 3, backoff 2/4/8s)
Sender->>DS: download_object(ref_id, same_state)
DS-->>Sender: ReturnCode.OK + chunk
end
Sender->>Sender: consumer.consume(chunk)
DS-->>Sender: ProduceRC.EOF
Sender->>Sender: log elapsed + total_bytes
Last reviewed commit: 0b49ead |
| while window > self.window_size: | ||
| now = time.monotonic() | ||
| if now - self.last_ack_progress_ts >= self.ack_progress_timeout: | ||
| self.stop(StreamError(f"{self} ACK made no progress for {self.ack_progress_timeout} seconds")) | ||
| return | ||
|
|
||
| window = self.offset - self.offset_ack | ||
| elapsed = now - wait_start | ||
| if elapsed >= self.ack_wait: | ||
| self.stop(StreamError(f"{self} ACK timeouts after {self.ack_wait} seconds")) | ||
| return | ||
|
|
||
| self.ack_waiter.clear() | ||
| wait_timeout = min(self.ack_progress_check_interval, self.ack_wait - elapsed) | ||
| self.ack_waiter.wait(timeout=wait_timeout) | ||
| window = self.offset - self.offset_ack |
There was a problem hiding this comment.
ack_wait now acts as a total window-clear budget, not a per-ACK wait
The original code waited up to ack_wait seconds for each individual ACK signal. With the nested loop, wait_start is set once when the outer while window > self.window_size: is first entered, so the elapsed >= self.ack_wait guard is now a total budget for the entire window-clearing period.
If multiple ACKs are needed to drain a large window (e.g., STREAM_WINDOW_SIZE = 16 MB with 1 MB ACKs), the old code permitted up to N * ack_wait aggregate wait. The new code caps the entire clearing sequence at one ack_wait (default 300 s). In practice the ack_progress_timeout (60 s) will usually fire first, but if slow but steady ACKs keep arriving — resetting last_ack_progress_ts repeatedly — only the ack_wait guard prevents an indefinite wait, and it now fires 16× sooner than before for that scenario.
Additionally, the outer while window > self.window_size: loop is functionally equivalent to if window > self.window_size: here: the inner loop always exits with window ≤ window_size (the condition that breaks it), so the outer while re-evaluates to False immediately. Consider replacing the outer while with if and documenting the intent, or resetting wait_start correctly inside the outer loop if the intent is to allow multiple stall periods.
| if data is not None: | ||
| total_bytes += sum(len(c) for c in data) if isinstance(data, list) else len(data) |
There was a problem hiding this comment.
total_bytes is inflated when retries re-produce the same chunk
On a TIMEOUT retry, current_state is kept the same, so the server re-invokes produce() for that state and returns the identical chunk. Both the client (line ~706) and the server (line ~496 in _handle_download) add the chunk length to total_bytes on every successful response, including retried ones.
This means a chunk that required 2 attempts to deliver is counted twice. The elapsed-time / byte-count summary logged at EOF will overstate the actual bytes transferred.
# Only accumulate bytes for new (non-retry) responses to keep total_bytes accurate.
if consecutive_timeouts == 0 and data is not None:
total_bytes += sum(len(c) for c in data) if isinstance(data, list) else len(data)The same correction should be applied server-side in _handle_download — though the server has no visibility into client-side retry state, so an exact fix there may require a retry flag in the request payload.
| def _run_blob_cb(self, future: StreamFuture, stream: Stream, args: tuple, kwargs: dict): | ||
| """Run blob_cb on the callback pool; preserve exception handling (log + task.stop) as in ByteReceiver.""" | ||
| try: | ||
| self.blob_cb(future, *args, **kwargs) | ||
| except Exception as ex: | ||
| log.error(f"blob_cb threw: {ex}\n{secure_format_traceback()}") | ||
| if hasattr(stream, "task"): | ||
| stream.task.stop(StreamError(f"blob_cb threw: {ex}")) |
There was a problem hiding this comment.
Stream may not be stopped if blob_cb raises and stream.task is absent
stream.task is not a guaranteed attribute on the Stream base type; hasattr(stream, "task") will be False for any stream that was not enriched with this attribute, causing the task.stop() call to be silently skipped.
When blob_cb raises and the task is not stopped, _read_stream continues to run on stream_thread_pool — pulling chunks from the stream and writing them to blob_task.buffer — even though no consumer is active. The future is eventually resolved via blob_task.future.set_result(result), but any caller await-ing that future will receive the data with no indication that the callback failed.
Consider propagating the error to the future directly so callers can observe it:
def _run_blob_cb(self, future: StreamFuture, stream: Stream, args: tuple, kwargs: dict):
try:
self.blob_cb(future, *args, **kwargs)
except Exception as ex:
log.error(f"blob_cb threw: {ex}\n{secure_format_traceback()}")
future.set_exception(StreamError(f"blob_cb threw: {ex}"))
if hasattr(stream, "task"):
stream.task.stop(StreamError(f"blob_cb threw: {ex}"))| self.ack_waiter.clear() | ||
| wait_timeout = min(self.ack_progress_check_interval, self.ack_wait - elapsed) | ||
| self.ack_waiter.wait(timeout=wait_timeout) |
There was a problem hiding this comment.
ack_waiter.clear() may discard an already-arrived ACK signal
An ACK that was received after the previous ack_waiter.wait() returned (either by timeout or by signal) will have called ack_waiter.set(). The next iteration of the inner loop then calls ack_waiter.clear(), discarding that signal before reaching ack_waiter.wait(). The result is an unnecessary extra wait of up to ack_progress_check_interval (default 5 s) before the updated window is re-evaluated.
The standard remedy is to clear the event before entering the wait phase, but after sampling offset_ack, so any ACK that arrives after the sample but before the wait is not lost:
self.ack_waiter.clear()
window = self.offset - self.offset_ack # re-sample after clear, before wait
if window <= self.window_size:
break
self.ack_waiter.wait(timeout=wait_timeout)
window = self.offset - self.offset_ackThis is a minor latency issue rather than a correctness bug, since offset_ack is updated regardless of the event state.
Summary
This PR merges a set of F3 (fuel/f3) reliability and performance improvements from the 2.7 branch into main:
SocketConnectionnow uses aselect-based send loop with a configurable timeout (default 30s).HeartbeatMonitordetects stalled connections and optionally closes them after consecutive stall checks (sfm_send_stall_timeout,sfm_close_stalled_connection,sfm_send_stall_consecutive_checksconfig vars).TxTasktracks the last time ACK offset advanced and aborts if no progress is made within a configurable window (streaming_ack_progress_timeout, default 60s), preventing indefinite hangs from a stuck receiver.RxTasknow keys its task map on(origin, sid)instead of justsid, preventing stream collisions when multiple origins use the same stream ID.callback_thread_pool(64 threads) to avoid starving the main stream thread pool.download_objectnow retries onTIMEOUTresponses with exponential backoff (up tomax_retries=3by default), with recovery/failure logging.CacheableObject.release()nullsbase_objaftertransaction_done_cbfires, allowing GC to immediately reclaim large objects (e.g. numpy dicts).PASS_THROUGHheader support: NewMessageHeaderKey.PASS_THROUGHandFOBSContextKey.PASS_THROUGHkeys enable lazy tensor downloading at forwarding nodes (foundation for pass-through architecture).ConnManager:stopped=Trueis set before executor shutdown;frame_mgr_executor.submit()calls are guarded againstRuntimeErrorafter shutdown._Transactionanddownload_objectnow log elapsed time and total bytes on completion.Test plan
python3 -m pytest tests/unit_test/fuel/f3/ -v🤖 Generated with Claude Code