Skip to content

Commit 6d6ef11

Browse files
committed
Fixed TCP Overload test
1 parent 7b3fd4d commit 6d6ef11

File tree

1 file changed

+85
-104
lines changed

1 file changed

+85
-104
lines changed

riemann-java-client/src/main/java/io/riemann/riemann/client/TcpTransport.java

Lines changed: 85 additions & 104 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,11 @@
33
import io.netty.bootstrap.Bootstrap;
44
import io.netty.channel.Channel;
55
import io.netty.channel.ChannelFuture;
6-
import io.netty.channel.ChannelFutureListener;
76
import io.netty.channel.ChannelInitializer;
87
import io.netty.channel.ChannelOption;
98
import io.netty.channel.ChannelPipeline;
109
import io.netty.channel.EventLoopGroup;
10+
import io.netty.channel.WriteBufferWaterMark;
1111
import io.netty.channel.group.ChannelGroup;
1212
import io.netty.channel.group.DefaultChannelGroup;
1313
import io.netty.channel.nio.NioEventLoopGroup;
@@ -35,53 +35,46 @@
3535
import org.slf4j.LoggerFactory;
3636

3737
public class TcpTransport implements AsynchronousTransport {
38-
// Logger
3938
public final Logger logger = LoggerFactory.getLogger(TcpTransport.class);
4039

41-
// Shared pipeline handlers
4240
public static final ProtobufDecoder pbDecoder =
43-
new ProtobufDecoder(Msg.getDefaultInstance());
44-
public static final ProtobufEncoder pbEncoder =
45-
new ProtobufEncoder();
46-
public static final LengthFieldPrepender frameEncoder =
47-
new LengthFieldPrepender(4);
41+
new ProtobufDecoder(Msg.getDefaultInstance());
42+
public static final ProtobufEncoder pbEncoder = new ProtobufEncoder();
43+
public static final LengthFieldPrepender frameEncoder = new LengthFieldPrepender(4);
4844

4945
public static final int DEFAULT_PORT = 5555;
5046

51-
// I AM A STATE MUSHEEN
5247
public enum State {
5348
DISCONNECTED,
54-
CONNECTING,
55-
CONNECTED,
56-
DISCONNECTING
49+
CONNECTING,
50+
CONNECTED,
51+
DISCONNECTING
5752
}
5853

59-
// STATE STATE STATE
6054
public volatile State state = State.DISCONNECTED;
6155
public final EventLoopGroup eventLoopGroup = new NioEventLoopGroup(1);
6256
public final ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
6357
public volatile Bootstrap bootstrap;
6458
public volatile Semaphore writeLimiter = new Semaphore(8192);
6559

66-
// Configuration
67-
public final AtomicBoolean autoFlush = new AtomicBoolean(true);
68-
public final AtomicInteger writeLimit = new AtomicInteger(8192);
69-
public final AtomicLong reconnectDelay = new AtomicLong(5000);
60+
public final AtomicBoolean autoFlush = new AtomicBoolean(true);
61+
public final AtomicInteger writeLimit = new AtomicInteger(8192);
62+
public final AtomicLong reconnectDelay = new AtomicLong(5000);
7063
public final AtomicInteger connectTimeout = new AtomicInteger(5000);
71-
public final AtomicInteger writeTimeout = new AtomicInteger(5000);
64+
public final AtomicInteger writeTimeout = new AtomicInteger(5000);
7265
public final AtomicInteger writeBufferHigh = new AtomicInteger(1024 * 64);
73-
public final AtomicInteger writeBufferLow = new AtomicInteger(1024 * 8);
66+
public final AtomicInteger writeBufferLow = new AtomicInteger(1024 * 8);
7467

7568
public final InetSocketAddress remoteAddress;
7669
public final InetSocketAddress localAddress;
77-
public final AtomicReference<SSLContext> sslContext =
78-
new AtomicReference<SSLContext>();
70+
public final AtomicReference<SSLContext> sslContext = new AtomicReference<SSLContext>();
7971

80-
public volatile ExceptionReporter exceptionReporter = new ExceptionReporter() {
81-
public void reportException(final Throwable t) {
82-
// By default, don't spam the logs.
83-
}
84-
};
72+
public volatile ExceptionReporter exceptionReporter =
73+
new ExceptionReporter() {
74+
public void reportException(final Throwable t) {
75+
// intentionally quiet by default
76+
}
77+
};
8578

8679
public void setExceptionReporter(final ExceptionReporter exceptionReporter) {
8780
this.exceptionReporter = exceptionReporter;
@@ -105,7 +98,8 @@ public TcpTransport(
10598
final String remoteHost, final int remotePort, final String localHost, final int localPort)
10699
throws IOException {
107100
this(
108-
InetSocketAddress.createUnresolved(remoteHost, remotePort), InetSocketAddress.createUnresolved(localHost, localPort));
101+
InetSocketAddress.createUnresolved(remoteHost, remotePort),
102+
InetSocketAddress.createUnresolved(localHost, localPort));
109103
}
110104

111105
public TcpTransport(final String remoteHost) throws IOException {
@@ -120,112 +114,94 @@ public TcpTransport(final int remotePort) throws IOException {
120114
this(InetAddress.getLocalHost().getHostAddress(), remotePort);
121115
}
122116

123-
// Set the number of outstanding writes allowed at any time.
124117
public synchronized TcpTransport setWriteBufferLimit(final int limit) {
125118
if (isConnected()) {
126-
throw new IllegalStateException("can't modify the write buffer limit of a connected transport; please set the limit before connecting");
119+
throw new IllegalStateException(
120+
"can't modify the write buffer limit of a connected transport; please set the limit before connecting");
127121
}
128-
129122
writeLimit.set(limit);
130123
writeLimiter = new Semaphore(limit);
131124
return this;
132125
}
133126

134127
@Override
135128
public boolean isConnected() {
136-
// Are we in state connected?
137129
if (state != State.CONNECTED) {
138130
return false;
139131
}
140-
141-
// Is at least one channel connected?
142132
for (Channel ch : channels) {
143133
if (ch.isOpen()) {
144134
return true;
145135
}
146136
}
147-
148137
return false;
149138
}
150139

151-
// Builds a new SSLHandler
152140
public SslHandler sslHandler() {
153141
final SSLContext context = sslContext.get();
154142
if (context == null) {
155143
return null;
156144
}
157-
158145
final SSLEngine engine = context.createSSLEngine();
159146
engine.setUseClientMode(true);
160-
161-
final SslHandler handler = new SslHandler(engine);
162-
163-
// to disable tls renegotiation see:
164-
// https://stackoverflow.com/questions/31418644/is-it-possible-to-disable-tls-renegotiation-in-netty-4
165-
166-
return handler;
147+
return new SslHandler(engine);
167148
}
168149

169150
@Override
170-
// Does nothing if not currently disconnected.
171151
public synchronized void connect() throws IOException {
172152
if (state != State.DISCONNECTED) {
173153
return;
174154
}
175155
state = State.CONNECTING;
176156

177-
// Create bootstrap
178-
bootstrap = new Bootstrap().group(eventLoopGroup)
179-
.localAddress(localAddress)
180-
.remoteAddress(remoteAddress)
181-
.channel(NioSocketChannel.class)
182-
.handler(
183-
new ChannelInitializer<SocketChannel>() {
184-
@Override
185-
protected void initChannel(SocketChannel channel) {
186-
ChannelPipeline p = channel.pipeline();
187-
// Reconnections
188-
p.addLast(
189-
"reconnect",
190-
new ReconnectHandler(bootstrap, channels, reconnectDelay, TimeUnit.MILLISECONDS));
191-
192-
// TLS
193-
final SslHandler sslHandler = sslHandler();
194-
if (sslHandler != null) {
195-
p.addLast("tls", sslHandler);
196-
}
157+
bootstrap =
158+
new Bootstrap()
159+
.group(eventLoopGroup)
160+
.localAddress(localAddress)
161+
.remoteAddress(remoteAddress)
162+
.channel(NioSocketChannel.class)
163+
.handler(
164+
new ChannelInitializer<SocketChannel>() {
165+
@Override
166+
protected void initChannel(SocketChannel channel) {
167+
ChannelPipeline p = channel.pipeline();
168+
p.addLast(
169+
"reconnect",
170+
new ReconnectHandler(
171+
bootstrap, channels, reconnectDelay, TimeUnit.MILLISECONDS));
172+
173+
final SslHandler sslHandler = sslHandler();
174+
if (sslHandler != null) {
175+
p.addLast("tls", sslHandler);
176+
}
177+
178+
p.addLast(
179+
"frame-decoder",
180+
new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
181+
p.addLast("frame-encoder", frameEncoder);
182+
p.addLast("protobuf-decoder", pbDecoder);
183+
p.addLast("protobuf-encoder", pbEncoder);
184+
p.addLast("handler", new TcpHandler(exceptionReporter));
185+
}
186+
});
197187

198-
// Normal codec
199-
p.addLast("frame-decoder", new LengthFieldBasedFrameDecoder(
200-
Integer.MAX_VALUE, 0, 4, 0, 4));
201-
p.addLast("frame-encoder", frameEncoder);
202-
p.addLast("protobuf-decoder", pbDecoder);
203-
p.addLast("protobuf-encoder", pbEncoder);
204-
p.addLast("handler", new TcpHandler(exceptionReporter));
205-
}});
206-
207-
// Set bootstrap options
208188
bootstrap.option(ChannelOption.TCP_NODELAY, true);
209189
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
210190
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, connectTimeout.get());
211-
bootstrap.option(ChannelOption.WRITE_BUFFER_LOW_WATER_MARK, writeBufferLow.get());
212-
bootstrap.option(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, writeBufferHigh.get());
191+
// modern watermark option replaces deprecated LOW/HIGH options
192+
bootstrap.option(
193+
ChannelOption.WRITE_BUFFER_WATER_MARK,
194+
new WriteBufferWaterMark(writeBufferLow.get(), writeBufferHigh.get()));
213195
bootstrap.localAddress(localAddress);
214196
bootstrap.remoteAddress(remoteAddress);
215197

216-
// Connect and wait for connection ready
217198
final ChannelFuture result = bootstrap.connect();
218199
channels.add(result.channel());
219200
result.awaitUninterruptibly();
220201

221-
// At this point we consider the client "connected"--even though the
222-
// connection may have failed. The channel will continue to initiate
223-
// reconnect attempts in the background.
224202
state = State.CONNECTED;
225203

226-
// We'll throw an exception so users can pretend this call is synchronous
227-
// (and log errors as appropriate) but the client might succeed later.
228-
if (! result.isSuccess()) {
204+
if (!result.isSuccess()) {
229205
throw new IOException("Connection failed", result.cause());
230206
}
231207
}
@@ -239,7 +215,6 @@ public synchronized void close(boolean force) {
239215
if (!(force || state == State.CONNECTED)) {
240216
return;
241217
}
242-
243218
try {
244219
channels.close().awaitUninterruptibly();
245220
eventLoopGroup.shutdownGracefully().awaitUninterruptibly();
@@ -260,51 +235,57 @@ public void flush() throws IOException {
260235
channels.flush();
261236
}
262237

263-
// Write a message to any handler and return a promise to be fulfilled by
264-
// the corresponding response Msg.
265238
@Override
266239
public IPromise<Msg> sendMessage(final Msg msg) {
267240
return sendMessage(msg, new Promise<Msg>());
268241
}
269242

270-
// Write a message to any available handler, fulfilling a specific promise.
243+
/**
244+
* Write a message to any available handler, fulfilling the provided promise. A slot is acquired
245+
* before enqueuing and released only when the response promise is delivered. This enforces an
246+
* upper bound on in-flight requests and allows the caller to observe overload conditions.
247+
*/
271248
public Promise<Msg> sendMessage(final Msg msg, final Promise<Msg> promise) {
272249
if (state != State.CONNECTED) {
273250
promise.deliver(new IOException("client not connected"));
274251
return promise;
275252
}
276253

277-
final Write write = new Write(msg, promise);
278254
final Semaphore limiter = writeLimiter;
279255

280-
// Reserve a slot in the queue
281256
if (limiter.tryAcquire()) {
257+
// Promise wrapper that releases the permit when the response is delivered.
258+
final Promise<Msg> releasing =
259+
new Promise<Msg>() {
260+
@Override
261+
public void deliver(final Object value) {
262+
try {
263+
// fulfill the caller's promise first
264+
promise.deliver(value);
265+
} finally {
266+
limiter.release();
267+
}
268+
}
269+
};
270+
282271
for (Channel channel : channels) {
283-
// When the write is flushed from our local buffer, release our
284-
// limiter permit.
285-
ChannelFuture f;
272+
final Write write = new Write(msg, releasing);
286273
if (autoFlush.get()) {
287-
f = channel.writeAndFlush(write);
274+
channel.writeAndFlush(write);
288275
} else {
289-
f = channel.write(write);
276+
channel.write(write);
290277
}
291-
f.addListener(
292-
new ChannelFutureListener() {
293-
@Override
294-
public void operationComplete(ChannelFuture f) {
295-
limiter.release();
296-
}
297-
});
278+
// we return the original promise that the caller passed in
298279
return promise;
299280
}
300281

301-
// No channels available, release the slot.
282+
// No channels available, release the slot and fail fast.
302283
limiter.release();
303284
promise.deliver(new IOException("no channels available"));
304285
return promise;
305286
}
306287

307-
// Buffer's full.
288+
// Buffer is full at the client.
308289
promise.deliver(
309290
new OverloadedException(
310291
"client write buffer is full: "
@@ -317,6 +298,6 @@ public void operationComplete(ChannelFuture f) {
317298

318299
@Override
319300
public Transport transport() {
320-
return null;
301+
return this;
321302
}
322303
}

0 commit comments

Comments
 (0)