-
Notifications
You must be signed in to change notification settings - Fork 176
Description
Description
The reconnect logic added in #1180 (commit 75ee7aea) introduced a thread-safety bug. When multiple threads concurrently trigger reconnection after a ClosedChannelException, a race condition causes IllegalStateException: Connect already invoked.
Stack Trace
java.lang.IllegalStateException: Connect already invoked
at sun.nio.ch.DatagramChannelImpl.connect(DatagramChannelImpl.java:728)
at com.netflix.spectator.sidecar.UdpWriter.connect(UdpWriter.java:45)
at com.netflix.spectator.sidecar.UdpWriter.writeImpl(UdpWriter.java:54)
Root Cause
The connect() method performs two non-atomic operations without synchronization:
private void connect() throws IOException {
channel = DatagramChannel.open(); // Step 1
channel.connect(address); // Step 2
}When two threads enter connect() concurrently:
- Thread A:
channel = DatagramChannel.open()(creates X) - Thread B:
channel = DatagramChannel.open()(creates Y, overwrites field) - Thread A:
channel.connect(address)(connects Y) - Thread B:
channel.connect(address)on Y → FAILS: already connected
Affected Versions
v1.8.10, v1.8.11, v1.8.12, v1.8.13, v1.8.14, v1.9.0, v1.9.1, v1.9.2, main
Impact
This affects BDP Spark 3.3 which upgraded Spectator from 1.8.0 to 1.9.2. High-throughput jobs using Cryptex UDFs (which record metrics via Spectator) fail under high concurrency.
Suggested Fix
Add synchronization to connect() and use double-checked locking pattern in writeImpl():
private final Object lock = new Object();
private volatile DatagramChannel channel;
private void connect() throws IOException {
synchronized (lock) {
DatagramChannel newChannel = DatagramChannel.open();
newChannel.connect(address);
channel = newChannel;
}
}
@Override public void writeImpl(String line) throws IOException {
ByteBuffer buffer = ByteBuffer.wrap(line.getBytes(StandardCharsets.UTF_8));
DatagramChannel ch = channel; // Local copy for thread safety
try {
ch.write(buffer);
} catch (ClosedChannelException e) {
synchronized (lock) {
// Double-check: another thread may have already reconnected
if (channel == ch) {
try {
connect();
} catch (IOException ex) {
LOGGER.warn("channel closed, failed to reconnect", ex);
}
}
}
throw e;
}
}Example Failure
Genie job: prod_ads.lb_etl.ad_idg_profile_edge.wf_110_1_write_ad_idg_profile_hem_stg-1
https://genie.prod.netflix.net/jobs?id=prod_ads.lb_etl.ad_idg_profile_edge.wf_110_1_write_ad_idg_profile_hem_stg-1&rowId=prod_ads.lb_etl.ad_idg_profile_edge.wf_110_1_write_ad_idg_profile_hem_stg-1