Skip to content

Commit d8e93b3

Browse files
authored
Merge pull request #11 from hatlabs/tcp_client
TCP Client
2 parents 2a410a1 + 9afd006 commit d8e93b3

16 files changed

Lines changed: 551 additions & 58 deletions

src/buffered_tcp_client.h

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
#ifndef SH_WG_FIRMWARE_BUFFERED_TCP_CLIENT_H_
2+
#define SH_WG_FIRMWARE_BUFFERED_TCP_CLIENT_H_
3+
4+
#include <Arduino.h>
5+
#include <WiFi.h>
6+
7+
#include "origin_string.h"
8+
#include "shwg.h"
9+
10+
using namespace sensesp;
11+
12+
using WiFiClientPtr = std::shared_ptr<WiFiClient>;
13+
14+
constexpr size_t kRXBufferSize = 512;
15+
16+
/**
17+
* @brief TCP client connection container with RX buffer.
18+
*/
19+
class BufferedTCPClient {
20+
public:
21+
BufferedTCPClient(WiFiClientPtr client) : client_{client} {}
22+
23+
WiFiClientPtr client_;
24+
25+
int available() { return client_->available(); }
26+
27+
void clear_buf() {
28+
rx_pos_ = 0;
29+
}
30+
31+
int read_line(String& line) {
32+
while (client_->available()) {
33+
char c = client_->read();
34+
rx_buf_[rx_pos_++] = c;
35+
if (rx_pos_ == kRXBufferSize - 1) {
36+
debugW("RX buffer overflow");
37+
rx_pos_ = 0;
38+
} else if (c == '\n') {
39+
// received a full line
40+
rx_buf_[rx_pos_] = '\0';
41+
int received = rx_pos_;
42+
rx_pos_ = 0;
43+
line = rx_buf_;
44+
return received;
45+
}
46+
}
47+
return 0;
48+
}
49+
50+
protected:
51+
char rx_buf_[kRXBufferSize];
52+
int rx_pos_ = 0;
53+
};
54+
55+
#endif // SH_WG_FIRMWARE_BUFFERED_TCP_CLIENT_H_

src/main.cpp

Lines changed: 47 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
#include "shwg.h"
3838
#include "shwg_button.h"
3939
#include "shwg_factory_test.h"
40+
#include "streaming_tcp_client.h"
4041
#include "streaming_tcp_server.h"
4142
#include "streaming_udp_server.h"
4243
#include "stringtokenizer_transform.h"
@@ -74,6 +75,9 @@ StreamingTCPServer *ydwg_raw_tcp_server;
7475
StreamingUDPServer *nmea0183_udp_server;
7576
StreamingUDPServer *ydwg_raw_udp_server;
7677

78+
StreamingTCPClient *ydwg_raw_tcp_client;
79+
StreamingTCPClient *nmea0183_tcp_client;
80+
7781
// time elapsed since last system time update
7882
elapsedMillis elapsed_since_last_system_time_update = kTimeUpdatePeriodMs;
7983

@@ -85,10 +89,12 @@ Networking *networking;
8589

8690
CheckboxConfig *checkbox_config_enable_firmware_updates;
8791
BiDiPortConfig *port_config_ydwg_raw_tcp;
92+
HostPortConfig *port_config_ydwg_raw_tcp_client;
8893
BiDiPortConfig *port_config_ydwg_raw_udp;
8994
CheckboxConfig *checkbox_config_translate_to_seasmart;
9095
CheckboxConfig *checkbox_config_translate_to_nmea0183;
9196
PortConfig *port_config_nmea0183_tcp_tx;
97+
HostPortConfig *port_config_nmea0183_tcp_client;
9298
PortConfig *port_config_nmea0183_udp_tx;
9399

94100
UIOutput<String> ui_output_firmware_name("Firmware name", kFirmwareName,
@@ -280,7 +286,7 @@ static void SetupYellowLEDBlinker(
280286
}));
281287
}
282288

