diff --git a/Garnet.sln b/Garnet.sln index e0cffb087bb..d3e5c8f1723 100644 --- a/Garnet.sln +++ b/Garnet.sln @@ -113,6 +113,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NoOpModule", "playground\No EndProject Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ETag", "samples\ETag\ETag.csproj", "{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Btree", "playground\BTree\Btree.csproj", "{CE12831B-2805-469E-8208-759DC4B4862C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -345,6 +347,14 @@ Global {4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|Any CPU.Build.0 = Release|Any CPU {4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.ActiveCfg = Release|Any CPU {4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.Build.0 = Release|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.ActiveCfg = Debug|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.Build.0 = Debug|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.Build.0 = Release|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.ActiveCfg = Release|Any CPU + {CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -381,6 +391,7 @@ Global {A48412B4-FD60-467E-A5D9-F155CAB4F907} = {147FCE31-EC09-4C90-8E4D-37CA87ED18C3} {D4C9A1A0-7053-F072-21F5-4E0C5827136D} = {69A71E2C-00E3-42F3-854E-BE157A24834E} {4FBA1587-BAFC-49F8-803A-D1CF431A26F5} = {7068BB97-1958-4060-B5F1-859464592E56} + {CE12831B-2805-469E-8208-759DC4B4862C} = {69A71E2C-00E3-42F3-854E-BE157A24834E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E} diff --git a/libs/common/RespReadUtils.cs b/libs/common/RespReadUtils.cs index d0c179cb65c..b78e28ee5a7 100644 --- a/libs/common/RespReadUtils.cs +++ b/libs/common/RespReadUtils.cs @@ -630,6 +630,16 @@ public static bool TryReadInt64WithLengthHeader(out long number, ref byte* ptr, return true; } + /// + /// Tries to read a Ulong from the given ASCII-encoded RESP string. + /// Note: this does not check for any length headers and is simply an accessor to TryReadUlong. + /// + /// If parsing was successful, contains the parsed ulong value. + /// The starting position in the RESP string. Will be advanced if parsing is successful. + /// The current end of the RESP string. + /// True if a ulong was successfully parsed. + public static bool ReadUlong(out ulong number, ref byte* ptr, byte* end) => TryReadUInt64(ref ptr, end, out number, out _); + /// /// Read long with length header /// diff --git a/libs/resources/RespCommandsInfo.json b/libs/resources/RespCommandsInfo.json index c0a7b00a31f..026bf1a5b5d 100644 --- a/libs/resources/RespCommandsInfo.json +++ b/libs/resources/RespCommandsInfo.json @@ -4828,6 +4828,124 @@ } ] }, + { + "Command": "XADD", + "Name": "XADD", + "IsInternal": false, + "Arity": -5, + "Flags": "DenyOom, Fast, Write", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Fast, Stream, Write", + "Tips": [ + "nondeterministic_output" + ], + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Notes": "UPDATE instead of INSERT because of the optional trimming feature", + "Flags": "RW, Update" + } + ], + "SubCommands": null + }, + { + "Command": "XDEL", + "Name": "XDEL", + "IsInternal": false, + "Arity": -3, + "Flags": "Fast, Write", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Fast, Stream, Write", + "Tips": null, + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Notes": null, + "Flags": "RW, Delete" + } + ], + "SubCommands": null + }, + { + "Command": "XLEN", + "Name": "XLEN", + "IsInternal": false, + "Arity": 2, + "Flags": "Fast, ReadOnly", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Fast, Read, Stream", + "Tips": null, + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Notes": null, + "Flags": "RO" + } + ], + "SubCommands": null + }, + { + "Command": "XRANGE", + "Name": "XRANGE", + "IsInternal": false, + "Arity": -4, + "Flags": "ReadOnly", + "FirstKey": 1, + "LastKey": 1, + "Step": 1, + "AclCategories": "Read, Slow, Stream", + "Tips": null, + "KeySpecifications": [ + { + "BeginSearch": { + "TypeDiscriminator": "BeginSearchIndex", + "Index": 1 + }, + "FindKeys": { + "TypeDiscriminator": "FindKeysRange", + "LastKey": 0, + "KeyStep": 1, + "Limit": 0 + }, + "Notes": null, + "Flags": "RO, Access" + } + ], + "SubCommands": null + }, { "Command": "ZADD", "Name": "ZADD", diff --git a/libs/server/BTreeIndex/BTree.cs b/libs/server/BTreeIndex/BTree.cs new file mode 100644 index 00000000000..8ee6ca78f3c --- /dev/null +++ b/libs/server/BTreeIndex/BTree.cs @@ -0,0 +1,177 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Runtime.InteropServices; + +namespace Garnet.server.BTreeIndex +{ + public unsafe partial class BTree + { + BTreeNode* root; + BTreeNode* head; + BTreeNode* tail; + byte* tailMinKey; + public static readonly int MAX_TREE_DEPTH = 10; // maximum allowed depth of the tree + static int DEFAULT_SPLIT_LEAF_POSITION = (BTreeNode.LEAF_CAPACITY + 1) / 2; // position at which leaf node is split + static int SPLIT_LEAF_POSITION = BTreeNode.LEAF_CAPACITY; // position at which leaf node is split + static int SPLIT_INTERNAL_POSITION = BTreeNode.INTERNAL_CAPACITY; // position at which internal node is split + + BTreeNode*[] rootToTailLeaf; // array of nodes from root to tail leaf + public BTreeStats stats; // statistics about the tree + + /// + /// Initializes a new instance of the class. + /// + public BTree(uint sectorSize) + { + var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); + root = BTreeNode.Create(BTreeNodeType.Leaf, memoryBlock); + head = tail = root; + root->info->next = root->info->previous = null; + root->info->count = 0; + tailMinKey = null; + rootToTailLeaf = new BTreeNode*[MAX_TREE_DEPTH]; + stats = new BTreeStats(); + stats.depth = 1; + stats.numLeafNodes = 1; + stats.numAllocates = 1; + } + + /// + /// Frees the memory allocated for a node + /// + /// BTreeNode to free from memory + private void Free(ref BTreeNode* node) + { + if (node == null) + return; + + // If this is an internal node, free all its children first + if (node->info->type == BTreeNodeType.Internal) + { + for (int i = 0; i <= node->info->count; i++) + { + var child = node->data.children[i]; + Free(ref child); + node->data.children[i] = null; + } + } + + // Free the memory handle + if (node->memoryHandle != null) + { + NativeMemory.Free(node->memoryHandle); + stats.numDeallocates++; + node = null; + } + } + + /// + /// Frees the memory allocated for a node + /// + /// + public static void FreeNode(ref BTreeNode* node) + { + if (node == null) + return; + + // If this is an internal node, free all its children first + if (node->info->type == BTreeNodeType.Internal) + { + for (int i = 0; i <= node->info->count; i++) + { + var child = node->data.children[i]; + FreeNode(ref child); + node->data.children[i] = null; + } + } + + // Free the memory handle + if (node->memoryHandle != null) + { + NativeMemory.Free(node->memoryHandle); + node = null; + } + } + + /// + /// Deallocates the memory allocated for the B+Tree + /// + public void Deallocate() + { + if (root == null) + return; + Free(ref root); + Console.WriteLine("free complete"); + stats.printStats(); + root = null; + head = null; + tail = null; + } + + /// + /// Destructor for the B+tree + /// + ~BTree() + { + Deallocate(); + } + + public ulong FastInserts => stats.totalFastInserts; + public ulong LeafCount => stats.numLeafNodes; + public ulong InternalCount => stats.numInternalNodes; + + public ulong ValidCount => StatsValidCount(); + + public long RootValidCount => GetValidCount(root); + + public long TailValidCount => GetValidCount(tail); + + public long Count() + { + return stats.numKeys; + } + public ulong StatsValidCount() + { + return stats.numValidKeys; + } + + public long GetValidCount(BTreeNode* node) + { + return node->info->validCount; + } + + /// + /// Retrieves the first entry in the B+Tree (smallest key) + /// + /// entry fetched + public KeyValuePair First() + { + BTreeNode* leaf = head; + if (leaf == null) + { + return default; + } + byte[] keyBytes = new ReadOnlySpan(leaf->GetKey(0), BTreeNode.KEY_SIZE).ToArray(); + return new KeyValuePair(keyBytes, leaf->GetValue(0)); + } + + /// + /// Retrieves the last entry in the B+Tree (largest key) + /// + /// entry fetched + public KeyValuePair Last() + { + BTreeNode* leaf = tail; + if (leaf == null) + { + return default; + } + byte[] keyBytes = new ReadOnlySpan(leaf->GetKey(leaf->info->count - 1), BTreeNode.KEY_SIZE).ToArray(); + return new KeyValuePair(keyBytes, leaf->GetValue(leaf->info->count - 1)); + } + + } +} \ No newline at end of file diff --git a/libs/server/BTreeIndex/BTreeDelete.cs b/libs/server/BTreeIndex/BTreeDelete.cs new file mode 100644 index 00000000000..07097b04831 --- /dev/null +++ b/libs/server/BTreeIndex/BTreeDelete.cs @@ -0,0 +1,32 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +namespace Garnet.server.BTreeIndex +{ + public unsafe partial class BTree + { + /// + /// Delete a key from the B+tree + /// + /// key to delete + /// true if key was tombstoned + public bool Delete(byte* key) + { + BTreeNode* leaf = null; + var nodesTraversed = new BTreeNode*[MAX_TREE_DEPTH]; + + TraverseToLeaf(ref leaf, ref nodesTraversed, key); + var index = leaf->LowerBound(key); + if (index >= leaf->info->count || BTreeNode.Compare(key, leaf->GetKey(index)) != 0) + { + return false; + } + + // insert a tombstone for the delete + leaf->InsertTombstone(index); + leaf->info->validCount--; + stats.numValidKeys--; + return true; + } + } +} \ No newline at end of file diff --git a/libs/server/BTreeIndex/BTreeInsert.cs b/libs/server/BTreeIndex/BTreeInsert.cs new file mode 100644 index 00000000000..239fb5f00ee --- /dev/null +++ b/libs/server/BTreeIndex/BTreeInsert.cs @@ -0,0 +1,343 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Runtime.InteropServices; + +namespace Garnet.server.BTreeIndex +{ + public unsafe partial class BTree + { + /// + /// Insert a key-value pair into the B+tree. Directly inserts into the tail leaf node. + /// + /// + /// + /// true if insertion is successful + public bool Insert(byte* key, Value value) + { + BTreeNode* leaf = null; + stats.totalFastInserts++; + stats.totalInserts++; + stats.numKeys++; + stats.numValidKeys++; + leaf = tail; + return InsertToLeafNode(ref leaf, ref rootToTailLeaf, key, value, true); + } + + public bool Insert(byte* key,ReadOnlySpan keySpan, Value value) + { + BTreeNode* leaf = null; + stats.totalFastInserts++; + stats.totalInserts++; + stats.numKeys++; + stats.numValidKeys++; + leaf = tail; + return InsertToLeafNode(ref leaf, ref rootToTailLeaf, key, value, true); + } + public bool InsertToLeafNode(ref BTreeNode* leaf, ref BTreeNode*[] nodesTraversed, byte* key, Value value, bool appendToLeaf = false) + { + int index; + if(appendToLeaf) + { + // if leaf has space + if (leaf->info->count < BTreeNode.LEAF_CAPACITY) + { + // append to end of leaf node + leaf->SetKey(leaf->info->count, key); + leaf->SetValue(leaf->info->count, value); + leaf->info->count++; + leaf->info->validCount++; + return true; + } + index = leaf->info->count; + return SplitLeafNode(ref leaf, ref nodesTraversed, key, value, index); + } + + // find the index where the key should be inserted + index = leaf->LowerBound(key); + if (index < leaf->info->count && BTreeNode.Compare(key, leaf->GetKey(index)) == 0) + { + // insert is actually an update + leaf->SetValue(index, value); + return false; + } + + if (leaf->info->count < BTreeNode.LEAF_CAPACITY) + { + // move keys to the right of index + var sourceSpan = new ReadOnlySpan(leaf->keys + index * BTreeNode.KEY_SIZE, (leaf->info->count - index) * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(leaf->keys + ((index + 1) * BTreeNode.KEY_SIZE), (leaf->info->count - index) * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + leaf->SetKey(index, key); + leaf->SetValue(index, value); + leaf->info->count++; + leaf->info->validCount++; + return true; + } + return SplitLeafNode(ref leaf, ref nodesTraversed, key, value, index); + } + + public bool SplitLeafNode(ref BTreeNode* leaf, ref BTreeNode*[] nodesTraversed, byte* key, Value value, int index) + { + var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); + stats.numAllocates++; + BTreeNode* newLeaf = BTreeNode.Create(BTreeNodeType.Leaf, memoryBlock); + + leaf->info->count = SPLIT_LEAF_POSITION; + newLeaf->info->previous = leaf; + newLeaf->info->next = leaf->info->next; + newLeaf->info->count = BTreeNode.LEAF_CAPACITY + 1 - SPLIT_LEAF_POSITION; + leaf->info->next = newLeaf; + stats.numLeafNodes++; + + // scan the keys from splitLeafPos to get the number of valid keys in the new leaf + uint newLeafValidCount = 0; + for (var i = SPLIT_LEAF_POSITION; i < BTreeNode.LEAF_CAPACITY; i++) + { + if (leaf->data.values[i].Valid) + { + newLeafValidCount++; + } + } + leaf->info->validCount -= newLeafValidCount; + newLeaf->info->validCount = newLeafValidCount; + // insert the new key to either the old node or the newly created node, based on the index + if (index >= leaf->info->count) + { + // new key goes to the new leaf + var newIndex = index - leaf->info->count; + + // move the keys from old node to the new node using ReadOnlySpan + var sourceSpan = new ReadOnlySpan(leaf->keys + index * BTreeNode.KEY_SIZE, newIndex * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(newLeaf->keys, newIndex * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + // add key to new leaf + newLeaf->SetKey(newIndex, key); + + var existingLeafKeysSpan = new ReadOnlySpan(leaf->keys + index * BTreeNode.KEY_SIZE, (BTreeNode.LEAF_CAPACITY - index) * BTreeNode.KEY_SIZE); + var newLeafKeysSpan = new Span(newLeaf->keys + (newIndex + 1) * BTreeNode.KEY_SIZE, (BTreeNode.LEAF_CAPACITY - index) * BTreeNode.KEY_SIZE); + existingLeafKeysSpan.CopyTo(newLeafKeysSpan); + + var existingLeafValuesSpan = new ReadOnlySpan(leaf->data.values + leaf->info->count, newIndex * sizeof(Value)); + var newLeafValuesSpan = new Span(newLeaf->data.values, newIndex * sizeof(Value)); + existingLeafValuesSpan.CopyTo(newLeafValuesSpan); + newLeaf->SetValue(newIndex, value); + + var existingLeafValuesSpan2 = new ReadOnlySpan(leaf->data.values + index, (BTreeNode.LEAF_CAPACITY - index) * sizeof(Value)); + var newLeafValuesSpan2 = new Span(newLeaf->data.values + newIndex + 1, (BTreeNode.LEAF_CAPACITY - index) * sizeof(Value)); + existingLeafValuesSpan2.CopyTo(newLeafValuesSpan2); + newLeaf->info->validCount++; + } + else + { + var existingLeafKeysSpan = new ReadOnlySpan(leaf->keys + (leaf->info->count - 1) * BTreeNode.KEY_SIZE, newLeaf->info->count * BTreeNode.KEY_SIZE); + var newLeafKeysSpan = new Span(newLeaf->keys, newLeaf->info->count * BTreeNode.KEY_SIZE); + existingLeafKeysSpan.CopyTo(newLeafKeysSpan); + + var existingLeafKeysSpan2 = new ReadOnlySpan(leaf->keys + index * BTreeNode.KEY_SIZE, (leaf->info->count - index - 1) * BTreeNode.KEY_SIZE); + var newLeafKeysSpan2 = new Span(leaf->keys + ((index + 1) * BTreeNode.KEY_SIZE), (leaf->info->count - index - 1) * BTreeNode.KEY_SIZE); + existingLeafKeysSpan2.CopyTo(newLeafKeysSpan2); + leaf->SetKey(index, key); + + var existingLeafValuesSpan = new ReadOnlySpan(leaf->data.values + leaf->info->count - 1, newLeaf->info->count * sizeof(Value)); + var newLeafValuesSpan = new Span(newLeaf->data.values, newLeaf->info->count * sizeof(Value)); + existingLeafValuesSpan.CopyTo(newLeafValuesSpan); + + var existingLeafValuesSpan2 = new ReadOnlySpan(leaf->data.values + index, (leaf->info->count - index - 1) * sizeof(Value)); + var newLeafValuesSpan2 = new Span(leaf->data.values + index + 1, (leaf->info->count - index - 1) * sizeof(Value)); + existingLeafValuesSpan2.CopyTo(newLeafValuesSpan2); + leaf->SetValue(index, value); + leaf->info->validCount++; + } + + uint validCount = 0; + // the leaf that is split will also be the tail node; so update the tail pointer + if (leaf == tail) + { + tail = newLeaf; + tailMinKey = newLeaf->GetKey(0); + rootToTailLeaf[0] = newLeaf; + // validCount in internal nodes of the index excludes the validCount of the tail leaf node (optimizing for performance to avoid traversal) + // thus, when we split the tail leaf, we push up the validCount of the leaf that we split to the internal node + validCount = leaf->info->validCount; + } + + // update the parent node with the new key + PushUpKeyInInternalNode(ref nodesTraversed, newLeaf->GetKey(0), ref newLeaf, SPLIT_INTERNAL_POSITION, validCount); + return true; + } + + public void PushUpKeyInInternalNode(ref BTreeNode*[] nodesTraversed, byte* key, ref BTreeNode* child, int splitPos, uint newValidCount) + { + int i; + // starts from parent of leaf node that triggered the push-up. + // if the parent has space, insert the key and child pointer, and return. Otherwise, split and cascade up. + for (i = 1; i < stats.depth; i++) + { + var node = nodesTraversed[i]; + var index = node->UpperBound(key); + + if (node->info->count < BTreeNode.INTERNAL_CAPACITY) + { + // we can insert + InsertToInternalNodeWithinCapacity(ref node, key, ref child, ref nodesTraversed, index, newValidCount); + + // update validCounts in the parent nodes + for (var j = i + 1; j < stats.depth; j++) + { + nodesTraversed[j]->info->validCount += newValidCount; + } + return; + } + + // split internal node + var newNode = SplitInternalNode(ref node, ref nodesTraversed, ref key, ref child, splitPos, index, i); + if (rootToTailLeaf[i] == node && tail != head && BTreeNode.Compare(key, tailMinKey) <= 0) + { + rootToTailLeaf[i] = newNode; + } + child = newNode; + } + // split root + CreateNewRoot(key, child); + } + + public void InsertToInternalNodeWithinCapacity(ref BTreeNode* node, byte* key, ref BTreeNode* child, ref BTreeNode*[] nodesTraversed, int index, uint newValidCount) + { + // move all keys to the right + var sourceSpan = new ReadOnlySpan(node->keys + index * BTreeNode.KEY_SIZE, (node->info->count - index) * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(node->keys + ((index + 1) * BTreeNode.KEY_SIZE), (node->info->count - index) * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + // move all children starting from index+1 to the right using a for loop + for (var j = node->info->count; j > index; j--) + { + node->SetChild(j + 1, node->GetChild(j)); + } + + // insert + node->SetKey(index, key); + node->SetChild(index + 1, child); + node->info->count++; + node->info->validCount += newValidCount; + } + + public BTreeNode* CreateInternalNode(ref BTreeNode* node, int splitPos) + { + var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); + stats.numAllocates++; + BTreeNode* newNode = BTreeNode.Create(BTreeNodeType.Internal, memoryBlock); + stats.numInternalNodes++; + node->info->count = splitPos; + newNode->info->count = BTreeNode.INTERNAL_CAPACITY - splitPos; + newNode->info->next = node->info->next; + newNode->info->previous = node; + node->info->next = newNode; + return newNode; + } + + public BTreeNode* SplitInternalNode(ref BTreeNode* nodeToSplit, ref BTreeNode*[] nodesTraversed, ref byte* key, ref BTreeNode* child, int splitPos, int index, int level) + { + var newNode = CreateInternalNode(ref nodeToSplit, splitPos); + + // scan keys from splitPos to get number of valid keys in the new node + uint newValidCount = 0; + for (int i = splitPos; i < BTreeNode.INTERNAL_CAPACITY; i++) + { + if (nodeToSplit->GetChild(i) != null) + { + newValidCount += nodeToSplit->GetChild(i)->info->validCount; + } + } + newNode->info->validCount = newValidCount; + + if (index > nodeToSplit->info->count) + { + // child goes to newNode + var sourceSpan = new ReadOnlySpan(nodeToSplit->keys + (nodeToSplit->info->count + 1) * BTreeNode.KEY_SIZE, (index - nodeToSplit->info->count - 1) * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(newNode->keys, (index - nodeToSplit->info->count - 1) * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + var existingNodeKeysSpan = new ReadOnlySpan(nodeToSplit->keys + index * BTreeNode.KEY_SIZE, (BTreeNode.INTERNAL_CAPACITY - index) * BTreeNode.KEY_SIZE); + var newNodeKeysSpan = new Span(newNode->keys + (index - nodeToSplit->info->count) * BTreeNode.KEY_SIZE, (BTreeNode.INTERNAL_CAPACITY - index) * BTreeNode.KEY_SIZE); + existingNodeKeysSpan.CopyTo(newNodeKeysSpan); + newNode->SetKey(index - nodeToSplit->info->count - 1, key); + + var existingNodeChildrenSpan = new ReadOnlySpan(nodeToSplit->data.children + 1 + nodeToSplit->info->count, (index - nodeToSplit->info->count) * sizeof(BTreeNode*)); + var newNodeChildrenSpan = new Span(newNode->data.children, (index - nodeToSplit->info->count) * sizeof(BTreeNode*)); + existingNodeChildrenSpan.CopyTo(newNodeChildrenSpan); + + var existingNodeChildrenSpan2 = new ReadOnlySpan(nodeToSplit->data.children + 1 + index, newNode->info->count * sizeof(BTreeNode*)); + var newNodeChildrenSpan2 = new Span(newNode->data.children + 1 + index - nodeToSplit->info->count, newNode->info->count * sizeof(BTreeNode*)); + existingNodeChildrenSpan2.CopyTo(newNodeChildrenSpan2); + newNode->SetChild(index - nodeToSplit->info->count, child); + key = nodeToSplit->GetKey(nodeToSplit->info->count); + } + else if (index == nodeToSplit->info->count) + { + var sourceSpan = new ReadOnlySpan(nodeToSplit->keys + nodeToSplit->info->count * BTreeNode.KEY_SIZE, newNode->info->count * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(newNode->keys, newNode->info->count * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + var existingNodeChildrenSpan = new ReadOnlySpan(nodeToSplit->data.children + 1 + nodeToSplit->info->count, newNode->info->count * sizeof(BTreeNode*)); + var newNodeChildrenSpan = new Span(newNode->data.children + 1, newNode->info->count * sizeof(BTreeNode*)); + existingNodeChildrenSpan.CopyTo(newNodeChildrenSpan); + newNode->SetChild(0, child); + } + else + { + // child goes to old node + var sourceSpan = new ReadOnlySpan(nodeToSplit->keys + nodeToSplit->info->count * BTreeNode.KEY_SIZE, newNode->info->count * BTreeNode.KEY_SIZE); + var destinationSpan = new Span(newNode->keys, newNode->info->count * BTreeNode.KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + + var existingNodeKeysSpan = new ReadOnlySpan(nodeToSplit->keys + index * BTreeNode.KEY_SIZE, (nodeToSplit->info->count - index) * BTreeNode.KEY_SIZE); + var newNodeKeysSpan = new Span(nodeToSplit->keys + ((index + 1) * BTreeNode.KEY_SIZE), (nodeToSplit->info->count - index) * BTreeNode.KEY_SIZE); + existingNodeKeysSpan.CopyTo(newNodeKeysSpan); + nodeToSplit->SetKey(index, key); + + var existingNodeChildrenSpan = new ReadOnlySpan(nodeToSplit->data.children + nodeToSplit->info->count, newNode->info->count * sizeof(BTreeNode*)); + var newNodeChildrenSpan = new Span(newNode->data.children, newNode->info->count * sizeof(BTreeNode*)); + existingNodeChildrenSpan.CopyTo(newNodeChildrenSpan); + + var existingNodeChildrenSpan2 = new ReadOnlySpan(nodeToSplit->data.children + index + 1, (nodeToSplit->info->count - index + 1) * sizeof(BTreeNode*)); + var newNodeChildrenSpan2 = new Span(nodeToSplit->data.children + index + 2, (nodeToSplit->info->count - index + 1) * sizeof(BTreeNode*)); + existingNodeChildrenSpan2.CopyTo(newNodeChildrenSpan2); + nodeToSplit->SetChild(index + 1, child); + key = nodeToSplit->GetKey(nodeToSplit->info->count); + } + + return newNode; + } + + + public void CreateNewRoot(byte* key, BTreeNode* newlySplitNode) + { + var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); + stats.numAllocates++; + BTreeNode* newRoot = BTreeNode.Create(BTreeNodeType.Internal, memoryBlock); + + // Set the new root's key. + newRoot->info->count = 1; + newRoot->SetKey(0, key); + + // Set children: left child is the old root; right child is the newly split node. + newRoot->SetChild(0, root); + newRoot->SetChild(1, newlySplitNode); + + newRoot->info->validCount = root->info->validCount; + if (newlySplitNode != tail) + { + newRoot->info->validCount += newlySplitNode->info->validCount; + } + newRoot->info->next = newRoot->info->previous = null; + root = newRoot; + rootToTailLeaf[stats.depth] = newRoot; + stats.depth++; + stats.numInternalNodes++; + } + } +} \ No newline at end of file diff --git a/libs/server/BTreeIndex/BTreeInternals.cs b/libs/server/BTreeIndex/BTreeInternals.cs new file mode 100644 index 00000000000..d7b243beb3e --- /dev/null +++ b/libs/server/BTreeIndex/BTreeInternals.cs @@ -0,0 +1,329 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Numerics; +using System.Runtime.InteropServices; +using System.Runtime.Intrinsics.X86; + +namespace Garnet.server.BTreeIndex +{ + + public enum BTreeNodeType + { + Internal, + Leaf + } + + /// + /// Represents information stored in a node in the B+tree + /// + [StructLayout(LayoutKind.Explicit)] + public unsafe struct NodeData + { + [FieldOffset(0)] + public Value* values; + [FieldOffset(0)] + public BTreeNode** children; + } + + [StructLayout(LayoutKind.Explicit, Size = sizeof(byte) + sizeof(ulong))] + public struct Value + { + [FieldOffset(0)] + public byte valid; + [FieldOffset(1)] + public ulong address; + + public bool Valid + { + get + { + return valid == 1; + } + set + { + valid = (byte)(value ? 1 : 0); + } + } + + public Value(ulong value) + { + this.valid = 1; + this.address = value; + } + } + + public unsafe struct NodeInfo + { + public BTreeNodeType type; + public int count; + public BTreeNode* next; + public BTreeNode* previous; + public uint validCount; // valid keys (non-tombstone keys) in the node. + } + + /// + /// Represents a node in the B+tree + /// Memory layout: + /// +-----------------------------------+ + /// | BTreeNode (HEADER_SIZE bytes) | + /// | - NodeInfo* info | + /// | - NodeData data | + /// | - byte* keys | + /// | - IntPtr* memoryHandle | + /// +-----------------------------------+ + /// | NodeInfo (METADATA_SIZE bytes) | + /// | - BTreeNodeType type | + /// | - int count | + /// | - BTreeNode* next | + /// | - BTreeNode* previous | + /// | - uint validCount | + /// +-----------------------------------+ + /// | Keys array: capacity * KEY_SIZE | + /// +-----------------------------------+ + /// | Data array: either Value[] (leaf) | + /// | or BTreeNode*[] (internal) | + /// +-----------------------------------+ + /// Expects an allocated block of memory (of size BTreeNode.PAGE_SIZE) to be passed as handle + /// Stores handle for deallocation + /// BTreeNode struct also contained within the 4KB block to allow pointers to created nodes to be passed around + /// as well as allow for on-demand allocation/deallocation. + /// NOTE: currently reverted to MemoryMarshal for allocation of handles due to undefined behavior with SectorAlignedMemory. + /// + public unsafe struct BTreeNode + { + public static int HEADER_SIZE = sizeof(BTreeNode); + public static int PAGE_SIZE = 4096; // This must be increased if you want to store the BTreeNode header in the block. + public static int KEY_SIZE = 16; // key size in bytes. + public static int METADATA_SIZE = sizeof(NodeInfo); + public static int LEAF_CAPACITY = (PAGE_SIZE - HEADER_SIZE - METADATA_SIZE) / (KEY_SIZE + sizeof(Value)); + public static int INTERNAL_CAPACITY = (PAGE_SIZE - HEADER_SIZE - METADATA_SIZE - sizeof(BTreeNode*)) / (KEY_SIZE + sizeof(BTreeNode*)); + + public NodeInfo* info; + public NodeData data; + public byte* keys; + public IntPtr* memoryHandle; + + public static BTreeNode* Create(BTreeNodeType type, IntPtr* handle) + { + // Place the node header at the beginning of the block. + BTreeNode* node = (BTreeNode*)handle; + node->memoryHandle = handle; + + // Define the start of the payload right after the header. + byte* payloadPtr = (byte*)(handle) + HEADER_SIZE; + + // The NodeInfo will be stored at the start of the payload. + node->info = (NodeInfo*)payloadPtr; + node->info->type = type; + node->info->count = 0; + node->info->next = null; + node->info->previous = null; + node->info->validCount = 0; + + // Data for keys follows the Nodeinfo-> + byte* keysPtr = payloadPtr + METADATA_SIZE; + node->keys = keysPtr; + + int capacity = (type == BTreeNodeType.Leaf) ? LEAF_CAPACITY : INTERNAL_CAPACITY; + int keysSize = capacity * KEY_SIZE; + byte* dataSectionPtr = keysPtr + keysSize; + + // Set up NodeData in-place. + if (type == BTreeNodeType.Leaf) + { + node->data.values = (Value*)dataSectionPtr; + } + else + { + node->data.children = (BTreeNode**)dataSectionPtr; + } + + return node; + } + + public byte* GetKey(int index) + { + byte* keyAddress = keys + (index * KEY_SIZE); + return keyAddress; + } + + public void SetKey(int index, byte* keyData) + { + var sourceSpan = new ReadOnlySpan(keyData, KEY_SIZE); + var destinationSpan = new Span(keys + (index * KEY_SIZE), KEY_SIZE); + sourceSpan.CopyTo(destinationSpan); + } + + public void SetChild(int index, BTreeNode* child) + { + data.children[index] = child; + } + + public BTreeNode* GetChild(int index) + { + return data.children[index]; + } + + public void SetValue(int index, Value value) + { + data.values[index] = value; + } + + public Value GetValue(int index) + { + return data.values[index]; + } + + public void SetValueValid(int index, bool valid) + { + data.values[index].Valid = valid; + } + + public void InsertTombstone(int index) + { + data.values[index].Valid = false; + } + + /// + /// Returns the index of the first key greater than the given key + /// + /// + /// + public int UpperBound(byte* key) + { + if (info->count == 0) + { + return 0; + } + int left = 0, right = info->count - 1; + while (left <= right) + { + var mid = left + (right - left) / 2; + byte* midKey = GetKey(mid); + int cmp = Compare(key, midKey); + if (cmp < 0) + { + right = mid - 1; + } + else + { + left = mid + 1; + } + } + return left; + } + + /// + /// Returns the index of the first key less than the given key + /// + /// + /// + public int LowerBound(byte* key) + { + if (info->count == 0) + { + return 0; + } + int left = 0, right = info->count - 1; + while (left <= right) + { + var mid = left + (right - left) / 2; + byte* midKey = GetKey(mid); + int cmp = Compare(midKey, key); + if (cmp == 0) + { + return mid; + } + else if (cmp < 0) + { + left = mid + 1; + } + else + { + right = mid - 1; + } + } + return left; + } + + /// + /// Compares two keys + /// + /// + /// + /// -1 if key1 is less than key2; 0 if key1 == key2; 1 if key1 > key2 + public static int Compare(byte* key1, byte* key2) + { + + if (Sse2.IsSupported) + { + var v1 = Sse2.LoadVector128(key1); + var v2 = Sse2.LoadVector128(key2); + + var mask = Sse2.MoveMask(Sse2.CompareEqual(v1, v2)); + + if (mask != 0xFFFF) // Not all bytes are equal + { + // Find the index of the first differing byte + int index = BitOperations.TrailingZeroCount(~mask); // Invert mask to find first zero (differing byte) + return key1[index] < key2[index] ? -1 : 1; + } + + return 0; // Arrays are equal + } + else + { + return new Span(key1, KEY_SIZE).SequenceCompareTo(new Span(key2, KEY_SIZE)); + } + } + } + + /// + /// Statistics about the B+Tree + /// + public struct BTreeStats + { + // general index stats + public int depth; + public ulong numLeafNodes; + public ulong numInternalNodes; + + // workload specific stats + public long totalInserts; // cumulative number of inserts to the index + public long totalDeletes; // cumulative number of deletes to the index + public ulong totalFastInserts; // cumulative number of fast inserts to the index + public long numKeys; // number of keys currently indexed + public ulong numValidKeys; // number of keys that are not tombstoned + public ulong numAllocates; + public ulong numDeallocates; + public BTreeStats() + { + depth = 0; + numLeafNodes = 0; + numInternalNodes = 0; + totalInserts = 0; + totalDeletes = 0; + totalFastInserts = 0; + numKeys = 0; + numValidKeys = 0; + numAllocates = 0; + numDeallocates = 0; + } + + public void printStats() + { + Console.WriteLine($"Depth: {depth}"); + Console.WriteLine($"Number of leaf nodes: {numLeafNodes}"); + Console.WriteLine($"Number of internal nodes: {numInternalNodes}"); + Console.WriteLine($"Total inserts: {totalInserts}"); + Console.WriteLine($"Total deletes: {totalDeletes}"); + Console.WriteLine($"Total fast inserts: {totalFastInserts}"); + Console.WriteLine($"Number of keys: {numKeys}"); + Console.WriteLine($"Number of valid keys: {numValidKeys}"); + Console.WriteLine($"Number of allocates: {numAllocates}"); + Console.WriteLine($"Number of deallocates: {numDeallocates}"); + } + } +} \ No newline at end of file diff --git a/libs/server/BTreeIndex/BTreeLookup.cs b/libs/server/BTreeIndex/BTreeLookup.cs new file mode 100644 index 00000000000..e5837a64327 --- /dev/null +++ b/libs/server/BTreeIndex/BTreeLookup.cs @@ -0,0 +1,156 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System.Collections.Generic; +using System.Diagnostics; + +namespace Garnet.server.BTreeIndex +{ + public unsafe partial class BTree + { + /// + /// Point lookup in the index + /// + /// lookup key + /// + public Value Get(byte* key) + { + BTreeNode* leaf = null; + var nodesTraversed = new BTreeNode*[MAX_TREE_DEPTH]; + TraverseToLeaf(ref leaf, ref nodesTraversed, key); + + var index = leaf->LowerBound(key); + if (index < leaf->info->count && BTreeNode.Compare(key, leaf->GetKey(index)) == 0) + { + var value = leaf->GetValue(index); + if (value.Valid) + { + return value; + } + } + return default; + } + + /// + /// Range lookup in the index + /// + /// start key for the range lookup + /// end key for the range lookup + /// address of the start key + /// address of end key + /// list of tombstones + /// limit entries scanned in the range lookup + /// reverse lookup + /// + public int Get(byte* start, byte* end, out Value startVal, out Value endVal, out List tombstones, long limit = -1, bool reverse = false) + { + Debug.Assert(reverse ? + BTreeNode.Compare(start, end) >= 0 : BTreeNode.Compare(start, end) <= 0, + "Start key should be less than or equal to end key"); + int count = 0; + tombstones = new List(); + BTreeNode* startLeaf = null, endLeaf = null; + BTreeNode*[] nodesTraversed = new BTreeNode*[MAX_TREE_DEPTH]; + int startIndex, endIndex; + + // find the leaf node for the start key + TraverseToLeaf(ref startLeaf, ref nodesTraversed, start); + // find the leaf node for the end key + TraverseToLeaf(ref endLeaf, ref nodesTraversed, end); + + if (reverse) + { + // find the first slot > start and subtract one index to get the start index + startIndex = startLeaf->UpperBound(start) - 1; + startVal = startLeaf->GetValue(startIndex); + + // find the first value greater than equal to key and that will be the last index + endIndex = endLeaf->LowerBound(end); + endVal = endLeaf->GetValue(endIndex); + } + else + { + // find the first key in the start leaf that is greater than or equal to the start key + startIndex = startLeaf->LowerBound(start); + startVal = startLeaf->GetValue(startIndex); + // find the last key in the end leaf that is less than or equal to the end key + endIndex = endLeaf->UpperBound(end) - 1; + endVal = endLeaf->GetValue(endIndex); + } + + // iterate over the leaves between startLeaf[startIndex] and endLeaf[endIndex] (inclusive) and collect all tombstones + BTreeNode* leaf = startLeaf; + uint numScanned = 0; + while (leaf != null) + { + int first, last; + bool scanComplete = false; + if (reverse) + { + // we would like an inverse traversal + first = leaf == startLeaf ? startIndex : leaf->info->count - 1; + last = leaf == endLeaf ? endIndex : 0; + } + else + { + last = leaf == endLeaf ? endIndex : leaf->info->count - 1; + first = leaf == startLeaf ? startIndex : 0; + } + + for (var i = first; ;) + { + numScanned++; + var value = leaf->GetValue(i); + if (!value.Valid) + { + tombstones.Add(leaf->GetValue(i)); + } + else + { + // entry will be part of result set + count++; + if (limit != -1 && count >= limit) + { + // update address as required + if (reverse) + { + startVal = value; + } + else + { + endVal = value; + } + scanComplete = true; + break; + } + } + + if (reverse) + { + if (i <= last) + { + break; + } + i--; + } + else + { + if (i >= last) + { + break; + } + i++; + } + } + + if (leaf == endLeaf || scanComplete) + { + break; + } + + leaf = reverse ? leaf->info->previous : leaf->info->next; + } + return count; + } + } +} \ No newline at end of file diff --git a/libs/server/BTreeIndex/BTreeTraverse.cs b/libs/server/BTreeIndex/BTreeTraverse.cs new file mode 100644 index 00000000000..8c9d482045f --- /dev/null +++ b/libs/server/BTreeIndex/BTreeTraverse.cs @@ -0,0 +1,50 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +namespace Garnet.server.BTreeIndex +{ + public unsafe partial class BTree + { + public byte* TraverseToLeaf(ref BTreeNode* node, ref BTreeNode*[] nodesTraversed, byte* key) + { + byte* leafMax = null; + BTreeNode* child = root; + for (var i = stats.depth - 1; i > 0; --i) + { + node = child; + nodesTraversed[i] = child; + var slot = node->UpperBound(key); + if (slot != node->info->count) + { + leafMax = node->GetKey(slot); + } + child = node->GetChild(slot); + } + node = child; + nodesTraversed[0] = child; + return leafMax; + } + + public byte* TraverseToLeaf(ref BTreeNode* node, ref BTreeNode*[] nodesTraversed, byte* key, out int[] slots) + { + slots = new int[MAX_TREE_DEPTH]; + byte* leafMax = null; + BTreeNode* child = root; + for (var i = stats.depth - 1; i > 0; --i) + { + node = child; + nodesTraversed[i] = child; + var slot = node->UpperBound(key); + slots[i] = slot; + if (slot != node->info->count) + { + leafMax = node->GetKey(slot); + } + child = node->GetChild(slot); + } + node = child; + nodesTraversed[0] = child; + return leafMax; + } + } +} \ No newline at end of file diff --git a/libs/server/Resp/CmdStrings.cs b/libs/server/Resp/CmdStrings.cs index b0ba2d0719f..60e963c7c32 100644 --- a/libs/server/Resp/CmdStrings.cs +++ b/libs/server/Resp/CmdStrings.cs @@ -262,6 +262,10 @@ static partial class CmdStrings public static ReadOnlySpan RESP_ERR_DEUBG_DISALLOWED => @"ERR DEBUG command not allowed. If the EnableDebugCommand option is set to ""local"", you can run it from a local connection, otherwise you need to set this option in the configuration file, and then restart the server."u8; + public static ReadOnlySpan RESP_ERR_XADD_WRONG_NUM_ARGS => "ERR wrong number of arguments for 'xadd' command"u8; + public static ReadOnlySpan RESP_ERR_XLEN_WRONG_NUM_ARGS => "ERR wrong number of arguments for 'xlen' command"u8; + public static ReadOnlySpan RESP_ERR_XRANGE_WRONG_NUM_ARGS => "ERR wrong number of arguments for 'xrange' command"u8; + public static ReadOnlySpan RESP_ERR_XDEL_WRONG_NUM_ARGS => "ERR wrong number of arguments for 'xdel' command"u8; /// /// Response string templates diff --git a/libs/server/Resp/Parser/RespCommand.cs b/libs/server/Resp/Parser/RespCommand.cs index 49e7e70c526..c3174f540fc 100644 --- a/libs/server/Resp/Parser/RespCommand.cs +++ b/libs/server/Resp/Parser/RespCommand.cs @@ -184,6 +184,10 @@ public enum RespCommand : ushort SREM, SUNIONSTORE, UNLINK, + XADD, + XLEN, + XRANGE, + XDEL, ZADD, ZDIFFSTORE, ZINCRBY, @@ -939,6 +943,21 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan } break; + case 'X': + if (*(ulong*)(ptr + 2) == MemoryMarshal.Read("\r\nXADD\r\n"u8)) + { + return RespCommand.XADD; + } + else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read("\r\nXLEN\r\n"u8)) + { + return RespCommand.XLEN; + } + else if (*(ulong*)(ptr + 2) == MemoryMarshal.Read("\r\nXDEL\r\n"u8)) + { + return RespCommand.XDEL; + } + break; + case 'Z': if (*(ulong*)(ptr + 2) == MemoryMarshal.Read("\r\nZADD\r\n"u8)) { @@ -1280,6 +1299,13 @@ private RespCommand FastParseArrayCommand(ref int count, ref ReadOnlySpan return RespCommand.UNLINK; } break; + + case 'X': + if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("XRANGE\r\n"u8)) + { + return RespCommand.XRANGE; + } + break; case 'Z': if (*(ulong*)(ptr + 4) == MemoryMarshal.Read("ZCOUNT\r\n"u8)) diff --git a/libs/server/Resp/RespServerSession.cs b/libs/server/Resp/RespServerSession.cs index cb9cec8afb9..9d7e1e9f868 100644 --- a/libs/server/Resp/RespServerSession.cs +++ b/libs/server/Resp/RespServerSession.cs @@ -191,6 +191,11 @@ internal sealed unsafe partial class RespServerSession : ServerSessionBase // Threshold for slow log in ticks (0 means disabled) readonly long slowLogThreshold; + /// + /// Stream cache for the session + /// + internal readonly SessionStreamCache sessionStreamCache; + public RespServerSession( long id, INetworkSender networkSender, @@ -249,6 +254,10 @@ public RespServerSession( if (this.networkSender.GetMaxSizeSettings?.MaxOutputSize < sizeof(int)) this.networkSender.GetMaxSizeSettings.MaxOutputSize = sizeof(int); } + + // grab stream manager from storeWrapper + this.streamManager = storeWrapper.streamManager; + sessionStreamCache = new SessionStreamCache(); } internal void SetUserHandle(UserHandle userHandle) @@ -806,6 +815,11 @@ private bool ProcessArrayCommands(RespCommand cmd, ref TGarnetApi st RespCommand.SUNIONSTORE => SetUnionStore(ref storageApi), RespCommand.SDIFF => SetDiff(ref storageApi), RespCommand.SDIFFSTORE => SetDiffStore(ref storageApi), + // Stream Commands + RespCommand.XADD => StreamAdd(), + RespCommand.XLEN => StreamLength(), + RespCommand.XDEL => StreamDelete(), + RespCommand.XRANGE => StreamRange(), _ => ProcessOtherCommands(cmd, ref storageApi) }; return success; diff --git a/libs/server/Resp/StreamCommands.cs b/libs/server/Resp/StreamCommands.cs new file mode 100644 index 00000000000..7ae68bb4473 --- /dev/null +++ b/libs/server/Resp/StreamCommands.cs @@ -0,0 +1,191 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using Garnet.common; +using Tsavorite.core; + +namespace Garnet.server +{ + internal sealed unsafe partial class RespServerSession : ServerSessionBase + { + readonly StreamManager streamManager; + /// + /// STREAMADD + /// + /// + private unsafe bool StreamAdd() + { + if (parseState.Count < 4) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_XADD_WRONG_NUM_ARGS); + } + + // Parse the stream key. + var key = parseState.GetArgSliceByRef(0); + + // Parse the id. We parse as string for easy pattern matching. + var idGiven = parseState.GetArgSliceByRef(1); + + // get the number of the remaining key-value pairs + var numPairs = parseState.Count - 2; + + // grab the rest of the input that will mainly be k-v pairs as entry to the stream. + byte* vPtr = parseState.GetArgSliceByRef(2).ptr - sizeof(int); + //int vsize = (int)(recvBufferPtr + bytesRead - vPtr); + int vsize = (int)(recvBufferPtr + endReadHead - vPtr); + SpanByteAndMemory _output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); + + + if (sessionStreamCache.TryGetStreamFromCache(key.Span, out StreamObject cachedStream)) + { + cachedStream.AddEntry(vPtr, vsize, idGiven, numPairs, ref _output); + } + else + { + streamManager.StreamAdd(key, idGiven, vPtr, vsize, numPairs, ref _output, out byte[] lastStreamKey, out StreamObject lastStream); + // since we added to a new stream that was not in the cache, try adding it to the cache + sessionStreamCache.TryAddStreamToCache(lastStreamKey, lastStream); + } + _ = ProcessOutputWithHeader(_output); + return true; + } + + /// + /// STREAMLENGTH + /// + /// + private bool StreamLength() + { + if (parseState.Count != 1) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_XLEN_WRONG_NUM_ARGS); + } + // parse the stream key. + var key = parseState.GetArgSliceByRef(0); + + ulong streamLength; + + // check if the stream exists in cache + if (sessionStreamCache.TryGetStreamFromCache(key.Span, out StreamObject cachedStream)) + { + streamLength = cachedStream.Length(); + } + else + { + streamLength = streamManager.StreamLength(key); + } + // write back result + while (!RespWriteUtils.TryWriteInt64((long)streamLength, ref dcurr, dend)) + SendAndReset(); + return true; + } + + /// + /// STREAMRANGE + /// + /// + public unsafe bool StreamRange() + { + // command is of format: XRANGE key start end [COUNT count] + // we expect at least 3 arguments + if (parseState.Count < 3) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_XRANGE_WRONG_NUM_ARGS); + } + + // parse the stream key + var key = parseState.GetArgSliceByRef(0); + + // parse start and end IDs + var startId = parseState.GetArgSliceByRef(1).ToString(); + var endId = parseState.GetArgSliceByRef(2).ToString(); + + int count = -1; + if (parseState.Count > 3) + { + // parse the count argument + var countStr = parseState.GetArgSliceByRef(4).ToString(); + if (!int.TryParse(countStr, out count)) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_GENERIC_SYNTAX_ERROR); + } + } + + SpanByteAndMemory _output = new SpanByteAndMemory(dcurr, (int)(dend - dcurr)); + + bool success = false; + + // check if the stream exists in cache + if (sessionStreamCache.TryGetStreamFromCache(key.Span, out StreamObject cachedStream)) + { + cachedStream.ReadRange(startId, endId, count, ref _output); + success = true; + } + else + { + success = streamManager.StreamRange(key, startId, endId, count, ref _output); + } + if (success) + { + _ = ProcessOutputWithHeader(_output); + } + else + { + //return empty array + while (!RespWriteUtils.TryWriteArrayLength(0, ref dcurr, dend)) + SendAndReset(); + return true; + } + + // _ = ProcessOutputWithHeader(_output); + + return true; + } + + public bool StreamDelete() + { + // command is of format: XDEL key id [id ...] + // we expect at least 2 arguments + if (parseState.Count < 2) + { + return AbortWithErrorMessage(CmdStrings.RESP_ERR_XDEL_WRONG_NUM_ARGS); + } + + // parse the stream key + var key = parseState.GetArgSliceByRef(0); + int deletedCount = 0; + + // for every id, parse and delete the stream entry + for (int i = 1; i < parseState.Count; i++) + { + // parse the id as string + var idGiven = parseState.GetArgSliceByRef(i); + + bool deleted; + // check if the stream exists in cache + if (sessionStreamCache.TryGetStreamFromCache(key.Span, out StreamObject cachedStream)) + { + deleted = cachedStream.DeleteEntry(idGiven); + } + else + { + // delete the entry in the stream from the streamManager + deleted = streamManager.StreamDelete(key, idGiven, out StreamObject lastStream); + if (lastStream != null) + { + // since we deleted from a stream that was not in the cache, try adding it to the cache + sessionStreamCache.TryAddStreamToCache(key.ToArray(), lastStream); + } + } + + deletedCount = deleted ? deletedCount + 1 : deletedCount; + } + + // write back the number of entries deleted + while (!RespWriteUtils.TryWriteInt64(deletedCount, ref dcurr, dend)) + SendAndReset(); + return true; + } + + } +} \ No newline at end of file diff --git a/libs/server/Servers/GarnetServerOptions.cs b/libs/server/Servers/GarnetServerOptions.cs index fd6a0d8f79e..db53263de40 100644 --- a/libs/server/Servers/GarnetServerOptions.cs +++ b/libs/server/Servers/GarnetServerOptions.cs @@ -456,6 +456,16 @@ public class GarnetServerOptions : ServerOptions /// public StateMachineDriver StateMachineDriver; + /// + /// Page size for BTree index for STREAM + /// + public string StreamPageSize = "4m"; + + /// + /// Memory for STREAM + /// + public string StreamMemorySize = "1g"; + /// /// Constructor /// @@ -617,6 +627,32 @@ public KVSettings GetSettings(ILoggerFactory loggerFactory, return kvSettings; } + /// + /// Get stream page size + /// + /// + public long StreamPageSizeBytes() + { + long size = ParseSize(StreamPageSize); + long adjustedSize = PreviousPowerOf2(size); + if (size != adjustedSize) + logger?.LogInformation($"Warning: using lower stream page size than specified (power of 2)"); + return adjustedSize; + } + + /// + /// Get stream memory size + /// + /// + public long StreamMemorySizeBytes() + { + long size = ParseSize(StreamMemorySize); + long adjustedSize = PreviousPowerOf2(size); + if (size != adjustedSize) + logger?.LogInformation($"Warning: using lower stream page size than specified (power of 2)"); + return adjustedSize; + } + /// /// Get memory size /// diff --git a/libs/server/StoreWrapper.cs b/libs/server/StoreWrapper.cs index d6438f48cee..aaf8af35c1b 100644 --- a/libs/server/StoreWrapper.cs +++ b/libs/server/StoreWrapper.cs @@ -124,6 +124,8 @@ public sealed class StoreWrapper : IDisposable /// public readonly int databaseNum = 1; + internal readonly StreamManager streamManager; + /// /// Constructor /// @@ -212,6 +214,9 @@ public StoreWrapper( clusterProvider = clusterFactory.CreateClusterProvider(this); ctsCommit = new(); run_id = Generator.CreateHexId(); + + // initialize stream manager + this.streamManager = new StreamManager(serverOptions.StreamPageSizeBytes(), serverOptions.StreamMemorySizeBytes(), 0); } /// diff --git a/libs/server/Stream/SessionStreamCache.cs b/libs/server/Stream/SessionStreamCache.cs new file mode 100644 index 00000000000..4f6ee3ba288 --- /dev/null +++ b/libs/server/Stream/SessionStreamCache.cs @@ -0,0 +1,59 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; + +namespace Garnet.server +{ + internal class SessionStreamCache + { + const int DefaultCacheSize = 16; + readonly Dictionary streamCache = new Dictionary(DefaultCacheSize, ByteArrayComparer.Instance); + readonly byte[][] streamKeysCache = new byte[DefaultCacheSize][]; + int cachedStreamsCount = 0; + int front = 0; + + public SessionStreamCache() + { } + + /// + /// Lookup a stream in the cahce. Since the cache is expected to be small, we can sequentially scan. + /// + /// name of stream to lookup + /// stream found from the cache + /// true if stream exists in cache + public bool TryGetStreamFromCache(ReadOnlySpan key, out StreamObject stream) + { + return streamCache.TryGetValue(key.ToArray(), out stream); + } + + /// + /// Add a stream to the cache. If the cache is full, we don't add the stream. + /// + /// name of stream + /// reference to stream object + /// true if successfully added + public bool TryAddStreamToCache(byte[] key, StreamObject stream) + { + if (cachedStreamsCount < DefaultCacheSize) + { + streamCache.Add(key, stream); + // add to circular array and update front + streamKeysCache[front] = key; + front = (front + 1) % DefaultCacheSize; + cachedStreamsCount++; + return true; + } + + streamCache.Remove(streamKeysCache[front]); + streamCache.Add(key, stream); + // add to circular array where we removed the oldest stream + streamKeysCache[front] = key; + front = (front + 1) % DefaultCacheSize; + // we don't need to update cachedStreamsCount since we added and removed a stream + return true; + + } + } +} \ No newline at end of file diff --git a/libs/server/Stream/Stream.cs b/libs/server/Stream/Stream.cs new file mode 100644 index 00000000000..ee29b318f2d --- /dev/null +++ b/libs/server/Stream/Stream.cs @@ -0,0 +1,568 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using Tsavorite.core; +using Garnet.server.BTreeIndex; +using Garnet.common; +using System.Diagnostics; +using System.Buffers; +using System.Runtime.CompilerServices; +using System.Buffers.Binary; + +namespace Garnet.server +{ + public class StreamObject : IDisposable + { + readonly IDevice device; + readonly TsavoriteLog log; + readonly BTree index; + StreamID lastId; + long totalEntriesAdded; + SingleWriterMultiReaderLock _lock; + + /// + /// Constructor + /// + /// Directory where the log will be stored + /// Page size of the log used for the stream + public StreamObject(string logDir, long pageSize, long memorySize, int safeTailRefreshFreqMs) + { + device = logDir == null ? new NullDevice() : Devices.CreateLogDevice("streamLogs/" + logDir + "/streamLog", preallocateFile: false); + log = new TsavoriteLog(new TsavoriteLogSettings { LogDevice = device, PageSize = pageSize, MemorySize = memorySize, SafeTailRefreshFrequencyMs = safeTailRefreshFreqMs }); + index = new BTree(device.SectorSize); + totalEntriesAdded = 0; + lastId = default; + _lock = new SingleWriterMultiReaderLock(); + } + + /// + /// Increment the stream ID + /// + /// carries the incremented stream id + public void IncrementID(ref StreamID incrementedID) + { + var originalMs = lastId.getMS(); + var originalSeq = lastId.getSeq(); + + if (originalMs == long.MaxValue) + { + incrementedID = default; + return; + } + + var newMs = originalMs; + var newSeq = originalSeq + 1; + + // if seq overflows, increment timestamp and reset seq + if (newSeq == 0) + { + newMs += 1; + newSeq = 0; + } + + incrementedID.setMS(newMs); + incrementedID.setSeq(newSeq); + + } + + /// + /// Generate the next stream ID + /// + /// StreamID generated + public unsafe void GenerateNextID(ref StreamID id) + { + ulong timestamp = (ulong)Stopwatch.GetTimestamp() / (ulong)(Stopwatch.Frequency / 1000); + + // read existing timestamp in big endian format + var lastTs = lastId.getMS(); + // if this is the first entry or timestamp is greater than last added entry + if (totalEntriesAdded == 0 || timestamp > lastTs) + { + // this will write timestamp in big endian format + id.setMS(timestamp); + id.setSeq(0); + return; + } + // if timestamp is same as last added entry, increment the sequence number + // if seq overflows, increment timestamp and reset the sequence number + IncrementID(ref id); + } + + // TODO: implement this using parseState functions without operating with RespReadUtils + unsafe bool parseIDString(ArgSlice idSlice, ref StreamID id) + { + // if we have to auto-generate the whole ID + if (*idSlice.ptr == '*' && idSlice.length == 1) + { + GenerateNextID(ref id); + return true; + } + + var lastIdDecodedTs = lastId.getMS(); + + // parse user-defined ID + // can be of following formats: + // 1. ts (seq = 0) + // 2. ts-* (auto-generate seq number) + // 3. ts-seq + + // last character is a * + if (*(idSlice.ptr + idSlice.length - 1) == '*') + { + // has to be of format ts-*, check if '-' is the preceding character + if (*(idSlice.ptr + idSlice.length - 2) != '-') + { + return false; + } + // parse the timestamp + // slice the id to remove the last two characters + var slicedId = new ArgSlice(idSlice.ptr, idSlice.length - 2); + var idEnd = idSlice.ptr + idSlice.length - 2; + if (!RespReadUtils.ReadUlong(out ulong timestamp, ref idSlice.ptr, idEnd)) + { + return false; + } + + // check if timestamp is greater than last added entry's decoded ts + if (totalEntriesAdded != 0 && timestamp < lastIdDecodedTs) + { + return false; + } + else if (totalEntriesAdded != 0 && timestamp == lastIdDecodedTs) + { + IncrementID(ref id); + } + else + { + id.setMS(timestamp); + id.setSeq(0); + } + } + else + { + // find index of '-' in the id + int index = -1; + for (int i = 0; i < idSlice.length; i++) + { + if (*(idSlice.ptr + i) == '-') + { + index = i; + break; + } + } + // if '-' is not found, format should be just ts + if (index == -1) + { + if (!RespReadUtils.ReadUlong(out ulong timestamp, ref idSlice.ptr, idSlice.ptr + idSlice.length)) + { + return false; + } + // check if timestamp is greater than last added entry + if (totalEntriesAdded != 0 && timestamp < lastIdDecodedTs) + { + return false; + } + else if (totalEntriesAdded != 0 && timestamp == lastIdDecodedTs) + { + IncrementID(ref id); + } + else + { + id.setMS(timestamp); + id.setSeq(0); + } + } + else + { + // parse the timestamp + // slice the id to remove everything after '-' + var slicedId = new ArgSlice(idSlice.ptr, index); + var slicedSeq = new ArgSlice(idSlice.ptr + index + 1, idSlice.length - index - 1); + if (!RespReadUtils.ReadUlong(out ulong timestamp, ref idSlice.ptr, idSlice.ptr + index)) + { + return false; + } + var seqBegin = idSlice.ptr + index + 1; + var seqEnd = idSlice.ptr + idSlice.length; + if (!RespReadUtils.ReadUlong(out ulong seq, ref seqBegin, seqEnd)) + { + return false; + } + + if (totalEntriesAdded != 0 && timestamp < lastIdDecodedTs) + { + return false; + } + else if (totalEntriesAdded != 0 && timestamp == lastIdDecodedTs) + { + if (seq <= lastId.seq) + { + return false; + } + } + // use ID and seq given by user + // encode while storing + id.setMS(timestamp); + id.setSeq(seq); + } + } + + return true; + } + + /// + /// Adds an entry or item to the stream + /// + /// byte array of the entry to store in the stream + /// True if entry is added successfully + public unsafe void AddEntry(byte* value, int valueLength, ArgSlice idSlice, int numPairs, ref SpanByteAndMemory output) + { + byte* ptr = output.SpanByte.ToPointer(); + var curr = ptr; + var end = curr + output.Length; + MemoryHandle ptrHandle = default; + bool isMemory = false; + byte* tmpPtr = null; + StreamID id = default; + // take a lock to ensure thread safety + _lock.WriteLock(); + try + { + bool canParseID = parseIDString(idSlice, ref id); + if (!canParseID) + { + while (!RespWriteUtils.TryWriteError("ERR Syntax", ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + + // add the entry to the log + { + bool enqueueInLog = log.TryEnqueueStreamEntry(id.idBytes, sizeof(StreamID), numPairs, value, valueLength, out long retAddress); + if (!enqueueInLog) + { + while (!RespWriteUtils.TryWriteError("ERR StreamAdd failed", ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + + var streamValue = new Value((ulong)retAddress); + + bool added = index.Insert((byte*)Unsafe.AsPointer(ref id.idBytes[0]), streamValue); + // bool added = true; + if (!added) + { + while (!RespWriteUtils.TryWriteError("ERR StreamAdd failed", ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + return; + } + // copy encoded ms and seq + lastId.ms = (id.ms); + lastId.seq = (id.seq); + + totalEntriesAdded++; + // write back the decoded ID of the entry added + string idString = $"{id.getMS()}-{id.getSeq()}"; + while (!RespWriteUtils.TryWriteSimpleString(idString, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + } + } + finally + { + // log.Commit(); + + if (isMemory) ptrHandle.Dispose(); + output.Length = (int)(curr - ptr) + sizeof(ObjectOutputHeader); + _lock.WriteUnlock(); + + } + + } + + /// + /// Get current length of the stream (number of entries in the stream) + /// + /// length of stream + public ulong Length() + { + ulong len = 0; + _lock.ReadLock(); + try + { + // get length of the stream from the index excluding tombstones + len = index.ValidCount; + } + finally + { + _lock.ReadUnlock(); + } + return len; + } + + /// + /// Deletes an entry fromt the stream + /// + /// id of the stream entry to delete + /// true if entry was deleted successfully + public unsafe bool DeleteEntry(ArgSlice idSlice) + { + // first parse the idString + if (!parseCompleteID(idSlice, out StreamID entryID)) + { + return false; + } + bool deleted = false; + // take a lock to delete from the index + _lock.WriteLock(); + try + { + deleted = index.Delete((byte*)Unsafe.AsPointer(ref entryID.idBytes[0])); + } + finally + { + _lock.WriteUnlock(); + } + return deleted; + } + + public bool ParseCompleteStreamIDFromString(string idString, out StreamID id) + { + id = default; + string[] parts = idString.Split('-'); + if (parts.Length != 2) + { + return false; + } + if (!ulong.TryParse(parts[0], out ulong timestamp)) + { + return false; + } + if (!ulong.TryParse(parts[1], out ulong seq)) + { + return false; + } + + id.setMS(timestamp); + id.setSeq(seq); + return true; + } + + public bool ParseStreamIDFromString(string idString, out StreamID id) + { + id = default; + if (idString == "-" || idString == "+") + { + return false; + } + if (!idString.Contains('-')) + { + + if (!ulong.TryParse(idString, out ulong ms)) + { + return false; + } + id.setMS(ms); + id.setSeq(0); + return true; + } + return ParseCompleteStreamIDFromString(idString, out id); + } + + /// + /// Read entries from the stream from given range + /// + /// start of range + /// end of range + /// threshold to scanning + /// + public unsafe void ReadRange(string min, string max, int limit, ref SpanByteAndMemory output) + { + _lock.ReadLock(); + try + { + if (index.Count() == 0) + { + return; + } + + long startAddr, endAddr; + StreamID startID, endID; + if (min == "-") + { + byte[] idBytes = index.First().Key; + startID = new StreamID(idBytes); + } + else if (!ParseStreamIDFromString(min, out startID)) + { + return; + } + if (max == "+") + { + byte[] idBytes = index.Last().Key; + endID = new StreamID(idBytes); + } + else + { + if (!ParseStreamIDFromString(max, out endID)) + { + return; + } + //endID.seq = long.MaxValue; + endID.setSeq(long.MaxValue); + } + + int count = index.Get((byte*)Unsafe.AsPointer(ref startID.idBytes[0]), (byte*)Unsafe.AsPointer(ref endID.idBytes[0]), out Value startVal, out Value endVal, out var tombstones, limit); + startAddr = (long)startVal.address; + endAddr = (long)endVal.address + 1; + + byte* ptr = output.SpanByte.ToPointer(); + var curr = ptr; + var end = curr + output.Length; + MemoryHandle ptrHandle = default; + bool isMemory = false; + byte* tmpPtr = null; + int tmpSize = 0; + long readCount = 0; + + try + { + using (var iter = log.Scan(startAddr, endAddr, scanUncommitted: true)) + { + + // write length of how many entries we will print out + while (!RespWriteUtils.TryWriteArrayLength(count, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + byte* e; + while (iter.GetNext(out var entry, out _, out long currentAddress, out long nextAddress)) + { + + var current = new Value((ulong)currentAddress); + // check if any tombstone t.address matches current + var tombstoneFound = false; + foreach (var tombstone in tombstones) + { + if (tombstone.address == current.address) + { + tombstoneFound = true; + break; + } + } + if (tombstoneFound) + { + continue; + } + + var entryBytes = entry.AsSpan(); + // check if the entry is actually one of the qualified keys + // parse ID for the entry which is the first 16 bytes + var idBytes = entryBytes.Slice(0, 16); + var ts = BinaryPrimitives.ReadUInt64BigEndian(idBytes.Slice(0, 8)); + var seq = BinaryPrimitives.ReadUInt64BigEndian(idBytes.Slice(8, 8)); + // var ts = BitConverter.ToUInt64(idBytes.Slice(0, 8)); + // var seq = BitConverter.ToUInt64(idBytes.Slice(8, 8)); + string idString = $"{ts}-{seq}"; + Span numPairsBytes = entryBytes.Slice(16, 4); + int numPairs = BitConverter.ToInt32(numPairsBytes); + Span value = entryBytes.Slice(20); + + // we can already write back the ID that we read + while (!RespWriteUtils.TryWriteArrayLength(2, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + if (!RespWriteUtils.TryWriteSimpleString(idString, ref curr, end)) + { + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + } + + // print array length for the number of key-value pairs in the entry + while (!RespWriteUtils.TryWriteArrayLength(numPairs, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + + // write key-value pairs + fixed (byte* p = value) + { + e = p; + int read = 0; + read += (int)(e - p); + while (value.Length - read >= 4) + { + var orig = e; + if (!RespReadUtils.TryReadPtrWithLengthHeader(ref tmpPtr, ref tmpSize, ref e, e + entry.Length)) + { + return; + } + var o = new Span(tmpPtr, tmpSize).ToArray(); + while (!RespWriteUtils.TryWriteBulkString(o, ref curr, end)) + ObjectUtils.ReallocateOutput(ref output, ref isMemory, ref ptr, ref ptrHandle, ref curr, ref end); + read += (int)(e - orig); + } + } + readCount++; + if (limit != -1 && readCount == limit) + { + break; + } + } + } + } + finally + { + if (isMemory) ptrHandle.Dispose(); + output.Length = (int)(curr - ptr) + sizeof(ObjectOutputHeader); + } + } + finally + { + _lock.ReadUnlock(); + } + } + + + unsafe bool parseCompleteID(ArgSlice idSlice, out StreamID streamID) + { + streamID = default; + // complete ID is of the format ts-seq in input where both ts and seq are ulong + // find the index of '-' in the id + int index = -1; + for (int i = 0; i < idSlice.length; i++) + { + if (*(idSlice.ptr + i) == '-') + { + index = i; + break; + } + } + // parse the timestamp + if (!RespReadUtils.ReadUlong(out ulong timestamp, ref idSlice.ptr, idSlice.ptr + index)) + { + return false; + } + + // after reading the timestamp, the pointer will be at the '-' character + var seqBegin = idSlice.ptr + 1; + // parse the sequence number + if (!RespReadUtils.ReadUlong(out ulong seq, ref seqBegin, idSlice.ptr + idSlice.length - 1)) + { + return false; + } + streamID.setMS(timestamp); + streamID.setSeq(seq); + return true; + } + + /// + public void Dispose() + { + try + { + log.Dispose(); + device.Dispose(); + } + finally + { + + } + } + } +} \ No newline at end of file diff --git a/libs/server/Stream/StreamID.cs b/libs/server/Stream/StreamID.cs new file mode 100644 index 00000000000..a2ab0a99fb9 --- /dev/null +++ b/libs/server/Stream/StreamID.cs @@ -0,0 +1,64 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Buffers.Binary; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +namespace Garnet.server +{ + /// + /// Represents a GarnetStreamID, which is a 128-bit identifier for an entry in a stream. + /// + [StructLayout(LayoutKind.Explicit)] + public unsafe struct StreamID + { + [FieldOffset(0)] + public ulong ms; + [FieldOffset(8)] + public ulong seq; + [FieldOffset(0)] + public fixed byte idBytes[16]; + + public StreamID(ulong ms, ulong seq) + { + BinaryPrimitives.WriteUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.ms), 8), ms); + BinaryPrimitives.WriteUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.seq), 8), seq); + } + public void setMS(ulong ms) + { + BinaryPrimitives.WriteUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.ms), 8), ms); + } + + public void setSeq(ulong seq) + { + BinaryPrimitives.WriteUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.seq), 8), seq); + } + + public ulong getMS() + { + return BinaryPrimitives.ReadUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.ms), 8)); + } + + public ulong getSeq() + { + return BinaryPrimitives.ReadUInt64BigEndian(new Span(Unsafe.AsPointer(ref this.seq), 8)); + } + + public unsafe StreamID(byte[] inputBytes) + { + if (inputBytes.Length != 16) + { + throw new ArgumentException("idBytes must be 16 bytes"); + } + + fixed (byte* idBytesPtr = idBytes) + { + var sourceSpan = new ReadOnlySpan(inputBytes); + var destinationSpan = new Span(idBytesPtr, 16); + sourceSpan.CopyTo(destinationSpan); + } + } + } +} \ No newline at end of file diff --git a/libs/server/Stream/StreamManager.cs b/libs/server/Stream/StreamManager.cs new file mode 100644 index 00000000000..7d8574ff7a7 --- /dev/null +++ b/libs/server/Stream/StreamManager.cs @@ -0,0 +1,194 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using Garnet.common; +using Tsavorite.core; + +namespace Garnet.server +{ + public sealed class StreamManager : IDisposable + { + private Dictionary streams; + long defPageSize; + long defMemorySize; + int safeTailRefreshFreqMs; + + SingleWriterMultiReaderLock _lock = new SingleWriterMultiReaderLock(); + + public StreamManager(long pageSize, long memorySize, int safeTailRefreshFreqMs) + { + streams = new Dictionary(ByteArrayComparer.Instance); + defPageSize = pageSize; + defMemorySize = memorySize; + this.safeTailRefreshFreqMs = safeTailRefreshFreqMs; + } + + /// + /// Add a new entry to the stream + /// + /// key/name of the stream + /// id of the stream entry + /// payload to the stream + /// length of payload to the stream + /// # k-v pairs in the payload + /// + /// key of last stream accessed (for cache) + /// reference to last stream accessed (for cache) + public unsafe void StreamAdd(ArgSlice keySlice, ArgSlice idSlice, byte* value, int valueLength, int numPairs, ref SpanByteAndMemory output, out byte[] streamKey, out StreamObject lastStream) + { + // copy key store this key in the dictionary + byte[] key = new byte[keySlice.Length]; + fixed (byte* keyPtr = key) + Buffer.MemoryCopy(keySlice.ptr, keyPtr, keySlice.Length, keySlice.Length); + bool foundStream = false; + StreamObject stream; + lastStream = null; + streamKey = null; + _lock.ReadLock(); + try + { + foundStream = streams.TryGetValue(key, out stream); + if (foundStream) + { + stream.AddEntry(value, valueLength, idSlice, numPairs, ref output); + // update last accessed stream key + lastStream = stream; + streamKey = key; + } + } + finally + { + _lock.ReadUnlock(); + } + if (foundStream) + { + return; + } + // take a write lock + _lock.WriteLock(); + try + { + // retry to validate if some other thread has created the stream + foundStream = streams.TryGetValue(key, out stream); + if (!foundStream) + { + // stream was not found with this key so create a new one + StreamObject newStream = new StreamObject(null, defPageSize, defMemorySize, safeTailRefreshFreqMs); + newStream.AddEntry(value, valueLength, idSlice, numPairs, ref output); + streams.TryAdd(key, newStream); + streamKey = key; + lastStream = newStream; + } + else + { + stream.AddEntry(value, valueLength, idSlice, numPairs, ref output); + lastStream = stream; + streamKey = key; + } + } + finally + { + _lock.WriteUnlock(); + } + return; + } + + /// + /// Get the length of a particular stream + /// + /// key of the stream we want to obtain the length + /// length of the stream + public unsafe ulong StreamLength(ArgSlice keySlice) + { + var key = keySlice.ToArray(); + if (streams != null) + { + bool foundStream = streams.TryGetValue(key, out StreamObject stream); + if (foundStream) + { + return stream.Length(); + } + else + { + // return 0 if stream does not exist, as if it was empty + return 0; + } + } + return 0; + } + + /// + /// Perform range scan in a stream + /// + /// key/name of stream + /// start of range + /// end of range + /// threshold to limit scanning + /// + public unsafe bool StreamRange(ArgSlice keySlice, string start, string end, int count, ref SpanByteAndMemory output) + { + var key = keySlice.ToArray(); + if (streams != null && streams.Count > 0) + { + bool foundStream = streams.TryGetValue(key, out StreamObject stream); + if (foundStream) + { + stream.ReadRange(start, end, count, ref output); + return true; + } + } + return false; + } + + /// + /// Delete an entry from a stream + /// + /// key/name of stream to delete + /// id of stream entry to delete + /// last accessed stream in cache + /// + public bool StreamDelete(ArgSlice keySlice, ArgSlice idSlice, out StreamObject lastSeenStream) + { + bool foundStream; + var key = keySlice.ToArray(); + StreamObject stream; + lastSeenStream = null; + if (streams != null) + { + foundStream = streams.TryGetValue(key, out stream); + + if (foundStream) + { + lastSeenStream = stream; + return stream.DeleteEntry(idSlice); + } + } + return false; + } + + /// + public void Dispose() + { + if (streams != null) + { + _lock.WriteLock(); + try + { + foreach (var stream in streams.Values) + { + stream.Dispose(); + } + + streams.Clear(); + } + finally + { + _lock.WriteUnlock(); + } + } + + } + } +} \ No newline at end of file diff --git a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs index 9cacbed79a9..70490cdf775 100644 --- a/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs +++ b/libs/storage/Tsavorite/cs/src/core/TsavoriteLog/TsavoriteLog.cs @@ -844,6 +844,45 @@ public unsafe bool TryEnqueue(ReadOnlySpan entry, out long logicalAddress) return true; } + public unsafe bool TryEnqueueStreamEntry(byte* id, int idLength, int numPairs, byte* entry, int entryLength, out long logicalAddress) + { + logicalAddress = 0; + var length = idLength + sizeof(int) + entryLength; + int allocatedLength = headerSize + Align(length); + ValidateAllocatedLength(allocatedLength); + + epoch.Resume(); + + if (commitNum == long.MaxValue) throw new TsavoriteException("Attempting to enqueue into a completed log"); + + logicalAddress = allocator.TryAllocateRetryNow(allocatedLength); + if (logicalAddress == 0) + { + epoch.Suspend(); + if (cannedException != null) throw cannedException; + return false; + } + + var physicalAddress = allocator.GetPhysicalAddress(logicalAddress); + // start writing + // first we copy the id + *(long*)(headerSize + physicalAddress) = *(long*)id; + *(long*)(headerSize + physicalAddress + 8) = *(long*)(id + sizeof(long)); + // Buffer.MemoryCopy(id, (void*)(headerSize + physicalAddress), idLength, idLength); + // then we copy the number of pairs + // Buffer.MemoryCopy(numPairsBytes, (void*)(headerSize + physicalAddress + idLength), numPairsBytesLength, numPairsBytesLength); + *(int*)(headerSize + physicalAddress + idLength) = numPairs; + // then we copy the entry + Buffer.MemoryCopy(entry, (void*)(headerSize + physicalAddress + idLength + sizeof(int)), entryLength, entryLength); + + SetHeader(length, (byte*)physicalAddress); + safeTailRefreshEntryEnqueued?.Signal(); + epoch.Suspend(); + if (AutoCommit) Commit(); + return true; + } + + /// /// Append a user-defined blittable struct header atomically to the log. /// diff --git a/playground/BTree/Btree.csproj b/playground/BTree/Btree.csproj new file mode 100644 index 00000000000..b4678381019 --- /dev/null +++ b/playground/BTree/Btree.csproj @@ -0,0 +1,18 @@ + + + + Exe + net8.0 + enable + enable + + + + + + + + + + + diff --git a/playground/BTree/Program.cs b/playground/BTree/Program.cs new file mode 100644 index 00000000000..8655cb50bff --- /dev/null +++ b/playground/BTree/Program.cs @@ -0,0 +1,156 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System.Diagnostics; +using System.Runtime.CompilerServices; +using Garnet.server; +using Garnet.server.BTreeIndex; +class Program +{ + /// + /// Playground for the B+tree index implementation + /// + /// + /// + static unsafe void Main(string[] args) + { + var tree = new BTree((uint)BTreeNode.PAGE_SIZE); + ulong N = 400000; + bool verbose = true; + if (args.Length > 0) + { + for (int i = 0; i < args.Length; i++) + { + if (args[i] == "--verb") + { + verbose = true; + } + else if (args[i] == "-N") + { + N = ulong.Parse(args[i + 1]); + break; + } + } + } + StreamID[] streamIDs = new StreamID[N]; + long duration = 0; + long dur2 = 0; + for (ulong i = 0; i < N; i++) + { + StreamID x = new StreamID(i + 1, 0); + Debug.Assert(x.ms > 0); + streamIDs[i] = x; + } + long start = Stopwatch.GetTimestamp(); + Stopwatch sw = new Stopwatch(); + sw.Start(); + for (ulong i = 0; i < N; i++) + { + tree.Insert((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0]), new Value(i + 1)); + var value = tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0])); + Debug.Assert(value.address == i + 1); + } + sw.Stop(); + dur2 = sw.ElapsedTicks; + duration += Stopwatch.GetTimestamp() - start; + Console.WriteLine(" Number of Fast Inserts = " + tree.FastInserts); + double nanosecondsPerTick = (1_000_000_000.0) / Stopwatch.Frequency; + if (verbose) + { + Console.WriteLine("Insertion done"); + Console.WriteLine(" Number of Fast Inserts = " + tree.FastInserts); + Console.WriteLine("Number of Leaves = " + tree.LeafCount); + Console.WriteLine("Number of Internal Nodes = " + tree.InternalCount); + Console.WriteLine("Time for insertion = " + (double)dur2 * nanosecondsPerTick + " ns"); + } + long insertion_time = (long)(dur2 * nanosecondsPerTick); + sw.Reset(); + + // point lookups + sw.Start(); + for (ulong i = 0; i < N; i++) + { + var value = tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0])); + Debug.Assert(value.address == i + 1); + } + sw.Stop(); + long query_time = (long)(sw.ElapsedTicks * nanosecondsPerTick); + if (verbose) + { + Console.WriteLine("Time for querying = " + query_time + " ns"); + } + sw.Reset(); + Console.WriteLine("All inserted keys found"); + + // forward range query + double[] selectivities = [0.01, 0.05, 0.1]; + long[] range_query_times = new long[selectivities.Length]; + Value[] startVal = new Value[selectivities.Length]; + Value[] endVal = new Value[selectivities.Length]; + List[] list = new List[selectivities.Length]; + for (int i = 0; i < selectivities.Length; i++) + { + double selectivity = selectivities[i]; + ulong startIdx, endIdx; + do + { + // get a random start index from 0 to N + startIdx = (ulong)new Random().Next(0, (int)N); + endIdx = (ulong)(startIdx + (N * selectivity)); + } while (endIdx >= N); + sw.Start(); + var count = tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[startIdx].idBytes[0]), (byte*)Unsafe.AsPointer(ref streamIDs[endIdx].idBytes[0]), out startVal[i], out endVal[i], out list[i]); + Debug.Assert(count == (int)(endIdx - startIdx + 1)); + sw.Stop(); + range_query_times[i] = (long)(sw.ElapsedTicks * nanosecondsPerTick); + if (verbose) + { + Console.WriteLine("Time for range query " + (i + 1) + " = " + range_query_times[i] + " ns"); + } + sw.Reset(); + } + if (verbose) + Console.WriteLine("Range query check passed "); + + // now let's delete some keys + sw.Reset(); + int num_deletes = 100; + int num_successfully_deleted = 0; + for (int i = 0; i < num_deletes; i++) + { + // generate a random index to delete + int idx = new Random().Next(0, (int)N); + sw.Start(); + bool val = false; + // bool val = tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[idx].idBytes[0])); + sw.Stop(); + if (val) + { + num_successfully_deleted++; + } + } + long deleteTime = (long)(sw.ElapsedTicks * nanosecondsPerTick); + if (verbose) + { + Console.WriteLine("Number of keys deleted = " + num_successfully_deleted); + Console.WriteLine("Time for deletion = " + deleteTime + " ns"); + } + + tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[N - 400].idBytes[0])); + tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[N - 300].idBytes[0])); + tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[N - 200].idBytes[0])); + tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[N - 100].idBytes[0])); + + // do a range query to check again + tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[N - 500].idBytes[0]), (byte*)Unsafe.AsPointer(ref streamIDs[N - 1].idBytes[0]), out Value startVal1, out Value endVal1, out List tombstones); + Debug.Assert(tombstones.Count == 4); + Console.WriteLine("Delete check passed "); + + // print all times collected in a csv format + Console.WriteLine(insertion_time + ", " + query_time + ", " + range_query_times[0] + ", " + range_query_times[1] + ", " + range_query_times[2] + ", " + deleteTime); + tree.Deallocate(); + Console.WriteLine("Num allocates = " + tree.stats.numAllocates); + Console.WriteLine("Num deallocates = " + tree.stats.numDeallocates); + Console.WriteLine("All checks passed"); + } +} \ No newline at end of file diff --git a/playground/CommandInfoUpdater/SupportedCommand.cs b/playground/CommandInfoUpdater/SupportedCommand.cs index 9163630282a..64c64a1b12e 100644 --- a/playground/CommandInfoUpdater/SupportedCommand.cs +++ b/playground/CommandInfoUpdater/SupportedCommand.cs @@ -303,6 +303,10 @@ public class SupportedCommand new("WATCH", RespCommand.WATCH), new("WATCHMS", RespCommand.WATCHMS), new("WATCHOS", RespCommand.WATCHOS), + new("XADD", RespCommand.XADD), + new("XDEL", RespCommand.XDEL), + new("XLEN", RespCommand.XLEN), + new("XRANGE", RespCommand.XRANGE), new("ZADD", RespCommand.ZADD), new("ZCARD", RespCommand.ZCARD), new("ZCOUNT", RespCommand.ZCOUNT), diff --git a/test/Garnet.test/BTreeTests.cs b/test/Garnet.test/BTreeTests.cs new file mode 100644 index 00000000000..c41cd5d9754 --- /dev/null +++ b/test/Garnet.test/BTreeTests.cs @@ -0,0 +1,135 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; +using Garnet.server; +using Garnet.server.BTreeIndex; +using NUnit.Framework; +using NUnit.Framework.Legacy; + +namespace Garnet.test +{ + using Value = Value; + + [TestFixture] + public unsafe class BTreeTests + { + static StreamID[] streamIDs; + static ulong N = 50000; + + [SetUp] + public void Setup() + { + streamIDs = new StreamID[N]; + for (ulong i = 0; i < N; i++) + { + streamIDs[i] = new StreamID(i + 1, 0); + } + } + + [TearDown] + public void TearDown() + { } + + [Test] + [Category("INIT")] + public void InitBTreeLeafNode() + { + // var memoryBlock = (IntPtr*)Marshal.AllocHGlobal(BTreeNode.PAGE_SIZE).ToPointer(); + var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE); + var leaf = BTreeNode.Create(BTreeNodeType.Leaf, memoryBlock); + ClassicAssert.AreEqual(leaf->info->type, BTreeNodeType.Leaf); + ClassicAssert.AreEqual(leaf->info->count, 0); + + // free the leaf + BTree.FreeNode(ref leaf); + + leaf = null; + } + + [Test] + [Category("INSERT")] + public void Insert() + { + var tree = new BTree((uint)BTreeNode.PAGE_SIZE); + ClassicAssert.AreEqual(tree.FastInserts, 0); + ClassicAssert.AreEqual(tree.LeafCount, 1); + ClassicAssert.AreEqual(tree.InternalCount, 0); + + for (ulong i = 0; i < N; i++) + { + tree.Insert((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0]), new Value(i + 1)); + } + ClassicAssert.AreEqual(tree.FastInserts, N); + tree.Deallocate(); + } + + [Test] + [Category("LOOKUP")] + public void PointLookup() + { + var tree = new BTree((uint)BTreeNode.PAGE_SIZE); + + for (ulong i = 0; i < N; i++) + { + tree.Insert((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0]), new Value(streamIDs[i].ms)); + } + + for (ulong i = 0; i < N; i++) + { + ClassicAssert.AreEqual(tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0])).address, streamIDs[i].ms); + } + + tree.Deallocate(); + } + + [Test] + [Category("LOOKUP")] + public void RangeLookup() + { + var tree = new BTree(4096); + + for (ulong i = 0; i < N; i++) + { + tree.Insert((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0]), new Value(streamIDs[i].ms)); + } + + int count = tree.Get((byte*)Unsafe.AsPointer(ref streamIDs[N - 200].idBytes[0]), (byte*)Unsafe.AsPointer(ref streamIDs[N - 1].idBytes[0]), out Value startVal, out Value endVal, out List list); + ClassicAssert.AreEqual(count, N - 1 - (N - 200) + 1); + ClassicAssert.AreEqual(list.Count, 0); + ClassicAssert.AreEqual(startVal.address, streamIDs[N - 200].ms); + ClassicAssert.AreEqual(endVal.address, streamIDs[N - 1].ms); + + tree.Deallocate(); + } + + [Test] + [Category("Delete")] + public void Delete() + { + var tree = new BTree((uint)BTreeNode.PAGE_SIZE); + for (ulong i = 0; i < N; i++) + { + tree.Insert((byte*)Unsafe.AsPointer(ref streamIDs[i].idBytes[0]), new Value(streamIDs[i].ms)); + } + + // delete 10% of keys at random + Random rand = new Random(); + uint delCount = 0; + for (ulong i = 0; i < N / 10; i++) + { + ulong idx = (ulong)rand.Next(0, (int)N); + bool deleted = tree.Delete((byte*)Unsafe.AsPointer(ref streamIDs[idx].idBytes[0])); + if (deleted) + { + delCount++; + } + } + ClassicAssert.AreEqual(tree.ValidCount, N - delCount); + tree.Deallocate(); + } + } +} diff --git a/test/Garnet.test/Resp/ACL/RespCommandTests.cs b/test/Garnet.test/Resp/ACL/RespCommandTests.cs index 1a2e8924b3d..1d431154762 100644 --- a/test/Garnet.test/Resp/ACL/RespCommandTests.cs +++ b/test/Garnet.test/Resp/ACL/RespCommandTests.cs @@ -6346,6 +6346,67 @@ static async Task DoGeoSearchStoreAsync(GarnetClient client) } } + [Test] + public async Task XADDACLsAsync() + { + int count = 0; + await CheckCommandsAsync( + "XADD", + [DoXAddAsync] + ); + + async Task DoXAddAsync(GarnetClient client) + { + string val = await client.ExecuteForStringResultAsync("XADD", ["foo", "*", $"bar--{count}", "fizz"]); + ClassicAssert.IsNotNull(val); + } + } + + [Test] + public async Task XLENACLsAsync() + { + await CheckCommandsAsync( + "XLEN", + [DoXLenAsync] + ); + + async Task DoXLenAsync(GarnetClient client) + { + long val = await client.ExecuteForLongResultAsync("XLEN", ["foo"]); + ClassicAssert.AreEqual(0, val); + } + } + + [Test] + public async Task XRangeACLsAsync() + { + await CheckCommandsAsync( + "XRANGE", + [DoXRangeAsync] + ); + + async Task DoXRangeAsync(GarnetClient client) + { + var val = await client.ExecuteForStringArrayResultAsync("XRANGE", ["foo", "-", "+"]); + ClassicAssert.AreEqual(0, val.Length); + } + } + + [Test] + public async Task XDELACLsAsync() + { + await CheckCommandsAsync( + "XDEL", + [DoXDelAsync] + ); + + async Task DoXDelAsync(GarnetClient client) + { + long val = await client.ExecuteForLongResultAsync("XDEL", ["foo", "1"]); + ClassicAssert.AreEqual(0, val); + } + } + [Test] public async Task ZAddACLsAsync() { diff --git a/test/Garnet.test/RespStreamTests.cs b/test/Garnet.test/RespStreamTests.cs new file mode 100644 index 00000000000..78c636e7a9e --- /dev/null +++ b/test/Garnet.test/RespStreamTests.cs @@ -0,0 +1,210 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +using System; +using System.Collections.Generic; +using System.Globalization; +using System.Linq; +using System.Text; +using System.Threading.Tasks; +using Embedded.server; +using Garnet.common; +using Garnet.server; +using NUnit.Framework; +using NUnit.Framework.Legacy; +using StackExchange.Redis; +using Tsavorite.core; + +namespace Garnet.test +{ + [TestFixture] + public class RespStreamTests + { + protected GarnetServer server; + const string chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"; + Random random; + static ulong N = 5; + + [SetUp] + public void Setup() + { + TestUtils.DeleteDirectory(TestUtils.MethodTestDir, wait: true); + server = TestUtils.CreateGarnetServer(TestUtils.MethodTestDir, lowMemory: true); + server.Start(); + random = new Random(); + + // write to one stream to test for range scans + var streamKey = "rangeScan"; + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + for (ulong i = 0; i < N; i++) + { + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue); + } + } + + [TearDown] + public void TearDown() + { + server.Dispose(); + TestUtils.DeleteDirectory(TestUtils.MethodTestDir); + } + + public string GenerateRandomString(int length) + { + return new string(Enumerable.Repeat(chars, length) + .Select(s => s[random.Next(s.Length)]).ToArray()); + } + + #region STREAMIDTests + [Test] + public void StreamAddAutoGenIdTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "add"; + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue); + ClassicAssert.IsTrue(retId.ToString().Contains("-")); + } + + [Test] + public void StreamAddUserDefinedTsTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "addTs"; + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue, $"{1}"); + ClassicAssert.IsTrue(retId.ToString().Contains("-")); + } + + [Test] + public void StreamAddUserDefinedIdTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "addId"; + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue, $"{1}-0"); + ClassicAssert.IsTrue(retId.ToString().Contains("-")); + } + #endregion + + #region STREAMOperationsTests + [Test] + public void StreamAddAndLengthTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "length"; + var count = 0; + for (ulong i = 0; i < N; i++) + { + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue); + count++; + } + ClassicAssert.AreEqual(count, N); + + var length = db.StreamLength(streamKey); + ClassicAssert.AreEqual(length, N); + } + + [Test] + public void StreamRangeExistingTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + var streamKey = "rangeScan"; + var range = db.StreamRange(streamKey, "-", "+"); + ClassicAssert.AreEqual(range.Length, N); + } + + [Test] + public void StreamRangeNonExistingTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + var streamKey = "nonExistingRangeScan"; + var range = db.StreamRange(streamKey, "-", "+"); + ClassicAssert.AreEqual(range.Length, 0); + } + + [Test] + public void StreamRangeWithCountTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + var streamKey = "rangeScan"; + int limit = 2; + var range = db.StreamRange(streamKey, "-", "+", limit); + ClassicAssert.AreEqual(range.Length, limit); + } + + [Test] + public void StreamDeleteSingleTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "delOne"; + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue, $"{1}-0"); + + var delCount = db.StreamDelete(streamKey, [retId]); + ClassicAssert.AreEqual(delCount, 1); + } + + [Test] + [Category("Delete")] + public void StreamDeleteMultipleTest() + { + using var redis = ConnectionMultiplexer.Connect(TestUtils.GetConfig()); + var db = redis.GetDatabase(0); + + var streamKey = "delMultiple"; + var count = 0; + for (ulong i = 0; i < N; i++) + { + var entryKey = GenerateRandomString(4); // generate random ascii string of length 4 + var entryValue = GenerateRandomString(4); // generate random ascii string of length 4 + var retId = db.StreamAdd(streamKey, entryKey, entryValue, $"{i + 1}-0"); + count++; + } + ClassicAssert.AreEqual(count, N); + + // Pick arbitrary 2 unique indices between 0 and N and store each index in a set + int numToDelete = 2; + var indices = new HashSet(); + while (indices.Count < numToDelete) + { + indices.Add(random.Next(0, (int)N)); + } + + var eIds = new RedisValue[numToDelete]; + int c = 0; + foreach (var idx in indices) + { + eIds[c++] = $"{idx + 1}-0"; + } + + var delCount = db.StreamDelete(streamKey, eIds); + ClassicAssert.AreEqual(delCount, indices.Count); + } + + + #endregion + } +} \ No newline at end of file