Skip to content

Commit 11a5c67

Browse files
authored
[server] force unmap mmap file on linux when call AbstractIndex.resize() (apache#1382)
1 parent cf799e2 commit 11a5c67

File tree

4 files changed

+181
-85
lines changed

4 files changed

+181
-85
lines changed

fluss-server/src/main/java/com/alibaba/fluss/server/log/AbstractIndex.java

Lines changed: 58 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import com.alibaba.fluss.server.exception.CorruptIndexException;
2222
import com.alibaba.fluss.utils.FileUtils;
2323
import com.alibaba.fluss.utils.IOUtils;
24-
import com.alibaba.fluss.utils.OperatingSystem;
2524
import com.alibaba.fluss.utils.log.ByteBufferUnmapper;
2625

2726
import org.slf4j.Logger;
@@ -37,10 +36,11 @@
3736
import java.nio.file.Files;
3837
import java.util.Objects;
3938
import java.util.OptionalInt;
40-
import java.util.concurrent.locks.Lock;
4139
import java.util.concurrent.locks.ReentrantLock;
40+
import java.util.concurrent.locks.ReentrantReadWriteLock;
4241

4342
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
43+
import static com.alibaba.fluss.utils.concurrent.LockUtils.inWriteLock;
4444

4545
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
4646
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -50,7 +50,19 @@
5050
public abstract class AbstractIndex implements Closeable {
5151
private static final Logger LOG = LoggerFactory.getLogger(AbstractIndex.class);
5252

53-
protected final Lock lock = new ReentrantLock();
53+
// Serializes all index operations that mutate internal state.
54+
// Readers do not need to acquire this lock because:
55+
// 1) MappedByteBuffer provides direct access to the OS-level buffer cache,
56+
// which allows concurrent reads in practice.
57+
// 2) Clients only read committed data and are not affected by concurrent appends/truncates.
58+
// In the rare case when the data is truncated, the follower could read inconsistent data.
59+
// The follower has the logic to ignore the inconsistent data through crc and leader epoch.
60+
// 3) Read and remap operations are coordinated via remapLock to ensure visibility of the
61+
// underlying mmap.
62+
protected final ReentrantLock lock = new ReentrantLock();
63+
// Allows concurrent read operations while ensuring exclusive access if the underlying mmap is
64+
// changed
65+
protected final ReentrantReadWriteLock remapLock = new ReentrantReadWriteLock();
5466

5567
/** The base offset of the segment that this index is corresponding to. */
5668
private final long baseOffset;
@@ -189,43 +201,48 @@ public void updateParentDir(File parentDir) {
189201
public boolean resize(int newSize) throws IOException {
190202
return inLock(
191203
lock,
192-
() -> {
193-
int roundedNewSize = roundDownToExactMultiple(newSize, entrySize());
194-
195-
if (length == roundedNewSize) {
196-
LOG.debug(
197-
"Index {} was not resized because it already has size {}",
198-
file.getAbsolutePath(),
199-
roundedNewSize);
200-
return false;
201-
} else {
202-
RandomAccessFile raf = new RandomAccessFile(file, "rw");
203-
try {
204-
int position = mmap.position();
205-
206-
/* Windows won't let us modify the file length while the file is mmapped :-( */
207-
if (OperatingSystem.isWindows()) {
208-
safeForceUnmap();
209-
}
210-
raf.setLength(roundedNewSize);
211-
this.length = roundedNewSize;
212-
mmap =
213-
raf.getChannel()
214-
.map(FileChannel.MapMode.READ_WRITE, 0, roundedNewSize);
215-
this.maxEntries = mmap.limit() / entrySize();
216-
mmap.position(position);
217-
LOG.debug(
218-
"Resized {} to {}, position is {} and limit is {}",
219-
file.getAbsolutePath(),
220-
roundedNewSize,
221-
mmap.position(),
222-
mmap.limit());
223-
return true;
224-
} finally {
225-
IOUtils.closeQuietly(raf, "index file " + file.getName());
226-
}
227-
}
228-
});
204+
() ->
205+
inWriteLock(
206+
remapLock,
207+
() -> {
208+
int roundedNewSize =
209+
roundDownToExactMultiple(newSize, entrySize());
210+
211+
if (length == roundedNewSize) {
212+
LOG.debug(
213+
"Index {} was not resized because it already has size {}",
214+
file.getAbsolutePath(),
215+
roundedNewSize);
216+
return false;
217+
} else {
218+
RandomAccessFile raf = new RandomAccessFile(file, "rw");
219+
try {
220+
int position = mmap.position();
221+
222+
safeForceUnmap();
223+
raf.setLength(roundedNewSize);
224+
this.length = roundedNewSize;
225+
mmap =
226+
raf.getChannel()
227+
.map(
228+
FileChannel.MapMode.READ_WRITE,
229+
0,
230+
roundedNewSize);
231+
this.maxEntries = mmap.limit() / entrySize();
232+
mmap.position(position);
233+
LOG.debug(
234+
"Resized {} to {}, position is {} and limit is {}",
235+
file.getAbsolutePath(),
236+
roundedNewSize,
237+
mmap.position(),
238+
mmap.limit());
239+
return true;
240+
} finally {
241+
IOUtils.closeQuietly(
242+
raf, "index file " + file.getName());
243+
}
244+
}
245+
}));
229246
}
230247

231248
/**
@@ -283,7 +300,7 @@ public void closeHandler() {
283300
// metadata from a physical disk.
284301
// To prevent this, we forcefully cleanup memory mapping within proper execution which never
285302
// affects API responsiveness.
286-
inLock(lock, this::safeForceUnmap);
303+
inLock(lock, () -> inWriteLock(remapLock, this::safeForceUnmap));
287304
}
288305

289306
/** Remove all the entries from the index and resize the index to the max index size. */
@@ -411,25 +428,6 @@ protected void truncateToEntries0(int entries) {
411428
mmap.position(entries * entrySize());
412429
}
413430

414-
/**
415-
* Execute the given function in a lock only if we are running on windows. We do this because
416-
* Windows won't let us resize a file while it is mmapped. As a result we have to force unmap it
417-
* and this requires synchronizing reads.
418-
*/
419-
protected final <T, E extends Exception> T maybeLock(Lock lock, StorageAction<T, E> action)
420-
throws E {
421-
if (OperatingSystem.isWindows()) {
422-
lock.lock();
423-
}
424-
try {
425-
return action.execute();
426-
} finally {
427-
if (OperatingSystem.isWindows()) {
428-
lock.unlock();
429-
}
430-
}
431-
}
432-
433431
/**
434432
* Find the slot in which the largest entry less than or equal to the given target key or value
435433
* is stored. The comparison is made using the `IndexEntry.compareTo()` method.

fluss-server/src/main/java/com/alibaba/fluss/server/log/OffsetIndex.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import java.util.Optional;
3131

3232
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
33+
import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
3334

3435
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3536
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -70,7 +71,7 @@ public final class OffsetIndex extends AbstractIndex {
7071
private static final int ENTRY_SIZE = 8;
7172

7273
/* the last offset in the index */
73-
private long lastOffset;
74+
private volatile long lastOffset;
7475

7576
public OffsetIndex(File file, long baseOffset) throws IOException {
7677
this(file, baseOffset, -1);
@@ -130,8 +131,8 @@ public void sanityCheck() {
130131
* (baseOffset, 0) is returned.
131132
*/
132133
public OffsetPosition lookup(long targetOffset) {
133-
return maybeLock(
134-
lock,
134+
return inReadLock(
135+
remapLock,
135136
() -> {
136137
ByteBuffer idx = mmap().duplicate();
137138
int slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY);
@@ -150,8 +151,8 @@ public OffsetPosition lookup(long targetOffset) {
150151
* @return The offset/position pair at that entry
151152
*/
152153
public OffsetPosition entry(int n) {
153-
return maybeLock(
154-
lock,
154+
return inReadLock(
155+
remapLock,
155156
() -> {
156157
if (n >= entries()) {
157158
throw new IllegalArgumentException(
@@ -173,8 +174,8 @@ public OffsetPosition entry(int n) {
173174
*/
174175
public Optional<OffsetPosition> fetchUpperBoundOffset(
175176
OffsetPosition fetchOffset, int fetchSize) {
176-
return maybeLock(
177-
lock,
177+
return inReadLock(
178+
remapLock,
178179
() -> {
179180
ByteBuffer idx = mmap().duplicate();
180181
int slot =
@@ -309,8 +310,8 @@ private void truncateToEntries(int entries) {
309310

310311
/** The last entry in the index. */
311312
private OffsetPosition lastEntry() {
312-
return inLock(
313-
lock,
313+
return inReadLock(
314+
remapLock,
314315
() -> {
315316
int entries = entries();
316317
if (entries == 0) {

fluss-server/src/main/java/com/alibaba/fluss/server/log/TimeIndex.java

Lines changed: 21 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.nio.MappedByteBuffer;
3030

3131
import static com.alibaba.fluss.utils.concurrent.LockUtils.inLock;
32+
import static com.alibaba.fluss.utils.concurrent.LockUtils.inReadLock;
3233

3334
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
3435
* Software Foundation (ASF) under the Apache License, Version 2.0. See the NOTICE file distributed with this work for
@@ -90,16 +91,20 @@ public void sanityCheck() {
9091
TimestampOffset entry = lastEntry();
9192
long lastTimestamp = entry.timestamp;
9293
long lastOffset = entry.offset;
93-
if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) {
94-
throw new CorruptIndexException(
95-
"Corrupt time index found, time index file ("
96-
+ file().getAbsolutePath()
97-
+ ") has "
98-
+ "non-zero size but the last timestamp is "
99-
+ lastTimestamp
100-
+ " which is less than the first timestamp "
101-
+ timestamp(mmap(), 0));
102-
}
94+
inReadLock(
95+
remapLock,
96+
() -> {
97+
if (entries() != 0 && lastTimestamp < timestamp(mmap(), 0)) {
98+
throw new CorruptIndexException(
99+
"Corrupt time index found, time index file ("
100+
+ file().getAbsolutePath()
101+
+ ") has "
102+
+ "non-zero size but the last timestamp is "
103+
+ lastTimestamp
104+
+ " which is less than the first timestamp "
105+
+ timestamp(mmap(), 0));
106+
}
107+
});
103108
if (entries() != 0 && lastOffset < baseOffset()) {
104109
throw new CorruptIndexException(
105110
"Corrupt time index found, time index file ("
@@ -170,8 +175,8 @@ public TimestampOffset lastEntry() {
170175
* @return The timestamp/offset pair at that entry
171176
*/
172177
public TimestampOffset entry(int n) {
173-
return maybeLock(
174-
lock,
178+
return inReadLock(
179+
remapLock,
175180
() -> {
176181
if (n >= entries()) {
177182
throw new IllegalArgumentException(
@@ -195,8 +200,8 @@ public TimestampOffset entry(int n) {
195200
* @return The time index entry found.
196201
*/
197202
public TimestampOffset lookup(long targetTimestamp) {
198-
return maybeLock(
199-
lock,
203+
return inReadLock(
204+
remapLock,
200205
() -> {
201206
ByteBuffer idx = mmap().duplicate();
202207
int slot = largestLowerBoundSlotFor(idx, targetTimestamp, IndexSearchType.KEY);
@@ -332,8 +337,8 @@ private int relativeOffset(ByteBuffer buffer, int n) {
332337

333338
/** Read the last entry from the index file. This operation involves disk access. */
334339
private TimestampOffset lastEntryFromIndexFile() {
335-
return inLock(
336-
lock,
340+
return inReadLock(
341+
remapLock,
337342
() -> {
338343
int entries = entries();
339344
if (entries == 0) {
Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package com.alibaba.fluss.server.log;
19+
20+
import org.junit.jupiter.api.Test;
21+
import org.junit.jupiter.api.io.TempDir;
22+
23+
import java.io.File;
24+
import java.io.IOException;
25+
import java.nio.ByteBuffer;
26+
import java.nio.MappedByteBuffer;
27+
28+
import static org.assertj.core.api.Assertions.assertThat;
29+
30+
/** Tests for {@link AbstractIndex}. */
31+
public class AbstractIndexTest {
32+
33+
private @TempDir File tempDir;
34+
35+
@Test
36+
public void testResizeInvokeUnmap() throws IOException {
37+
File testIndex = new File(tempDir, "test.index");
38+
TestIndex idx = new TestIndex(testIndex, 0L, 100, true);
39+
MappedByteBuffer oldMmap = idx.mmap();
40+
assertThat(idx.mmap()).isNotNull();
41+
assertThat(idx.unmapInvoked).isFalse();
42+
43+
boolean changed = idx.resize(80);
44+
assertThat(changed).isTrue();
45+
// Unmap should have been invoked after resize.
46+
assertThat(idx.unmapInvoked).isTrue();
47+
// old mmap should be unmapped.
48+
assertThat(idx.unmappedBuffer).isEqualTo(oldMmap);
49+
assertThat(idx.unmappedBuffer).isNotEqualTo(idx.mmap());
50+
}
51+
52+
private static class TestIndex extends AbstractIndex {
53+
private boolean unmapInvoked = false;
54+
private MappedByteBuffer unmappedBuffer = null;
55+
56+
public TestIndex(File file, long baseOffset, int maxIndexSize, boolean writable)
57+
throws IOException {
58+
super(file, baseOffset, maxIndexSize, writable);
59+
}
60+
61+
@Override
62+
protected int entrySize() {
63+
return 1;
64+
}
65+
66+
@Override
67+
protected IndexEntry parseEntry(ByteBuffer buffer, int n) {
68+
return null;
69+
}
70+
71+
@Override
72+
public void sanityCheck() {
73+
// unused
74+
}
75+
76+
@Override
77+
protected void truncate() {
78+
// unused
79+
}
80+
81+
@Override
82+
public void truncateTo(long offset) {
83+
// unused
84+
}
85+
86+
@Override
87+
public void forceUnmap() throws IOException {
88+
unmapInvoked = true;
89+
unmappedBuffer = mmap();
90+
}
91+
}
92+
}

0 commit comments

Comments
 (0)