Skip to content

Commit

Permalink
Expose LastFailure and LastResult from WorkflowInfo (#136)
Browse files Browse the repository at this point in the history
Fixes #133
  • Loading branch information
cretz authored Sep 22, 2023
1 parent 9d56fe8 commit 5cf7a4a
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 55 deletions.
5 changes: 5 additions & 0 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -146,13 +146,18 @@ public WorkflowInstance(WorkflowInstanceDetails details)
RunId: start.ParentWorkflowInfo.RunId,
WorkflowId: start.ParentWorkflowInfo.WorkflowId);
}
var lastFailure = start.ContinuedFailure == null ?
null : failureConverter.ToException(start.ContinuedFailure, PayloadConverter);
var lastResult = start.LastCompletionResult?.Payloads_.Select(v => new RawValue(v)).ToArray();
static string? NonEmptyOrNull(string s) => string.IsNullOrEmpty(s) ? null : s;
Info = new(
Attempt: start.Attempt,
ContinuedRunId: NonEmptyOrNull(start.ContinuedFromExecutionRunId),
CronSchedule: NonEmptyOrNull(start.CronSchedule),
ExecutionTimeout: start.WorkflowExecutionTimeout?.ToTimeSpan(),
Headers: start.Headers,
LastFailure: lastFailure,
LastResult: lastResult,
Namespace: details.Namespace,
Parent: parent,
RetryPolicy: start.RetryPolicy == null ? null : Common.RetryPolicy.FromProto(start.RetryPolicy),
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/WorkflowInfo.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using Temporalio.Common;
using Temporalio.Converters;

namespace Temporalio.Workflows
{
Expand All @@ -12,6 +13,8 @@ namespace Temporalio.Workflows
/// <param name="CronSchedule">Cron schedule if applicable.</param>
/// <param name="ExecutionTimeout">Execution timeout for the workflow.</param>
/// <param name="Headers">Headers from when the workflow was started.</param>
/// <param name="LastFailure">Failure if this workflow run is a continuation of a failure.</param>
/// <param name="LastResult">Successful result if this workflow is a continuation of a success.</param>
/// <param name="Namespace">Namespace for the workflow.</param>
/// <param name="Parent">Parent information for the workflow if this is a child.</param>
/// <param name="RetryPolicy">Retry policy for the workflow.</param>
Expand All @@ -32,6 +35,8 @@ public record WorkflowInfo(
string? CronSchedule,
TimeSpan? ExecutionTimeout,
IReadOnlyDictionary<string, Api.Common.V1.Payload>? Headers,
Exception? LastFailure,
IReadOnlyCollection<IRawValue>? LastResult,
string Namespace,
WorkflowInfo.ParentInfo? Parent,
RetryPolicy? RetryPolicy,
Expand Down
63 changes: 8 additions & 55 deletions tests/Temporalio.Tests/Client/TemporalClientScheduleTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public TemporalClientScheduleTests(ITestOutputHelper output, WorkflowEnvironment
[Fact]
public async Task CreateScheduleAsync_Basics_Succeeds()
{
await AssertNoSchedulesAsync();
await TestUtils.AssertNoSchedulesAsync(Client);

// Create a schedule with a lot of stuff
var arg = new KSWorkflowParams(new KSAction(Result: new("some result")));
Expand Down Expand Up @@ -202,13 +202,13 @@ await AssertMore.EqualEventuallyAsync(expectedIds, async () =>
});

// Delete when done
await DeleteAllSchedulesAsync();
await TestUtils.DeleteAllSchedulesAsync(Client);
}

[Fact]
public async Task CreateScheduleAsync_CalendarSpecDefaults_AreProper()
{
await AssertNoSchedulesAsync();
await TestUtils.AssertNoSchedulesAsync(Client);

var arg = new KSWorkflowParams(new KSAction(Result: new("some result")));
var handle = await Client.CreateScheduleAsync(
Expand Down Expand Up @@ -241,13 +241,13 @@ public async Task CreateScheduleAsync_CalendarSpecDefaults_AreProper()
}

// Delete when done
await DeleteAllSchedulesAsync();
await TestUtils.DeleteAllSchedulesAsync(Client);
}

[Fact]
public async Task CreateScheduleAsync_TriggerImmediately_Succeeds()
{
await AssertNoSchedulesAsync();
await TestUtils.AssertNoSchedulesAsync(Client);

// Create paused schedule that triggers immediately
var arg = new KSWorkflowParams(new KSAction(Result: new("some result")));
Expand All @@ -272,13 +272,13 @@ public async Task CreateScheduleAsync_TriggerImmediately_Succeeds()
await Client.GetWorkflowHandle(exec.WorkflowId, exec.FirstExecutionRunId).GetResultAsync<string>());

// Delete when done
await DeleteAllSchedulesAsync();
await TestUtils.DeleteAllSchedulesAsync(Client);
}

[Fact]
public async Task CreateScheduleAsync_Backfill_CreatesProperActions()
{
await AssertNoSchedulesAsync();
await TestUtils.AssertNoSchedulesAsync(Client);

// Create paused schedule that runs every minute and has two backfills
var now = DateTime.UtcNow;
Expand Down Expand Up @@ -326,53 +326,6 @@ await handle.BackfillAsync(new List<ScheduleBackfill>
Assert.Equal(6, (await handle.DescribeAsync()).Info.NumActions);

// Delete when done
await DeleteAllSchedulesAsync();
}

private async Task DeleteAllSchedulesAsync()
{
// We will try this 3 times
var tries = 0;
while (true)
{
await foreach (var sched in Client.ListSchedulesAsync())
{
try
{
await Client.GetScheduleHandle(sched.Id).DeleteAsync();
}
catch (RpcException e) when (e.Code == RpcException.StatusCode.NotFound)
{
// Ignore not-found errors
}
}
try
{
await AssertNoSchedulesAsync();
return;
}
catch
{
if (++tries >= 3)
{
throw;
}
}
}
}

private async Task AssertNoSchedulesAsync()
{
await AssertMore.EqualEventuallyAsync(
0,
async () =>
{
var count = 0;
await foreach (var sched in Client.ListSchedulesAsync())
{
count++;
}
return count;
});
await TestUtils.DeleteAllSchedulesAsync(Client);
}
}
49 changes: 49 additions & 0 deletions tests/Temporalio.Tests/TestUtils.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ namespace Temporalio.Tests;
using System.Net;
using System.Net.Sockets;
using Microsoft.Extensions.Logging;
using Temporalio.Client;
using Temporalio.Exceptions;

public static class TestUtils
{
Expand All @@ -23,6 +25,53 @@ public static int FreePort()
return port;
}

public static async Task DeleteAllSchedulesAsync(ITemporalClient client)
{
// We will try this 3 times
var tries = 0;
while (true)
{
await foreach (var sched in client.ListSchedulesAsync())
{
try
{
await client.GetScheduleHandle(sched.Id).DeleteAsync();
}
catch (RpcException e) when (e.Code == RpcException.StatusCode.NotFound)
{
// Ignore not-found errors
}
}
try
{
await AssertNoSchedulesAsync(client);
return;
}
catch
{
if (++tries >= 3)
{
throw;
}
}
}
}

public static async Task AssertNoSchedulesAsync(ITemporalClient client)
{
await AssertMore.EqualEventuallyAsync(
0,
async () =>
{
var count = 0;
await foreach (var sched in client.ListSchedulesAsync())
{
count++;
}
return count;
});
}

public record LogEntry(
LogLevel Level,
EventId EventId,
Expand Down
90 changes: 90 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Temporalio.Tests.Worker;
using Temporalio.Api.Enums.V1;
using Temporalio.Api.History.V1;
using Temporalio.Client;
using Temporalio.Client.Schedules;
using Temporalio.Common;
using Temporalio.Converters;
using Temporalio.Exceptions;
Expand Down Expand Up @@ -3265,6 +3266,95 @@ void AssertMetricDescriptionExists(string name, string description) =>
client);
}

[Workflow]
public class LastFailureWorkflow
{
[WorkflowRun]
public async Task RunAsync()
{
// First attempt fail, second attempt confirm failure is present
if (Workflow.Info.Attempt == 1)
{
throw new ApplicationFailureException(
"Intentional failure", details: new[] { "some detail" });
}
var err = Assert.IsType<ApplicationFailureException>(Workflow.Info.LastFailure);
Assert.Equal("Intentional failure", err.Message);
Assert.Equal("some detail", err.Details.ElementAt<string>(0));
}
}

[Fact]
public async Task ExecuteWorkflowAsync_LastFailure_ProperlyPresent()
{
await ExecuteWorkerAsync<LastFailureWorkflow>(async worker =>
{
await Env.Client.ExecuteWorkflowAsync(
(LastFailureWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!)
{
RetryPolicy = new() { MaximumAttempts = 2 },
});
});
}

[Workflow]
public class LastResultWorkflow
{
[WorkflowRun]
public async Task<string> RunAsync()
{
var maybeLastResult = Workflow.Info.LastResult?.SingleOrDefault();
if (maybeLastResult is { } lastResult)
{
var lastResultStr = Workflow.PayloadConverter.ToValue<string>(lastResult);
return $"last result: {lastResultStr}";
}
return "no result";
}
}

[Fact]
public async Task ExecuteWorkflowAsync_LastResult_ProperlyPresent()
{
await TestUtils.AssertNoSchedulesAsync(Client);

await ExecuteWorkerAsync<LastResultWorkflow>(async worker =>
{
// Create schedule, trigger twice, confirm second got result of first
var schedAction = ScheduleActionStartWorkflow.Create(
(LastResultWorkflow wf) => wf.RunAsync(),
new WorkflowOptions(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
var sched = await Client.CreateScheduleAsync(
"sched-id",
new(schedAction, new ScheduleSpec()) { State = new() { Paused = true } });
async Task<string[]> AllResultsAsync()
{
var desc = await sched.DescribeAsync();
return await Task.WhenAll(desc.Info.RecentActions.Select(async res =>
{
var action = res.Action as ScheduleActionExecutionStartWorkflow;
var handle = Client.GetWorkflowHandle(
action!.WorkflowId) with
{ ResultRunId = action.FirstExecutionRunId };
return await handle.GetResultAsync<string>();
}));
}

// Check first result
await sched.TriggerAsync();
await AssertMore.EqualEventuallyAsync(new string[] { "no result" }, AllResultsAsync);

// Check both results
await sched.TriggerAsync();
await AssertMore.EqualEventuallyAsync(
new string[] { "no result", "last result: no result" },
AllResultsAsync);
});

await TestUtils.DeleteAllSchedulesAsync(Client);
}

private async Task ExecuteWorkerAsync<TWf>(
Func<TemporalWorker, Task> action,
TemporalWorkerOptions? options = null,
Expand Down

0 comments on commit 5cf7a4a

Please sign in to comment.