Skip to content

Commit

Permalink
Access current update info inside update handler (#268)
Browse files Browse the repository at this point in the history
Fixes #265
  • Loading branch information
cretz authored Jun 6, 2024
1 parent 5ed423f commit c299cec
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 0 deletions.
7 changes: 7 additions & 0 deletions src/Temporalio/Worker/WorkflowInstance.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ namespace Temporalio.Worker
internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowContext
{
private static readonly string[] Newlines = new[] { "\r", "\n", "\r\n" };
private static readonly AsyncLocal<WorkflowUpdateInfo> CurrentUpdateInfoLocal = new();

private readonly TaskFactory taskFactory;
private readonly IFailureConverter failureConverter;
Expand Down Expand Up @@ -216,6 +217,9 @@ public WorkflowInstance(WorkflowInstanceDetails details)
/// <inheritdoc />
public int CurrentHistorySize { get; private set; }

/// <inheritdoc />
public WorkflowUpdateInfo? CurrentUpdateInfo => CurrentUpdateInfoLocal.Value;

/// <inheritdoc />
public WorkflowQueryDefinition? DynamicQuery
{
Expand Down Expand Up @@ -899,6 +903,9 @@ private void ApplyDoUpdate(DoUpdate update)
// Queue it up so it can run in workflow environment
_ = QueueNewTaskAsync(() =>
{
// Set the current update for the life of this task
CurrentUpdateInfoLocal.Value = new(Id: update.Id, Name: update.Name);

// Find update definition or reject
var updates = mutableUpdates.IsValueCreated ? mutableUpdates.Value : Definition.Updates;
if (!updates.TryGetValue(update.Name, out var updateDefn))
Expand Down
5 changes: 5 additions & 0 deletions src/Temporalio/Workflows/IWorkflowContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,11 @@ internal interface IWorkflowContext
/// </summary>
int CurrentHistorySize { get; }

/// <summary>
/// Gets value for <see cref="Workflow.CurrentUpdateInfo" />.
/// </summary>
WorkflowUpdateInfo? CurrentUpdateInfo { get; }

/// <summary>
/// Gets or sets value for <see cref="Workflow.DynamicQuery" />.
/// </summary>
Expand Down
10 changes: 10 additions & 0 deletions src/Temporalio/Workflows/Workflow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,16 @@ public static class Workflow
/// </remarks>
public static int CurrentHistorySize => Context.CurrentHistorySize;

/// <summary>
/// Gets the current workflow update handler for the caller if any.
/// </summary>
/// <remarks>
/// This set via a <see cref="AsyncLocal{T}" /> and therefore only visible inside the
/// handler and tasks it creates.
/// </remarks>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public static WorkflowUpdateInfo? CurrentUpdateInfo => Context.CurrentUpdateInfo;

/// <summary>
/// Gets or sets the current dynamic query handler. This can be null for no dynamic query
/// handling.
Expand Down
16 changes: 16 additions & 0 deletions src/Temporalio/Workflows/WorkflowUpdateInfo.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace Temporalio.Workflows
{
/// <summary>
/// Information about the current update. This set via a
/// <see cref="System.Threading.AsyncLocal{T}" /> and therefore only visible inside the handler
/// and tasks it creates.
/// </summary>
/// <param name="Id">Current update ID.</param>
/// <param name="Name">Current update name.</param>
/// <remarks>WARNING: Workflow update is experimental and APIs may change.</remarks>
public record WorkflowUpdateInfo(
string Id,
string Name)
{
}
}
76 changes: 76 additions & 0 deletions tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4602,6 +4602,82 @@ public async Task ExecuteAsync_InvalidWorker_Fails()
Assert.Contains("Worker validation failed", err.Message);
}

[Workflow]
public class CurrentUpdateWorkflow
{
private readonly List<Task<string>> pendingGetUpdateIdTasks = new();

[WorkflowRun]
public async Task<string[]> RunAsync()
{
// Confirm no info
Assert.Null(Workflow.CurrentUpdateInfo);

// Wait for all tasks then return full set
await Workflow.WaitConditionAsync(() => pendingGetUpdateIdTasks.Count == 5);
var res = await Task.WhenAll(pendingGetUpdateIdTasks);

// Confirm again null then return
Assert.Null(Workflow.CurrentUpdateInfo);
return res;
}

[WorkflowUpdate]
public async Task<string> DoUpdateAsync()
{
Assert.Equal("DoUpdate", Workflow.CurrentUpdateInfo?.Name);
// Check that the simple helper awaited has the ID
Assert.Equal(Workflow.CurrentUpdateInfo?.Id, await GetUpdateIdAsync());

// Also schedule the task and wait for it in the main workflow to confirm it still gets
// the update ID
pendingGetUpdateIdTasks.Add(Task.Factory.StartNew(() => GetUpdateIdAsync()).Unwrap());

// Return
return Workflow.CurrentUpdateInfo?.Id ??
throw new InvalidOperationException("Missing update");
}

[WorkflowUpdateValidator(nameof(DoUpdateAsync))]
public void ValidateDoUpdate() =>
Assert.Equal("DoUpdate", Workflow.CurrentUpdateInfo?.Name);

private async Task<string> GetUpdateIdAsync()
{
await Workflow.DelayAsync(1);
return Workflow.CurrentUpdateInfo?.Id ??
throw new InvalidOperationException("Missing update");
}
}

[Fact]
public async Task ExecuteWorkflowAsync_CurrentUpdate_HasInfo()
{
await ExecuteWorkerAsync<CurrentUpdateWorkflow>(
async worker =>
{
// Start
var handle = await Client.StartWorkflowAsync(
(CurrentUpdateWorkflow wf) => wf.RunAsync(),
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));

// Issue 5 updates concurrently and confirm they have the right IDs
var expected = new[] { "update1", "update2", "update3", "update4", "update5" };
var actual = await Task.WhenAll(
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync(), new("update1")),
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync(), new("update2")),
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync(), new("update3")),
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync(), new("update4")),
handle.ExecuteUpdateAsync(wf => wf.DoUpdateAsync(), new("update5")));
Assert.Equal(
new[] { "update1", "update2", "update3", "update4", "update5" }.ToHashSet(),
actual.ToHashSet());
Assert.Equal(
new[] { "update1", "update2", "update3", "update4", "update5" }.ToHashSet(),
(await handle.GetResultAsync()).ToHashSet());
});
}

internal static Task AssertTaskFailureContainsEventuallyAsync(
WorkflowHandle handle, string messageContains)
{
Expand Down

0 comments on commit c299cec

Please sign in to comment.