Skip to content

Use a sqlite manager per stream that's kept around forever #302

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 18 additions & 81 deletions src/Speckle.Sdk/SQLite/SQLiteJsonCacheManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,84 +9,12 @@ namespace Speckle.Sdk.SQLite;
public partial interface ISqLiteJsonCacheManager : IDisposable;

[GenerateAutoInterface]
public sealed class SqLiteJsonCacheManager : ISqLiteJsonCacheManager
public sealed class SqLiteJsonCacheManager(ISqliteJsonCachePool pool, bool dispose) : ISqLiteJsonCacheManager
{
private readonly CacheDbCommandPool _pool;

public SqLiteJsonCacheManager(string path, int concurrency)
{
//disable pooling as we pool ourselves
var builder = new SqliteConnectionStringBuilder { Pooling = false, DataSource = path };
_pool = new CacheDbCommandPool(builder.ToString(), concurrency);
Initialize();
}

[AutoInterfaceIgnore]
public void Dispose() => _pool.Dispose();

private void Initialize()
{
// NOTE: used for creating partioned object tables.
//string[] HexChars = new string[] { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
//var cart = new List<string>();
//foreach (var str in HexChars)
// foreach (var str2 in HexChars)
// cart.Add(str + str2);

_pool.Use(c =>
{
const string COMMAND_TEXT =
@"
CREATE TABLE IF NOT EXISTS objects(
hash TEXT PRIMARY KEY,
content TEXT
) WITHOUT ROWID;
";
using (var command = new SqliteCommand(COMMAND_TEXT, c))
{
command.ExecuteNonQuery();
}

// Insert Optimisations

//Note / Hack: This setting has the potential to corrupt the db.
//cmd = new SqliteCommand("PRAGMA synchronous=OFF;", Connection);
//cmd.ExecuteNonQuery();

using (SqliteCommand cmd1 = new("PRAGMA count_changes=OFF;", c))
{
cmd1.ExecuteNonQuery();
}

using (SqliteCommand cmd2 = new("PRAGMA temp_store=MEMORY;", c))
{
cmd2.ExecuteNonQuery();
}

using (SqliteCommand cmd3 = new("PRAGMA mmap_size = 30000000000;", c))
{
cmd3.ExecuteNonQuery();
}

using (SqliteCommand cmd4 = new("PRAGMA page_size = 32768;", c))
{
cmd4.ExecuteNonQuery();
}

using (SqliteCommand cmd5 = new("PRAGMA journal_mode='wal';", c))
{
cmd5.ExecuteNonQuery();
}
//do this to wait 5 seconds to avoid db lock exceptions, this is 0 by default
using (SqliteCommand cmd6 = new("PRAGMA busy_timeout=5000;", c))
{
cmd6.ExecuteNonQuery();
}
});
}
public ISqliteJsonCachePool Pool => pool;

public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() =>
_pool.Use(
pool.Use(
CacheOperation.GetAll,
command =>
{
Expand All @@ -101,7 +29,7 @@ content TEXT
);

public void DeleteObject(string id) =>
_pool.Use(
pool.Use(
CacheOperation.Delete,
command =>
{
Expand All @@ -111,7 +39,7 @@ public void DeleteObject(string id) =>
);

public string? GetObject(string id) =>
_pool.Use(
pool.Use(
CacheOperation.Get,
command =>
{
Expand All @@ -125,7 +53,7 @@ public void SaveObject(string id, string json)
{
id.NotNullOrWhiteSpace();
json.NotNullOrWhiteSpace();
_pool.Use(
pool.Use(
CacheOperation.InsertOrIgnore,
command =>
{
Expand All @@ -138,7 +66,7 @@ public void SaveObject(string id, string json)

//This does an insert or replaces if already exists
public void UpdateObject(string id, string json) =>
_pool.Use(
pool.Use(
CacheOperation.InsertOrReplace,
command =>
{
Expand All @@ -149,7 +77,7 @@ public void UpdateObject(string id, string json) =>
);

public void SaveObjects(IEnumerable<(string id, string json)> items) =>
_pool.Use(
pool.Use(
CacheOperation.BulkInsertOrIgnore,
cmd =>
{
Expand Down Expand Up @@ -195,7 +123,7 @@ private bool CreateBulkInsert(SqliteCommand cmd, IEnumerable<(string id, string
}

public bool HasObject(string objectId) =>
_pool.Use(
pool.Use(
CacheOperation.Has,
command =>
{
Expand All @@ -205,4 +133,13 @@ public bool HasObject(string objectId) =>
return rowFound;
}
);

[AutoInterfaceIgnore]
public void Dispose()
{
if (dispose)
{
pool.Dispose();
}
}
}
42 changes: 36 additions & 6 deletions src/Speckle.Sdk/SQLite/SqLiteJsonCacheManagerFactory.cs
Original file line number Diff line number Diff line change
@@ -1,19 +1,49 @@
using Speckle.InterfaceGenerator;
using System.Collections.Concurrent;
using Speckle.InterfaceGenerator;
using Speckle.Sdk.Logging;
using Speckle.Sdk.Serialisation.Utilities;

namespace Speckle.Sdk.SQLite;

public partial interface ISqLiteJsonCacheManagerFactory : IDisposable;

[GenerateAutoInterface]
public class SqLiteJsonCacheManagerFactory : ISqLiteJsonCacheManagerFactory
public sealed class SqLiteJsonCacheManagerFactory : ISqLiteJsonCacheManagerFactory
{
public const int INITIAL_CONCURRENCY = 4;

private ISqLiteJsonCacheManager Create(string path, int concurrency) => new SqLiteJsonCacheManager(path, concurrency);
private readonly ConcurrentDictionary<string, ISqliteJsonCachePool> _pools = new();

[AutoInterfaceIgnore]
public void Dispose()
{
foreach (var pool in _pools)
{
pool.Value.Dispose();
}

_pools.Clear();
}

private ISqliteJsonCachePool Create(string path, int concurrency) => new SqliteJsonCachePool(path, concurrency);

public ISqLiteJsonCacheManager CreateForUser(string scope) =>
Create(Path.Combine(SpecklePathProvider.UserApplicationDataPath(), "Speckle", $"{scope}.db"), 1);
new SqLiteJsonCacheManager(
#pragma warning disable CA2000
//this is fine because we told SqLiteJsonCacheManager to dispose this
Create(Path.Combine(SpecklePathProvider.UserApplicationDataPath(), "Speckle", $"{scope}.db"), 1),
#pragma warning restore CA2000
true
);

public ISqLiteJsonCacheManager CreateFromStream(string streamId) =>
Create(SqlitePaths.GetDBPath(streamId), INITIAL_CONCURRENCY);
public ISqLiteJsonCacheManager CreateFromStream(string streamId)
{
if (!_pools.TryGetValue(streamId, out var pool))
{
pool = Create(SqlitePaths.GetDBPath(streamId), INITIAL_CONCURRENCY);
_pools.TryAdd(streamId, pool);
}
//never dispose pools for streams
return new SqLiteJsonCacheManager(pool, false);
}
}
99 changes: 99 additions & 0 deletions src/Speckle.Sdk/SQLite/SqliteJsonCachePool.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using Microsoft.Data.Sqlite;
using Speckle.InterfaceGenerator;

namespace Speckle.Sdk.SQLite;

public partial interface ISqliteJsonCachePool : IDisposable
{
T Use<T>(CacheOperation type, Func<SqliteCommand, T> handler);
}

[GenerateAutoInterface]
public sealed class SqliteJsonCachePool : ISqliteJsonCachePool
{
private readonly CacheDbCommandPool _pool;

public SqliteJsonCachePool(string path, int concurrency)
{
Path = path;
Concurrency = concurrency;
//disable pooling as we pool ourselves
var builder = new SqliteConnectionStringBuilder { Pooling = false, DataSource = path };
_pool = new CacheDbCommandPool(builder.ToString(), concurrency);
Initialize();
}

public string Path { get; }
public int Concurrency { get; }

private void Initialize()
{
// NOTE: used for creating partioned object tables.
//string[] HexChars = new string[] { "0", "1", "2", "3", "4", "5", "6", "7", "8", "9", "a", "b", "c", "d", "e", "f" };
//var cart = new List<string>();
//foreach (var str in HexChars)
// foreach (var str2 in HexChars)
// cart.Add(str + str2);

_pool.Use(c =>
{
const string COMMAND_TEXT =
@"
CREATE TABLE IF NOT EXISTS objects(
hash TEXT PRIMARY KEY,
content TEXT
) WITHOUT ROWID;
";
using (var command = new SqliteCommand(COMMAND_TEXT, c))
{
command.ExecuteNonQuery();
}

// Insert Optimisations

//Note / Hack: This setting has the potential to corrupt the db.
//cmd = new SqliteCommand("PRAGMA synchronous=OFF;", Connection);
//cmd.ExecuteNonQuery();

using (SqliteCommand cmd1 = new("PRAGMA count_changes=OFF;", c))
{
cmd1.ExecuteNonQuery();
}

using (SqliteCommand cmd2 = new("PRAGMA temp_store=MEMORY;", c))
{
cmd2.ExecuteNonQuery();
}

using (SqliteCommand cmd3 = new("PRAGMA mmap_size = 30000000000;", c))
{
cmd3.ExecuteNonQuery();
}

using (SqliteCommand cmd4 = new("PRAGMA page_size = 32768;", c))
{
cmd4.ExecuteNonQuery();
}

using (SqliteCommand cmd5 = new("PRAGMA journal_mode='wal';", c))
{
cmd5.ExecuteNonQuery();
}
//do this to wait 5 seconds to avoid db lock exceptions, this is 0 by default
using (SqliteCommand cmd6 = new("PRAGMA busy_timeout=5000;", c))
{
cmd6.ExecuteNonQuery();
}
});
}

public void Use(Action<SqliteConnection> handler) => _pool.Use(handler);

public void Use(CacheOperation type, Action<SqliteCommand> handler) => _pool.Use(type, handler);

[AutoInterfaceIgnore]
public T Use<T>(CacheOperation type, Func<SqliteCommand, T> handler) => _pool.Use(type, handler);

[AutoInterfaceIgnore]
public void Dispose() => _pool.Dispose();
}
3 changes: 3 additions & 0 deletions src/Speckle.Sdk/Serialisation/V2/MemoryJsonCacheManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@ namespace Speckle.Sdk.Serialisation.V2;
public class MemoryJsonCacheManager(ConcurrentDictionary<Id, Json> jsonCache) : ISqLiteJsonCacheManager
#pragma warning restore CA1063
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
public IReadOnlyCollection<(string Id, string Json)> GetAllObjects() =>
jsonCache.Select(x => (x.Key.Value, x.Value.Value)).ToList();

Expand Down
5 changes: 4 additions & 1 deletion src/Speckle.Sdk/ServiceRegistration.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,12 @@ SpeckleSdkOptions speckleSdkOptions
typeof(DeserializeProcess),
typeof(ObjectLoader),
typeof(TraversalRule),
typeof(Client)
typeof(Client),
typeof(SqliteJsonCachePool)
);
serviceCollection.AddMatchingInterfacesAsTransient(typeof(GraphQLRetry).Assembly);
//we want to make sqlite pools be singletons per stream so needs a singleton factory
serviceCollection.AddSingleton<ISqLiteJsonCacheManagerFactory, SqLiteJsonCacheManagerFactory>();
return serviceCollection;
}

Expand Down
3 changes: 3 additions & 0 deletions tests/Speckle.Sdk.Serialization.Tests/DetachedTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -373,6 +373,9 @@ CancellationToken cancellationToken

public class DummySendCacheManager(Dictionary<string, string> objects) : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
public void Dispose() { }

public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ namespace Speckle.Sdk.Serialization.Tests;

public class DummyCancellationSqLiteSendManager : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
public string? GetObject(string id) => null;

public void SaveObject(string id, string json) => throw new NotImplementedException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ namespace Speckle.Sdk.Serialization.Tests.Framework;

public class ExceptionSendCacheManager(bool? hasObject = null, int? exceptionsAfter = null) : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
private readonly object _lock = new();
private int _count;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,9 @@ namespace Speckle.Sdk.Testing.Framework;
public sealed class DummySqLiteReceiveManager(IReadOnlyDictionary<string, string> savedObjects)
: ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
public void Dispose() { }

public IReadOnlyCollection<(string, string)> GetAllObjects() => throw new NotImplementedException();
Expand Down
3 changes: 3 additions & 0 deletions tests/Speckle.Sdk.Testing/Framework/DummySqLiteSendManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ namespace Speckle.Sdk.Testing.Framework;

public class DummySqLiteSendManager : ISqLiteJsonCacheManager
{
#pragma warning disable CA1065
public ISqliteJsonCachePool Pool => throw new NotImplementedException();
#pragma warning restore CA1065
public string? GetObject(string id) => throw new NotImplementedException();

public void SaveObject(string id, string json) => throw new NotImplementedException();
Expand Down
Loading