Skip to content

Commit 3c68e8e

Browse files
committed
Refactor LogRecord
1 parent ee661a3 commit 3c68e8e

8 files changed

Lines changed: 115 additions & 77 deletions

File tree

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.ratis.server.raftlog.segmented;
20+
21+
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
22+
import org.apache.ratis.server.protocol.TermIndex;
23+
import org.apache.ratis.server.raftlog.LogEntryHeader;
24+
import org.apache.ratis.util.Preconditions;
25+
26+
import java.util.Map;
27+
import java.util.Objects;
28+
import java.util.concurrent.ConcurrentNavigableMap;
29+
import java.util.concurrent.ConcurrentSkipListMap;
30+
31+
/**
32+
* Log record of a RaftLog segment.
33+
*/
34+
public final class LogRecord {
35+
public static LogRecord valueOf(long offset, LogEntryProto entry) {
36+
return new LogRecord(offset, LogEntryHeader.valueOf(entry));
37+
}
38+
39+
/** starting offset in the file */
40+
private final long offset;
41+
private final LogEntryHeader header;
42+
43+
private LogRecord(long offset, LogEntryHeader header) {
44+
this.offset = offset;
45+
this.header = header;
46+
}
47+
48+
public LogEntryHeader getHeader() {
49+
return header;
50+
}
51+
52+
public TermIndex getTermIndex() {
53+
return getHeader().getTermIndex();
54+
}
55+
56+
public long getOffset() {
57+
return offset;
58+
}
59+
60+
/**
61+
* A thread-safed mop supporting random read but NOT random write.
62+
* For write operations, it supports append, removeLast and clear.
63+
*/
64+
public static class AppendOnlyMap {
65+
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();
66+
67+
public int size() {
68+
return map.size();
69+
}
70+
71+
public LogRecord getFirst() {
72+
final Map.Entry<Long, LogRecord> first = map.firstEntry();
73+
return first != null? first.getValue() : null;
74+
}
75+
76+
public LogRecord getLast() {
77+
final Map.Entry<Long, LogRecord> last = map.lastEntry();
78+
return last != null? last.getValue() : null;
79+
}
80+
81+
public LogRecord get(long i) {
82+
return map.get(i);
83+
}
84+
85+
public long append(LogRecord record) {
86+
final long index = record.getTermIndex().getIndex();
87+
final LogRecord previous = map.put(index, record);
88+
Preconditions.assertNull(previous, "previous");
89+
return index;
90+
}
91+
92+
public LogRecord removeLast() {
93+
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
94+
return Objects.requireNonNull(last, "last == null").getValue();
95+
}
96+
97+
public void clear() {
98+
map.clear();
99+
}
100+
}
101+
}

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/v2/LogSegment.java

Lines changed: 6 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,16 @@
1515
* See the License for the specific language governing permissions and
1616
* limitations under the License.
1717
*/
18+
1819
package org.apache.ratis.server.raftlog.segmented.v2;
1920

2021
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
2122
import org.apache.ratis.server.RaftServerConfigKeys.Log.CorruptionPolicy;
2223
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
2324
import org.apache.ratis.server.protocol.TermIndex;
24-
import org.apache.ratis.server.raftlog.LogEntryHeader;
2525
import org.apache.ratis.server.raftlog.LogProtoUtils;
2626
import org.apache.ratis.server.raftlog.RaftLogIOException;
27+
import org.apache.ratis.server.raftlog.segmented.LogRecord;
2728
import org.apache.ratis.server.storage.RaftStorage;
2829
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
2930
import org.apache.ratis.thirdparty.com.google.common.cache.CacheLoader;
@@ -41,8 +42,6 @@
4142
import java.util.Map;
4243
import java.util.Objects;
4344
import java.util.concurrent.ConcurrentHashMap;
44-
import java.util.concurrent.ConcurrentNavigableMap;
45-
import java.util.concurrent.ConcurrentSkipListMap;
4645
import java.util.concurrent.atomic.AtomicInteger;
4746
import java.util.concurrent.atomic.AtomicLong;
4847
import java.util.concurrent.atomic.AtomicReference;
@@ -82,67 +81,6 @@ static long getEntrySize(LogEntryProto entry, Op op) {
8281
return serialized + CodedOutputStream.computeUInt32SizeNoTag(serialized) + 4L;
8382
}
8483

