Skip to content

Commit 2c920ee

Browse files
committed
net: tcp: change unsent queue from packets to temporary_buffer:s
The unsent queue is currently a queue of packets. This is awkward because the packets queued don't correspond to packets to be sent; they have to be cut and pasted to fit the send window or maximum packet size, whichever is greater. The recent change to data_sink to work in terms of temporary_buffer:s rather than packets provides an additional incentive to fix this. The fix is simple: change the queue type to hold temporary buffers. The compatibility code to construct a packet out of the data_sink temporary_buffer:s is no longer needed. The code to reshape the unsent packets to a packet to be sent is simplified: we append buffers to the packet until a complete buffer will overflow the maximum send size, then split the last buffer (if available and needed) to complete the packet. Tested with httpd on a tap device, seems to work.
1 parent 99e07a1 commit 2c920ee

File tree

3 files changed

+22
-33
lines changed

3 files changed

+22
-33
lines changed

demos/tcp_demo.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,8 @@ struct tcp_test {
4545
return;
4646
}
4747
fmt::print("read {:d} bytes\n", p.len());
48-
(void)tcp_conn.send(std::move(p));
48+
auto v = std::move(p).release();
49+
(void)tcp_conn.send(std::span(v));
4950
run();
5051
});
5152
}

include/seastar/net/tcp.hh

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <deque>
2929
#include <chrono>
3030
#include <random>
31+
#include <span>
3132
#include <stdexcept>
3233
#include <system_error>
3334
#include <gnutls/crypto.h>
@@ -339,7 +340,7 @@ private:
339340
tcp_seq wl2;
340341
tcp_seq initial;
341342
std::deque<unacked_segment> data;
342-
std::deque<packet> unsent;
343+
std::deque<temporary_buffer<char>> unsent;
343344
uint32_t unsent_len = 0;
344345
bool closed = false;
345346
promise<> _window_opened;
@@ -436,7 +437,7 @@ private:
436437
void abort_reader() noexcept;
437438
future<> wait_for_all_data_acked();
438439
future<> wait_send_available();
439-
future<> send(packet p);
440+
future<> send(std::span<temporary_buffer<char>> data);
440441
void connect();
441442
packet read();
442443
void close() noexcept;
@@ -689,8 +690,8 @@ public:
689690
future<> connected() {
690691
return _tcb->connect_done();
691692
}
692-
future<> send(packet p) {
693-
return _tcb->send(std::move(p));
693+
future<> send(std::span<temporary_buffer<char>> data) {
694+
return _tcb->send(data);
694695
}
695696
future<> wait_for_data() {
696697
return _tcb->wait_for_data();
@@ -1590,27 +1591,9 @@ packet tcp<InetTraits>::tcb::get_transmit_packet() {
15901591
len = std::min(uint16_t(_tcp.hw_features().mtu - net::tcp_hdr_len_min - InetTraits::ip_hdr_len_min), _snd.mss);
15911592
}
15921593
can_send = std::min(can_send, len);
1593-
// easy case: one small packet
1594-
if (_snd.unsent.size() == 1 && _snd.unsent.front().len() <= can_send) {
1595-
auto p = std::move(_snd.unsent.front());
1596-
_snd.unsent.pop_front();
1597-
_snd.unsent_len -= p.len();
1598-
return p;
1599-
}
1600-
// moderate case: need to split one packet
1601-
if (_snd.unsent.front().len() > can_send) {
1602-
auto p = _snd.unsent.front().share(0, can_send);
1603-
_snd.unsent.front().trim_front(can_send);
1604-
_snd.unsent_len -= p.len();
1605-
return p;
1606-
}
1607-
// hard case: merge some packets, possibly split last
1608-
auto p = std::move(_snd.unsent.front());
1609-
_snd.unsent.pop_front();
1610-
can_send -= p.len();
1611-
while (!_snd.unsent.empty()
1612-
&& _snd.unsent.front().len() <= can_send) {
1613-
can_send -= _snd.unsent.front().len();
1594+
auto p = packet();
1595+
while (!_snd.unsent.empty() && _snd.unsent.front().size() <= can_send) {
1596+
can_send -= _snd.unsent.front().size();
16141597
p.append(std::move(_snd.unsent.front()));
16151598
_snd.unsent.pop_front();
16161599
}
@@ -1813,16 +1796,19 @@ future<> tcp<InetTraits>::tcb::wait_send_available() {
18131796
}
18141797

18151798
template <typename InetTraits>
1816-
future<> tcp<InetTraits>::tcb::send(packet p) {
1799+
future<> tcp<InetTraits>::tcb::send(std::span<temporary_buffer<char>> data) {
18171800
// We can not send after the connection is closed
18181801
if (_snd.closed || in_state(CLOSED)) {
18191802
return make_exception_future<>(tcp_reset_error());
18201803
}
18211804

1822-
auto len = p.len();
1805+
auto sizes = data | std::views::transform(&temporary_buffer<char>::size);
1806+
auto len = std::accumulate(sizes.begin(), sizes.end(), size_t(0));
1807+
18231808
_snd.current_queue_space += len;
18241809
_snd.unsent_len += len;
1825-
_snd.unsent.push_back(std::move(p));
1810+
// FIXME: use append_range with C++23
1811+
std::ranges::move(data, std::back_inserter(_snd.unsent));
18261812

18271813
if (can_send() > 0) {
18281814
output();

src/net/native-stack-impl.hh

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -207,17 +207,19 @@ public:
207207
: _conn(std::move(conn)) {}
208208
#if SEASTAR_API_LEVEL >= 9
209209
future<> put(std::span<temporary_buffer<char>> bufs) override {
210-
net::packet p = net::packet(bufs);
211210
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
212-
internal::native_stack_net_stats::bytes_sent[sg_id] += p.len();
213-
return _conn->send(std::move(p));
211+
auto sizes = bufs | std::views::transform(&temporary_buffer<char>::size);
212+
auto len = std::accumulate(sizes.begin(), sizes.end(), size_t(0));
213+
internal::native_stack_net_stats::bytes_sent[sg_id] += len;
214+
return _conn->send(bufs);
214215
}
215216
#else
216217
using data_sink_impl::put;
217218
virtual future<> put(packet p) override {
218219
auto sg_id = internal::scheduling_group_index(current_scheduling_group());
219220
internal::native_stack_net_stats::bytes_sent[sg_id] += p.len();
220-
return _conn->send(std::move(p));
221+
auto v = std::move(p).release();
222+
return _conn->send(v);
221223
}
222224
#endif
223225
virtual future<> close() override {

0 commit comments

Comments
 (0)