Skip to content

Commit

Permalink
Merge pull request #304 from liammclennan/fix-end-to-end-tests
Browse files Browse the repository at this point in the history
Fix end to end tests
  • Loading branch information
nblumhardt authored Jan 2, 2024
2 parents 6bde141 + c2a7dbb commit 56c6a24
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 51 deletions.
2 changes: 1 addition & 1 deletion src/SeqCli/Cli/Commands/Bench/BenchCasesCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ namespace SeqCli.Cli.Commands.Bench;
class BenchCasesCollection
{
// ReSharper disable once CollectionNeverUpdated.Global
public IList<BenchCase> Cases { get; } = new List<BenchCase>();
public IList<QueryBenchCase> Cases { get; } = new List<QueryBenchCase>();
}
197 changes: 155 additions & 42 deletions src/SeqCli/Cli/Commands/Bench/BenchCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Seq.Api;
using Seq.Api.Model.Data;
using Seq.Api.Model.Signals;
using SeqCli.Cli.Features;
using SeqCli.Connection;
using SeqCli.Sample.Loader;
using SeqCli.Util;
using Serilog;
using Serilog.Context;
Expand Down Expand Up @@ -70,6 +73,8 @@ class BenchCommand : Command
string _reportingServerUrl = "";
string _reportingServerApiKey = "";
string _description = "";
bool _withIngestion = false;
bool _withQueries = false;

public BenchCommand(SeqConnectionFactory connectionFactory)
{
Expand All @@ -96,70 +101,102 @@ public BenchCommand(SeqConnectionFactory connectionFactory)
"description=",
"Optional description of the bench test run",
a => _description = a);
Options.Add(
"with-ingestion",
"Should the benchmark include sending events to Seq",
_ => _withIngestion = true);
Options.Add(
"with-queries",
"Should the benchmark include querying Seq",
_ => _withQueries = true);
}