85-
static class LogRecord {
86-
/** starting offset in the file */
87-
private final long offset;
88-
private final LogEntryHeader logEntryHeader;
89-
90-
LogRecord(long offset, LogEntryProto entry) {
91-
this.offset = offset;
92-
this.logEntryHeader = LogEntryHeader.valueOf(entry);
93-
}
94-
95-
LogEntryHeader getLogEntryHeader() {
96-
return logEntryHeader;
97-
}
98-
99-
TermIndex getTermIndex() {
100-
return getLogEntryHeader().getTermIndex();
101-
}
102-
103-
long getOffset() {
104-
return offset;
105-
}
106-
}
107-
108-
private static class Records {
109-
private final ConcurrentNavigableMap<Long, LogRecord> map = new ConcurrentSkipListMap<>();
110-
111-
int size() {
112-
return map.size();
113-
}
114-
115-
LogRecord getFirst() {
116-
final Map.Entry<Long, LogRecord> first = map.firstEntry();
117-
return first != null? first.getValue() : null;
118-
}
119-
120-
LogRecord getLast() {
121-
final Map.Entry<Long, LogRecord> last = map.lastEntry();
122-
return last != null? last.getValue() : null;
123-
}
124-
125-
LogRecord get(long i) {
126-
return map.get(i);
127-
}
128-
129-
long append(LogRecord record) {
130-
final long index = record.getTermIndex().getIndex();
131-
final LogRecord previous = map.put(index, record);
132-
Preconditions.assertNull(previous, "previous");
133-
return index;
134-
}
135-
136-
LogRecord removeLast() {
137-
final Map.Entry<Long, LogRecord> last = map.pollLastEntry();
138-
return Objects.requireNonNull(last, "last == null").getValue();
139-
}
140-
141-
void clear() {
142-
map.clear();
143-
}
144-
}
145-
14684
static LogSegment newOpenSegment(RaftStorage storage, long start, SizeInBytes maxOpSize,
14785
SegmentedRaftLogMetrics raftLogMetrics) {
14886
Preconditions.assertTrue(start >= 0);
@@ -309,10 +247,8 @@ File getFile() {
309247
/** later replace it with a metric */
310248
private final AtomicInteger loadingTimes = new AtomicInteger();
311249

312-
/**
313-
* the list of records is more like the index of a segment
314-
*/
315-
private final Records records = new Records();
250+
/** Log records of a segment */
251+
private final LogRecord.AppendOnlyMap records = new LogRecord.AppendOnlyMap();
316252
/**
317253
* the entryCache caches the content of log entries.
318254
*/
@@ -365,7 +301,7 @@ private void append(boolean keepEntryInCache, LogEntryProto entry, Op op) {
365301
"gap between entries %s and %s", entry.getIndex(), currentLast.getTermIndex().getIndex());
366302
}
367303

368-
final LogRecord record = new LogRecord(totalFileSize, entry);
304+
final LogRecord record = LogRecord.valueOf(totalFileSize, entry);
369305
if (keepEntryInCache) {
370306
putEntryCache(record.getTermIndex(), entry, op);
371307
}
@@ -424,7 +360,7 @@ synchronized void truncate(long fromIndex) {
424360
final LogRecord removed = records.removeLast();
425361
Preconditions.assertSame(index, removed.getTermIndex().getIndex(), "removedIndex");
426362
removeEntryCache(removed.getTermIndex(), Op.REMOVE_CACHE);
427-
totalFileSize = removed.offset;
363+
totalFileSize = removed.getOffset();
428364
}
429365
isOpen = false;
430366
this.endIndex = fromIndex - 1;

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/v2/SegmentedRaftLog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,9 @@
2929
import org.apache.ratis.server.raftlog.RaftLog;
3030
import org.apache.ratis.server.raftlog.RaftLogBase;
3131
import org.apache.ratis.server.raftlog.RaftLogIOException;
32+
import org.apache.ratis.server.raftlog.segmented.LogRecord;
3233
import org.apache.ratis.server.storage.RaftStorageMetadata;
3334
import org.apache.ratis.server.storage.RaftStorage;
34-
import org.apache.ratis.server.raftlog.segmented.v2.LogSegment.LogRecord;
3535
import org.apache.ratis.server.raftlog.segmented.v2.SegmentedRaftLogCache.TruncateIndices;
3636
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
3737
import org.apache.ratis.statemachine.StateMachine;

ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/v2/SegmentedRaftLogCache.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,9 @@
2525
import org.apache.ratis.server.raftlog.LogEntryHeader;
2626
import org.apache.ratis.server.raftlog.LogProtoUtils;
2727
import org.apache.ratis.server.raftlog.RaftLog;
28+
import org.apache.ratis.server.raftlog.segmented.LogRecord;
2829
import org.apache.ratis.server.storage.RaftStorage;
2930
import org.apache.ratis.server.raftlog.segmented.v2.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
30-
import org.apache.ratis.server.raftlog.segmented.v2.LogSegment.LogRecord;
3131
import org.apache.ratis.util.AutoCloseableLock;
3232
import org.apache.ratis.util.AutoCloseableReadWriteLock;
3333
import org.apache.ratis.util.JavaUtils;
@@ -582,7 +582,7 @@ private static void getFromSegment(LogSegment segment, long startIndex,
582582
endIndex = Math.min(endIndex, startIndex + size - 1);
583583
int index = offset;
584584
for (long i = startIndex; i <= endIndex; i++) {
585-
entries[index++] = Optional.ofNullable(segment.getLogRecord(i)).map(LogRecord::getLogEntryHeader).orElse(null);
585+
entries[index++] = Optional.ofNullable(segment.getLogRecord(i)).map(LogRecord::getHeader).orElse(null);
586586
}
587587
}
588588

ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.ratis.server.raftlog.segmented.v2.SegmentedRaftLogFormat;
3737
import org.apache.ratis.server.RaftServerConfigKeys.Log;
3838
import org.apache.ratis.server.raftlog.segmented.v2.SegmentedRaftLogTestUtils;
39-
import org.apache.ratis.server.raftlog.segmented.TestSegmentedRaftLog;
39+
import org.apache.ratis.server.raftlog.segmented.v2.TestSegmentedRaftLog;
4040
import org.apache.ratis.statemachine.impl.SimpleStateMachine4Testing;
4141
import org.apache.ratis.statemachine.StateMachine;
4242
import org.apache.ratis.util.FileUtils;

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/v2/TestLogSegment.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import org.apache.ratis.server.metrics.SegmentedRaftLogMetrics;
2929
import org.apache.ratis.server.protocol.TermIndex;
3030
import org.apache.ratis.server.raftlog.LogProtoUtils;
31+
import org.apache.ratis.server.raftlog.segmented.LogRecord;
3132
import org.apache.ratis.server.storage.RaftStorage;
3233
import org.apache.ratis.server.storage.RaftStorageTestUtils;
3334
import org.apache.ratis.thirdparty.com.google.protobuf.CodedOutputStream;
@@ -136,7 +137,7 @@ static void checkLogSegment(LogSegment segment, long start, long end,
136137

137138
long offset = SegmentedRaftLogFormat.getHeaderLength();
138139
for (long i = start; i <= end; i++) {
139-
LogSegment.LogRecord record = segment.getLogRecord(i);
140+
LogRecord record = segment.getLogRecord(i);
140141
final TermIndex ti = record.getTermIndex();
141142
Assertions.assertEquals(i, ti.getIndex());
142143
Assertions.assertEquals(term, ti.getTerm());

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/v2/TestRaftLogReadWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@
4444
import java.util.Collections;
4545
import java.util.List;
4646

47-
import static org.apache.ratis.server.raftlog.segmented.v2.TestLogSegment.ZERO_START_NULL_END;
47+
import static org.apache.ratis.server.raftlog.segmented.v2.t TestLogSegment.ZERO_START_NULL_END;
4848

4949
/**
5050
* Test basic functionality of LogReader, SegmentedRaftLogInputStream, and SegmentedRaftLogOutputStream.

ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/v2/TestSegmentedRaftLogCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
import org.apache.ratis.server.protocol.TermIndex;
2727
import org.apache.ratis.server.raftlog.LogEntryHeader;
2828
import org.apache.ratis.server.raftlog.LogProtoUtils;
29-
import org.apache.ratis.server.raftlog.segmented.v2.LogSegment.LogRecord;
29+
import org.apache.ratis.server.raftlog.segmented.LogRecord;
3030
import org.apache.ratis.server.raftlog.segmented.v2.SegmentedRaftLogCache.TruncationSegments;
3131
import org.junit.jupiter.api.AfterEach;
3232
import org.junit.jupiter.api.Assertions;

0 commit comments

Comments
 (0)