Skip to content

Commit 125a665

Browse files
authored
[FLINK-37628] Fix reference counting in ForSt file cache (#26421)
1 parent 8605484 commit 125a665

File tree

3 files changed

+141
-68
lines changed

3 files changed

+141
-68
lines changed

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/CachedDataInputStream.java

+115-63
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,8 @@ public CachedDataInputStream(
9393
* Retrieves the appropriate input stream for reading data. This method attempts to use the
9494
* cached stream if it is available and valid. If the cached stream is not available, it falls
9595
* back to the original stream. The method also handles the transition between cached and
96-
* original streams based on the current status of the stream.
96+
* original streams based on the current status of the stream. The invoker must ensure to
97+
* release the cache stream after use.
9798
*
9899
* @return the input stream to be used for reading data
99100
* @throws IOException if an I/O error occurs while accessing the stream
@@ -102,15 +103,18 @@ private FSDataInputStream getStream() throws IOException {
102103
if (isFlinkThread()) {
103104
cacheEntry.touch();
104105
}
105-
FSDataInputStream stream = tryGetCacheStream();
106-
if (stream != null) {
107-
fileBasedCache.incHitCounter();
108-
return stream;
109-
}
110-
111-
if (streamStatus == StreamStatus.CACHED_CLOSED
112-
|| streamStatus == StreamStatus.CACHED_CLOSING) {
106+
int round = 0;
107+
// Repeat at most 3 times. If fails, we will get the original stream for read.
108+
while (round++ < 3) {
109+
// Firstly, we try to get cache stream
110+
FSDataInputStream stream = tryGetCacheStream();
111+
if (stream != null) {
112+
fileBasedCache.incHitCounter();
113+
return stream;
114+
}
115+
// No cache stream
113116
if (streamStatus == StreamStatus.CACHED_CLOSING) {
117+
// if closing, update the position
114118
try {
115119
semaphore.acquire(1);
116120
} catch (InterruptedException e) {
@@ -119,62 +123,78 @@ private FSDataInputStream getStream() throws IOException {
119123
originalStream.seek(position);
120124
position = -1;
121125
LOG.trace(
122-
"Stream {} status from {} to {}",
126+
"Cached Stream {} status from {} to {}",
123127
cacheEntry.cachePath,
124128
streamStatus,
125129
StreamStatus.CACHED_CLOSED);
126130
streamStatus = StreamStatus.CACHED_CLOSED;
127131
}
128-
// try reopen
129-
tryReopen();
130-
stream = tryGetCacheStream();
131-
if (stream != null) {
132-
fileBasedCache.incHitCounter();
133-
return stream;
134-
}
135-
fileBasedCache.incMissCounter();
136-
return originalStream;
137-
} else if (streamStatus == StreamStatus.ORIGINAL) {
138-
fileBasedCache.incMissCounter();
139-
return originalStream;
140-
} else {
141-
if (streamStatus == StreamStatus.CACHED_OPEN) {
142-
stream = tryGetCacheStream();
132+
// if it is CACHED_CLOSED, we try to reopen it
133+
if (streamStatus == StreamStatus.CACHED_CLOSED) {
134+
stream = tryReopenCachedStream();
143135
if (stream != null) {
144136
fileBasedCache.incHitCounter();
145137
return stream;
146138
}
139+
fileBasedCache.incMissCounter();
140+
return originalStream;
141+
} else if (streamStatus == StreamStatus.ORIGINAL) {
142+
fileBasedCache.incMissCounter();
143+
return originalStream;
144+
} else {
145+
// The stream is not closed, but we cannot get the cache stream.
146+
// Meaning that it is in the process of closing, but the status has not been
147+
// updated. Thus, we'd better retry here until it reach a stable state (CLOSING).
148+
Thread.yield();
147149
}
148-
fileBasedCache.incMissCounter();
149-
return originalStream;
150150
}
151+
return originalStream;
151152
}
152153

154+
/**
155+
* Attempts to retrieve the cached stream if it is open and the reference count is greater than
156+
* zero. If successful, it retains the reference count and returns the cached stream. The
157+
* invoker must ensure to release the stream after use.
158+
*
159+
* @return the cached stream if available, or null if not
160+
*/
153161
private FSDataInputStream tryGetCacheStream() {
154162
if (streamStatus == StreamStatus.CACHED_OPEN && cacheEntry.tryRetain() > 0) {
155-
return fsdis;
163+
// Double-check the status as it may change after retain.
164+
if (streamStatus == StreamStatus.CACHED_OPEN) {
165+
return fsdis;
166+
}
156167
}
157168
return null;
158169
}
159170

160-
private void tryReopen() {
171+
/**
172+
* Attempts to reopen the cached stream if it is closed and the current thread is a Flink
173+
* thread. If successful, it updates the stream status and seeks to the original stream's
174+
* position. Reference counting is retained, the invoked thread must dereference the stream
175+
* after use.
176+
*
177+
* @return the reopened cached stream, or null if reopening fails
178+
*/
179+
private FSDataInputStream tryReopenCachedStream() {
161180
if (streamStatus == StreamStatus.CACHED_CLOSED && isFlinkThread()) {
162181
try {
163182
fsdis = cacheEntry.getCacheStream();
164183
if (fsdis != null) {
165184
LOG.trace(
166-
"Stream {} status from {} to {}",
185+
"Cached Stream {} status from {} to {}",
167186
cacheEntry.cachePath,
168187
streamStatus,
169188
StreamStatus.CACHED_OPEN);
170189
fsdis.seek(originalStream.getPos());
171190
streamStatus = StreamStatus.CACHED_OPEN;
172-
cacheEntry.release();
191+
return fsdis;
173192
}
174193
} catch (IOException e) {
175194
LOG.warn("Reopen stream error.", e);
176195
}
177196
}
197+
return null;
178198
}
179199

180200
/**
@@ -196,72 +216,87 @@ synchronized void closeCachedStream() throws IOException {
196216
}
197217
}
198218

199-
private void finish() {
200-
if (streamStatus == StreamStatus.CACHED_OPEN) {
201-
cacheEntry.release();
202-
}
203-
}
204-
205219
@Override
206220
public void seek(long desired) throws IOException {
221+
FSDataInputStream stream = getStream();
207222
try {
208-
getStream().seek(desired);
223+
stream.seek(desired);
209224
} finally {
210-
finish();
225+
if (stream != originalStream) {
226+
cacheEntry.release();
227+
}
211228
}
212229
}
213230

214231
@Override
215232
public long getPos() throws IOException {
233+
FSDataInputStream stream = getStream();
216234
try {
217-
return getStream().getPos();
235+
return stream.getPos();
218236
} finally {
219-
finish();
237+
if (stream != originalStream) {
238+
cacheEntry.release();
239+
}
220240
}
221241
}
222242

223243
@Override
224244
public int read() throws IOException {
245+
FSDataInputStream stream = getStream();
225246
try {
226-
return getStream().read();
247+
return stream.read();
227248
} finally {
228-
finish();
249+
if (stream != originalStream) {
250+
cacheEntry.release();
251+
}
229252
}
230253
}
231254

232255
@Override
233256
public int read(byte[] b) throws IOException {
257+
FSDataInputStream stream = getStream();
234258
try {
235-
return getStream().read(b);
259+
return stream.read(b);
236260
} finally {
237-
finish();
261+
if (stream != originalStream) {
262+
cacheEntry.release();
263+
}
238264
}
239265
}
240266

241267
@Override
242268
public int read(byte[] b, int off, int len) throws IOException {
269+
FSDataInputStream stream = getStream();
243270
try {
244-
return getStream().read(b, off, len);
271+
return stream.read(b, off, len);
245272
} finally {
246-
finish();
273+
if (stream != originalStream) {
274+
cacheEntry.release();
275+
}
247276
}
248277
}
249278

250279
@Override
251280
public long skip(long n) throws IOException {
281+
FSDataInputStream stream = getStream();
252282
try {
253-
return getStream().skip(n);
283+
return stream.skip(n);
254284
} finally {
255-
finish();
285+
if (stream != originalStream) {
286+
cacheEntry.release();
287+
}
256288
}
257289
}
258290

259291
@Override
260292
public int available() throws IOException {
293+
FSDataInputStream stream = getStream();
261294
try {
262-
return getStream().available();
295+
return stream.available();
263296
} finally {
264-
finish();
297+
if (stream != originalStream) {
298+
cacheEntry.release();
299+
}
265300
}
266301
}
267302

@@ -281,32 +316,45 @@ public boolean isClosed() {
281316
@Override
282317
public void mark(int readlimit) {
283318
try {
284-
getStream().mark(readlimit);
319+
FSDataInputStream stream = getStream();
320+
try {
321+
stream.mark(readlimit);
322+
} finally {
323+
if (stream != originalStream) {
324+
cacheEntry.release();
325+
}
326+
}
285327
} catch (Exception e) {
286328
LOG.warn("Mark error.", e);
287-
} finally {
288-
finish();
289329
}
290330
}
291331

292332
@Override
293333
public void reset() throws IOException {
334+
FSDataInputStream stream = getStream();
294335
try {
295-
getStream().reset();
336+
stream.reset();
296337
} finally {
297-
finish();
338+
if (stream != originalStream) {
339+
cacheEntry.release();
340+
}
298341
}
299342
}
300343

301344
@Override
302345
public boolean markSupported() {
303346
try {
304-
return getStream().markSupported();
347+
FSDataInputStream stream = getStream();
348+
try {
349+
return stream.markSupported();
350+
} finally {
351+
if (stream != originalStream) {
352+
cacheEntry.release();
353+
}
354+
}
305355
} catch (IOException e) {
306356
LOG.warn("MarkSupported error.", e);
307357
return false;
308-
} finally {
309-
finish();
310358
}
311359
}
312360

@@ -317,28 +365,32 @@ public int read(ByteBuffer bb) throws IOException {
317365
} else if (bb.remaining() == 0) {
318366
return 0;
319367
}
368+
FSDataInputStream stream = getStream();
320369
try {
321-
FSDataInputStream stream = getStream();
322370
return stream instanceof ByteBufferReadable
323371
? ((ByteBufferReadable) stream).read(bb)
324372
: readFullyFromFSDataInputStream(stream, bb);
325373
} finally {
326-
finish();
374+
if (stream != originalStream) {
375+
cacheEntry.release();
376+
}
327377
}
328378
}
329379

330380
@Override
331381
public int read(long position, ByteBuffer bb) throws IOException {
382+
FSDataInputStream stream = getStream();
332383
try {
333-
FSDataInputStream stream = getStream();
334384
if (stream instanceof ByteBufferReadable) {
335385
return ((ByteBufferReadable) stream).read(position, bb);
336386
} else {
337387
stream.seek(position);
338388
return readFullyFromFSDataInputStream(stream, bb);
339389
}
340390
} finally {
341-
finish();
391+
if (stream != originalStream) {
392+
cacheEntry.release();
393+
}
342394
}
343395
}
344396

Diff for: flink-state-backends/flink-statebackend-forst/src/main/java/org/apache/flink/state/forst/fs/cache/FileBasedCache.java

+6
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.flink.state.forst.fs.cache;
2020

21+
import org.apache.flink.annotation.VisibleForTesting;
2122
import org.apache.flink.api.java.tuple.Tuple2;
2223
import org.apache.flink.configuration.ReadableConfig;
2324
import org.apache.flink.core.fs.FSDataInputStream;
@@ -146,6 +147,11 @@ public static void setFlinkThread() {
146147
isFlinkThread.set(true);
147148
}
148149

150+
@VisibleForTesting
151+
public static void unsetFlinkThread() {
152+
isFlinkThread.set(false);
153+
}
154+
149155
/**
150156
* Checks if the current thread is a Flink thread. This method returns a boolean indicating
151157
* whether the current thread has been marked as a Flink thread using the {@link

0 commit comments

Comments
 (0)