protected override async Task<int> Run()
{
if (!_withIngestion && !_withQueries)
{
Log.Error("Use at least one of --with-ingestion and --with-queries");
return 1;
}

try
{
var (_, apiKey) = _connectionFactory.GetConnectionDetails(_connection);
var connection = _connectionFactory.Connect(_connection);
var seqVersion = (await connection.Client.GetRootAsync()).Version;
await using var reportingLogger = BuildReportingLogger();

var cases = ReadCases(_cases);
var runId = Guid.NewGuid().ToString("N")[..16];
CancellationTokenSource cancellationTokenSource = new ();
var cancellationToken = cancellationTokenSource.Token;

await using var reportingLogger = BuildReportingLogger();

using (!string.IsNullOrWhiteSpace(_description)
? LogContext.PushProperty("Description", _description)
: null)
{
reportingLogger.Information(
"Bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);
}

using (LogContext.PushProperty("RunId", runId))
using (LogContext.PushProperty("SeqVersion", seqVersion))
using (LogContext.PushProperty("WithIngestion", _withIngestion))
using (LogContext.PushProperty("WithQueries", _withQueries))
using (LogContext.PushProperty("Start", _range.Start))
using (LogContext.PushProperty("End", _range.End))
using (!string.IsNullOrWhiteSpace(_description)
? LogContext.PushProperty("Description", _description)
: null)
{
foreach (var c in cases.Cases.OrderBy(c => c.Id))
if (_withIngestion)
{
var timings = new BenchCaseTimings();
object? lastResult = null;
var t = IngestionBenchmark(reportingLogger, runId, connection, apiKey, seqVersion,
isQueryBench: _withQueries, cancellationToken)
.ContinueWith(t =>
{
if (t.Exception is not null)
{
return Console.Error.WriteLineAsync(t.Exception.Message);
}

foreach (var i in Enumerable.Range(1, _runs))
return Task.CompletedTask;
});

if (!_withQueries)
{
int benchDurationMs = 120_000;
await Task.Delay(benchDurationMs);
cancellationTokenSource.Cancel();

var response = await connection.Data.QueryAsync(
c.Query,
_range.Start,
_range.End,
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null
"select count(*) from stream group by time(1s)",
DateTime.Now.Add(-1 * TimeSpan.FromMilliseconds(benchDurationMs))
);

timings.PushElapsed(response.Statistics.ElapsedMilliseconds);

if (response.Rows != null)

if (response.Slices == null)
{
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
if (isScalarResult && i == _runs)
{
lastResult = response.Rows[0][0];
}
throw new Exception("Failed to query ingestion benchmark results");
}

var counts = response.Slices.Skip(30) // ignore the warmup
.Select(s => Convert.ToDouble(s.Rows[0][0])) // extract per-second counts
.Where(c => c > 10000) // ignore any very small values
.ToArray();
counts = counts.SkipLast(5).ToArray(); // ignore warmdown
var countsMean = counts.Sum() / counts.Length;
var countsRSD = QueryBenchCaseTimings.StandardDeviation(counts) / countsMean;

using (LogContext.PushProperty("EventsPerSecond", counts))
{
reportingLogger.Information(
"Ingestion benchmark {Description} ran for {RunDuration:N0}ms; ingested {TotalIngested:N0} "
+ "at {EventsPerMinute:N0}events/min; with RSD {RelativeStandardDeviationPercentage,4:N1}%",
_description,
benchDurationMs,
counts.Sum(),
countsMean * 60,
countsRSD * 100);
}
}
}

using (lastResult != null ? LogContext.PushProperty("LastResult", lastResult) : null)
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
: null)
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
using (LogContext.PushProperty("Query", c.Query))
{
reportingLogger.Information(
"Case {Id,-40} mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
c.Id, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
}
if (_withQueries)
{
var collectedTimings = await QueryBenchmark(reportingLogger, runId, connection, seqVersion);
collectedTimings.LogSummary(_description);
cancellationTokenSource.Cancel();
}
}

Expand All @@ -172,6 +209,82 @@ protected override async Task<int> Run()
}
}

async Task IngestionBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string? apiKey,
string seqVersion, bool isQueryBench, CancellationToken cancellationToken = default)
{
reportingLogger.Information(
"Ingestion bench run {RunId} against {ServerUrl} ({SeqVersion})",
runId, connection.Client.ServerUrl, seqVersion);

if (isQueryBench)
{
var simulationTasks = Enumerable.Range(1, 500)
.Select(i => Simulation.RunAsync(connection, apiKey, 10000, echoToStdout: false, cancellationToken))
.ToArray();
await Task.Delay(20_000); // how long to ingest before beginning queries
}
else
{
var simulationTasks = Enumerable.Range(1, 2000)
.Select(i => Simulation.RunAsync(connection, apiKey, 10000, echoToStdout: false, cancellationToken))
.ToArray();
}
}

