Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quic/QuicConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -805,6 +805,7 @@ enum class NoWriteReason {
NO_FRAME,
NO_BODY,
SOCKET_FAILURE,
WRITER_BACKPRESSURE, // async writer SPSC queue full
};

enum class NoReadReason {
Expand Down
9 changes: 9 additions & 0 deletions quic/api/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,14 @@

# Auto-generated by quic/facebook/generate_cmake.py - DO NOT EDIT MANUALLY

mvfst_add_library(mvfst_api_quic_packet_writer
EXPORTED_DEPS
mvfst_exception
mvfst_constants
Folly::folly_io_iobuf
Folly::folly_network_address
)

mvfst_add_library(mvfst_api_quic_batch_writer
SRCS
QuicBatchWriter.cpp
Expand Down Expand Up @@ -139,6 +147,7 @@ mvfst_add_library(mvfst_api_transport_helpers
EXPORTED_DEPS
mvfst_api_ack_scheduler
mvfst_api_quic_batch_writer
mvfst_api_quic_packet_writer
mvfst_client_state_and_handshake
mvfst_codec
mvfst_codec_pktbuilder
Expand Down
6 changes: 1 addition & 5 deletions quic/api/IoBufQuicBatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,12 @@
#pragma once
#include <quic/QuicException.h>
#include <quic/api/QuicBatchWriter.h>
#include <quic/api/QuicPacketWriter.h>
#include <quic/client/state/ClientStateMachine.h>
#include <quic/state/QuicTransportStatsCallback.h>

namespace quic {

struct BufQuicBatchResult {
uint64_t packetsSent{0};
uint64_t bytesSent{0};
};

class IOBufQuicBatch {
public:
IOBufQuicBatch(
Expand Down
58 changes: 58 additions & 0 deletions quic/api/QuicPacketWriter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright (c) Meta Platforms, Inc. and affiliates.
*
* This source code is licensed under the MIT license found in the
* LICENSE file in the root directory of this source tree.
*/

#pragma once

#include <cstdint>

#include <folly/SocketAddress.h>
#include <folly/io/IOBuf.h>

#include <quic/QuicException.h>
#include <quic/common/Expected.h>

namespace quic {

// Defined here (not in IoBufQuicBatch.h) so StateData.h can include this
// header without pulling in the IoBufQuicBatch → QuicBatchWriter → StateData
// include cycle.
struct BufQuicBatchResult {
uint64_t packetsSent{0};
uint64_t bytesSent{0};
};

/**
* Abstract interface for sending fully-built, encrypted QUIC packets.
*
* The default path (conn.packetWriter == nullptr) calls IOBufQuicBatch
* directly. When conn.packetWriter is set (ChainedMemory data path only),
* writeConnectionDataToSocket dispatches through this interface instead.
*/
class QuicPacketWriter {
public:
virtual ~QuicPacketWriter() = default;

// Called on the EventBase thread. Returns false → stop write loop
// (backpressure, not an error). Returns unexpected → close connection.
[[nodiscard]] virtual quic::Expected<bool, QuicError> write(
BufPtr&& buf,
size_t encodedSize,
const folly::SocketAddress& peerAddr) = 0;

[[nodiscard]] virtual quic::Expected<bool, QuicError> flush() = 0;

// packetsSent counts packets handed to this writer (enqueued or sent inline).
virtual BufQuicBatchResult getResult() const = 0;

// Last retriable errno (EAGAIN/ENOBUFS) seen. Always 0 for async writers —
// errno tracking is internal to their drain thread.
virtual int getLastRetryableErrno() const {
return 0;
}
};

} // namespace quic
149 changes: 103 additions & 46 deletions quic/api/QuicTransportFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <quic/QuicConstants.h>
#include <quic/QuicException.h>
#include <quic/api/QuicBatchWriterFactory.h>
#include <quic/api/QuicPacketWriter.h>
#include <quic/api/QuicTransportFunctions.h>
#include <quic/client/state/ClientStateMachine.h>
#include <quic/codec/QuicPacketBuilder.h>
Expand Down Expand Up @@ -235,10 +236,7 @@ quic::Expected<WriteQuicDataResult, QuicError> writeQuicDataToSocketImpl(
return result;
}

void updateErrnoCount(
QuicConnectionStateBase& connection,
IOBufQuicBatch& ioBufBatch) {
int lastErrno = ioBufBatch.getLastRetryableErrno();
void updateErrnoCount(QuicConnectionStateBase& connection, int lastErrno) {
if (lastErrno == EAGAIN || lastErrno == EWOULDBLOCK) {
connection.eagainOrEwouldblockCount++;
} else if (lastErrno == ENOBUFS) {
Expand Down Expand Up @@ -400,7 +398,7 @@ continuousMemoryBuildScheduleEncrypt(
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(connection, ioBufBatch.getLastRetryableErrno());
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
}
Expand All @@ -413,7 +411,7 @@ continuousMemoryBuildScheduleEncrypt(
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(connection, ioBufBatch.getLastRetryableErrno());
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
}
Expand Down Expand Up @@ -494,7 +492,7 @@ continuousMemoryBuildScheduleEncrypt(
if (!writeResult.has_value()) {
return quic::make_unexpected(writeResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(connection, ioBufBatch.getLastRetryableErrno());
return DataPathResult::makeWriteResult(
writeResult.value(),
std::move(result.value()),
Expand All @@ -514,7 +512,9 @@ iobufChainBasedBuildScheduleEncrypt(
IOBufQuicBatch& ioBufBatch,
const Aead& aead,
const PacketNumberCipher& headerCipher,
TimePoint sendTime) {
TimePoint sendTime,
const folly::SocketAddress& peerAddr,
QuicPacketWriter* packetWriter) {
// SCONE: Pre-build SCONE packet and adjust max packet size to avoid overflow
std::unique_ptr<Buf> preBuildSconePacket;
uint64_t adjustedMaxPacketSize = connection.udpSendPacketLen;
Expand Down Expand Up @@ -556,23 +556,29 @@ iobufChainBasedBuildScheduleEncrypt(
}
auto& packet = result->packet;
if (!packet || packet->packet.frames.empty()) {
auto flushResult = ioBufBatch.flush();
auto flushResult = packetWriter ? packetWriter->flush() : ioBufBatch.flush();
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(
connection,
packetWriter ? packetWriter->getLastRetryableErrno()
: ioBufBatch.getLastRetryableErrno());
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_FRAME;
}
return DataPathResult::makeBuildFailure();
}
if (packet->body.empty()) {
// No more space remaining.
auto flushResult = ioBufBatch.flush();
auto flushResult = packetWriter ? packetWriter->flush() : ioBufBatch.flush();
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(
connection,
packetWriter ? packetWriter->getLastRetryableErrno()
: ioBufBatch.getLastRetryableErrno());
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason = NoWriteReason::NO_BODY;
}
Expand Down Expand Up @@ -660,11 +666,16 @@ iobufChainBasedBuildScheduleEncrypt(
return DataPathResult::makeWriteResult(
true, std::move(result.value()), encodedSize, encodedBodySize);
}
auto writeResult = ioBufBatch.write(std::move(packetBuf), encodedSize);
auto writeResult = packetWriter
? packetWriter->write(std::move(packetBuf), encodedSize, peerAddr)
: ioBufBatch.write(std::move(packetBuf), encodedSize);
if (!writeResult.has_value()) {
return quic::make_unexpected(writeResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(
connection,
packetWriter ? packetWriter->getLastRetryableErrno()
: ioBufBatch.getLastRetryableErrno());
return DataPathResult::makeWriteResult(
writeResult.value(),
std::move(result.value()),
Expand Down Expand Up @@ -1891,8 +1902,34 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
uint64_t bytesWritten = 0;
uint64_t shortHeaderPadding = 0;
[[maybe_unused]] uint64_t shortHeaderPaddingCount = 0;
// Capture baseline so pktSentFn() counts packets sent *this call*, not
// lifetime total. ConnectionPacketWriter::result_.packetsSent accumulates
// across calls; ioBufBatch is fresh each call so needs no adjustment.
const uint64_t pktSentBaseline = connection.packetWriter
? connection.packetWriter->getResult().packetsSent
: 0;
auto pktSentFn = [&]() -> uint64_t {
return connection.packetWriter
? connection.packetWriter->getResult().packetsSent - pktSentBaseline
: static_cast<uint64_t>(ioBufBatch.getPktSent());
};
MVDCHECK(
!connection.packetWriter ||
connection.transportSettings.dataPathType ==
DataPathType::ChainedMemory,
"packetWriter requires ChainedMemory data path");
auto flushFn = [&]() {
return connection.packetWriter ? connection.packetWriter->flush()
: ioBufBatch.flush();
};
auto errnoFn = [&]() {
return connection.packetWriter
? connection.packetWriter->getLastRetryableErrno()
: ioBufBatch.getLastRetryableErrno();
};

SCOPE_EXIT {
auto nSent = ioBufBatch.getPktSent();
auto nSent = pktSentFn();
if (nSent > 0) {
QUIC_STATS(connection.statsCallback, onPacketsSent, nSent);
QUIC_STATS(connection.statsCallback, onWrite, bytesWritten);
Expand All @@ -1908,8 +1945,8 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(

quic::TimePoint sentTime = Clock::now();

while (scheduler.hasData() && ioBufBatch.getPktSent() < packetLimit &&
((ioBufBatch.getPktSent() < batchSize) ||
while (scheduler.hasData() && pktSentFn() < packetLimit &&
((pktSentFn() < batchSize) ||
writeLoopTimeLimit(writeLoopBeginTime, connection))) {
auto packetNum = getNextPacketNum(connection, pnSpace);
auto header = builder(srcConnId, dstConnId, packetNum, version, token);
Expand All @@ -1931,22 +1968,38 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(

bool useChainedMemory = connection.transportSettings.dataPathType ==
DataPathType::ChainedMemory;
const auto& dataPlaneFunc = useChainedMemory
? iobufChainBasedBuildScheduleEncrypt
: continuousMemoryBuildScheduleEncrypt;

auto ret = dataPlaneFunc(
connection,
std::move(header),
pnSpace,
packetNum,
cipherOverhead,
scheduler,
writableBytes,
ioBufBatch,
aead,
headerCipher,
sentTime);
auto ret = [&]() {
if (useChainedMemory) {
return iobufChainBasedBuildScheduleEncrypt(
connection,
std::move(header),
pnSpace,
packetNum,
cipherOverhead,
scheduler,
writableBytes,
ioBufBatch,
aead,
headerCipher,
sentTime,
peerAddress,
connection.packetWriter.get());
} else {
return continuousMemoryBuildScheduleEncrypt(
connection,
std::move(header),
pnSpace,
packetNum,
cipherOverhead,
scheduler,
writableBytes,
ioBufBatch,
aead,
headerCipher,
sentTime);
}
}();

// This is a fatal error vs. a build error.
if (!ret.has_value()) {
Expand All @@ -1955,12 +2008,12 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
if (!ret->buildSuccess) {
// If we're returning because we couldn't schedule more packets,
// make sure we flush the buffer in this function.
auto flushResult = ioBufBatch.flush();
auto flushResult = flushFn();
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
updateErrnoCount(connection, errnoFn());
return WriteQuicDataResult{pktSentFn(), 0, bytesWritten};
}
// If we build a packet, we updateConnection(), even if write might have
// been failed. Because if it builds, a lot of states need to be updated no
Expand Down Expand Up @@ -1993,14 +2046,15 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
connection.streamManager->writeQueue().commitTransaction(
std::move(writeQueueTransaction));

// if ioBufBatch.write returns false
// it is because a flush() call failed
// writeSuccess == false means flush() failed (inline) or SPSC queue full
// (async writer backpressure).
if (!ret->writeSuccess) {
if (connection.loopDetectorCallback) {
connection.writeDebugState.noWriteReason =
NoWriteReason::SOCKET_FAILURE;
connection.writeDebugState.noWriteReason = connection.packetWriter
? NoWriteReason::WRITER_BACKPRESSURE
: NoWriteReason::SOCKET_FAILURE;
}
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
return WriteQuicDataResult{pktSentFn(), 0, bytesWritten};
}

if ((connection.transportSettings.batchingMode ==
Expand All @@ -2009,21 +2063,24 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
connection.transportSettings.maxBatchSize,
connection.transportSettings.dataPathType)) {
// With SinglePacketInplaceBatchWriter we always write one packet, and so
// ioBufBatch needs a flush.
// ioBufBatch needs a flush. This path requires ContinuousMemory, so
// connection.packetWriter is always null here.
auto flushResult = ioBufBatch.flush();
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(connection, ioBufBatch.getLastRetryableErrno());
}
}

// Ensure that the buffer is flushed before returning
auto flushResult = ioBufBatch.flush();
// Ensure that the buffer is flushed before returning.
// On the async path this flush() writes the eventfd to wake the drain thread
// — it must not be skipped.
auto flushResult = flushFn();
if (!flushResult.has_value()) {
return quic::make_unexpected(flushResult.error());
}
updateErrnoCount(connection, ioBufBatch);
updateErrnoCount(connection, errnoFn());

if (connection.transportSettings.dataPathType ==
DataPathType::ContinuousMemory) {
Expand All @@ -2032,7 +2089,7 @@ quic::Expected<WriteQuicDataResult, QuicError> writeConnectionDataToSocket(
connection.bufAccessor->length() == 0 &&
connection.bufAccessor->headroom() == 0);
}
return WriteQuicDataResult{ioBufBatch.getPktSent(), 0, bytesWritten};
return WriteQuicDataResult{pktSentFn(), 0, bytesWritten};
}

quic::Expected<WriteQuicDataResult, QuicError> writeProbingDataToSocket(
Expand Down
Loading
Loading