Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,42 +24,87 @@
import java.nio.channels.ClosedChannelException;
import java.nio.channels.DatagramChannel;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.locks.ReentrantLock;

/** Writer that outputs data to UDP socket. */
final class UdpWriter extends SidecarWriter {

private static final Logger LOGGER = LoggerFactory.getLogger(UdpWriter.class);

private final SocketAddress address;
private DatagramChannel channel;
private final ReentrantLock lock;
private volatile DatagramChannel channel;

/** Create a new instance. */
UdpWriter(String location, SocketAddress address) throws IOException {
super(location);
this.address = address;
this.lock = new ReentrantLock();
connect();
}

private void connect() throws IOException {
channel = DatagramChannel.open();
channel.connect(address);
DatagramChannel newChannel = DatagramChannel.open();
try {
newChannel.connect(address);
channel = newChannel;
} catch (IOException e) {
try {
newChannel.close();
} catch (IOException ignored) {
// Suppress close exception during error handling
}
throw e;
}
}

@Override public void writeImpl(String line) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8));
DatagramChannel ch = channel;
try {
channel.write(buffer);
ch.write(buffer);
} catch (ClosedChannelException e) {
lock.lock();
try {
connect();
} catch (IOException ex) {
LOGGER.warn("channel closed, failed to reconnect", ex);
// Double-check: another thread may have already reconnected
if (channel == ch) {
try {
connect();
// After successful reconnection, retry the write once
buffer.rewind();
channel.write(buffer);
// Write succeeded after reconnection
} catch (IOException ex) {
LOGGER.warn("channel closed, failed to reconnect", ex);
ex.initCause(e);
throw ex;
}
} else {
// Another thread reconnected, retry the write once with new channel
try {
buffer.rewind();
channel.write(buffer);
// Write succeeded with reconnected channel
} catch (IOException ex) {
LOGGER.warn("failed to write after reconnection by another thread", ex);
ex.initCause(e);
throw ex;
}
}
} finally {
lock.unlock();
}
throw e;
}
}

@Override public void close() throws IOException {
channel.close();
lock.lock();
try {
if (channel != null) {
channel.close();
}
} finally {
lock.unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,11 @@ public void udpReconnectIfClosed() throws IOException {
try (UdpServer server = new UdpServer()) {
try (SidecarWriter w = SidecarWriter.create(server.address())) {
// Used to simulate close from something like an interrupt. The next write
// will fail and it should try to reconnect.
// will trigger reconnection and retry the write.
w.close();
w.write("1");
// After reconnection, the write should succeed (retry logic)
Assertions.assertEquals("1", server.read());

w.write("2");
Assertions.assertEquals("2", server.read());
Expand All @@ -56,6 +58,69 @@ public void udpReconnectIfClosed() throws IOException {
}
}

@Test
public void concurrentReconnect() throws Exception {
try (UdpServer server = new UdpServer()) {
try (SidecarWriter w = SidecarWriter.create(server.address())) {
// Close the channel to force reconnection on next write
w.close();

// Multiple threads attempting to write simultaneously should trigger
// concurrent reconnection attempts. The key test is that we don't get
// IllegalStateException from "Connect already invoked"
Thread[] threads = new Thread[10];
List<Exception> exceptions = Collections.synchronizedList(new ArrayList<>());
for (int i = 0; i < threads.length; ++i) {
final int n = i;
Runnable task = () -> {
try {
// All threads will hit ClosedChannelException and try to reconnect
w.write("thread-" + n);
} catch (Exception e) {
exceptions.add(e);
}
};
threads[i] = new Thread(task);
}

// Start all threads at once to maximize race condition probability
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}

// The key assertion: should not get IllegalStateException from concurrent connect()
for (Exception e : exceptions) {
Assertions.assertFalse(
e instanceof IllegalStateException,
"Should not throw IllegalStateException: " + e.getMessage()
);
}

// Give time for any pending UDP packets to arrive, then drain them
Thread.sleep(100);

// Verify the writer still works after concurrent reconnection attempts
// by writing a unique marker and reading until we find it
String marker = "final-test-marker";
w.write(marker);

// Read messages until we find our marker (with timeout protection)
int attempts = 0;
String received = null;
while (attempts++ < 20) {
received = server.read();
if (marker.equals(received)) {
break;
}
}
Assertions.assertEquals(marker, received);
}
}
}

// Disabled because it can have issues on CI
@Test
@Disabled
Expand Down
Loading