Skip to content

STREAMS API in Garnet #1131

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 30 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
75f20bd
Streams API Work in progress
ramananeesh Mar 9, 2025
73b3f20
Streams - sector aligned bufferpool for index work in progress
ramananeesh Mar 9, 2025
b4723ec
sector aligned bufferpool integration works
ramananeesh Mar 10, 2025
1068e53
testing code wip
ramananeesh Mar 10, 2025
53e204b
bug fixed
ramananeesh Mar 10, 2025
c64785a
fixed major bug in btree
ramananeesh Mar 21, 2025
9a6099e
updates
ramananeesh Mar 21, 2025
edaa741
fixed memory alloc by reverting to Marshal.AllocHGlobal
ramananeesh Mar 22, 2025
7710e56
code cleanup
ramananeesh Mar 22, 2025
0129aca
delete passes
ramananeesh Mar 22, 2025
fab86a8
Merge branch 'main' of github.com:ramananeesh/garnet into ramananeesh…
ramananeesh Mar 22, 2025
1be22c6
tying up the streams API
ramananeesh Mar 22, 2025
d8a41f4
added command to RespCommandsInfo
ramananeesh Mar 22, 2025
70f3bac
basic test with XADD and XLEN works
ramananeesh Mar 22, 2025
460d88f
added XRANGE and XDEL - basic tests work
ramananeesh Mar 22, 2025
579bd6e
fixed bug with stream id parsing/encode/decode
ramananeesh Mar 22, 2025
40af237
added ACL Tests
ramananeesh Mar 22, 2025
83acdc7
added RespStreamTests
ramananeesh Mar 23, 2025
5444205
minor cleanup
ramananeesh Mar 23, 2025
5cab8b7
added cleaner command strings for incorrect arguments
ramananeesh Mar 23, 2025
d3551d8
cleanup; removed TRIM code
ramananeesh Mar 23, 2025
dcd33b9
cleanup; added more documentation about node structure
ramananeesh Mar 23, 2025
8003334
more cleanup
ramananeesh Mar 23, 2025
5f92120
reverted visibility change to ByteArrayComparer to instead use .Instance
ramananeesh Mar 27, 2025
ede575d
removed most of Unsafe.AsPointer() instances from BTree code
ramananeesh Mar 27, 2025
6e4f239
swapped out memory allocation for B-tree to use NativeMemory.AlignedA…
ramananeesh Mar 27, 2025
ce15667
cleanup; removed unused imports + commented out code.
ramananeesh Mar 27, 2025
e66d113
bug fix - mismatch in allocates v/s deallocates for B-tree
ramananeesh Mar 27, 2025
70bed70
updates fixing PR comments
ramananeesh May 11, 2025
b356164
reverted an earlier change in Build.props following PR comment + chan…
ramananeesh May 12, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Garnet.sln
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,8 @@ Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "NoOpModule", "playground\No
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "ETag", "samples\ETag\ETag.csproj", "{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Btree", "playground\BTree\Btree.csproj", "{CE12831B-2805-469E-8208-759DC4B4862C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -345,6 +347,14 @@ Global
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|Any CPU.Build.0 = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.ActiveCfg = Release|Any CPU
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5}.Release|x64.Build.0 = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.ActiveCfg = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Debug|x64.Build.0 = Debug|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|Any CPU.Build.0 = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.ActiveCfg = Release|Any CPU
{CE12831B-2805-469E-8208-759DC4B4862C}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand Down Expand Up @@ -381,6 +391,7 @@ Global
{A48412B4-FD60-467E-A5D9-F155CAB4F907} = {147FCE31-EC09-4C90-8E4D-37CA87ED18C3}
{D4C9A1A0-7053-F072-21F5-4E0C5827136D} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
{4FBA1587-BAFC-49F8-803A-D1CF431A26F5} = {7068BB97-1958-4060-B5F1-859464592E56}
{CE12831B-2805-469E-8208-759DC4B4862C} = {69A71E2C-00E3-42F3-854E-BE157A24834E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {2C02C405-4798-41CA-AF98-61EDFEF6772E}
Expand Down
10 changes: 10 additions & 0 deletions libs/common/RespReadUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,16 @@ public static bool TryReadInt64WithLengthHeader(out long number, ref byte* ptr,
return true;
}

/// <summary>
/// Tries to read a Ulong from the given ASCII-encoded RESP string.
/// Note: this does not check for any length headers and is simply an accessor to TryReadUlong.
/// </summary>
/// <param name="number">If parsing was successful, contains the parsed ulong value.</param>
/// <param name="ptr">The starting position in the RESP string. Will be advanced if parsing is successful.</param>
/// <param name="end">The current end of the RESP string.</param>
/// <returns>True if a ulong was successfully parsed.</returns>
public static bool ReadUlong(out ulong number, ref byte* ptr, byte* end) => TryReadUInt64(ref ptr, end, out number, out _);

/// <summary>
/// Read long with length header
/// </summary>
Expand Down
118 changes: 118 additions & 0 deletions libs/resources/RespCommandsInfo.json
Original file line number Diff line number Diff line change
Expand Up @@ -4828,6 +4828,124 @@
}
]
},
{
"Command": "XADD",
"Name": "XADD",
"IsInternal": false,
"Arity": -5,
"Flags": "DenyOom, Fast, Write",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "Fast, Stream, Write",
"Tips": [
"nondeterministic_output"
],
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": "UPDATE instead of INSERT because of the optional trimming feature",
"Flags": "RW, Update"
}
],
"SubCommands": null
},
{
"Command": "XDEL",
"Name": "XDEL",
"IsInternal": false,
"Arity": -3,
"Flags": "Fast, Write",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "Fast, Stream, Write",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RW, Delete"
}
],
"SubCommands": null
},
{
"Command": "XLEN",
"Name": "XLEN",
"IsInternal": false,
"Arity": 2,
"Flags": "Fast, ReadOnly",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "Fast, Read, Stream",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO"
}
],
"SubCommands": null
},
{
"Command": "XRANGE",
"Name": "XRANGE",
"IsInternal": false,
"Arity": -4,
"Flags": "ReadOnly",
"FirstKey": 1,
"LastKey": 1,
"Step": 1,
"AclCategories": "Read, Slow, Stream",
"Tips": null,
"KeySpecifications": [
{
"BeginSearch": {
"TypeDiscriminator": "BeginSearchIndex",
"Index": 1
},
"FindKeys": {
"TypeDiscriminator": "FindKeysRange",
"LastKey": 0,
"KeyStep": 1,
"Limit": 0
},
"Notes": null,
"Flags": "RO, Access"
}
],
"SubCommands": null
},
{
"Command": "ZADD",
"Name": "ZADD",
Expand Down
177 changes: 177 additions & 0 deletions libs/server/BTreeIndex/BTree.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,177 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;

namespace Garnet.server.BTreeIndex
{
public unsafe partial class BTree
{
BTreeNode* root;
BTreeNode* head;
BTreeNode* tail;
byte* tailMinKey;
public static readonly int MAX_TREE_DEPTH = 10; // maximum allowed depth of the tree
static int DEFAULT_SPLIT_LEAF_POSITION = (BTreeNode.LEAF_CAPACITY + 1) / 2; // position at which leaf node is split
static int SPLIT_LEAF_POSITION = BTreeNode.LEAF_CAPACITY; // position at which leaf node is split
static int SPLIT_INTERNAL_POSITION = BTreeNode.INTERNAL_CAPACITY; // position at which internal node is split

BTreeNode*[] rootToTailLeaf; // array of nodes from root to tail leaf
public BTreeStats stats; // statistics about the tree

/// <summary>
/// Initializes a new instance of the <see cref="BTree"/> class.
/// </summary>
public BTree(uint sectorSize)
{
var memoryBlock = (IntPtr*)NativeMemory.AlignedAlloc((nuint)BTreeNode.PAGE_SIZE, (nuint)BTreeNode.PAGE_SIZE);
root = BTreeNode.Create(BTreeNodeType.Leaf, memoryBlock);
head = tail = root;
root->info->next = root->info->previous = null;
root->info->count = 0;
tailMinKey = null;
rootToTailLeaf = new BTreeNode*[MAX_TREE_DEPTH];
stats = new BTreeStats();
stats.depth = 1;
stats.numLeafNodes = 1;
stats.numAllocates = 1;
}

/// <summary>
/// Frees the memory allocated for a node
/// </summary>
/// <param name="node">BTreeNode to free from memory</param>
private void Free(ref BTreeNode* node)
{
if (node == null)
return;

// If this is an internal node, free all its children first
if (node->info->type == BTreeNodeType.Internal)
{
for (int i = 0; i <= node->info->count; i++)
{
var child = node->data.children[i];
Free(ref child);
node->data.children[i] = null;
}
}

// Free the memory handle
if (node->memoryHandle != null)
{
NativeMemory.Free(node->memoryHandle);
stats.numDeallocates++;
node = null;
}
}

/// <summary>
/// Frees the memory allocated for a node
/// </summary>
/// <param name="node"></param>
public static void FreeNode(ref BTreeNode* node)
{
if (node == null)
return;

// If this is an internal node, free all its children first
if (node->info->type == BTreeNodeType.Internal)
{
for (int i = 0; i <= node->info->count; i++)
{
var child = node->data.children[i];
FreeNode(ref child);
node->data.children[i] = null;
}
}

// Free the memory handle
if (node->memoryHandle != null)
{
NativeMemory.Free(node->memoryHandle);
node = null;
}
}

/// <summary>
/// Deallocates the memory allocated for the B+Tree
/// </summary>
public void Deallocate()
{
if (root == null)
return;
Free(ref root);
Console.WriteLine("free complete");
stats.printStats();
root = null;
head = null;
tail = null;
}

/// <summary>
/// Destructor for the B+tree
/// </summary>
~BTree()
{
Deallocate();
}

public ulong FastInserts => stats.totalFastInserts;
public ulong LeafCount => stats.numLeafNodes;
public ulong InternalCount => stats.numInternalNodes;

public ulong ValidCount => StatsValidCount();

public long RootValidCount => GetValidCount(root);

public long TailValidCount => GetValidCount(tail);

public long Count()
{
return stats.numKeys;
}
public ulong StatsValidCount()
{
return stats.numValidKeys;
}

public long GetValidCount(BTreeNode* node)
{
return node->info->validCount;
}

/// <summary>
/// Retrieves the first entry in the B+Tree (smallest key)
/// </summary>
/// <returns>entry fetched</returns>
public KeyValuePair<byte[], Value> First()
{
BTreeNode* leaf = head;
if (leaf == null)
{
return default;
}
byte[] keyBytes = new ReadOnlySpan<byte>(leaf->GetKey(0), BTreeNode.KEY_SIZE).ToArray();
return new KeyValuePair<byte[], Value>(keyBytes, leaf->GetValue(0));
}

/// <summary>
/// Retrieves the last entry in the B+Tree (largest key)
/// </summary>
/// <returns>entry fetched</returns>
public KeyValuePair<byte[], Value> Last()
{
BTreeNode* leaf = tail;
if (leaf == null)
{
return default;
}
byte[] keyBytes = new ReadOnlySpan<byte>(leaf->GetKey(leaf->info->count - 1), BTreeNode.KEY_SIZE).ToArray();
return new KeyValuePair<byte[], Value>(keyBytes, leaf->GetValue(leaf->info->count - 1));
}

}
}
32 changes: 32 additions & 0 deletions libs/server/BTreeIndex/BTreeDelete.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT license.

namespace Garnet.server.BTreeIndex
{
public unsafe partial class BTree
{
/// <summary>
/// Delete a key from the B+tree
/// </summary>
/// <param name="key">key to delete</param>
/// <returns>true if key was tombstoned</returns>
public bool Delete(byte* key)
{
BTreeNode* leaf = null;
var nodesTraversed = new BTreeNode*[MAX_TREE_DEPTH];

TraverseToLeaf(ref leaf, ref nodesTraversed, key);
var index = leaf->LowerBound(key);
if (index >= leaf->info->count || BTreeNode.Compare(key, leaf->GetKey(index)) != 0)
{
return false;
}

// insert a tombstone for the delete
leaf->InsertTombstone(index);
leaf->info->validCount--;
stats.numValidKeys--;
return true;
}
}
}
Loading