Skip to content

Commit a289d03

Browse files
authored
Benchmark and Duplex Pipe Concurrent Usage Fixes (#27)
1 parent 4606bef commit a289d03

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+1130
-748
lines changed

README.md

+19
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,25 @@ await client.ConnectAsync();
6565

6666
await client.Proxy.UpdateInfoAndWait(1, 2, "Custom Status");
6767
```
68+
## Benchmarks
69+
```
70+
BenchmarkDotNet v0.13.8, Windows 11 (10.0.22621.2283/22H2/2022Update/SunValley2)
71+
AMD Ryzen 9 3900X, 1 CPU, 24 logical and 12 physical cores
72+
.NET SDK 8.0.100-rc.1.23455.8
73+
[Host] : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2
74+
Job-ASQQIE : .NET 7.0.10 (7.0.1023.36312), X64 RyuJIT AVX2
75+
76+
Platform=X64 Runtime=.NET 7.0 MaxIterationCount=5
77+
MaxWarmupIterationCount=3 MinIterationCount=3 MinWarmupIterationCount=1
78+
```
79+
80+
| Method | Mean | Error | StdDev | Op/s | Gen0 | Gen1 | Allocated |
81+
|------------------------------------- |---------:|---------:|---------:|---------:|-------:|-------:|----------:|
82+
| InvocationNoArgument | 38.66 us | 2.070 us | 0.537 us | 25,867.9 | 0.0610 | - | 681 B |
83+
| InvocationUnmanagedArgument | 38.35 us | 2.496 us | 0.648 us | 26,078.6 | 0.0610 | - | 737 B |
84+
| InvocationUnmanagedMultipleArguments | 38.98 us | 2.044 us | 0.531 us | 25,654.8 | 0.0610 | - | 785 B |
85+
| InvocationNoArgumentWithResult | 38.22 us | 0.752 us | 0.195 us | 26,166.8 | 0.0610 | - | 721 B |
86+
| InvocationWithDuplexPipe_Upload | 64.48 us | 2.690 us | 0.416 us | 15,509.1 | 2.0752 | 0.4883 | 14142 B |
6887

6988
## Method Invocation Table
7089
Some methods are handled differently based upon the arguments passed and there are limitations placed upon the types of arguments which can be used together. Most of these incompatibilities handled with Diagnostic Errors provided by the `NexNet.Generator`. Below is a table which shows valid combinations of arguments and return values.

src/NexNet.IntegrationTests/BaseTests.cs

+6-1
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using NexNet.Quic;
99
using NexNet.Transports;
1010
using NUnit.Framework;
11+
using NUnit.Framework.Interfaces;
1112
using NUnit.Framework.Internal;
1213

1314
namespace NexNet.IntegrationTests;
@@ -38,7 +39,6 @@ public enum Type
3839
[OneTimeSetUp]
3940
public void OneTimeSetUp()
4041
{
41-
4242
//_logger = new ConsoleLogger();
4343
Trace.Listeners.Add(new ConsoleTraceListener());
4444
_socketDirectory = Directory.CreateTempSubdirectory("socketTests");
@@ -62,6 +62,11 @@ public virtual void SetUp()
6262
[TearDown]
6363
public virtual void TearDown()
6464
{
65+
if (TestContext.CurrentContext.Result.Outcome != ResultState.Success)
66+
{
67+
_logger.Flush(TestContext.Out);
68+
}
69+
6570
CurrentPath = null;
6671
CurrentTcpPort = null;
6772
CurrentUdpPort = null;
+49-10
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,35 @@
1-
using System.Diagnostics;
2-
using NUnit.Framework;
1+
using System;
2+
using System.Diagnostics;
3+
using System.IO;
34

45
namespace NexNet.IntegrationTests;
56

67
public class ConsoleLogger : INexusLogger
78
{
89
private readonly string _prefix = "";
910
private readonly Stopwatch _sw;
10-
private readonly TextWriter _outWriter;
11+
private readonly string[]? _lines;
12+
private int _currentLineIndex = 0;
13+
private int _totalLinesWritten = 0;
1114

12-
public string? Category { get; }
15+
public string? Category { get; set; }
1316

1417
public bool LogEnabled { get; set; } = true;
1518

1619
private readonly ConsoleLogger _baseLogger;
1720

18-
public ConsoleLogger()
21+
public INexusLogger.LogLevel MinLogLevel { get; set; } = INexusLogger.LogLevel.Trace;
22+
23+
public int TotalLinesWritten
24+
{
25+
get => _totalLinesWritten;
26+
}
27+
28+
public ConsoleLogger(int maxLines = 200)
1929
{
30+
_lines = new string[maxLines];
2031
_baseLogger = this;
2132
_sw = Stopwatch.StartNew();
22-
_outWriter = TestContext.Out;
2333
}
2434

2535
private ConsoleLogger(ConsoleLogger baseLogger, string? category, string prefix = "")
@@ -28,7 +38,6 @@ private ConsoleLogger(ConsoleLogger baseLogger, string? category, string prefix
2838
_prefix = prefix;
2939
Category = category;
3040
_sw = baseLogger._sw;
31-
_outWriter = baseLogger._outWriter;
3241
}
3342

3443

@@ -37,16 +46,46 @@ public void Log(INexusLogger.LogLevel logLevel, string? category, Exception? exc
3746
if (!_baseLogger.LogEnabled)
3847
return;
3948

40-
_outWriter.WriteLine($"[{_sw.ElapsedTicks/(double)Stopwatch.Frequency:0.000000}]{_prefix} [{category}]: {message} {exception}");
49+
if (logLevel < MinLogLevel)
50+
return;
51+
52+
lock (_baseLogger._lines!)
53+
{
54+
_baseLogger._lines[_baseLogger._currentLineIndex] =
55+
$"[{_sw.ElapsedTicks / (double)Stopwatch.Frequency:0.000000}]{_prefix} [{category}]: {message} {exception}";
56+
_baseLogger._currentLineIndex = (_baseLogger._currentLineIndex + 1) % _baseLogger._lines.Length;
57+
_baseLogger._totalLinesWritten++;
58+
}
4159
}
4260

4361
public INexusLogger CreateLogger(string? category)
4462
{
45-
return new ConsoleLogger(_baseLogger, category, _prefix);
63+
return new ConsoleLogger(_baseLogger, category, _prefix) { MinLogLevel = MinLogLevel };
4664
}
4765

4866
public INexusLogger CreateLogger(string? category, string prefix)
4967
{
50-
return new ConsoleLogger(_baseLogger, category, prefix);
68+
return new ConsoleLogger(_baseLogger, category, prefix) { MinLogLevel = MinLogLevel };
69+
}
70+
71+
public void Flush(TextWriter writer)
72+
{
73+
if (_baseLogger._totalLinesWritten == 0)
74+
return;
75+
76+
var startIndex = _baseLogger._currentLineIndex;
77+
var maxLines = _baseLogger._lines!.Length;
78+
79+
if (_baseLogger._totalLinesWritten > maxLines)
80+
{
81+
writer.WriteLine($"Truncating Log. Showing only last {maxLines} out of {_baseLogger._totalLinesWritten} total lines written.");
82+
}
83+
for (int i = 0; i < maxLines; i++)
84+
{
85+
writer.WriteLine(_baseLogger._lines![(startIndex + i) % maxLines]);
86+
}
87+
88+
_baseLogger._currentLineIndex = 0;
89+
_baseLogger._totalLinesWritten = 0;
5190
}
5291
}

src/NexNet.IntegrationTests/NexusServerTests_NexusInvocations.cs

+10-10
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ public async Task NexusInvokesOnAll(Type type)
161161
connectedNexus.OnConnectedEvent = async nexus =>
162162
{
163163
// Second connection
164-
if (++connectedCount == 2)
164+
if (Interlocked.Increment(ref connectedCount) == 2)
165165
{
166166
await nexus.Context.Clients.All.ClientTask();
167167
}
@@ -199,7 +199,7 @@ public async Task NexusInvokesOnGroup(Type type)
199199
{
200200
nexus.Context.Groups.Add("group");
201201
// Second connection
202-
if (++connectedCount == 2)
202+
if (Interlocked.Increment(ref connectedCount) == 2)
203203
{
204204
await nexus.Context.Clients.Group("group").ClientTask();
205205
}
@@ -234,7 +234,7 @@ public async Task NexusInvokesOnGroups(Type type)
234234
{
235235
connectedNexus.OnConnectedEvent = async nexus =>
236236
{
237-
if (++connectedCount == 1) {
237+
if (Interlocked.Increment(ref connectedCount) == 1) {
238238
nexus.Context.Groups.Add("group");
239239
}
240240
// Second connection
@@ -276,7 +276,7 @@ public async Task NexusInvokesOnOthers(Type type)
276276
connectedNexus.OnConnectedEvent = async nexus =>
277277
{
278278
// Second connection
279-
if (++connectedCount == 2)
279+
if (Interlocked.Increment(ref connectedCount) == 2)
280280
{
281281
await nexus.Context.Clients.Others.ClientTask();
282282
await Task.Delay(10);
@@ -288,8 +288,8 @@ public async Task NexusInvokesOnOthers(Type type)
288288
var (client1, clientNexus1) = CreateClient(CreateClientConfig(type));
289289
var (client2, clientNexus2) = CreateClient(CreateClientConfig(type));
290290
#pragma warning disable CS1998
291-
clientNexus1.ClientTaskEvent = async _ => invocationCount++;
292-
clientNexus2.ClientTaskEvent = async _ => invocationCount++;
291+
clientNexus1.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
292+
clientNexus2.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
293293
#pragma warning restore CS1998
294294
await server.StartAsync().Timeout(1);
295295

@@ -316,7 +316,7 @@ public async Task NexusInvokesOnClient(Type type)
316316
connectedNexus.OnConnectedEvent = async nexus =>
317317
{
318318
// Second connection
319-
if (++connectedCount == 2)
319+
if (Interlocked.Increment(ref connectedCount) == 2)
320320
{
321321
await nexus.Context.Clients.Client(nexus.Context.Id).ClientTask();
322322
await Task.Delay(10);
@@ -328,8 +328,8 @@ public async Task NexusInvokesOnClient(Type type)
328328
var (client1, clientNexus1) = CreateClient(CreateClientConfig(type));
329329
var (client2, clientNexus2) = CreateClient(CreateClientConfig(type));
330330
#pragma warning disable CS1998
331-
clientNexus1.ClientTaskEvent = async _ => invocationCount++;
332-
clientNexus2.ClientTaskEvent = async _ => invocationCount++;
331+
clientNexus1.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
332+
clientNexus2.ClientTaskEvent = async _ => Interlocked.Increment(ref invocationCount);
333333
#pragma warning restore CS1998
334334
await server.StartAsync().Timeout(1);
335335

@@ -356,7 +356,7 @@ public async Task NexusInvokesOnClients(Type type)
356356
connectedNexus.OnConnectedEvent = async nexus =>
357357
{
358358
// Second connection
359-
if (++connectedCount == 2)
359+
if (Interlocked.Increment(ref connectedCount) == 2)
360360
{
361361
// ReSharper disable once AccessToModifiedClosure
362362
var clientIds = server!.GetContext().Clients.GetIds().ToArray();

src/NexNet.IntegrationTests/Pipes/NexusChannelReaderTests.cs

+8-24
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,12 @@
66

77
namespace NexNet.IntegrationTests.Pipes;
88

9-
internal class NexusChannelReaderTests
9+
internal class NexusChannelReaderTests : NexusChannelTestBase
1010
{
1111
[Test]
1212
public async Task ReadsData()
1313
{
14-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
14+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
1515
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
1616

1717
var baseObject = ComplexMessage.Random();
@@ -48,7 +48,7 @@ public async Task ReadsData()
4848
[Test]
4949
public async Task ReadsPartialData()
5050
{
51-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
51+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
5252
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
5353

5454
var baseObject = ComplexMessage.Random();
@@ -87,7 +87,7 @@ public async Task ReadsPartialData()
8787
[Test]
8888
public async Task CancelsReadDelayed()
8989
{
90-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
90+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
9191
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
9292
var cts = new CancellationTokenSource(100);
9393
var result = await reader.ReadAsync(cts.Token).Timeout(1);
@@ -100,7 +100,7 @@ public async Task CancelsReadDelayed()
100100
[Test]
101101
public async Task CancelsReadImmediate()
102102
{
103-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
103+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
104104
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
105105
var cts = new CancellationTokenSource(100);
106106
cts.Cancel();
@@ -114,7 +114,7 @@ public async Task CancelsReadImmediate()
114114
[Test]
115115
public async Task Completes()
116116
{
117-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
117+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
118118
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
119119
// ReSharper disable once MethodHasAsyncOverload
120120
await pipeReader.CompleteAsync();
@@ -130,7 +130,7 @@ public async Task Completes()
130130
public async Task WaitsForFullData()
131131
{
132132
var tcs = new TaskCompletionSource();
133-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
133+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
134134
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
135135
var baseObject = ComplexMessage.Random();
136136
var bytes = new ReadOnlySequence<byte>(MemoryPackSerializer.Serialize(baseObject));
@@ -153,7 +153,7 @@ public async Task WaitsForFullData()
153153
public async Task ReadsMultiple()
154154
{
155155
const int iterations = 1000;
156-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
156+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
157157
var reader = new NexusChannelReader<ComplexMessage>(pipeReader);
158158
var baseObject = ComplexMessage.Random();
159159
var bytes = new ReadOnlySequence<byte>(MemoryPackSerializer.Serialize(baseObject));
@@ -171,20 +171,4 @@ public async Task ReadsMultiple()
171171

172172
Assert.AreEqual(iterations, result.Count());
173173
}
174-
private class DummyPipeStateManager : IPipeStateManager
175-
{
176-
public ushort Id { get; } = 0;
177-
public ValueTask NotifyState()
178-
{
179-
return default;
180-
}
181-
182-
public bool UpdateState(NexusDuplexPipe.State updatedState, bool remove = false)
183-
{
184-
CurrentState |= updatedState;
185-
return true;
186-
}
187-
188-
public NexusDuplexPipe.State CurrentState { get; private set; } = NexusDuplexPipe.State.Ready;
189-
}
190174
}

src/NexNet.IntegrationTests/Pipes/NexusChannelReaderUnmanagedTests.cs

+8-8
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ internal class NexusChannelReaderUnmanagedTests
1919
public async Task ReadsData<T>(T inputData)
2020
where T : unmanaged
2121
{
22-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
22+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
2323

2424
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
2525
await pipeReader.BufferData(Utilities.GetBytes(inputData));
@@ -32,7 +32,7 @@ public async Task ReadsData<T>(T inputData)
3232
public async Task CancelsReadDelayed<T>()
3333
where T : unmanaged
3434
{
35-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
35+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
3636
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);
3737
var cts = new CancellationTokenSource(100);
3838
var result = await reader.ReadAsync(cts.Token).Timeout(1);
@@ -45,7 +45,7 @@ public async Task CancelsReadDelayed<T>()
4545
[Test]
4646
public async Task CancelsReadImmediate()
4747
{
48-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
48+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
4949
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);
5050
var cts = new CancellationTokenSource(100);
5151
cts.Cancel();
@@ -59,7 +59,7 @@ public async Task CancelsReadImmediate()
5959
[Test]
6060
public async Task Completes()
6161
{
62-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
62+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
6363
var reader = new NexusChannelReaderUnmanaged<long>(pipeReader);
6464

6565
// ReSharper disable once MethodHasAsyncOverload
@@ -87,7 +87,7 @@ public async Task WaitsForFullData<T>(T inputData)
8787
where T : unmanaged
8888
{
8989
var tcs = new TaskCompletionSource();
90-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
90+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
9191
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
9292

9393
_ = Task.Run(async () =>
@@ -121,7 +121,7 @@ public async Task ReadsMultiple<T>(T inputData)
121121
where T : unmanaged
122122
{
123123
const int iterations = 1000;
124-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
124+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
125125
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
126126
var data = Utilities.GetBytes(inputData);
127127
var count = 0;
@@ -155,7 +155,7 @@ public async Task ReadsMultipleParallel<T>(T inputData)
155155
where T : unmanaged
156156
{
157157
const int iterations = 1000;
158-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
158+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
159159
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
160160
var data = Utilities.GetBytes(inputData);
161161
var count = 0;
@@ -202,7 +202,7 @@ await Task.Run(async () =>
202202
public async Task ReadsWithPartialWrites<T>(T inputData)
203203
where T : unmanaged
204204
{
205-
var pipeReader = new NexusPipeReader(new DummyPipeStateManager());
205+
var pipeReader = new NexusPipeReader(new DummyPipeStateManager(), null, true, 0, 0, 0);
206206
var reader = new NexusChannelReaderUnmanaged<T>(pipeReader);
207207
var data = Utilities.GetBytes(inputData);
208208

0 commit comments

Comments
 (0)