Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix end to end tests #304

Merged
merged 6 commits into from
Jan 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/SeqCli/Cli/Commands/Bench/BenchCasesCollection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ namespace SeqCli.Cli.Commands.Bench;
class BenchCasesCollection
{
// ReSharper disable once CollectionNeverUpdated.Global
public IList<BenchCase> Cases { get; } = new List<BenchCase>();
public IList<QueryBenchCase> Cases { get; } = new List<QueryBenchCase>();
}
197 changes: 155 additions & 42 deletions src/SeqCli/Cli/Commands/Bench/BenchCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,18 @@
// limitations under the License.

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Security.Cryptography;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using Seq.Api;
using Seq.Api.Model.Data;
using Seq.Api.Model.Signals;
using SeqCli.Cli.Features;
using SeqCli.Connection;
using SeqCli.Sample.Loader;
using SeqCli.Util;
using Serilog;
using Serilog.Context;
Expand Down Expand Up @@ -70,6 +73,8 @@ class BenchCommand : Command
string _reportingServerUrl = "";
string _reportingServerApiKey = "";
string _description = "";
bool _withIngestion = false;
bool _withQueries = false;

public BenchCommand(SeqConnectionFactory connectionFactory)
{
Expand All @@ -96,70 +101,102 @@ public BenchCommand(SeqConnectionFactory connectionFactory)
"description=",
"Optional description of the bench test run",
a => _description = a);
Options.Add(
"with-ingestion",
"Should the benchmark include sending events to Seq",
_ => _withIngestion = true);
Options.Add(
"with-queries",
"Should the benchmark include querying Seq",
_ => _withQueries = true);
}

protected override async Task<int> Run()
{
if (!_withIngestion && !_withQueries)
{
Log.Error("Use at least one of --with-ingestion and --with-queries");
return 1;
}

try
{
var (_, apiKey) = _connectionFactory.GetConnectionDetails(_connection);
var connection = _connectionFactory.Connect(_connection);
var seqVersion = (await connection.Client.GetRootAsync()).Version;
await using var reportingLogger = BuildReportingLogger();

var cases = ReadCases(_cases);
var runId = Guid.NewGuid().ToString("N")[..16];
CancellationTokenSource cancellationTokenSource = new ();
var cancellationToken = cancellationTokenSource.Token;

await using var reportingLogger = BuildReportingLogger();

using (!string.IsNullOrWhiteSpace(_description)
? LogContext.PushProperty("Description", _description)
: null)
{
reportingLogger.Information(
"Bench run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);
}

using (LogContext.PushProperty("RunId", runId))
using (LogContext.PushProperty("SeqVersion", seqVersion))
using (LogContext.PushProperty("WithIngestion", _withIngestion))
using (LogContext.PushProperty("WithQueries", _withQueries))
using (LogContext.PushProperty("Start", _range.Start))
using (LogContext.PushProperty("End", _range.End))
using (!string.IsNullOrWhiteSpace(_description)
? LogContext.PushProperty("Description", _description)
: null)
{
foreach (var c in cases.Cases.OrderBy(c => c.Id))
if (_withIngestion)
{
var timings = new BenchCaseTimings();
object? lastResult = null;
var t = IngestionBenchmark(reportingLogger, runId, connection, apiKey, seqVersion,
isQueryBench: _withQueries, cancellationToken)
.ContinueWith(t =>
{
if (t.Exception is not null)
{
return Console.Error.WriteLineAsync(t.Exception.Message);
}

foreach (var i in Enumerable.Range(1, _runs))
return Task.CompletedTask;
});

if (!_withQueries)
{
int benchDurationMs = 120_000;
await Task.Delay(benchDurationMs);
cancellationTokenSource.Cancel();

var response = await connection.Data.QueryAsync(
c.Query,
_range.Start,
_range.End,
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null
"select count(*) from stream group by time(1s)",
DateTime.Now.Add(-1 * TimeSpan.FromMilliseconds(benchDurationMs))
);

timings.PushElapsed(response.Statistics.ElapsedMilliseconds);

if (response.Rows != null)

if (response.Slices == null)
{
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
if (isScalarResult && i == _runs)
{
lastResult = response.Rows[0][0];
}
throw new Exception("Failed to query ingestion benchmark results");
}

var counts = response.Slices.Skip(30) // ignore the warmup
.Select(s => Convert.ToDouble(s.Rows[0][0])) // extract per-second counts
.Where(c => c > 10000) // ignore any very small values
.ToArray();
counts = counts.SkipLast(5).ToArray(); // ignore warmdown
var countsMean = counts.Sum() / counts.Length;
var countsRSD = QueryBenchCaseTimings.StandardDeviation(counts) / countsMean;

using (LogContext.PushProperty("EventsPerSecond", counts))
{
reportingLogger.Information(
"Ingestion benchmark {Description} ran for {RunDuration:N0}ms; ingested {TotalIngested:N0} "
+ "at {EventsPerMinute:N0}events/min; with RSD {RelativeStandardDeviationPercentage,4:N1}%",
_description,
benchDurationMs,
counts.Sum(),
countsMean * 60,
countsRSD * 100);
}
}
}

using (lastResult != null ? LogContext.PushProperty("LastResult", lastResult) : null)
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
: null)
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
using (LogContext.PushProperty("Query", c.Query))
{
reportingLogger.Information(
"Case {Id,-40} mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
c.Id, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
}
if (_withQueries)
{
var collectedTimings = await QueryBenchmark(reportingLogger, runId, connection, seqVersion);
collectedTimings.LogSummary(_description);
cancellationTokenSource.Cancel();
}
}

Expand All @@ -172,6 +209,82 @@ protected override async Task<int> Run()
}
}

async Task IngestionBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string? apiKey,
string seqVersion, bool isQueryBench, CancellationToken cancellationToken = default)
{
reportingLogger.Information(
"Ingestion bench run {RunId} against {ServerUrl} ({SeqVersion})",
runId, connection.Client.ServerUrl, seqVersion);

if (isQueryBench)
{
var simulationTasks = Enumerable.Range(1, 500)
.Select(i => Simulation.RunAsync(connection, apiKey, 10000, echoToStdout: false, cancellationToken))
.ToArray();
await Task.Delay(20_000); // how long to ingest before beginning queries
}
else
{
var simulationTasks = Enumerable.Range(1, 2000)
.Select(i => Simulation.RunAsync(connection, apiKey, 10000, echoToStdout: false, cancellationToken))
.ToArray();
}
}

