Skip to content

Commit f521a6a

Browse files
committed
polishes logging and refactor Client/ServerRSocketSession. adds tests. Improves KeepAliveSupport
Signed-off-by: Oleh Dokuka <[email protected]>
1 parent de60762 commit f521a6a

File tree

8 files changed

+900
-200
lines changed

8 files changed

+900
-200
lines changed

Diff for: rsocket-core/src/main/java/io/rsocket/core/ServerSetup.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -114,10 +114,10 @@ public Mono<Void> acceptRSocketSetup(
114114
final ServerRSocketSession serverRSocketSession =
115115
new ServerRSocketSession(
116116
resumeToken,
117-
duplexConnection,
118117
resumableDuplexConnection,
119-
resumeSessionDuration,
118+
duplexConnection,
120119
resumableFramesStore,
120+
resumeSessionDuration,
121121
cleanupStoreOnKeepAlive);
122122

123123
sessionManager.save(serverRSocketSession, resumeToken);

Diff for: rsocket-core/src/main/java/io/rsocket/keepalive/KeepAliveSupport.java

+47-22
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
import io.rsocket.resume.ResumeStateHolder;
2424
import java.time.Duration;
2525
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.atomic.AtomicBoolean;
26+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2727
import java.util.function.Consumer;
2828
import reactor.core.Disposable;
2929
import reactor.core.publisher.Flux;
@@ -38,11 +38,19 @@ public abstract class KeepAliveSupport implements KeepAliveFramesAcceptor {
3838
final Duration keepAliveTimeout;
3939
final long keepAliveTimeoutMillis;
4040

41-
final AtomicBoolean started = new AtomicBoolean();
41+
volatile int state;
42+
static final AtomicIntegerFieldUpdater<KeepAliveSupport> STATE =
43+
AtomicIntegerFieldUpdater.newUpdater(KeepAliveSupport.class, "state");
44+
45+
static final int STOPPED_STATE = 0;
46+
static final int STARTING_STATE = 1;
47+
static final int STARTED_STATE = 2;
48+
static final int DISPOSED_STATE = -1;
4249

4350
volatile Consumer<KeepAlive> onTimeout;
4451
volatile Consumer<ByteBuf> onFrameSent;
45-
volatile Disposable ticksDisposable;
52+
53+
Disposable ticksDisposable;
4654

4755
volatile ResumeStateHolder resumeStateHolder;
4856
volatile long lastReceivedMillis;
@@ -57,25 +65,30 @@ private KeepAliveSupport(
5765
}
5866

5967
public KeepAliveSupport start() {
60-
this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS);
61-
if (started.compareAndSet(false, true)) {
62-
ticksDisposable =
68+
if (this.state == STOPPED_STATE && STATE.compareAndSet(this, STOPPED_STATE, STARTING_STATE)) {
69+
this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS);
70+
71+
final Disposable disposable =
6372
Flux.interval(keepAliveInterval, scheduler).subscribe(v -> onIntervalTick());
73+
this.ticksDisposable = disposable;
74+
75+
if (this.state != STARTING_STATE
76+
|| !STATE.compareAndSet(this, STARTING_STATE, STARTED_STATE)) {
77+
disposable.dispose();
78+
}
6479
}
6580
return this;
6681
}
6782

6883
public void stop() {
69-
if (started.compareAndSet(true, false)) {
70-
ticksDisposable.dispose();
71-
}
84+
terminate(STOPPED_STATE);
7285
}
7386

7487
@Override
7588
public void receive(ByteBuf keepAliveFrame) {
7689
this.lastReceivedMillis = scheduler.now(TimeUnit.MILLISECONDS);
7790
if (resumeStateHolder != null) {
78-
long remoteLastReceivedPos = remoteLastReceivedPosition(keepAliveFrame);
91+
final long remoteLastReceivedPos = KeepAliveFrameCodec.lastPosition(keepAliveFrame);
7992
resumeStateHolder.onImpliedPosition(remoteLastReceivedPos);
8093
}
8194
if (KeepAliveFrameCodec.respondFlag(keepAliveFrame)) {
@@ -104,6 +117,16 @@ public KeepAliveSupport onTimeout(Consumer<KeepAlive> onTimeout) {
104117
return this;
105118
}
106119

120+
@Override
121+
public void dispose() {
122+
terminate(DISPOSED_STATE);
123+
}
124+
125+
@Override
126+
public boolean isDisposed() {
127+
return ticksDisposable.isDisposed();
128+
}
129+
107130
abstract void onIntervalTick();
108131

109132
void send(ByteBuf frame) {
@@ -122,22 +145,24 @@ void tryTimeout() {
122145
}
123146
}
124147

125-
long localLastReceivedPosition() {
126-
return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0;
127-
}
148+
void terminate(int terminationState) {
149+
for (; ; ) {
150+
final int state = this.state;
128151

129-
long remoteLastReceivedPosition(ByteBuf keepAliveFrame) {
130-
return KeepAliveFrameCodec.lastPosition(keepAliveFrame);
131-
}
152+
if (state == STOPPED_STATE || state == DISPOSED_STATE) {
153+
return;
154+
}
132155

133-
@Override
134-
public void dispose() {
135-
stop();
156+
final Disposable disposable = this.ticksDisposable;
157+
if (STATE.compareAndSet(this, state, terminationState)) {
158+
disposable.dispose();
159+
return;
160+
}
161+
}
136162
}
137163

138-
@Override
139-
public boolean isDisposed() {
140-
return ticksDisposable.isDisposed();
164+
long localLastReceivedPosition() {
165+
return resumeStateHolder != null ? resumeStateHolder.impliedPosition() : 0;
141166
}
142167

143168
public static final class ClientKeepAliveSupport extends KeepAliveSupport {

0 commit comments

Comments
 (0)