@@ -32,14 +32,35 @@ namespace oxen::quic
3232 class TCPHandle ;
3333
3434 inline const auto LOCALHOST = " 127.0.0.1" s;
35- inline const auto TUNNEL_SEED = oxenc::from_hex(" 0000000000000000000000000000000000000000000000000000000000000000" );
36- inline const auto TUNNEL_PUBKEY = oxenc::from_hex(" 3b6a27bcceb6a42d62a3a8d02a6f0d73653215771de243a63ac048a18b59da29" );
35+ inline constexpr auto TUNNEL_SEED = " 0000000000000000000000000000000000000000000000000000000000000000" _hex;
36+ inline constexpr auto TUNNEL_PUBKEY = " 3b6a27bcceb6a42d62a3a8d02a6f0d73653215771de243a63ac048a18b59da29" _hex;
37+
38+ inline constexpr size_t HIGH_WATERMARK {4_Mi};
39+ inline constexpr size_t LOW_WATERMARK {HIGH_WATERMARK / 4 };
40+
41+ inline std::vector<std::byte> serialize_payload (bstring_view data, uint16_t port = 0 )
42+ {
43+ std::vector<std::byte> ret (data.size () + sizeof (port));
44+ oxenc::write_host_as_big (port, ret.data ());
45+ std::memcpy (&ret[2 ], data.data (), data.size ());
46+ return ret;
47+ }
48+
49+ inline std::tuple<uint16_t , bstring> deserialize_payload (bstring data)
50+ {
51+ uint16_t p = oxenc::load_big_to_host<uint16_t >(data.data ());
52+
53+ return {p, data.substr (2 )};
54+ }
3755
3856 struct TCPQUIC
3957 {
4058 std::shared_ptr<connection_interface> _ci;
4159
60+ std::unordered_set<std::shared_ptr<TCPConnection>> t;
61+
4262 // keyed against backend tcp address
63+ std::unordered_map<Address, std::unordered_set<std::shared_ptr<TCPConnection>>> _tcp_conns2;
4364 std::unordered_map<Address, std::shared_ptr<TCPConnection>> _tcp_conns;
4465 };
4566
@@ -58,17 +79,62 @@ namespace oxen::quic
5879 evconnlistener_free (e);
5980 };
6081
82+ void tcp_drained_write_cb (struct bufferevent * bev, void * user_arg);
83+
6184 void tcp_read_cb (struct bufferevent * bev, void * user_arg);
85+
6286 void tcp_event_cb (struct bufferevent * bev, short what, void * user_arg);
87+
6388 void tcp_listen_cb (
6489 struct evconnlistener * listener, evutil_socket_t fd, struct sockaddr * src, int socklen, void * user_arg);
90+
6591 void tcp_err_cb (struct evconnlistener * listener, void * user_arg);
6692
6793 struct TCPConnection
6894 {
6995 TCPConnection (struct bufferevent * _bev, evutil_socket_t _fd, std::shared_ptr<Stream> _s) :
7096 bev{_bev}, fd{_fd}, stream{std::move (_s)}
71- {}
97+ {
98+ stream->set_stream_data_cb ([this ](oxen::quic::Stream& s, bstring_view data) {
99+ auto rv = bev ? bufferevent_write (bev, data.data (), data.size ()) : -1 ;
100+ log::info (
101+ test_cat,
102+ " Stream (id: {}) {} {}B to TCP buffer" ,
103+ s.stream_id (),
104+ rv < 0 ? " failed to write" : " successfully wrote" ,
105+ data.size ());
106+
107+ // we get the output buffer (it sounds backwards but it isn't)
108+ if (evbuffer_get_length (bufferevent_get_output (bev)) >= HIGH_WATERMARK )
109+ {
110+ log::info (
111+ test_cat, " TCP input buffer over high-water threshold ({}); pausing stream..." , HIGH_WATERMARK );
112+ s.pause ();
113+
114+ bufferevent_setcb (bev, tcp_read_cb, tcp_drained_write_cb, tcp_event_cb, this );
115+ bufferevent_setwatermark (bev, EV_WRITE , LOW_WATERMARK , HIGH_WATERMARK );
116+ }
117+ });
118+
119+ stream->set_stream_close_cb ([this ](Stream&, uint64_t ) {
120+ log::info (
121+ test_cat,
122+ " Stream closed cb fired, {}..." ,
123+ bev ? " freeing bufferevent" : " bufferevent already freed" );
124+ if (bev)
125+ bufferevent_free (bev);
126+ });
127+
128+ stream->set_remote_reset_hooks (opt::remote_stream_reset{
129+ [](Stream& s, uint64_t ) {
130+ log::info (test_cat, " Remote stream signalled reading termination; halting local stream write!" );
131+ s.stop_writing ();
132+ },
133+ [](Stream& s, uint64_t ) {
134+ log::info (test_cat, " Remote stream signalled writing termination; halting local stream read!" );
135+ s.stop_reading ();
136+ }});
137+ }
72138
73139 TCPConnection () = delete ;
74140
@@ -163,37 +229,18 @@ namespace oxen::quic
163229 // returns the socket address of the TCP connection
164230 std::optional<Address> connect () const { return _connect; }
165231
166- std::shared_ptr<TCPConnection> connect_to_backend (std::shared_ptr<Stream> s , Address addr)
232+ std::shared_ptr<TCPConnection> connect_to_backend (std::shared_ptr<Stream> stream , Address addr)
167233 {
168234 if (addr.port () == 0 )
169235 throw std::runtime_error{" TCP backend must have valid port on localhost!" };
170236
171- log::critical (test_cat, " Attempting TCP connection to backend at: {}" , addr);
237+ log::info (test_cat, " Attempting TCP connection to backend at: {}" , addr);
172238 sockaddr_in _addr = addr.in4 ();
173239
174240 struct bufferevent * _bev =
175241 bufferevent_socket_new (_ev->loop ().get (), -1 , BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE );
176242
177- s->set_stream_data_cb ([_bev](oxen::quic::Stream& s, bstring_view data) {
178- auto rv = _bev ? bufferevent_write (_bev, data.data (), data.size ()) : -1 ;
179- log::info (
180- test_cat,
181- " Stream (id: {}) {} {}B to TCP buffer" ,
182- s.stream_id (),
183- rv < 0 ? " failed to write" : " successfully wrote" ,
184- data.size ());
185- });
186-
187- s->set_stream_close_cb ([_bev](Stream&, uint64_t ) {
188- log::critical (
189- test_cat,
190- " Stream closed cb fired, {}..." ,
191- _bev ? " freeing bufferevent" : " bufferevent already freed" );
192- if (_bev)
193- bufferevent_free (_bev);
194- });
195-
196- auto tcp_conn = std::make_shared<TCPConnection>(_bev, -1 , std::move (s));
243+ auto tcp_conn = std::make_shared<TCPConnection>(_bev, -1 , std::move (stream));
197244
198245 bufferevent_setcb (_bev, tcp_read_cb, nullptr , tcp_event_cb, tcp_conn.get ());
199246 bufferevent_enable (_bev, EV_READ | EV_WRITE );
@@ -260,13 +307,25 @@ namespace oxen::quic
260307
261308 evconnlistener_set_error_cb (_tcp_listener.get (), tcp_err_cb);
262309
263- log::critical (test_cat, " TCPHandle set up listener on: {}" , *_bound);
310+ log::info (test_cat, " TCPHandle set up listener on: {}" , *_bound);
264311 }
265312 };
266313
314+ inline void tcp_drained_write_cb (struct bufferevent * bev, void * user_arg)
315+ {
316+ bufferevent_setcb (bev, tcp_read_cb, nullptr , tcp_event_cb, user_arg);
317+ bufferevent_setwatermark (bev, EV_WRITE , 0 , 0 );
318+
319+ auto * conn = reinterpret_cast <TCPConnection*>(user_arg);
320+ assert (conn);
321+
322+ log::info (test_cat, " TCP input buffer below low-water threshold ({}); resuming stream!" , LOW_WATERMARK );
323+ conn->stream ->resume ();
324+ }
325+
267326 inline void tcp_read_cb (struct bufferevent * bev, void * user_arg)
268327 {
269- std::array<uint8_t , 2048 > buf{};
328+ std::array<uint8_t , 4096 > buf{};
270329
271330 // Load data from input buffer to local buffer
272331 auto nwrite = bufferevent_read (bev, buf.data (), buf.size ());
@@ -277,92 +336,92 @@ namespace oxen::quic
277336 {
278337 auto * conn = reinterpret_cast <TCPConnection*>(user_arg);
279338 assert (conn);
339+ auto & stream = conn->stream ;
340+ assert (stream);
341+
342+ stream->send (ustring{buf.data (), nwrite});
280343
281- conn->stream ->send (ustring{(buf.data ()), nwrite});
344+ if (stream->unsent () >= HIGH_WATERMARK )
345+ {
346+ stream->set_watermark (
347+ LOW_WATERMARK ,
348+ HIGH_WATERMARK ,
349+ opt::watermark{
350+ [bev](Stream&) {
351+ log::info (test_cat, " Stream buffer below low-water threshold; enabling TCP read!" );
352+ bufferevent_enable (bev, EV_READ );
353+ },
354+ false },
355+ opt::watermark{
356+ [bev](Stream&) {
357+ log::info (test_cat, " Stream buffer above high-water threshold; disabling TCP read!" );
358+ bufferevent_disable (bev, EV_READ );
359+ },
360+ false });
361+ }
282362 }
283363 }
284364
285365 inline void tcp_event_cb ([[maybe_unused]] struct bufferevent * bev, short what, void * user_arg)
286366 {
287- // this is where the InboundSession confirms it established a TCP connection to the backend app
288367 if (what & BEV_EVENT_CONNECTED )
289368 {
290369 log::info (test_cat, " TCP connect operation succeeded!" );
291370 }
292- if (what & BEV_EVENT_EOF )
293- {
294- log::critical (test_cat, " TCP Connection EOF!" );
295- }
296371 if (what & BEV_EVENT_ERROR )
297372 {
298373 log::critical (
299374 test_cat,
300375 " TCP Connection encountered bufferevent error (msg: {})!" ,
301376 evutil_socket_error_to_string (EVUTIL_SOCKET_ERROR ()));
302377 }
303- if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF ))
304- {
305- // if (bev)
306- // {
307- // log::critical(test_cat, "Freeing bufferevent socket...");
308- // bufferevent_free(bev);
309- // }
310378
311- auto * conn = reinterpret_cast <TCPConnection*>(user_arg);
312- assert (conn);
379+ auto * conn = reinterpret_cast <TCPConnection*>(user_arg);
380+ assert (conn);
381+ auto & stream = conn->stream ;
313382
383+ if (what & BEV_EVENT_EOF )
384+ {
385+ if (what & BEV_EVENT_WRITING )
386+ {
387+ // remote shut down reading
388+ log::info (test_cat, " Remote TCP stopped reading! Halting stream write..." );
389+ stream->stop_writing ();
390+ }
391+ else if (what & BEV_EVENT_READING )
392+ {
393+ // remote shut down writing
394+ log::info (test_cat, " Error encountered while reading! Halting stream read..." );
395+ stream->stop_reading ();
396+ }
397+ else
398+ {
399+ // remote closed connection
400+ log::info (test_cat, " TCP Connection EOF!" );
401+ }
402+ }
403+ if (what & (BEV_EVENT_ERROR | BEV_EVENT_EOF ) and not (what & BEV_EVENT_READING ) and not (what & BEV_EVENT_WRITING ))
404+ {
314405 log::critical (test_cat, " Closing stream..." );
315- conn->stream ->close ();
316- // auto& str = conn->stream;
317- // if (str and not str->is_closing())
318- // {
319- // return str->close();
320- // }
321- // log::critical(test_cat, "Stream for tcp connection already destroyed...");
406+ stream->close ();
322407 }
323408 }
324409
325410 inline void tcp_listen_cb (
326411 struct evconnlistener * listener, evutil_socket_t fd, struct sockaddr * src, int socklen, void * user_arg)
327412 {
328413 oxen::quic::Address source{src, static_cast <socklen_t >(socklen)};
329- log::critical (test_cat, " TCP CONNECTION ESTABLISHED -- SRC: {}" , source);
414+ log::info (test_cat, " TCP CONNECTION ESTABLISHED -- SRC: {}" , source);
330415
331416 auto * b = evconnlistener_get_base (listener);
332417 auto * _bev = bufferevent_socket_new (b, fd, BEV_OPT_CLOSE_ON_FREE | BEV_OPT_THREADSAFE );
333418
334- // int yes{1};
335- // if (setsockopt(fd, SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(int)) < 0)
336- // {
337- // log::critical(
338- // test_cat,
339- // "Failed to set keepalive on accepted TCP connection socket: {}",
340- // evutil_socket_error_to_string(EVUTIL_SOCKET_ERROR()));
341- // return bufferevent_free(bevent);
342- // }
343-
344419 auto * handle = reinterpret_cast <TCPHandle*>(user_arg);
345420 assert (handle);
346421
347422 // make TCPConnection here!
348423 auto * conn = handle->_conn_maker (_bev, fd, std::move (source));
349-
350- conn->stream ->set_stream_data_cb ([_bev](Stream& s, bstring_view data) {
351- auto rv = _bev ? bufferevent_write (_bev, data.data (), data.size ()) : -1 ;
352- log::info (
353- test_cat,
354- " Stream (id: {}) {} {}B to TCP buffer" ,
355- s.stream_id (),
356- rv < 0 ? " failed to write" : " successfully wrote" ,
357- data.size ());
358- });
359-
360- conn->stream ->set_stream_close_cb ([_bev](Stream&, uint64_t ) {
361- log::critical (
362- test_cat, " Stream closed cb fired, {}..." , _bev ? " freeing bufferevent" : " bufferevent already freed" );
363- if (_bev)
364- bufferevent_free (_bev);
365- });
424+ auto stream = conn->stream ;
366425
367426 bufferevent_setcb (_bev, tcp_read_cb, nullptr , tcp_event_cb, conn);
368427 bufferevent_enable (_bev, EV_READ | EV_WRITE );
0 commit comments