async Task<QueryBenchRunResults> QueryBenchmark(Logger reportingLogger, string runId, SeqConnection connection, string seqVersion)
{
var cases = ReadCases(_cases);
QueryBenchRunResults queryBenchRunResults = new(reportingLogger);
reportingLogger.Information(
"Query benchmark run {RunId} against {ServerUrl} ({SeqVersion}); {CaseCount} cases, {Runs} runs, from {Start} to {End}",
runId, connection.Client.ServerUrl, seqVersion, cases.Cases.Count, _runs, _range.Start, _range.End);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra whitespace here


foreach (var c in cases.Cases.OrderBy(c => c.Id)
.Concat(new [] { QueryBenchRunResults.FINAL_COUNT_CASE }))
{
var timings = new QueryBenchCaseTimings(c);
queryBenchRunResults.Add(timings);

foreach (var i in Enumerable.Range(1, _runs))
{
var response = await connection.Data.QueryAsync(
c.Query,
_range.Start,
_range.End,
c.SignalExpression != null ? SignalExpressionPart.Signal(c.SignalExpression) : null,
null,
TimeSpan.FromMinutes(4)
);

timings.PushElapsed(response.Statistics.ElapsedMilliseconds);

if (response.Rows != null)
{
var isScalarResult = response.Rows.Length == 1 && response.Rows[0].Length == 1;
if (isScalarResult && i == _runs)
{
timings.LastResult = response.Rows[0][0];
}
}
}

using (timings.LastResult != null ? LogContext.PushProperty("LastResult", timings.LastResult) : null)
using (!string.IsNullOrWhiteSpace(c.SignalExpression)
? LogContext.PushProperty("SignalExpression", c.SignalExpression)
: null)
using (LogContext.PushProperty("StandardDeviationElapsed", timings.StandardDeviationElapsed))
using (LogContext.PushProperty("Query", c.Query))
{
reportingLogger.Information(
"Case {Id,-40} ({LastResult}) mean {MeanElapsed,5:N0} ms (first {FirstElapsed,5:N0} ms, min {MinElapsed,5:N0} ms, max {MaxElapsed,5:N0} ms, RSD {RelativeStandardDeviationElapsed,4:N2})",
c.Id, timings.LastResult, timings.MeanElapsed, timings.FirstElapsed, timings.MinElapsed, timings.MaxElapsed, timings.RelativeStandardDeviationElapsed);
}
}

return queryBenchRunResults;
}

