forked from sccn/liblsl
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtcp_server.cpp
More file actions
577 lines (526 loc) · 22.5 KB
/
tcp_server.cpp
File metadata and controls
577 lines (526 loc) · 22.5 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
#include "tcp_server.h"
#include "api_config.h"
#include "cast.h"
#include "consumer_queue.h"
#include "sample.h"
#include "send_buffer.h"
#include "socket_utils.h"
#include "stream_info_impl.h"
#include <boost/asio/io_context.hpp>
#include <boost/asio/ip/host_name.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/asio/read_until.hpp>
#include <boost/asio/write.hpp>
#include <boost/uuid/random_generator.hpp>
#include <boost/uuid/uuid_io.hpp>
#include <memory>
#include <thread>
// a convention that applies when including portable_oarchive.h in multiple .cpp files.
// otherwise, the templates are instantiated in this file and sample.cpp which leads
// to errors like "multiple definition of `typeinfo name"
#define NO_EXPLICIT_TEMPLATE_INSTANTIATION
#include "portable_archive/portable_oarchive.hpp"
using namespace lslboost::asio;
using err_t = const lslboost::system::error_code &;
namespace lsl {
/**
* Active session with a TCP client.
*
* A note on memory ownership:
* - Generally, the stream_outlet maintains shared ownership of the `tcp_server`s, `io_context`s,
* and stream_info.
* - At any point in time there are likely multiple request/handler chains in flight somewhere
* between the operating system, asio, and the various handlers below.
* The handlers are set up such that any memory that may be referred to by them in the future is
* owned (shared) by the handler/callback function objects (this is what is encapsulated by the
* client_session instance).
* Their lifetime is managed by asio and ends when the handler chain ends (e.g., is aborted).
* Since the TCP server is referred to (occasionally) by handler code, the tcp_server is owned also
* by the client_sessions, and therefore kept alive for as long as there is at least one request
* chain running.
* - There is a per-session transfer thread (transfer_samples_thread) that owns the respective
* client_session and therefore the TCP server, as well (since it may refer to it); it goes out of
* scope once the server is being shut down.
* - The TCP server and client session also have shared ownership of the io_context (since in
* some cases some transfer threads can outlive the stream outlet, and so the io_context is still
* kept around until all sockets have been properly released).
* - So memory is generally owned by the code (functors and stack frames) that needs to refer to
* it for the duration of the execution.
*/
class client_session : public std::enable_shared_from_this<client_session> {
typedef std::shared_ptr<
lslboost::asio::executor_work_guard<lslboost::asio::io_context::executor_type>>
work_p;
public:
/// Instantiate a new session & its socket.
client_session(const tcp_server_p &serv)
: io_(serv->io_), serv_(serv), sock_(std::make_shared<tcp::socket>(*serv->io_)),
requeststream_(&requestbuf_), data_protocol_version_(100) {}
/// Destructor. Unregisters the session from the server.
~client_session();
/// Get the socket of this session.
tcp_socket_p socket() { return sock_; }
/// Begin processing this session (i.e., data transmission over the socket).
void begin_processing();
private:
/// Handler that gets called when the reading of the 1st line (command line) of the inbound
/// message finished.
void handle_read_command_outcome(error_code err);
/// Handler that gets called after finishing reading of the query line.
void handle_read_query_outcome(error_code err);
/// Helper function to send a status message to the connected party.
void send_status_message(const std::string &msg);
/// Handler that gets called after finishing the reading of feedparameters.
void handle_read_feedparams(
int request_protocol_version, std::string request_uid, error_code err);
/// Handler that gets called sending the feedheader has completed.
void handle_send_feedheader_outcome(error_code err, std::size_t n);
/// Transfers samples from the server's send buffer into the async send queues of IO threads
void transfer_samples_thread(std::shared_ptr<client_session> sess);
/// Handler that gets called when a sample transfer has been completed.
void handle_chunk_transfer_outcome(error_code err, std::size_t len);
/// whether we have registered ourselves at the server as active (so we need to unregister
/// ourselves at destruction)
bool registered_{false};
/// shared pointer to IO service; ensures that the IO is still around by the time the serv_ and
/// sock_ need to be destroyed
io_context_p io_;
/// the server that is associated with this connection
tcp_server_p serv_;
/// connection socket
tcp_socket_p sock_;
/// a RAII class indicating to the owning io_context that there is work to do even if no
/// outstanding handler is present
work_p work_;
// data used by the transfer thread (and some other handlers)
/// this buffer holds the data feed generated by us
lslboost::asio::streambuf feedbuf_;
/// this buffer holds the request as received from the client (incrementally filled)
lslboost::asio::streambuf requestbuf_;
/// output archive (wrapped around the feed buffer)
std::unique_ptr<class eos::portable_oarchive> outarch_;
/// this is a stream on top of the request buffer for convenient parsing
std::istream requeststream_;
/// scratchpad memory (e.g., for endianness conversion)
char *scratch_{nullptr};
/// protocol version to use for transmission
int data_protocol_version_;
/// byte order to use (0=portable, 1234=little endian, 4321=big endian, 2134=PDP endian,
/// unsupported)
int use_byte_order_{0};
/// our chunk granularity
int chunk_granularity_;
/// maximum number of samples buffered
int max_buffered_;
// data exchanged between the transfer completion handler and the transfer thread
/// whether the current transfer has finished (possibly with an error)
bool transfer_completed_;
/// the outcome of the last chunk transfer
error_code transfer_error_;
/// the amount of bytes transferred
std::size_t transfer_amount_;
/// a mutex that protects the completion data
std::mutex completion_mut_;
/// a condition variable that signals completion
std::condition_variable completion_cond_;
};
tcp_server::tcp_server(const stream_info_impl_p &info, const io_context_p &io,
const send_buffer_p &sendbuf, const factory_p &factory, tcp protocol, int chunk_size)
: chunk_size_(chunk_size), shutdown_(false), info_(info), io_(io), factory_(factory),
send_buffer_(sendbuf), acceptor_(std::make_shared<tcp::acceptor>(*io)) {
// open the server connection
acceptor_->open(protocol);
// bind to and listen on a free port
uint16_t port = bind_and_listen_to_port_in_range(*acceptor_, protocol, 10);
// and assign connection-dependent fields
// (note: this may be assigned multiple times by multiple TCPs during setup but does not matter)
info_->session_id(api_config::get_instance()->session_id());
info_->uid(lslboost::uuids::to_string(lslboost::uuids::random_generator()()));
info_->created_at(lsl_clock());
info_->hostname(ip::host_name());
if (protocol == tcp::v4())
info_->v4data_port(port);
else
info_->v6data_port(port);
LOG_F(2, "Created TCP server for outlet %s on IPv%d port %d", info_->name().c_str(),
protocol == tcp::v4() ? 4 : 6, port);
}
// === externally issued asynchronous commands ===
void tcp_server::begin_serving() {
// pre-generate the info's messages
shortinfo_msg_ = info_->to_shortinfo_message();
fullinfo_msg_ = info_->to_fullinfo_message();
// start accepting connections
accept_next_connection();
}
void tcp_server::end_serving() {
// the shutdown flag informs the transfer thread that we're shutting down
shutdown_ = true;
// issue closure of the server socket; this will result in a cancellation of the associated IO
// operations
post(*io_, [shared_acceptor = acceptor_]() { shared_acceptor->close(); });
// issue closure of all active client session sockets; cancels the related outstanding IO jobs
close_inflight_sockets();
// also notify any transfer threads that are blocked waiting for a sample by sending them one (=
// a ping)
send_buffer_->push_sample(factory_->new_sample(lsl_clock(), true));
}
// === accept loop ===
void tcp_server::accept_next_connection() {
try {
// make a new session
std::shared_ptr<client_session> newsession{
std::make_shared<client_session>(shared_from_this())};
// accept a connection on the session's socket
acceptor_->async_accept(
*newsession->socket(), [shared_this = shared_from_this(), newsession, this](err_t err) {
shared_this->handle_accept_outcome(newsession, err);
});
} catch (std::exception &e) {
LOG_F(ERROR, "Error during tcp_server::accept_next_connection: %s", e.what());
}
}
void tcp_server::handle_accept_outcome(std::shared_ptr<client_session> newsession, error_code err) {
if (err == error::operation_aborted || err == error::shut_down || shutdown_) return;
// no error: start processing the new connection
if (!err) newsession->begin_processing();
// and move on to the next connection
accept_next_connection();
}
// === graceful cancellation of in-flight sockets ===
void tcp_server::register_inflight_socket(const tcp_socket_p &sock) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
inflight_.insert(sock);
}
void tcp_server::unregister_inflight_socket(const tcp_socket_p &sock) {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
inflight_.erase(sock);
}
void tcp_server::close_inflight_sockets() {
std::lock_guard<std::recursive_mutex> lock(inflight_mut_);
for (auto sock : inflight_)
post(*io_, [sock]() {
try {
if (sock->is_open()) {
try {
// (in some cases shutdown may fail)
sock->shutdown(sock->shutdown_both);
} catch (...) {}
sock->close();
}
} catch (std::exception &e) {
LOG_F(WARNING, "Error during shutdown_and_close: %s", e.what());
}
});
}
// === implementation of the client_session class ===
client_session::~client_session() {
try {
if (registered_) serv_->unregister_inflight_socket(sock_);
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error in client_session destructor: %s", e.what());
} catch (...) { LOG_F(ERROR, "Severe error during client session shutdown."); }
if (scratch_) delete[] scratch_;
}
void client_session::begin_processing() {
try {
sock_->set_option(lslboost::asio::ip::tcp::no_delay(true));
// register this socket as "in-flight" with the server (so that any subsequent ops on it can
// be aborted if necessary)
serv_->register_inflight_socket(sock_);
registered_ = true;
// read the request line
async_read_until(
*sock_, requestbuf_, "\r\n", [shared_this = shared_from_this()](err_t err, size_t) {
shared_this->handle_read_command_outcome(err);
});
} catch (std::exception &e) {
LOG_F(ERROR, "Error during client_session::begin_processing: %s", e.what());
}
}
void client_session::handle_read_command_outcome(error_code read_err) {
try {
if (read_err) return;
// parse request method
std::string method;
getline(requeststream_, method);
method = trim(method);
if (method == "LSL:shortinfo")
// shortinfo request: read the content query string
async_read_until(*sock_, requestbuf_, "\r\n",
[shared_this = shared_from_this()](
err_t err, std::size_t) { shared_this->handle_read_query_outcome(err); });
else if (method == "LSL:fullinfo")
// fullinfo request: reply right away
async_write(*sock_, lslboost::asio::buffer(serv_->fullinfo_msg_),
[shared_this = shared_from_this()](err_t, std::size_t) {});
else if (method == "LSL:streamfeed")
// streamfeed request (1.00): read feed parameters
async_read_until(*sock_, requestbuf_, "\r\n",
[shared_this = shared_from_this()](
err_t err, std::size_t) { shared_this->handle_read_feedparams(100, "", err); });
else if (method.compare(0, 15, "LSL:streamfeed/") == 0) {
// streamfeed request with version: read feed parameters
std::vector<std::string> parts = splitandtrim(method, ' ', true);
async_read_until(*sock_, requestbuf_, "\r\n\r\n",
[shared_this = shared_from_this(),
request_protocol_version = std::stoi(parts[0].substr(15)),
request_uid = (parts.size() > 1) ? parts[1] : ""](err_t err, std::size_t) {
shared_this->handle_read_feedparams(request_protocol_version, request_uid, err);
});
}
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error while parsing a client command: %s", e.what());
}
}
void client_session::handle_read_query_outcome(error_code err) {
try {
if (err) return;
// read the query line
std::string query;
getline(requeststream_, query);
query = trim(query);
if (serv_->info_->matches_query(query)) {
// matches: reply (otherwise just close the stream)
async_write(*sock_, lslboost::asio::buffer(serv_->shortinfo_msg_),
[shared_this = shared_from_this()](err_t, std::size_t) {
/* keep the client_session alive until the shortinfo is sent completely*/
});
} else
DLOG_F(INFO, "%p got a shortinfo query response for the wrong query", this);
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error while parsing a client request: %s", e.what());
}
}
void client_session::send_status_message(const std::string &str) {
auto msg(std::make_shared<std::string>(str));
async_write(*sock_, lslboost::asio::buffer(*msg),
[msg, shared_this = shared_from_this()](
err_t, std::size_t) { /* keep objects alive until the message is sent */ });
}
void client_session::handle_read_feedparams(
int request_protocol_version, std::string request_uid, error_code err) {
try {
if (err) return;
DLOG_F(2, "%p got a streamfeed request", this);
// --- protocol negotiation ---
// check request validity
if (request_protocol_version / 100 >
api_config::get_instance()->use_protocol_version() / 100) {
send_status_message("LSL/" +
std::to_string(api_config::get_instance()->use_protocol_version()) +
" 505 Version not supported");
DLOG_F(WARNING, "%p Got a request for a too new protocol version", this);
return;
}
if (!request_uid.empty() && request_uid != serv_->info_->uid()) {
send_status_message("LSL/" +
to_string(api_config::get_instance()->use_protocol_version()) +
" 404 Not found");
return;
}
if (request_protocol_version >= 110) {
int client_byte_order = 1234; // assume little endian
double client_endian_performance = 0; // the other party's endian conversion performance
bool client_has_ieee754_floats =
true; // the client has IEEE-754 compliant floating point formats
bool client_supports_subnormals = true; // the client supports subnormal numbers
int client_protocol_version =
request_protocol_version; // assume that the client wants to use the same
// version for data transmission
int client_value_size =
serv_->info_->channel_bytes(); // assume that the client has a standard size for
// the relevant data type
lsl_channel_format_t format = serv_->info_->channel_format();
// read feed parameters
char buf[16384] = {0};
while (requeststream_.getline(buf, sizeof(buf)) && (buf[0] != '\r')) {
std::string hdrline(buf);
std::size_t colon = hdrline.find_first_of(':');
if (colon != std::string::npos) {
// strip off comments
auto semicolon = hdrline.find_first_of(';');
if (semicolon != std::string::npos) hdrline.erase(semicolon);
// convert to lowercase
for (auto &c : hdrline) c = ::tolower(c);
// extract key & value
std::string type = trim(hdrline.substr(0, colon)),
rest = trim(hdrline.substr(colon + 1));
// get the header information
if (type == "native-byte-order") client_byte_order = std::stoi(rest);
if (type == "endian-performance")
client_endian_performance = std::stod(rest);
if (type == "has-ieee754-floats")
client_has_ieee754_floats = from_string<bool>(rest);
if (type == "supports-subnormals")
client_supports_subnormals = from_string<bool>(rest);
if (type == "value-size") client_value_size = std::stoi(rest);
if (type == "max-buffer-length") max_buffered_ = std::stoi(rest);
if (type == "max-chunk-length") chunk_granularity_ = std::stoi(rest);
if (type == "protocol-version") client_protocol_version = std::stoi(rest);
} else {
DLOG_F(WARNING, "%p Request line '%s' contained no key-value pair", this,
hdrline.c_str());
}
}
// determine the parameters for data transmission
bool client_suppress_subnormals = false;
// use least common denominator data protocol version
data_protocol_version_ = std::min(
api_config::get_instance()->use_protocol_version(), client_protocol_version);
// downgrade to 1.00 (portable binary format) if an unsupported binary conversion is
// involved
if (serv_->info_->channel_bytes() != client_value_size) data_protocol_version_ = 100;
if (!format_ieee754[cft_double64] ||
(format == cft_float32 && !format_ieee754[cft_float32]) ||
!client_has_ieee754_floats)
data_protocol_version_ = 100;
if (data_protocol_version_ >= 110) {
// decide on the byte order if conflicting
if (BOOST_BYTE_ORDER != client_byte_order) {
if (client_byte_order == 2134 && client_value_size >= 8) {
// since we have no implementation for this byte order conversion let
// the client do it
use_byte_order_ = BOOST_BYTE_ORDER;
} else {
// let the faster party perform the endian conversion
use_byte_order_ = (client_value_size <= 1 || (measure_endian_performance() >
client_endian_performance))
? client_byte_order
: BOOST_BYTE_ORDER;
}
} else
use_byte_order_ = BOOST_BYTE_ORDER;
// determine if subnormal suppression needs to be enabled
client_suppress_subnormals =
(format_subnormal[format] && !client_supports_subnormals);
}
// send the response
std::ostream response_stream(&feedbuf_);
response_stream << "LSL/" << api_config::get_instance()->use_protocol_version()
<< " 200 OK\r\n";
response_stream << "UID: " << serv_->info_->uid() << "\r\n";
response_stream << "Byte-Order: " << use_byte_order_ << "\r\n";
response_stream << "Suppress-Subnormals: " << client_suppress_subnormals << "\r\n";
response_stream << "Data-Protocol-Version: " << data_protocol_version_ << "\r\n";
response_stream << "\r\n" << std::flush;
} else {
// read feed parameters
requeststream_ >> max_buffered_ >> chunk_granularity_;
}
// --- validation ---
if (data_protocol_version_ == 100) {
// create a portable output archive to write to
outarch_.reset(new eos::portable_oarchive(feedbuf_));
// serialize the shortinfo message into an archive
*outarch_ << serv_->shortinfo_msg_;
} else {
// allocate scratchpad memory for endian conversion, etc.
scratch_ = new char[format_sizes[serv_->info_->channel_format()] *
serv_->info_->channel_count()];
}
// send test pattern samples
std::unique_ptr<sample> temp(factory::new_sample_unmanaged(
serv_->info_->channel_format(), serv_->info_->channel_count(), 0.0, false));
temp->assign_test_pattern(4);
if (data_protocol_version_ >= 110)
temp->save_streambuf(feedbuf_, data_protocol_version_, use_byte_order_, scratch_);
else
*outarch_ << *temp;
temp->assign_test_pattern(2);
if (data_protocol_version_ >= 110)
temp->save_streambuf(feedbuf_, data_protocol_version_, use_byte_order_, scratch_);
else
*outarch_ << *temp;
// send off the newly created feedheader
async_write(
*sock_, feedbuf_.data(), [shared_this = shared_from_this()](err_t err, size_t len) {
shared_this->handle_send_feedheader_outcome(err, len);
});
DLOG_F(2, "%p sent test pattern samples", this);
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error while serializing the feed header: %s", e.what());
}
}
void client_session::handle_send_feedheader_outcome(error_code err, std::size_t n) {
try {
if (err) return;
feedbuf_.consume(n);
// register outstanding work at the server (will be unregistered at session destruction)
work_ = std::make_shared<work_p::element_type>(serv_->io_->get_executor());
// spawn a sample transfer thread
std::thread(&client_session::transfer_samples_thread, this, shared_from_this()).detach();
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected error while handling the feedheader send outcome: %s", e.what());
}
}
void client_session::transfer_samples_thread(std::shared_ptr<client_session>) {
if (max_buffered_ <= 0) return;
try {
// make a new consumer queue
auto queue = serv_->send_buffer_->new_consumer(max_buffered_);
// the sequence # is merely used to determine chunk boundaries (no need for int64)
uint32_t seqn = 0;
while (!serv_->shutdown_) {
try {
// get next sample from the sample queue (blocking)
sample_p samp(queue->pop_sample());
if (serv_->shutdown_) break;
// ignore blank samples (they are basically wakeup notifiers from someone's
// end_serving())
if (!samp) continue;
// optionally override the pushthrough flag by the chunk size of the receiver (if
// set) or of the sender (if set)
if (chunk_granularity_)
samp->pushthrough = (((++seqn) % (uint32_t)chunk_granularity_) == 0);
else if (serv_->chunk_size_)
samp->pushthrough = (((++seqn) % (uint32_t)serv_->chunk_size_) == 0);
// serialize the sample into the stream
if (data_protocol_version_ >= 110)
samp->save_streambuf(
feedbuf_, data_protocol_version_, use_byte_order_, scratch_);
else
*outarch_ << *samp;
// if the sample shall be pushed though...
if (samp->pushthrough) {
// send off the chunk that we aggregated so far
std::unique_lock<std::mutex> lock(completion_mut_);
transfer_completed_ = false;
async_write(*sock_, feedbuf_.data(),
[shared_this = shared_from_this()](err_t err, size_t len) {
shared_this->handle_chunk_transfer_outcome(err, len);
});
// wait for the completion condition
completion_cond_.wait(lock, [this]() { return transfer_completed_; });
// handle transfer outcome
if (!transfer_error_) {
feedbuf_.consume(transfer_amount_);
} else
break;
}
} catch (std::exception &e) {
LOG_F(WARNING, "Unexpected glitch in transfer_samples_thread: %s", e.what());
}
}
} catch (std::exception &e) {
LOG_F(ERROR, "Unexpected error in transfer_samples_thread: %s, exiting...", e.what());
}
}
void client_session::handle_chunk_transfer_outcome(error_code err, std::size_t len) {
try {
{
std::lock_guard<std::mutex> lock(completion_mut_);
// assign the transfer outcome
transfer_error_ = err;
transfer_amount_ = len;
transfer_completed_ = true;
}
// notify the server thread
completion_cond_.notify_all();
} catch (std::exception &e) {
LOG_F(WARNING,
"Catastrophic error in handling the chunk transfer outcome (in tcp_server): %s",
e.what());
}
}
} // namespace lsl