Skip to content
This repository was archived by the owner on Dec 18, 2023. It is now read-only.

Full duplex Ws transfer using channels #110

Open
wants to merge 89 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
89 commits
Select commit Hold shift + click to select a range
9d80ed0
Add ConcurrentBufferReaderWriter
ProphetLamb Oct 31, 2022
140b995
Rename WsTx -> WsManager
ProphetLamb Oct 31, 2022
20681b5
Implement BufferedStreamReader
ProphetLamb Oct 31, 2022
9e6f2f2
Implement message based websocket communication
ProphetLamb Oct 31, 2022
d17533d
Allow waiting for EndOfMessage
ProphetLamb Oct 31, 2022
2c6f1d2
Pass the result for Received Task
ProphetLamb Oct 31, 2022
1f934a1
Add nullability attributes for legacy frameworks
ProphetLamb Oct 31, 2022
6e04ae8
add System.Theading.Channels for netstd
ProphetLamb Oct 31, 2022
d0cd73a
Remove useless CancellationToken overload
ProphetLamb Oct 31, 2022
f2de839
Move ws jsonrpc headers to different file
ProphetLamb Oct 31, 2022
c2f8ed4
Pass RecyclableMemoryStreamManager to WsChannelTx
ProphetLamb Oct 31, 2022
0003150
Add Task.Inv == Task.ConfigureAwait(false)
ProphetLamb Oct 31, 2022
9118437
Use .Inv() instead of .ConfigureAwait(false)
ProphetLamb Oct 31, 2022
8acc45f
Implement IDisposable for WsChannelRx & WsChannelTx
ProphetLamb Oct 31, 2022
ae9e72f
Remove TaskList
ProphetLamb Oct 31, 2022
6fa5829
Introduce WsHeader and WsHeaderWithMessage
ProphetLamb Oct 31, 2022
8844339
Add Span.ClipLength
ProphetLamb Oct 31, 2022
a7146ad
Use channel for message portion received events
ProphetLamb Oct 31, 2022
987cd31
Use new WsHeader... types for handlers
ProphetLamb Oct 31, 2022
27de306
Implement WsTxMessageMediator
ProphetLamb Oct 31, 2022
e82dd90
Move WsChannel* and WsMessage to separate files
ProphetLamb Oct 31, 2022
afebf84
Move BufferedStreamReader to Common
ProphetLamb Oct 31, 2022
7debd5e
Rename WsClient - WsClientSync
ProphetLamb Oct 31, 2022
7d73065
Rename EmptyList
ProphetLamb Oct 31, 2022
c202e1c
idk
ProphetLamb Oct 31, 2022
5574b99
Convert WsMessage into a Stream WsMessageReader
ProphetLamb Oct 31, 2022
8e45651
Refactoring
ProphetLamb Oct 31, 2022
582fadf
Refactoring
ProphetLamb Oct 31, 2022
574e334
Use local memory manager
ProphetLamb Nov 1, 2022
e6abf11
public constants in UpperCamelCase
ProphetLamb Nov 1, 2022
170b11e
Use ValidateReadonly for WsClientOptions
ProphetLamb Nov 1, 2022
c969e10
Add ValueArrayBuilder
ProphetLamb Nov 1, 2022
c4d1d41
Dont use string interpolation
ProphetLamb Nov 1, 2022
1723745
idk
ProphetLamb Nov 1, 2022
a821b98
Fix unterminated recursion for Inv
ProphetLamb Nov 1, 2022
bff795b
Inline base class DatabaseTestBase
ProphetLamb Nov 1, 2022
abcac4c
Initialize MemoryManager for default WsClientOptions
ProphetLamb Nov 1, 2022
8d76222
Refactoring
ProphetLamb Nov 1, 2022
12bef01
Prevent premature disposing of input stream
ProphetLamb Nov 1, 2022
6dc740d
Extract producer/consumer functionality into separate method
ProphetLamb Nov 1, 2022
eb49c68
Docs
ProphetLamb Nov 1, 2022
9e29cd5
Reorder open, close and dispose
ProphetLamb Nov 1, 2022
bbff464
Fix bug where AppendResultAsync would also read
ProphetLamb Nov 1, 2022
66eeb46
Do not dispose Tasks
ProphetLamb Nov 1, 2022
89b28d2
Reorder close
ProphetLamb Nov 1, 2022
11fa246
Handle OperationCanceledException from long running task when Closing
ProphetLamb Nov 1, 2022
ecc3dd5
Make cons/prod classes, bc of dispose cow
ProphetLamb Nov 1, 2022
622e037
Add WebsocketReceiveResult.ThrowCancelIfClosed
ProphetLamb Nov 1, 2022
2419a50
Mark throw methods correctly
ProphetLamb Nov 1, 2022
f062036
Prevent duplicate dispose
ProphetLamb Nov 1, 2022
bd27910
Use DisposingCache instead of ConcurrentDictionary
ProphetLamb Nov 1, 2022
b4dd9aa
Reorder validation out if lock
ProphetLamb Nov 1, 2022
6c39727
Implement IAsyncDisposable for DbHandle
ProphetLamb Nov 1, 2022
de5addf
Remove need for Rx channel
ProphetLamb Nov 1, 2022
5714fc1
Rename file to WsRx
ProphetLamb Nov 1, 2022
500a249
BufferStreamReader does not dispose underlying stream
ProphetLamb Nov 1, 2022
697690e
Remove unused classed
ProphetLamb Nov 1, 2022
0cd5722
Remove duplicate db.Close call
ProphetLamb Nov 1, 2022
1d692ee
Disable orderly closing of database when Disposing DbHandle
ProphetLamb Nov 1, 2022
1e48002
Remove ChannelRxMessagesMax from WsClientOptions
ProphetLamb Nov 1, 2022
0a11f40
Merge branch 'develop' into feature/improve-async-large-data-transfer
ProphetLamb Nov 1, 2022
e9e45f7
Merge branch 'develop' into feature/improve-async-large-data-transfer
ProphetLamb Nov 1, 2022
0c39462
Remove RecyclableMemoryStream dependency from Common
ProphetLamb Nov 1, 2022
8829bc7
Implement BoundedChannelPool
ProphetLamb Nov 1, 2022
8128e18
Simplify preprocessor directives
ProphetLamb Nov 1, 2022
b8d2a71
Mark Common as CLSCompliant
ProphetLamb Nov 1, 2022
21c03f4
Use BoundChannelPool in WsMessageReader
ProphetLamb Nov 1, 2022
bf84494
Add MessageChannelCapacity to options
ProphetLamb Nov 1, 2022
be90e1a
Make retrieve private
ProphetLamb Nov 2, 2022
4b34222
Remove redundant assertion
ProphetLamb Nov 2, 2022
6990242
Pass cancellation token to final block
ProphetLamb Nov 2, 2022
26cdb49
Delay waiting for channel to end of message
ProphetLamb Nov 2, 2022
48fc635
Slice buffer before parsing json
ProphetLamb Nov 2, 2022
d060cdf
Return obvious invalid state instead of throwing when failing to pars…
ProphetLamb Nov 2, 2022
be8e40e
Rename classes
ProphetLamb Nov 2, 2022
da6fc4d
Separate read method
ProphetLamb Nov 2, 2022
4ac56d9
Refactoring
ProphetLamb Nov 2, 2022
79ba821
Remove Open and Close locks
ProphetLamb Nov 2, 2022
ec09821
Enable async close
ProphetLamb Nov 2, 2022
34ad688
Implement IDisposable for DbHandle
ProphetLamb Nov 2, 2022
2d4c970
Remove duplicate Close
ProphetLamb Nov 2, 2022
4fe0914
Remove unused using directives
ProphetLamb Nov 2, 2022
3938902
Short-circuit reads
ProphetLamb Nov 2, 2022
f599319
Execute return array to pool
ProphetLamb Nov 2, 2022
8011146
Add Diagnostics EventSource to WsReceiverDeflater
ProphetLamb Nov 2, 2022
c574de4
Add Diagnostics EventSource to WsReceiverInflater
ProphetLamb Nov 2, 2022
57587ce
Fix duped GUID
ProphetLamb Nov 2, 2022
bffe2e0
Add ConsoleOutEventListener when using DbHandle
ProphetLamb Nov 2, 2022
6e4586c
Add ConsoleOutEventListener when using DbHandle
ProphetLamb Nov 2, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .editorconfig
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ csharp_style_var_for_built_in_types = false:none
csharp_style_var_when_type_is_apparent = false:none
dotnet_naming_rule.constants_rule.import_to_resharper = as_predefined
dotnet_naming_rule.constants_rule.severity = warning
dotnet_naming_rule.constants_rule.style = all_upper_style
dotnet_naming_rule.constants_rule.style = upper_camel_case_style
dotnet_naming_rule.constants_rule.symbols = constants_symbols
dotnet_naming_rule.private_constants_rule.import_to_resharper = as_predefined
dotnet_naming_rule.private_constants_rule.resharper_style = AaBb, NUM_ + AA_BB
Expand Down
212 changes: 206 additions & 6 deletions src/Common/ArrayBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