/// <summary>
/// Build a second Serilog logger for logging benchmark results.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ namespace SeqCli.Cli.Commands.Bench;

// ReSharper disable ClassNeverInstantiated.Global AutoPropertyCanBeMadeGetOnly.Global UnusedAutoPropertyAccessor.Global

class BenchCase
class QueryBenchCase
{
public string Id { get; set; } = null!;
public string Query { get; set; } = null!;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,40 @@
namespace SeqCli.Cli.Commands.Bench;

/*
* Collects benchmarking elapsed time measurements and calculates statistics.
* Collects benchmarking elapsed time measurements and calculates statistics.
*
* The results for one bench case.
*/
class BenchCaseTimings
class QueryBenchCaseTimings
{
readonly QueryBenchCase _queryBenchCase;
readonly List<double> _timings = new();

object? _lastResult;

public double MeanElapsed => _timings.Sum() / _timings.Count;
public double MinElapsed => _timings.Min();
public double MaxElapsed => _timings.Max();
public double FirstElapsed => _timings.First();
public double StandardDeviationElapsed => StandardDeviation(_timings);
public double RelativeStandardDeviationElapsed => StandardDeviation(_timings) / MeanElapsed;

public object? LastResult
{
get => _lastResult;
set => _lastResult = value;
}

public QueryBenchCaseTimings(QueryBenchCase queryBenchCase)
{
_queryBenchCase = queryBenchCase;
}

public void PushElapsed(double elapsed)
{
_timings.Add(elapsed);
}

static double StandardDeviation(ICollection<double> population)
public static double StandardDeviation(ICollection<double> population)
{
if (population.Count < 2)
{
Expand All @@ -47,4 +62,6 @@ static double StandardDeviation(ICollection<double> population)
var mean = population.Sum() / population.Count;
return Math.Sqrt(population.Select(e => Math.Pow(e - mean, 2)).Sum() / (population.Count - 1));
}

public string Id => _queryBenchCase.Id;
}
55 changes: 55 additions & 0 deletions src/SeqCli/Cli/Commands/Bench/QueryBenchRunResults.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
using System;
using System.Collections.Generic;
using System.Linq;
using Serilog.Core;

namespace SeqCli.Cli.Commands.Bench;

class QueryBenchRunResults
{
readonly Logger _reportingLogger;
List<QueryBenchCaseTimings> _collectedTimings = new();

public static QueryBenchCase FINAL_COUNT_CASE = new()
{
Id = "final-count-star",
Query = "select count(*) from stream",
};

public QueryBenchRunResults(Logger reportingLogger)
{
_reportingLogger = reportingLogger;
}

public void Add(QueryBenchCaseTimings caseTimings)
{
_collectedTimings.Add(caseTimings);
}

public void LogSummary(string description)
{
_reportingLogger.Information(
"Query benchmark {Description} complete in {TotalMeanElapsed:N0} ms with {MeanRelativeStandardDeviationPercentage:N1}% deviation, processed {FinalEventCount:N0} events at {EventsPerMs:N0} events/ms",
description,
TotalMeanElapsed(),
MeanRelativeStandardDeviationPercentage(),
FinalEventCount(),
FinalEventCount() * _collectedTimings.Count / TotalMeanElapsed());
}

private double TotalMeanElapsed()
{
return _collectedTimings.Sum(c => c.MeanElapsed);
}

private double MeanRelativeStandardDeviationPercentage()
{
return _collectedTimings.Average(c => c.RelativeStandardDeviationElapsed) * 100;
}

private int FinalEventCount()
{
var benchCase = _collectedTimings.Single(c => c.Id == FINAL_COUNT_CASE.Id);
return Convert.ToInt32(benchCase.LastResult);
}
}
Loading