async Task<QueryBenchRunResults> QueryBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string seqVersion)
{
var cases = ReadCases(_cases);
QueryBenchRunResults queryBenchRunResults = new(reportingLogger);
reportingLogger.Information(
"Query benchmark run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);


foreach (var c in cases.Cases.OrderBy(c => c.Id)
.Concat(new [] { QueryBenchRunResults.FINAL_COUNT_CASE }))
{
var timings = new QueryBenchCaseTimings(c);
queryBenchRunResults.Add(timings);

foreach (var i in Enumerable.Range(1, _runs))
{
var response = await connection.Data.QueryAsync(
c.Query,
_range.Start,
_range.End,
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null,
null,
TimeSpan.FromMinutes(4)
);

timings.PushElapsed(response.Statistics.ElapsedMilliseconds);

if (response.Rows != null)
{
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
if (isScalarResult && i == _runs)
{
timings.LastResult = response.Rows[0][0];
}
}
}

using (timings.LastResult != null ? LogContext.PushProperty("LastResult", timings.LastResult) : null)
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
: null)
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
using (LogContext.PushProperty("Query", c.Query))
{
reportingLogger.Information(
"Case {Id,-40} ({LastResult}) mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
c.Id, timings.LastResult, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
}
}

return queryBenchRunResults;
}

/// <summary>
/// Build a second Serilog logger for logging benchmark results.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace SeqCli.Cli.Commands.Bench;

// ReSharper disable ClassNeverInstantiated.Global AutoPropertyCanBeMadeGetOnly.Global UnusedAutoPropertyAccessor.Global

class BenchCase
class QueryBenchCase
{
public string Id { get; set; } = null!;
public string Query { get; set; } = null!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,40 @@
namespace SeqCli.Cli.Commands.Bench;

/*
* Collects benchmarking elapsed time measurements and calculates statistics.
* Collects benchmarking elapsed time measurements and calculates statistics.
*
* The results for one bench case.
*/
class BenchCaseTimings
class QueryBenchCaseTimings
{
readonly QueryBenchCase _queryBenchCase;
readonly List<double> _timings = new();

object? _lastResult;

public double MeanElapsed => _timings.Sum() / _timings.Count;
public double MinElapsed => _timings.Min();
public double MaxElapsed => _timings.Max();
public double FirstElapsed => _timings.First();
public double StandardDeviationElapsed => StandardDeviation(_timings);
public double RelativeStandardDeviationElapsed => StandardDeviation(_timings) / MeanElapsed;

public object? LastResult
{
get => _lastResult;
set => _lastResult = value;
}

public QueryBenchCaseTimings(QueryBenchCase queryBenchCase)
{
_queryBenchCase = queryBenchCase;
}

public void PushElapsed(double elapsed)
{
_timings.Add(elapsed);
}

static double StandardDeviation(ICollection<double> population)
public static double StandardDeviation(ICollection<double> population)
{
if (population.Count < 2)
{
Expand All @@ -47,4 +62,6 @@ static double StandardDeviation(ICollection<double> population)
var mean = population.Sum() / population.Count;
return Math.Sqrt(population.Select(e => Math.Pow(e - mean, 2)).Sum() / (population.Count - 1));
}

public string Id => _queryBenchCase.Id;
}
55 changes: 55 additions & 0 deletions src/SeqCli/Cli/Commands/Bench/QueryBenchRunResults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Serilog.Core;

namespace SeqCli.Cli.Commands.Bench;

class QueryBenchRunResults
{
readonly Logger _reportingLogger;
List<QueryBenchCaseTimings> _collectedTimings = new();

public static QueryBenchCase FINAL_COUNT_CASE = new()
{
Id = "final-count-star",
Query = "select count(*) from stream",
};

public QueryBenchRunResults(Logger reportingLogger)
{
_reportingLogger = reportingLogger;
}

public void Add(QueryBenchCaseTimings caseTimings)
{
_collectedTimings.Add(caseTimings);
}

public void LogSummary(string description)
{
_reportingLogger.Information(
"Query benchmark {Description} complete in {TotalMeanElapsed:N0} ms with {MeanRelativeStandardDeviationPercentage:N1}% deviation, processed {FinalEventCount:N0} events at {EventsPerMs:N0} events/ms",
description,
TotalMeanElapsed(),
MeanRelativeStandardDeviationPercentage(),
FinalEventCount(),
FinalEventCount() * _collectedTimings.Count / TotalMeanElapsed());
}

private double TotalMeanElapsed()
{
return _collectedTimings.Sum(c => c.MeanElapsed);
}

private double MeanRelativeStandardDeviationPercentage()
{
return _collectedTimings.Average(c => c.RelativeStandardDeviationElapsed) * 100;
}

private int FinalEventCount()
{
var benchCase = _collectedTimings.Single(c => c.Id == FINAL_COUNT_CASE.Id);
return Convert.ToInt32(benchCase.LastResult);
}
}
Loading

0 comments on commit 56c6a24

Please sign in to comment.