From 9a0c066243a63e9abb0c7d83ccf50283689ea8a3 Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Thu, 18 Apr 2024 15:26:59 +0800 Subject: [PATCH 1/7] Adding TIKV as a metadata storage method --- .../master/metastore/tikv/TiKVUtils.java | 145 ++++++++++++++++++ 1 file changed, 145 insertions(+) create mode 100755 core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java diff --git a/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java new file mode 100755 index 000000000000..c5c94155b127 --- /dev/null +++ b/core/common/src/main/java/alluxio/master/metastore/tikv/TiKVUtils.java @@ -0,0 +1,145 @@ +package alluxio.master.metastore.tikv; + +import alluxio.resource.CloseableIterator; +import com.google.common.primitives.Longs; +import org.tikv.kvproto.Kvrpcpb; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Iterator; +import java.util.ListIterator; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Convenience methods for working with TiKV. + */ +public final class TiKVUtils { + private static final Logger LOG = LoggerFactory.getLogger(TiKVUtils.class); + + private TiKVUtils() {} // Utils class. + + + /** + * @param str a String value + * @param long1 a long value + * @param long2 a long value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long long1, long long2) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[strBytes.length + 2 * Longs.BYTES]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = strBytes.length + Longs.BYTES - 1; i >= strBytes.length; i--) { + key[i] = (byte) (long1 & 0xffL); + long1 >>= Byte.SIZE; + } + for (int i = strBytes.length + 2 * Longs.BYTES - 1; i >= strBytes.length + Longs.BYTES; i--) { + key[i] = (byte) (long2 & 0xffL); + long2 >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str, long n) { + byte[] strBytes = str.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes.length]; + System.arraycopy(strBytes, 0, key, 0, strBytes.length); + for (int i = key.length - 1; i >= strBytes.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + return key; + } + + /** + * @param n a long value + * @param str1 a string value + * @param str2 a string value + * @return a byte array formed by writing the bytes of n followed by the bytes of str + */ + public static byte[] toByteArray(String str1, long n, String str2) { + byte[] strBytes1 = str1.getBytes(); + byte[] strBytes2 = str2.getBytes(); + + byte[] key = new byte[Longs.BYTES + strBytes1.length + strBytes2.length]; + System.arraycopy(strBytes1, 0, key, 0, strBytes1.length); + for (int i = strBytes1.length + Longs.BYTES - 1; i >= strBytes1.length; i--) { + key[i] = (byte) (n & 0xffL); + n >>= Byte.SIZE; + } + System.arraycopy(strBytes2, 0, key, strBytes1.length + Longs.BYTES, strBytes2.length); + return key; + } + + /** + * @param bytes an array of bytes + * @param start the place in the array to read the long from + * @return the long + */ + public static long readLong(byte[] bytes, int start) { + return Longs.fromBytes(bytes[start], bytes[start + 1], bytes[start + 2], bytes[start + 3], + bytes[start + 4], bytes[start + 5], bytes[start + 6], bytes[start + 7]); + } + + + /** + * Used to parse current {@link ListIterator} element. + * + * @param return type of parser's next method + */ + public interface TiKVIteratorParser { + /** + * Parses and return next element. + * + * @param iter {@link ListIterator} instance + * @return parsed value + * @throws Exception if parsing fails + */ + T next(ListIterator iter) throws Exception; + } + + /** + * Used to wrap an {@link CloseableIterator} over {@link ListIterator}. + * It seeks given iterator to first entry before returning the iterator. + * + * @param tikvIterator the tikv iterator + * @param parser parser to produce iterated values from tikv key-value + * @param iterator value type + * @return wrapped iterator + */ + public static CloseableIterator createCloseableIterator( + ListIterator tikvIterator, TiKVIteratorParser parser) { + AtomicBoolean valid = new AtomicBoolean(true); + Iterator iter = new Iterator() { + @Override + public boolean hasNext() { + return valid.get() && tikvIterator.hasNext(); + } + + @Override + public T next() { + try { + return parser.next(tikvIterator); + } catch (Exception exc) { + LOG.warn("Iteration aborted because of error", exc); + valid.set(false); + throw new RuntimeException(exc); + } finally { + if (!tikvIterator.hasNext()) { + valid.set(false); + } + } + } + }; + + return CloseableIterator.noopCloseable(iter); + } + +} From 0ca36cfd8d842173bea1dc2343ceb79241add72f Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 09:57:28 +0800 Subject: [PATCH 2/7] Adding block meta operations in tikv --- .../metastore/tikv/TiKVBlockMetaStore.java | 157 ++++++++++++++++++ 1 file changed, 157 insertions(+) create mode 100644 core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java new file mode 100644 index 000000000000..1810a5040cd1 --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java @@ -0,0 +1,157 @@ +package alluxio.master.metastore.tikv; + +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.master.metastore.BlockMetaStore; +import alluxio.proto.meta.Block.BlockLocation; +import alluxio.proto.meta.Block.BlockMeta; +import alluxio.resource.CloseableIterator; + +import com.google.common.primitives.Longs; +import org.rocksdb.*; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.raw.RawKVClient; +import org.tikv.shade.com.google.protobuf.ByteString; + +import java.util.*; +import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.atomic.LongAdder; +import javax.annotation.concurrent.ThreadSafe; + +/** + * Block store backed by Tikv. + */ +@ThreadSafe +public class TiKVBlockMetaStore implements BlockMetaStore { + private static final Logger LOG = LoggerFactory.getLogger(TiKVBlockMetaStore.class); + private static final String BLOCKS_DB_NAME = "blocks-tikv"; + private static final String BLOCK_META_COLUMN = "blockmeta"; + private static final String BLOCK_LOCATIONS_COLUMN = "blocklocations"; + private static final String ROCKS_STORE_NAME = "BlockStore"; + + private final List mToClose = new ArrayList<>(); + + private final LongAdder mSize = new LongAdder(); + + private TiConfiguration mBlockConf; + private TiSession mBlockSession; + private RawKVClient mBlockClient; + + /** + * Creates and initializes a tikv block store. + * + * @param baseDir the base directory in which to store block store metadata + */ + public TiKVBlockMetaStore(String baseDir) { + String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION); + try { + mBlockConf = TiConfiguration.createDefault(hostConf); + mBlockConf.setRawKVReadTimeoutInMS(20000); + mBlockConf.setRawKVWriteTimeoutInMS(20000); + mBlockConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW)); + mBlockSession = TiSession.create(mBlockConf); + mBlockClient = mBlockSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional getBlock(long id) { + byte[] meta; + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + try { + Optional bytes = mBlockClient.get(key); + if (!bytes.isPresent()) { + return Optional.empty(); + } + meta = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (meta == null) { + return Optional.empty(); + } + try { + return Optional.of(BlockMeta.parseFrom(meta)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public void putBlock(long id, BlockMeta meta) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + ByteString value = ByteString.copyFrom(meta.toByteArray()); + try { + Optional buf = mBlockClient.get(key); + mBlockClient.put(key, value); + if (!buf.isPresent()) { + mSize.increment(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeBlock(long id) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_META_COLUMN, id)); + Optional buf = mBlockClient.get(key); + mBlockClient.delete(key); + if (!buf.isPresent()) { + mSize.decrement(); + } + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + // TODO + @Override + public void clear() { + mSize.reset(); + LOG.info("clear TiKVBlockStore"); + } + + @Override + public long size() { + return mSize.longValue(); + } + + @Override + public void close() { + mSize.reset(); + LOG.info("Closing TiKVBlockStore and recycling all TiKV JNI objects"); + mBlockClient.close(); + try { + mBlockSession.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TiKVBlockStore closed"); + } + + + @Override + public CloseableIterator getCloseableIterator() { + ListIterator iterator = mBlockClient + .scanPrefix(ByteString.copyFromUtf8(BLOCK_META_COLUMN)).listIterator(); + + return TiKVUtils.createCloseableIterator(iterator, + (iter) -> { + Kvrpcpb.KvPair kv = iter.next(); + byte[] key = kv.getKey().toByteArray(); + return new Block(TiKVUtils.readLong(key, BLOCK_META_COLUMN.length()), + BlockMeta.parseFrom(kv.getValue().toByteArray())); + } + ); + } + +} From 49635e1db6b41ed66db4c344c81bde7817d499f4 Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 10:13:04 +0800 Subject: [PATCH 3/7] Adding block location operation in tikv --- .../metastore/tikv/TiKVBlockMetaStore.java | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java index 1810a5040cd1..6346655cd683 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVBlockMetaStore.java @@ -138,6 +138,43 @@ public void close() { LOG.info("TiKVBlockStore closed"); } + @Override + public List getLocations(long id) { + + ListIterator iter = mBlockClient + .scanPrefix(ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id))).listIterator(); + List locations = new ArrayList<>(); + while ( iter.hasNext() ) { + try { + locations.add(BlockLocation.parseFrom(iter.next().getValue().toByteArray())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + return locations; + + } + + @Override + public void addLocation(long id, BlockLocation location) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, id, location.getWorkerId())); + ByteString value = ByteString.copyFrom(location.toByteArray()); + try { + mBlockClient.put(key, value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeLocation(long blockId, long workerId) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(BLOCK_LOCATIONS_COLUMN, blockId, workerId)); + try { + mBlockClient.delete(key); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } @Override public CloseableIterator getCloseableIterator() { From ff8171bb45edcdcea8fbfbd1c95eadf26829e2b6 Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 10:18:29 +0800 Subject: [PATCH 4/7] Adding inode metadata operations in tikv --- .../master/metastore/tikv/TiKVInodeStore.java | 356 ++++++++++++++++++ 1 file changed, 356 insertions(+) create mode 100644 core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java new file mode 100644 index 000000000000..23104254cac8 --- /dev/null +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java @@ -0,0 +1,356 @@ +package alluxio.master.metastore.tikv; + + +import alluxio.Client; +import alluxio.collections.Pair; +import alluxio.conf.Configuration; +import alluxio.conf.PropertyKey; +import alluxio.master.file.meta.EdgeEntry; +import alluxio.master.file.meta.Inode; +import alluxio.master.file.meta.InodeDirectoryView; +import alluxio.master.file.meta.InodeView; +import alluxio.master.file.meta.MutableInode; +import alluxio.master.journal.checkpoint.CheckpointInputStream; +import alluxio.master.journal.checkpoint.CheckpointName; +import alluxio.master.journal.checkpoint.CheckpointOutputStream; +import alluxio.master.journal.checkpoint.CheckpointType; +import alluxio.master.metastore.InodeStore; +import alluxio.master.metastore.ReadOption; +import alluxio.proto.meta.InodeMeta; +import alluxio.resource.CloseableIterator; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Longs; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.tikv.common.TiConfiguration; +import org.tikv.common.TiSession; +import org.tikv.common.exception.TiKVException; +import org.tikv.kvproto.Kvrpcpb; +import org.tikv.raw.RawKVClient; +import org.tikv.shade.com.google.protobuf.ByteString; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +/** + * File store backed by Tikv. + */ +@ThreadSafe +public class TiKVInodeStore implements InodeStore { + private static final Logger LOG = LoggerFactory.getLogger(TiKVInodeStore.class); + private static final String INODES_DB_NAME = "inodes-tikv"; + private static final String INODES_COLUMN = "inodes"; + private static final String EDGES_COLUMN = "edges"; + private static final String ROCKS_STORE_NAME = "InodeStore"; + + private TiConfiguration mInodeConf; + private TiSession mInodeSession; + private RawKVClient mInodeClient; + + /** + * Creates and initializes a rocks block store. + * + * @param baseDir the base directory in which to store inode metadata + */ + public TiKVInodeStore(String baseDir) { + String hostConf = Configuration.getString(PropertyKey.MASTER_METASTORE_INODE_TIKV_CONNECTION); + try { + mInodeConf = TiConfiguration.createDefault(hostConf); + mInodeConf.setRawKVBatchWriteTimeoutInMS(30000); + mInodeConf.setRawKVReadTimeoutInMS(20000); + mInodeConf.setRawKVWriteTimeoutInMS(20000); + mInodeConf.setKvMode(String.valueOf(TiConfiguration.KVMode.RAW)); + mInodeSession = TiSession.create(mInodeConf); + mInodeClient = mInodeSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + + } + + /** + * add param hostConf for test + */ + public TiKVInodeStore(String baseDir, String hostConf) { + try { + mInodeConf = TiConfiguration.createDefault(hostConf); + mInodeConf.setRawKVBatchWriteTimeoutInMS(30000); + mInodeSession = TiSession.create(mInodeConf); + mInodeClient = mInodeSession.createRawClient(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + + @Override + public void remove(Long inodeId) { + try { + ByteString id = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inodeId)); + mInodeClient.delete(id); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void writeInode(MutableInode inode) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inode.getId())); + ByteString value = ByteString.copyFrom(inode.toProto().toByteArray()); + mInodeClient.put(key,value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public WriteBatch createWriteBatch() { + return new TiKVInodeStore.TiKVWriteBatch(); + } + + // TODO + @Override + public void clear() { + LOG.info("clear TiKVInodeStore"); + } + + @Override + public void addChild(long parentId, String childName, Long childId) { + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + ByteString value = ByteString.copyFrom(Longs.toByteArray(childId)); + mInodeClient.put(key,value); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void removeChild(long parentId, String name) { + try { + ByteString id = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, name)); + mInodeClient.delete(id); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public Optional> getMutable(long id, ReadOption option) { + byte[] inode; + + try { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, id)); + Optional bytes = mInodeClient.get(key); + if (!bytes.isPresent()) { + return Optional.empty(); + } + inode = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (inode == null) { + return Optional.empty(); + } + try { + return Optional.of(MutableInode.fromProto(InodeMeta.Inode.parseFrom(inode))); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Override + public CloseableIterator getChildIds(Long inodeId, ReadOption option) { + + ByteString bytesPrefix; + bytesPrefix = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inodeId)); + ListIterator iter = mInodeClient.scanPrefix(bytesPrefix).listIterator(); + + TiKVIter tikvIter = new TiKVIter(iter); + Stream idStream = StreamSupport.stream(Spliterators + .spliteratorUnknownSize(tikvIter, Spliterator.ORDERED), false); + return CloseableIterator.noopCloseable(idStream.iterator()); + } + + @Override + public Optional getChildId(Long inodeId, String name, ReadOption option) { + byte[] id; + try { + Optional bytes = mInodeClient + .get(ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inodeId, name))); + if (!bytes.isPresent()) { + return Optional.empty(); + } + id = bytes.get().toByteArray(); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + if (id == null) { + return Optional.empty(); + } + return Optional.of(Longs.fromByteArray(id)); + } + + static class TiKVIter implements Iterator { + + final ListIterator mIter; + + TiKVIter(ListIterator tikvIterator) { + mIter = tikvIterator; + } + + + @Override + public boolean hasNext() { + return mIter.hasNext(); + } + + @Override + public Long next() { + Long l = Longs.fromByteArray(mIter.next().getValue().toByteArray()); + return l; + } + } + + @Override + public Optional getChild(Long inodeId, String name, ReadOption option) { + return getChildId(inodeId, name).flatMap(id -> { + Optional child = get(id); + if (!child.isPresent()) { + LOG.warn("Found child edge {}->{}={}, but inode {} does not exist", inodeId, name, + id, id); + } + return child; + }); + } + + @Override + public boolean hasChildren(InodeDirectoryView inode, ReadOption option) { + ByteString bytesPrefix; + bytesPrefix = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, inode.getId())); + ListIterator iter = mInodeClient.scanPrefix(bytesPrefix).listIterator(); + try { + iter.next(); + } catch (Exception e) { + e.printStackTrace(); + return false; + } + return iter.hasNext(); + } + + @Override + public Set allEdges() { + Set edges = new HashSet<>(); + ListIterator iter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(EDGES_COLUMN)).listIterator(); + while (iter.hasNext()) { + Kvrpcpb.KvPair kv = iter.next(); + byte[] key = kv.getKey().toByteArray(); + long parentId = TiKVUtils.readLong(key, EDGES_COLUMN.length()); + String childName = new String(key, EDGES_COLUMN.length() + Longs.BYTES, + key.length - Longs.BYTES - EDGES_COLUMN.length()); + long childId = Longs.fromByteArray(kv.getValue().toByteArray()); + edges.add(new EdgeEntry(parentId, childName, childId)); + } + return edges; + } + + @Override + public Set> allInodes() { + Set> inodes = new HashSet<>(); + ListIterator iter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + while (iter.hasNext()) { + Kvrpcpb.KvPair kv = iter.next(); + long key = TiKVUtils.readLong(kv.getKey().toByteArray(), INODES_COLUMN.length()); + inodes.add(getMutable(key, ReadOption.defaults()).get()); + } + return inodes; + } + + /** + * The name is intentional, in order to distinguish from the {@code Iterable} interface. + * + * @return an iterator over stored inodes + */ + public CloseableIterator getCloseableIterator() { + ListIterator iterator = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + return TiKVUtils.createCloseableIterator(iterator, + (iter) -> { + Kvrpcpb.KvPair kv = iter.next(); + return getMutable(Longs.fromByteArray(kv.getKey().toByteArray()), ReadOption.defaults()).get(); + } + ); + } + + @Override + public boolean supportsBatchWrite() { + return false; + } + + private class TiKVWriteBatch implements WriteBatch {} + + + @Override + public void close() { + LOG.info("Closing TIKVInodeStore and recycling all TIKV JNI objects"); + mInodeClient.close(); + try { + mInodeSession.close(); + } catch (Exception e) { + e.printStackTrace(); + } + LOG.info("TIKVInodeStore closed"); + } + + + /** + * @return a newline-delimited string representing the state of the inode store. This is useful + * for debugging purposes + */ + public String toStringEntries() { + StringBuilder sb = new StringBuilder(); + + ListIterator inodeIter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(INODES_COLUMN)).listIterator(); + while (inodeIter.hasNext()) { + MutableInode inode; + Kvrpcpb.KvPair inodeKV = inodeIter.next(); + try { + inode = MutableInode.fromProto(InodeMeta.Inode.parseFrom(inodeKV.getValue().toByteArray())); + } catch (Exception e) { + throw new RuntimeException(e); + } + sb.append("Inode ").append(inodeKV.getKey().toStringUtf8().substring(INODES_COLUMN.length())).append(": ") + .append(inode).append("\n"); + } + + ListIterator edgeIter = mInodeClient + .scanPrefix(ByteString.copyFromUtf8(EDGES_COLUMN)).listIterator(); + while (edgeIter.hasNext()) { + Kvrpcpb.KvPair edgeKV = edgeIter.next(); + byte[] key = edgeKV.getKey().toByteArray(); + byte[] id = new byte[Longs.BYTES]; + byte[] name = new byte[key.length - Longs.BYTES - EDGES_COLUMN.length()]; + System.arraycopy(key, EDGES_COLUMN.length(), id, 0, Longs.BYTES); + System.arraycopy(key, EDGES_COLUMN.length() + Longs.BYTES, + name, 0, key.length - Longs.BYTES - EDGES_COLUMN.length()); + sb.append(String.format("<%s,%s>->%s%n", Longs.fromByteArray(id), new String(name), + Long.parseLong(edgeKV.getValue().toStringUtf8()))); + } + + return sb.toString(); + } + +} From 267977f80019528baf8b9463610446aa7230a782 Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 10:24:53 +0800 Subject: [PATCH 5/7] Add checkpoint operation to inode metadata in tikv --- .../master/metastore/tikv/TiKVInodeStore.java | 24 +++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java index 23104254cac8..7d9ed25a1e47 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java @@ -294,6 +294,30 @@ public CloseableIterator getCloseableIterator() { ); } + @Override + public CheckpointName getCheckpointName() { + return CheckpointName.TIKV_INODE_STORE; + } + + // TODO + @Override + public void writeToCheckpoint(OutputStream output) throws IOException, InterruptedException { + LOG.info("Creating tikv checkpoint"); + output = new CheckpointOutputStream(output, CheckpointType.JOURNAL_ENTRY); + output.flush(); + LOG.info("Completed tikv checkpoint"); + } + + // TODO + @Override + public void restoreFromCheckpoint(CheckpointInputStream input) throws IOException { + LOG.info("Restoring tikv from checkpoint"); + Preconditions.checkState(input.getType() == CheckpointType.JOURNAL_ENTRY, + "Unrecognized checkpoint type when restoring %s: %s", getCheckpointName(), + input.getType()); + LOG.info("Restored tikv checkpoint"); + } + @Override public boolean supportsBatchWrite() { return false; From e66ff33016c722c63288a2df30e7ac13a3ca6923 Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 10:40:27 +0800 Subject: [PATCH 6/7] Add batch operations to inode metadata in tikv --- .../master/metastore/tikv/TiKVInodeStore.java | 47 ++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java index 7d9ed25a1e47..2c4b6aeda00d 100644 --- a/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java +++ b/core/server/master/src/main/java/alluxio/master/metastore/tikv/TiKVInodeStore.java @@ -323,8 +323,53 @@ public boolean supportsBatchWrite() { return false; } - private class TiKVWriteBatch implements WriteBatch {} + private class TiKVWriteBatch implements WriteBatch { + ConcurrentHashMap mInodeMap = new ConcurrentHashMap<>(); + ConcurrentHashMap mEdgeMap = new ConcurrentHashMap<>(); + + @Override + public void writeInode(MutableInode inode) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, inode.getId())); + ByteString value = ByteString.copyFrom(inode.toProto().toByteArray()); + mInodeMap.put(key,value); + } + + @Override + public void removeInode(Long key) { + ByteString k = ByteString.copyFrom(TiKVUtils.toByteArray(INODES_COLUMN, key)); + mInodeMap.remove(k); + } + + @Override + public void addChild(Long parentId, String childName, Long childId) { + ByteString key = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + ByteString value = ByteString.copyFrom(Longs.toByteArray(childId)); + mEdgeMap.put(key,value); + } + + @Override + public void removeChild(Long parentId, String childName) { + ByteString k = ByteString.copyFrom(TiKVUtils.toByteArray(EDGES_COLUMN, parentId, childName)); + mEdgeMap.remove(k); + } + + @Override + public void commit() { + try { + mInodeClient.batchPut(mInodeMap); + mInodeClient.batchPut(mEdgeMap); + } catch (TiKVException e) { + throw new RuntimeException(e); + } + } + + @Override + public void close() { + mInodeMap.clear(); + mEdgeMap.clear(); + } + } @Override public void close() { From c31e61a23df3e3dec86616b5a3a10d246ba7c4ec Mon Sep 17 00:00:00 2001 From: wangbin83 Date: Fri, 5 Jul 2024 11:17:21 +0800 Subject: [PATCH 7/7] Improve the metadata storage engine tikv calling process --- .../main/java/alluxio/conf/PropertyKey.java | 30 +++++++++++++++++-- .../master/metastore/MetastoreType.java | 3 +- .../journal/checkpoint/CheckpointName.java | 1 + .../main/java/alluxio/master/MasterUtils.java | 10 +++++++ 4 files changed, 40 insertions(+), 4 deletions(-) diff --git a/core/common/src/main/java/alluxio/conf/PropertyKey.java b/core/common/src/main/java/alluxio/conf/PropertyKey.java index a10a8bababf8..5a59f11ae7ae 100755 --- a/core/common/src/main/java/alluxio/conf/PropertyKey.java +++ b/core/common/src/main/java/alluxio/conf/PropertyKey.java @@ -3414,12 +3414,19 @@ public String toString() { .build(); public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_LEVEL1_ALIAS = stringBuilder(Name.MASTER_TIERED_STORE_GLOBAL_LEVEL1_ALIAS) - .setDefaultValue(Constants.MEDIUM_SSD) + .setDefaultValue(Constants.MEDIUM_PMEM) .setDescription("The name of the second highest storage tier in the entire system.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_LEVEL2_ALIAS = + stringBuilder(Name.MASTER_TIERED_STORE_GLOBAL_LEVEL1_ALIAS) + .setDefaultValue(Constants.MEDIUM_SSD) + .setDescription("The name of the second highest storage tier in the entire system.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.MASTER) + .build(); + public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_LEVEL3_ALIAS = stringBuilder(Name.MASTER_TIERED_STORE_GLOBAL_LEVEL2_ALIAS) .setDefaultValue(Constants.MEDIUM_HDD) .setDescription("The name of the third highest storage tier in the entire system.") @@ -3428,14 +3435,14 @@ public String toString() { .build(); public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_LEVELS = intBuilder(Name.MASTER_TIERED_STORE_GLOBAL_LEVELS) - .setDefaultValue(3) + .setDefaultValue(4) .setDescription("The total number of storage tiers in the system.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) .build(); public static final PropertyKey MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE = listBuilder(Name.MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE) - .setDefaultValue("MEM,SSD,HDD") + .setDefaultValue("MEM,PMEM,SSD,HDD") .setDescription("The list of medium types we support in the system.") .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) .setScope(Scope.MASTER) @@ -5104,6 +5111,12 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.WORKER) .build(); + public static final PropertyKey WORKER_TIERED_STORE_LEVEL3_ALIAS = + new Builder(PropertyType.STRING, Template.WORKER_TIERED_STORE_LEVEL_ALIAS, 3) + .setDescription("The alias of the four storage tier on this worker.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) + .setScope(Scope.WORKER) + .build(); public static final PropertyKey WORKER_TIERED_STORE_LEVEL2_DIRS_PATH = new Builder(PropertyType.LIST, Optional.of(","), Template.WORKER_TIERED_STORE_LEVEL_DIRS_PATH, 2) @@ -7732,6 +7745,13 @@ public String toString() { .setConsistencyCheckLevel(ConsistencyCheckLevel.WARN) .setScope(Scope.CLIENT) .build(); + public static final PropertyKey MASTER_METASTORE_INODE_TIKV_CONNECTION = + stringBuilder(Name.MASTER_METASTORE_INODE_TIKV_CONNECTION) + .setDefaultValue("127.0.0.1:2379") + .setDescription("The connection of tikv for backing store.") + .setConsistencyCheckLevel(ConsistencyCheckLevel.ENFORCE) + .setScope(Scope.MASTER) + .build(); /** * @deprecated This key is used for testing. It is always deprecated. */ @@ -8398,6 +8418,8 @@ public static final class Name { "alluxio.master.tieredstore.global.level1.alias"; public static final String MASTER_TIERED_STORE_GLOBAL_LEVEL2_ALIAS = "alluxio.master.tieredstore.global.level2.alias"; + public static final String MASTER_TIERED_STORE_GLOBAL_LEVEL3_ALIAS = + "alluxio.master.tieredstore.global.level3.alias"; public static final String MASTER_TIERED_STORE_GLOBAL_LEVELS = "alluxio.master.tieredstore.global.levels"; public static final String MASTER_TIERED_STORE_GLOBAL_MEDIUMTYPE = @@ -9304,6 +9326,8 @@ public static final class Name { "alluxio.hadoop.kerberos.keytab.login.autorenewal"; public static final String HADOOP_CHECKSUM_COMBINE_MODE = "alluxio.hadoop.checksum.combine.mode"; + public static final String MASTER_METASTORE_INODE_TIKV_CONNECTION = + "alluxio.master.metastore.inode.tikv.connection"; private Name() {} // prevent instantiation } diff --git a/core/common/src/main/java/alluxio/master/metastore/MetastoreType.java b/core/common/src/main/java/alluxio/master/metastore/MetastoreType.java index ffb9a0846604..20512637e0cf 100644 --- a/core/common/src/main/java/alluxio/master/metastore/MetastoreType.java +++ b/core/common/src/main/java/alluxio/master/metastore/MetastoreType.java @@ -16,5 +16,6 @@ */ public enum MetastoreType { HEAP, - ROCKS + ROCKS, + TIKV } diff --git a/core/server/common/src/main/java/alluxio/master/journal/checkpoint/CheckpointName.java b/core/server/common/src/main/java/alluxio/master/journal/checkpoint/CheckpointName.java index 8870f7ec5d4f..30a4be7ff8ba 100644 --- a/core/server/common/src/main/java/alluxio/master/journal/checkpoint/CheckpointName.java +++ b/core/server/common/src/main/java/alluxio/master/journal/checkpoint/CheckpointName.java @@ -42,4 +42,5 @@ public enum CheckpointName { TTL_BUCKET_LIST, SCHEDULER, SNAPSHOT_ID, + TIKV_INODE_STORE, } diff --git a/core/server/master/src/main/java/alluxio/master/MasterUtils.java b/core/server/master/src/main/java/alluxio/master/MasterUtils.java index ccc1205f7ba1..42947fbadc78 100644 --- a/core/server/master/src/main/java/alluxio/master/MasterUtils.java +++ b/core/server/master/src/main/java/alluxio/master/MasterUtils.java @@ -22,6 +22,8 @@ import alluxio.master.metastore.heap.HeapInodeStore; import alluxio.master.metastore.rocks.RocksBlockMetaStore; import alluxio.master.metastore.rocks.RocksInodeStore; +import alluxio.master.metastore.tikv.TiKVInodeStore; +import alluxio.master.metastore.tikv.TiKVBlockMetaStore; import alluxio.util.CommonUtils; import java.util.ArrayList; @@ -70,6 +72,8 @@ public static BlockMetaStore.Factory getBlockStoreFactory(String baseDir) { return HeapBlockMetaStore::new; case ROCKS: return () -> new RocksBlockMetaStore(baseDir); + case TIKV: + return () -> new TiKVBlockMetaStore(baseDir); default: throw new IllegalStateException("Unknown metastore type: " + type); } @@ -92,6 +96,12 @@ public static InodeStore.Factory getInodeStoreFactory(String baseDir) { } else { return lockManager -> new CachingInodeStore(new RocksInodeStore(baseDir), lockManager); } + case TIKV: + if (Configuration.getInt(PropertyKey.MASTER_METASTORE_INODE_CACHE_MAX_SIZE) == 0) { + return lockManager -> new TiKVInodeStore(baseDir); + } else { + return lockManager -> new CachingInodeStore(new TiKVInodeStore(baseDir), lockManager); + } default: throw new IllegalStateException("Unknown metastore type: " + type); }