// Modified for generic types and not backed by a pool

using System.Buffers;
using System.Diagnostics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
Expand Down Expand Up @@ -63,12 +64,6 @@ public ref T this[int index]
}
}

public override string ToString()
{
string s = Raw.Slice(0, _pos).ToString();
return s;
}

/// <summary>Returns the underlying storage of the builder.</summary>
public Span<T> Raw => _array.AsSpan();

Expand Down Expand Up @@ -185,3 +180,208 @@ private void Grow(int additionalCapacityBeyondPos)
_array = newArray;
}
}

internal ref struct PoolArrayBuilder<T>
{
private T[]? _array;
private Span<T> _raw;
private int _pos;

public PoolArrayBuilder(Span<T> initialBuffer)
{
_array = null;
_raw = initialBuffer;
_pos = 0;
}

public PoolArrayBuilder(int initialCapacity)
{
_array = ArrayPool<T>.Shared.Rent(initialCapacity);
_raw = _array;
_pos = 0;
}

public int Length
{
get => _pos;
set
{
Debug.Assert(value >= 0);
Debug.Assert(value <= _raw.Length);
_pos = value;
}
}

public bool IsEmpty => 0 >= (uint)_pos;

public bool IsDefault => _raw.IsEmpty && _array is null && _pos == 0;

public int Capacity => _raw.Length;

public void EnsureCapacity(int capacity)
{
// This is not expected to be called this with negative capacity
Debug.Assert(capacity >= 0);

if ((uint)capacity > (uint)_raw.Length)
Grow(capacity - _pos);
}

/// <summary>
/// Get a pinnable reference to the builder.
/// Does not ensure there is a null char after <see cref="Length"/>
/// This overload is pattern matched in the C# 7.3+ compiler so you can omit
/// the explicit method call, and write eg "fixed (char* c = builder)"
/// </summary>
public ref T GetPinnableReference()
{
return ref MemoryMarshal.GetReference(_raw);
}

public ref T this[int index]
{
get
{
Debug.Assert(index < _pos);
return ref _raw[index];
}
}

/// <summary>Returns the underlying storage of the builder.</summary>
public Span<T> Raw => _raw;

public ReadOnlySpan<T> AsSpan() => _raw.Slice(0, _pos);
public ReadOnlySpan<T> AsSpan(int start) => _raw.Slice(start, _pos - start);
public ReadOnlySpan<T> AsSpan(int start, int length) => _raw.Slice(start, length);

public ArraySegment<T> AsSegment() => new(_array ?? Array.Empty<T>());
public ArraySegment<T> AsSegment(int start) => new(_array ?? Array.Empty<T>(), start, _pos - start);
public ArraySegment<T> AsSegment(int start, int length) => new(_array ?? Array.Empty<T>(), start, length);

public bool TryCopyTo(Span<T> destination, out int written) {
if (_raw.Slice(0, _pos).TryCopyTo(destination))
{
written = _pos;
return true;
}

written = 0;
return false;
}

public void Insert(int index, in T value, int count)
{
if (_pos > _raw.Length - count)
{
Grow(count);
}

int remaining = _pos - index;
_raw.Slice(index, remaining).CopyTo(_raw.Slice(index + count));
_raw.Slice(index, count).Fill(value);
_pos += count;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public void Append(T c)
{
int pos = _pos;
if ((uint)pos < (uint)_raw.Length)
{
_raw[pos] = c;
_pos = pos + 1;
}
else
{
GrowAndAppend(c);
}
}

public void Append(in T c, int count)
{
if (_pos > _raw.Length - count)
{
Grow(count);
}

Span<T> dst = _raw.Slice(_pos, count);
for (int i = 0; i < dst.Length; i++)
{
dst[i] = c;
}
_pos += count;
}

public void Append(ReadOnlySpan<T> value)
{
int pos = _pos;
if (pos > _raw.Length - value.Length)
{
Grow(value.Length);
}

value.CopyTo(_raw.Slice(_pos));
_pos += value.Length;
}

[MethodImpl(MethodImplOptions.AggressiveInlining)]
public Span<T> AppendSpan(int length)
{
int origPos = _pos;
if (origPos > _raw.Length - length)
{
Grow(length);
}

_pos = origPos + length;
return _raw.Slice(origPos, length);
}

[MethodImpl(MethodImplOptions.NoInlining)]
private void GrowAndAppend(T c)
{
Grow(1);
Append(c);
}

/// <summary>
/// Resize the internal buffer either by doubling current buffer size or
/// by adding <paramref name="additionalCapacityBeyondPos"/> to
/// <see cref="_pos"/> whichever is greater.
/// </summary>
/// <param name="additionalCapacityBeyondPos">
/// Number of chars requested beyond current position.
/// </param>
[MethodImpl(MethodImplOptions.NoInlining)]
private void Grow(int additionalCapacityBeyondPos)
{
Debug.Assert(additionalCapacityBeyondPos > 0);
Debug.Assert(_pos > _raw.Length - additionalCapacityBeyondPos, "Grow called incorrectly, no resize is needed.");

T[] newArray = ArrayPool<T>.Shared.Rent(
(int)Math.Max((uint)(_pos + additionalCapacityBeyondPos), (uint)_raw.Length * 2)
);
_raw.Slice(0, _pos).CopyTo(newArray);
_raw = newArray;
if (_array is not null) {
ArrayPool<T>.Shared.Return(_array);
}
_array = newArray;
}

public void Dispose() {
var array = _array;
_array = null;
_raw = default;
if (array is not null) {
ArrayPool<T>.Shared.Return(array);
}
}

public T[] ToArray() {
var array = new T[_pos];
_raw.Slice(0, _pos).CopyTo(array.AsSpan(0, _pos));
Dispose();
return array;
}
}
1 change: 1 addition & 0 deletions src/Common/AssemblyInfo.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
using System.Runtime.CompilerServices;

[assembly: CLSCompliant(true)]
[assembly: InternalsVisibleTo("SurrealDB.Abstractions")]
[assembly: InternalsVisibleTo("SurrealDB.Configuration")]
[assembly: InternalsVisibleTo("SurrealDB.Driver.Rest")]
Expand Down
Loading