Skip to content

Commit cf82fff

Browse files
SNOW-569611 Close channel once + make future idempotent
1 parent aebd3d1 commit cf82fff

File tree

3 files changed

+84
-42
lines changed

3 files changed

+84
-42
lines changed

src/main/java/net/snowflake/ingest/streaming/internal/ChannelCache.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,9 @@ void addChannel(SnowflakeStreamingIngestChannelInternal channel) {
5252
void closeAllChannels() {
5353
this.cache
5454
.values()
55-
.forEach(channels -> channels.values().forEach(channel -> channel.markClosed()));
55+
.forEach(
56+
channels ->
57+
channels.values().forEach(SnowflakeStreamingIngestChannelInternal::markClosed));
5658
}
5759

5860
/** Remove a channel in the channel cache if the channel sequencer matches */
@@ -65,7 +67,7 @@ void removeChannelIfSequencersMatch(SnowflakeStreamingIngestChannelInternal chan
6567
// We need to compare the channel sequencer in case the old channel was already been
6668
// removed
6769
return channelInCache != null
68-
&& channelInCache.getChannelSequencer() == channel.getChannelSequencer()
70+
&& channelInCache.getChannelSequencer() == channel.getChannelSequencer() // TODO: 4/11/22 this is wrong because the long is boxed so == checks refs and not the value
6971
&& v.remove(channel.getName()) != null
7072
&& v.isEmpty()
7173
? null

src/main/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelInternal.java

Lines changed: 69 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
import java.util.Map;
1313
import java.util.concurrent.CompletableFuture;
1414
import java.util.concurrent.atomic.AtomicLong;
15+
import java.util.concurrent.atomic.AtomicReference;
1516
import java.util.stream.Collectors;
1617
import net.snowflake.ingest.streaming.InsertValidationResponse;
1718
import net.snowflake.ingest.streaming.OpenChannelRequest;
@@ -45,7 +46,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
4546
private volatile boolean isValid;
4647

4748
// Indicates whether the channel is closed
48-
private volatile boolean isClosed;
49+
private final AtomicReference<Boolean> isClosed;
4950

5051
// Reference to the client that owns this channel
5152
private final SnowflakeStreamingIngestClientInternal owningClient;
@@ -65,6 +66,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
6566
// ON_ERROR option for this channel
6667
private final OpenChannelRequest.OnErrorOption onErrorOption;
6768

69+
private final CompletableFuture<Void> terminationFuture;
70+
6871
/**
6972
* Constructor for TESTING ONLY which allows us to set the test mode
7073
*
@@ -99,7 +102,7 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
99102
this.channelSequencer = channelSequencer;
100103
this.rowSequencer = new AtomicLong(rowSequencer);
101104
this.isValid = true;
102-
this.isClosed = false;
105+
this.isClosed = new AtomicReference<>(false);
103106
this.owningClient = client;
104107
this.isTestMode = isTestMode;
105108
this.allocator =
@@ -112,6 +115,8 @@ class SnowflakeStreamingIngestChannelInternal implements SnowflakeStreamingInges
112115
this.encryptionKey = encryptionKey;
113116
this.encryptionKeyId = encryptionKeyId;
114117
this.onErrorOption = onErrorOption;
118+
this.terminationFuture = new CompletableFuture<>();
119+
115120
logger.logDebug("Channel={} created for table={}", this.channelName, this.tableName);
116121
}
117122

@@ -255,20 +260,31 @@ void invalidate() {
255260
rowSequencer);
256261
}
257262

258-
/** @return a boolean to indicate whether the channel is closed or not */
263+
/**
264+
* @return {@code true} if the channel was closed, {@code false} otherwise. A return value of
265+
* {@code true} does not mean that the data is committed. For this, you should call {@link
266+
* #close()} and wait on the returned future.
267+
*/
259268
@Override
260269
public boolean isClosed() {
261-
return this.isClosed;
270+
return this.isClosed.get();
262271
}
263272

264-
/** Mark the channel as closed */
265-
void markClosed() {
266-
this.isClosed = true;
267-
logger.logDebug(
268-
"Channel is closed, name={}, channel sequencer={}, row sequencer={}",
269-
getFullyQualifiedName(),
270-
channelSequencer,
271-
rowSequencer);
273+
/**
274+
* Mark the channel as closed atomically.
275+
*
276+
* @return {@code true} if the channel was already closed, {@code false} otherwise.
277+
*/
278+
boolean markClosed() {
279+
final boolean wasAlreadyClosed = this.isClosed.getAndSet(true);
280+
if (!wasAlreadyClosed) {
281+
logger.logDebug(
282+
"Channel is closed, name={}, channel sequencer={}, row sequencer={}",
283+
getFullyQualifiedName(),
284+
channelSequencer,
285+
rowSequencer);
286+
}
287+
return wasAlreadyClosed;
272288
}
273289

274290
/**
@@ -281,7 +297,9 @@ CompletableFuture<Void> flush(boolean closing) {
281297
// Skip this check for closing because we need to set the channel to closed first and then flush
282298
// in case there is any leftover rows
283299
if (isClosed() && !closing) {
284-
throw new SFException(ErrorCode.CLOSED_CHANNEL);
300+
final CompletableFuture<Void> res = new CompletableFuture<>();
301+
res.completeExceptionally(new SFException(ErrorCode.CLOSED_CHANNEL));
302+
return res;
285303
}
286304

287305
// Simply return if there is no data in the channel, this might not work if we support public
@@ -294,38 +312,51 @@ CompletableFuture<Void> flush(boolean closing) {
294312
}
295313

296314
/**
297-
* Close the channel (this will flush in-flight buffered data)
315+
* Close the channel and flush in-flight buffered data.
298316
*
299-
* @return future which will be complete when the channel is closed
317+
* @return a {@link CompletableFuture} which will be completed when the channel is closed and the
318+
* data is successfully committed or has failed to be committed for some reason.
300319
*/
301320
@Override
302321
public CompletableFuture<Void> close() {
303322
checkValidation();
304323

305-
if (isClosed()) {
306-
return CompletableFuture.completedFuture(null);
307-
}
324+
if (!markClosed()) {
325+
this.owningClient.removeChannelIfSequencersMatch(this);
308326

309-
markClosed();
310-
this.owningClient.removeChannelIfSequencersMatch(this);
311-
return flush(true)
312-
.thenRunAsync(
313-
() -> {
314-
List<SnowflakeStreamingIngestChannelInternal> uncommittedChannels =
315-
this.owningClient.verifyChannelsAreFullyCommitted(
316-
Collections.singletonList(this));
317-
318-
this.arrowBuffer.close();
319-
320-
// Throw an exception if the channel has any uncommitted rows
321-
if (!uncommittedChannels.isEmpty()) {
322-
throw new SFException(
323-
ErrorCode.CHANNEL_WITH_UNCOMMITTED_ROWS,
324-
uncommittedChannels.stream()
325-
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
326-
.collect(Collectors.toList()));
327-
}
328-
});
327+
// here we flush even if removal fails because, for some reason,
328+
// the client sequencers did not match.
329+
// This does not seem like the desirable behavior.
330+
331+
flush(true)
332+
.thenRunAsync(
333+
() -> {
334+
List<SnowflakeStreamingIngestChannelInternal> uncommittedChannels =
335+
this.owningClient.verifyChannelsAreFullyCommitted(
336+
Collections.singletonList(this));
337+
338+
this.arrowBuffer.close();
339+
340+
// Throw an exception if the channel has any uncommitted rows
341+
if (!uncommittedChannels.isEmpty()) {
342+
throw new SFException(
343+
ErrorCode.CHANNEL_WITH_UNCOMMITTED_ROWS,
344+
uncommittedChannels.stream()
345+
.map(SnowflakeStreamingIngestChannelInternal::getFullyQualifiedName)
346+
.collect(Collectors.toList()));
347+
}
348+
})
349+
.handle(
350+
(ignored, t) -> {
351+
if (t != null) {
352+
terminationFuture.completeExceptionally(t);
353+
} else {
354+
terminationFuture.complete(null);
355+
}
356+
return null;
357+
});
358+
}
359+
return terminationFuture;
329360
}
330361

331362
/**

src/test/java/net/snowflake/ingest/streaming/internal/SnowflakeStreamingIngestChannelTest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -583,8 +583,17 @@ public void testFlush() throws Exception {
583583
channel.close().get();
584584
try {
585585
channel.flush(false).get();
586-
} catch (SFException e) {
587-
Assert.assertEquals(ErrorCode.CLOSED_CHANNEL.getMessageCode(), e.getVendorCode());
586+
} catch (Exception e) {
587+
Throwable t = e;
588+
while (t != null) {
589+
t = e.getCause();
590+
if (t instanceof SFException) {
591+
Assert.assertEquals(
592+
ErrorCode.CLOSED_CHANNEL.getMessageCode(), ((SFException) t).getVendorCode());
593+
return;
594+
}
595+
}
596+
Assert.fail("Wrong exception: " + e.getMessage());
588597
}
589598
}
590599

0 commit comments

Comments
 (0)