283-
static void SetupTransmitters() {
289+
static void SetupConnections() {
284290
can_frame_clearinghouse = new LambdaTransform<CANFrame, CANFrame>(
285291
[](const CANFrame &frame) { return frame; });
286292

@@ -325,11 +331,14 @@ static void SetupTransmitters() {
325331
auto concatenate_ydwg_strings = new ConcatenateStrings(100, 1000);
326332
auto concatenate_n0183_strings = new ConcatenateStrings(100, 1000);
327333

334+
auto string_tokenizer = new StringTokenizer("\r\n");
335+
328336
auto n2k_to_0183_transform = new N2KTo0183Transform(nmea2000);
329337
auto n2k_to_seasmart_transform = new SeasmartTransform(nmea2000);
330338
auto ydwg_raw_to_can_transform = new YDWGRawToCANFrameTransform();
331339

332340
can_to_ydwg_transform->connect_to(concatenate_ydwg_strings);
341+
string_tokenizer->connect_to(ydwg_raw_to_can_transform);
333342

334343
//////
335344
// N2K message routing
@@ -390,7 +399,6 @@ static void SetupTransmitters() {
390399
int nmea0183_udp_port = port_config_nmea0183_udp_tx->get_port();
391400
nmea0183_udp_server = new StreamingUDPServer(nmea0183_udp_port, networking);
392401
nmea0183_udp_server->set_enabled(port_config_nmea0183_udp_tx->get_enabled());
393-
concatenate_n0183_strings->connect_to(nmea0183_udp_server);
394402

395403
// send the generated NMEA 0183 message
396404
if (checkbox_config_translate_to_nmea0183->get_value()) {
@@ -407,6 +415,27 @@ static void SetupTransmitters() {
407415
n2k_to_seasmart_transform->connect_to(concatenate_n0183_strings);
408416
}
409417

418+
// set up a YDWG RAW TCP client
419+
if (port_config_ydwg_raw_tcp_client->get_enabled()) {
420+
String ydwg_raw_tcp_client_host =
421+
port_config_ydwg_raw_tcp_client->get_host();
422+
int ydwg_raw_tcp_client_port = port_config_ydwg_raw_tcp_client->get_port();
423+
ydwg_raw_tcp_client = new StreamingTCPClient(
424+
ydwg_raw_tcp_client_host, ydwg_raw_tcp_client_port, networking);
425+
can_to_ydwg_transform->connect_to(ydwg_raw_tcp_client);
426+
ydwg_raw_tcp_client->connect_to(string_tokenizer);
427+
}
428+
429+
// set up an NMEA 0183 TCP client
430+
if (port_config_nmea0183_tcp_client->get_enabled()) {
431+
String nmea0183_tcp_client_host =
432+
port_config_nmea0183_tcp_client->get_host();
433+
int nmea0183_tcp_client_port = port_config_nmea0183_tcp_client->get_port();
434+
nmea0183_tcp_client = new StreamingTCPClient(
435+
nmea0183_tcp_client_host, nmea0183_tcp_client_port, networking);
436+
n2k_to_0183_transform->connect_to(nmea0183_tcp_client);
437+
}
438+
410439
// connect the CAN frame input to the YDWG raw transform
411440
debugD("Connecting CAN input to YDWG raw transform");
412441
can_frame_clearinghouse->connect_to(can_to_ydwg_transform);
@@ -430,9 +459,7 @@ static void SetupTransmitters() {
430459

431460
if (port_config_ydwg_raw_udp->get_rx_enabled()) {
432461
debugD("Connecting UDP RX to YDWG RAW");
433-
ydwg_raw_udp_server->connect_to(new StringTokenizer("\r\n"))
434-
->connect_to(ydwg_raw_to_can_transform);
435-
462+
ydwg_raw_udp_server->connect_to(string_tokenizer);
436463
}
437464
}
438465

@@ -474,6 +501,13 @@ void SetupUIComponents() {
474501
"Enable TCP server for transmitting and/or receiving YDWG RAW data.",
475502
1300);
476503

504+
port_config_ydwg_raw_tcp_client = new HostPortConfig(
505+
false, "", kDefaultYdwgRawTCPServerPort, "Enabled", "Server hostname",
506+
"Server port", "/Network/YDWG RAW TCP Client",
507+
"Connect to another TCP server for transmitting and receiving YDWG RAW "
508+
"data.",
509+
1350);
510+
477511
port_config_ydwg_raw_udp = new BiDiPortConfig(
478512
true, false, "Transmit to WiFi", "Receive from WiFi",
479513
kDefaultYdwgRawUDPServerPort, "/Network/YDWG RAW over UDP",
@@ -498,6 +532,13 @@ void SetupUIComponents() {
498532
"Enable a TCP server for transmitting NMEA 0183 and SeaSmart.Net data.",
499533
1800);
500534

535+
port_config_nmea0183_tcp_client = new HostPortConfig(
536+
false, "", kDefaultNMEA0183TCPServerPort, "Enabled", "Server hostname",
537+
"Server port", "/Network/NMEA 0183 TCP Client",
538+
"Connect to another TCP server for transmitting NMEA 0183 and "
539+
"SeaSmart.Net data.",
540+
1850);
541+
501542
port_config_nmea0183_udp_tx = new PortConfig(
502543
true, kDefaultNMEA0183UDPServerPort, "/Network/NMEA 0183 over UDP",
503544
"Broadcast NMEA 0183 and SeaSmart.Net data over UDP.", 1900);
@@ -593,7 +634,7 @@ void setup() {
593634
n2k_msg_input.connect_to(new LambdaConsumer<tN2kMsg>(
594635
[](const tN2kMsg &n2k_msg) { SetSystemTime(n2k_msg); }));
595636

596-
SetupTransmitters();
637+
SetupConnections();
597638

598639
app.onRepeat(1000, []() {
599640
debugD("Uptime: %lu, CAN RX: %d CAN TX: %d", millis() / 1000,

src/origin_string.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
#ifndef SH_WG_FIRMWARE_ORIGIN_STRING_H_
2+
#define SH_WG_FIRMWARE_ORIGIN_STRING_H_
3+
4+
#include <Arduino.h>
5+
6+
#include <cstdint>
7+
8+
#include "sensesp.h"
9+
10+
using namespace sensesp;
11+
12+
/**
13+
* @brief Container for Origin aware strings.
14+
*/
15+
struct OriginString {
16+
uint32_t origin_id; // string origin identifier
17+
String data; // data
18+
};
19+
20+
#endif // SH_WG_FIRMWARE_ORIGIN_STRING_H_

src/streaming_tcp_client.cpp

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#include "streaming_tcp_client.h"
2+
3+
using namespace sensesp;
4+
5+
void ExecuteTCPClientTask(void* this_ptr) {
6+
// cast this_ptr into a pointer to a StreamingTCPClient
7+
StreamingTCPClient* this_ = (StreamingTCPClient*)this_ptr;
8+
9+
this_->execute_client_task();
10+
}
11+
12+
void StreamingTCPClient::start() {
13+
if (enabled_) {
14+
xTaskCreate(ExecuteTCPClientTask, "tcp_client_task", 4096, this, 1, NULL);
15+
16+
// emit received OriginStrings in the main task
17+
rx_queue_producer_->connect_to(
18+
new LambdaConsumer<OriginString*>([this](OriginString* origin_str) {
19+
this->emit(*origin_str);
20+
delete origin_str;
21+
}));
22+
}
23+
}

src/streaming_tcp_client.h

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
#ifndef SH_WG_FIRMWARE_STREAMING_TCP_CLIENT_H_
2+
#define SH_WG_FIRMWARE_STREAMING_TCP_CLIENT_H_
3+
4+
#include <Arduino.h>
5+
#include <WiFi.h>
6+
7+
#include "buffered_tcp_client.h"
8+
#include "origin_string.h"
9+
#include "sensesp/net/networking.h"
10+
#include "sensesp/system/lambda_consumer.h"
11+
#include "sensesp/system/task_queue_producer.h"
12+
#include "shwg.h"
13+
14+
using namespace sensesp;
15+
16+
/**
17+
* @brief TCP client that is able to receive and transmit continuous data
18+
* streams.
19+
*/
20+
class StreamingTCPClient : public ValueProducer<OriginString>,
21+
public ValueConsumer<OriginString>,
22+
public Startable {
23+
public:
24+
StreamingTCPClient(const String& host, const uint16_t port,
25+
Networking* networking)
26+
: Startable(50), networking_{networking}, host_{host}, port_{port} {
27+
task_app_ = new ReactESP(false);
28+
client_ = new BufferedTCPClient(WiFiClientPtr(new WiFiClient()));
29+
tx_queue_producer_ =
30+
new TaskQueueProducer<OriginString*>(NULL, task_app_, 200, 491);
31+
rx_queue_producer_ =
32+
new TaskQueueProducer<OriginString*>(NULL, ReactESP::app, 200, 492);
33+
}
34+
35+
void set_input(OriginString new_value, uint8_t input_channel = 0) override {
36+
OriginString* value_ptr = new OriginString(new_value);
37+
bool retval = tx_queue_producer_->set(value_ptr);
38+
if (retval == false) {
39+
debugW("StreamingTCPClient: tx_queue_producer_ full, dropping value");
40+
delete value_ptr;
41+
}
42+
}
43+
44+
void set_enabled(bool enabled) { enabled_ = enabled; }
45+
46+
protected:
47+
Networking* networking_;
48+
const String host_;
49+
const uint16_t port_;
50+
51+
BufferedTCPClient* client_;
52+
53+
TaskQueueProducer<OriginString*>* tx_queue_producer_;
54+
TaskQueueProducer<OriginString*>* rx_queue_producer_;
55+
56+
ObservableValue<OriginString> tx_string_;
57+
58+
ReactESP* task_app_ = nullptr;
59+
60+
bool enabled_ = true;
61+
62+
void start() override;
63+
64+
void execute_client_task() {
65+
// Receive strings to be transmitted in the tcp client task.
66+
// We don't want consumers to connect to the task queue directly, because
67+
// we're responsible for deleting the received string objects.
68+
this->tx_queue_producer_->connect_to(
69+
new LambdaConsumer<OriginString*>([this](OriginString* origin_str) {
70+
this->tx_string_ = *origin_str;
71+
delete origin_str;
72+
}));
73+
74+
auto send_data =
75+
new LambdaConsumer<OriginString>([this](OriginString origin_str) {
76+
if (client_->client_->connected() &&
77+
origin_str.origin_id != origin_id(&client_->client_)) {
78+
client_->client_->write(origin_str.data.c_str());
79+
}
80+
});
81+
82+
task_app_->onRepeat(2000, [this]() {
83+
if (client_->client_->connected()) {
84+
// Send an empty line as a keepalive message. Without this,
85+
// disconnection detection takes just about forever.
86+
client_->client_->write("\r\n");
87+
}
88+
});
89+
90+
task_app_->onRepeat(100, [this]() {
91+
// flush the TCP client TX buffer
92+
if (client_->client_->connected()) {
93+
client_->client_->flush();
94+
}
95+
});
96+
97+
tx_string_.connect_to(send_data);
98+
99+
// receive any data sent to the client
100+
task_app_->onRepeat(1, [this]() {
101+
if (client_->available() || client_->client_->connected()) {
102+
String line;
103+
int retval;
104+
while (this->client_->read_line(line)) {
105+
OriginString* value =
106+
new OriginString{origin_id(&client_->client_), line};
107+
retval = this->rx_queue_producer_->set(value);
108+
if (retval == false) {
109+
debugW(
110+
"StreamingTCPClient: rx_queue_producer_ full, dropping value");
111+
delete value;
112+
}
113+
}
114+
}
115+
});
116+
117+
// try to establish a connection to the server
118+
task_app_->onRepeat(1000, [this]() {
119+
if (!client_->client_->connected()) {
120+
client_->client_->stop();
121+
client_->clear_buf();
122+
debugD("Connecting to %s:%d...", host_.c_str(), port_);
123+
client_->client_->connect(host_.c_str(), port_);
124+
debugD("Connected");
125+
}
126+
});
127+
128+
while (true) {
129+
task_app_->tick();
130+
131+
// A small delay required to prevent the task watchdog from triggering.
132+
// This also limits the maximum packet rate but greatly reduces
133+
// idle CPU load.
134+
delay(1);
135+
}
136+
}
137+
138+
// a new task entry point is always a plain function; use this friend
139+
// to route the execution back to this class
140+
friend void ExecuteTCPClientTask(void* task_args);
141+
};
142+
143+
#endif // SH_WG_FIRMWARE_STREAMING_TCP_CLIENT_H_

0 commit comments

Comments
 (0)