Skip to content

Commit c9269d8

Browse files
author
Jacques Kang
committed
fixing #63 Exception on receiving larger message
1 parent 704df39 commit c9269d8

File tree

2 files changed

+88
-4
lines changed

2 files changed

+88
-4
lines changed

src/JKang.IpcServiceFramework.Core/IO/IpcReader.cs

+34-4
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,41 @@ public IpcReader(Stream stream, IIpcMessageSerializer serializer, bool leaveOpen
3737

3838
private async Task<byte[]> ReadMessageAsync(CancellationToken cancellationToken)
3939
{
40-
await _stream.ReadAsync(_lengthBuffer, 0, _lengthBuffer.Length, cancellationToken).ConfigureAwait(false);
41-
int length = _lengthBuffer[0] | _lengthBuffer[1] << 8 | _lengthBuffer[2] << 16 | _lengthBuffer[3] << 24;
40+
int headerLength = await _stream.ReadAsync(_lengthBuffer, 0, _lengthBuffer.Length, cancellationToken);
4241

43-
byte[] bytes = new byte[length];
44-
await _stream.ReadAsync(bytes, 0, length, cancellationToken).ConfigureAwait(false);
42+
if (headerLength != 4)
43+
{
44+
throw new ArgumentOutOfRangeException($"Header length must be 4 but was {headerLength}");
45+
}
46+
47+
int expectedLength = _lengthBuffer[0] | _lengthBuffer[1] << 8 | _lengthBuffer[2] << 16 | _lengthBuffer[3] << 24;
48+
byte[] bytes = new byte[expectedLength];
49+
int totalBytesReceived = 0;
50+
int remainingBytes = expectedLength;
51+
52+
using (var ms = new MemoryStream())
53+
{
54+
while (totalBytesReceived < expectedLength)
55+
{
56+
int dataLength = await _stream.ReadAsync(bytes, 0, remainingBytes, cancellationToken);
57+
58+
if (dataLength == 0)
59+
{
60+
break; // end of stream or stream shut down.
61+
}
62+
63+
ms.Write(bytes, 0, dataLength);
64+
totalBytesReceived += dataLength;
65+
remainingBytes -= dataLength;
66+
}
67+
68+
bytes = ms.ToArray();
69+
}
70+
71+
if (totalBytesReceived != expectedLength)
72+
{
73+
throw new System.ArgumentOutOfRangeException($"Data length must be {expectedLength} but was {totalBytesReceived}");
74+
}
4575

4676
return bytes;
4777
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
using AutoFixture.Xunit2;
2+
using Microsoft.Extensions.DependencyInjection;
3+
using System;
4+
using System.Collections.Generic;
5+
using System.Globalization;
6+
using System.IO;
7+
using System.Net;
8+
using System.Numerics;
9+
using System.Threading;
10+
using System.Threading.Tasks;
11+
using Xunit;
12+
13+
namespace JKang.IpcServiceFramework.IntegrationTests
14+
{
15+
public class EdgeCaseTest : IDisposable
16+
{
17+
private readonly CancellationTokenSource _cancellationToken;
18+
private readonly int _port;
19+
private readonly IpcServiceClient<ITestService> _client;
20+
21+
public EdgeCaseTest()
22+
{
23+
// configure DI
24+
IServiceCollection services = new ServiceCollection()
25+
.AddIpc(builder => builder.AddNamedPipe().AddService<ITestService, TestService>());
26+
_port = new Random().Next(10000, 50000);
27+
IIpcServiceHost host = new IpcServiceHostBuilder(services.BuildServiceProvider())
28+
.AddTcpEndpoint<ITestService>(
29+
name: Guid.NewGuid().ToString(),
30+
ipEndpoint: IPAddress.Loopback,
31+
port: _port)
32+
.Build();
33+
_cancellationToken = new CancellationTokenSource();
34+
host.RunAsync(_cancellationToken.Token);
35+
36+
_client = new IpcServiceClientBuilder<ITestService>()
37+
.UseTcp(IPAddress.Loopback, _port)
38+
.Build();
39+
}
40+
41+
[Fact]
42+
public async Task HugeMessage()
43+
{
44+
byte[] buffer = new byte[100000000]; // 100MB
45+
new Random().NextBytes(buffer);
46+
byte[] result = await _client.InvokeAsync(x => x.ReverseBytes(buffer));
47+
}
48+
49+
public void Dispose()
50+
{
51+
_cancellationToken.Cancel();
52+
}
53+
}
54+
}

0 commit comments

Comments
 (0)