Skip to content

Commit c30714d

Browse files
authored
Merge pull request #18 from aws/tcp-autopause
tcp datajet: add close on error and pause on buffer full
2 parents 32d23ca + 4b7f13a commit c30714d

File tree

1 file changed

+39
-4
lines changed

1 file changed

+39
-4
lines changed

datajets/tcp-datajet.ts

Lines changed: 39 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,28 +15,39 @@ interface IDatajetConfig {
1515
host: string,
1616
port: number,
1717
maxRetries: number,
18+
tcpBufferLimit: number
1819
}
1920

2021
const defaultConfig: IDatajetConfig = {
2122
host: "127.0.0.1",
2223
port: 5170,
2324
maxRetries: 2,
25+
tcpBufferLimit: 100_000_000 /* 100 Megabytes */
2426
}
2527

2628
const tcpDatajet: IDatajet = {
2729
name: "tcp",
2830
defaultConfig: defaultConfig,
29-
createConfiguredDatajet: function (config: IDatajetConfig) {
31+
createConfiguredDatajet: function (config: IDatajetConfig, {
32+
logger
33+
}) {
3034

3135
/* lazy client creation */
3236
let client: net.Socket = null;
37+
let isPaused: boolean = false;
38+
let isClosed: boolean = false;
3339
const makeClient = () => {
3440
if (client) {
3541
client.destroy();
3642
}
3743
client = new net.Socket();
3844
client.connect(config.port, config.host, function() {
39-
console.log(`Connected tcp ${config.host}:${config.port}`);
45+
logger.info(`Connected tcp ${config.host}:${config.port}`);
46+
});
47+
48+
client.on('close', function() {
49+
logger.info(`Connection closed ${config.host}:${config.port}`);
50+
isClosed = true;
4051
});
4152
}
4253
return {
@@ -46,18 +57,42 @@ const tcpDatajet: IDatajet = {
4657
if (!client) {
4758
makeClient();
4859
}
60+
61+
/* client paused */
62+
if (isPaused) {
63+
/* still paused */
64+
if (client.writableLength > config.tcpBufferLimit * 3/4) {
65+
continue
66+
}
67+
68+
/* sent enough data to resume */
69+
logger.info(`Resuming tcp datajet ${config.host}:${config.port}`);
70+
isPaused = false;
71+
}
72+
73+
/* check if client is closed */
74+
if (isClosed) {
75+
return false;
76+
}
77+
4978
for (let r = 0; r < config.maxRetries + 1; ++r) {
5079
try {
5180
client.write(JSON.stringify(log));
81+
82+
/* check if client needs to be paused */
83+
if (client.writableLength > config.tcpBufferLimit) {
84+
logger.info(`Pausing tcp datajet ${config.host}:${config.port}`);
85+
isPaused = true;
86+
}
5287
break;
5388
}
5489
catch (e) {
5590
if (r < config.maxRetries) {
56-
console.log(`Failed to write to tcp connection. Re-establishing connection. Attempt ${r+1}.`);
91+
logger.info(`Failed to write to tcp connection. Re-establishing connection. Attempt ${r+1}.`);
5792
makeClient();
5893
}
5994
else {
60-
console.log(`TCP connection failure to ${config.host}:${config.port}.`);
95+
logger.info(`TCP connection failure to ${config.host}:${config.port}.`);
6196
return false;
6297
}
6398
}

0 commit comments

Comments
 (0)