Skip to content

Commit

Permalink
Workflow update (#142)
Browse files Browse the repository at this point in the history
Fixes #100
  • Loading branch information
cretz authored Oct 25, 2023
1 parent bd986d7 commit ef02df3
Show file tree
Hide file tree
Showing 32 changed files with 3,524 additions and 343 deletions.
32 changes: 22 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,8 @@ public class GreetingWorkflow
}
}

[WorkflowSignal]
// WARNING: Workflow updates are experimental
[WorkflowUpdate]
public async Task UpdateGreetingParamsAsync(GreetingParams greetingParams) =>
this.greetingParamsUpdate = greetingParams;

Expand Down Expand Up @@ -467,6 +468,17 @@ Attributes that can be applied:
* `Dynamic = true` can be set for the query which makes the query a dynamic query meaning it will be called when
no other queries match. The call must accept a `string` for the query name and `Temporalio.Converters.IRawValue[]`
for the arguments. Only one dynamic query may be present on a workflow.
* `[WorkflowUpdate]` attribute may be present on any public method that handles updates.
* Update methods must return a `Task` (can be a `Task<TResult>`).
* The attribute can have a string argument for the update name. Otherwise the name is defaulted to the unqualified
method name with `Async` trimmed off the end if it is present.
* This attribute is not inherited and therefore must be explicitly set on any override.
* `Dynamic = true` can be set for the update which makes the update a dynamic update, meaning it will be called when
no other updates match. The call must accept a `string` for the update name and `Temporalio.Converters.IRawValue[]`
for the arguments. Only one dynamic update may be present on a workflow.
* A validator method can be created that is marked with the `[WorkflowUpdateValidator(nameof(MyUpdateMethod))]`
attribute. It must be `void` but accept the exact same parameters as the update method. This must be a read-only
method and if an exception is thrown, the update is failed without being stored in history.

##### Workflow Inheritance

Expand All @@ -482,9 +494,9 @@ strategy was intentionally done to avoid diamond problems with workflows and to
is a workflow (including the name defaulted) and what its entry point is. A workflow can only have one `[WorkflowRun]`
method.

`[WorkflowSignal]` and `[WorkflowQuery]` methods can be inherited from base classes/interfaces if the method is not
overridden. However, if the method is declared in the subclass, it must also have these attributes. The attributes
themselves are not inherited.
`[WorkflowSignal]`, `[WorkflowQuery]`, and `[WorkflowUpdate]` methods can be inherited from base classes/interfaces if
the method is not overridden. However, if the method is declared in the subclass, it must also have these attributes.
The attributes themselves are not inherited.

#### Running Workflows

Expand All @@ -501,9 +513,9 @@ var handle = await client.StartWorkflowAsync(
Console.WriteLine(
"Current greeting: {0}",
await handle.QueryWorkflowAsync(wf => wf.CurrentGreeting()));
// Change the params via signal
var signalArg = new GreetingParams(Salutation: "Aloha", Name: "John");
await handle.SignalWorkflowAsync(wf => wf.UpdateGreetingParamsAsync(signalArg));
// Change the params via update
var updateArg = new GreetingParams(Salutation: "Aloha", Name: "John");
await handle.ExecuteUpdateAsync(wf => wf.UpdateGreetingParamsAsync(updateArg));
// Tell it to complete via signal
await handle.SignalWorkflowAsync(wf => wf.CompleteWithGreetingAsync());
// Wait for workflow result
Expand Down Expand Up @@ -703,8 +715,8 @@ Here are the rules to disable:
```

* [CA1822](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca1822) - This encourages
static methods when methods don't access instance state. Workflows however often use instance methods for run,
signals, or queries even if they could be static.
static methods when methods don't access instance state. Workflows however use instance methods for run, signals,
queries, or updates even if they could be static.
* [CA2007](https://learn.microsoft.com/en-us/dotnet/fundamentals/code-analysis/quality-rules/ca2007) - This encourages
users to use `ConfigureAwait` instead of directly waiting on a task. But in workflows, there is no benefit to this and
it just adds noise (and if used, needs to be `ConfigureAwait(true)` not `ConfigureAwait(false)`).
Expand All @@ -716,7 +728,7 @@ Here are the rules to disable:
non-crypto random instance.
* `CS1998` - This discourages use of `async` on async methods that don't `await`. But workflows handlers like signals
are often easier to write in one-line form this way, e.g.
`public async Task SignalSomething(string value) => this.value = value;`.
`public async Task SignalSomethingAsync(string value) => this.value = value;`.
* [VSTHRD105](https://github.com/microsoft/vs-threading/blob/main/doc/analyzers/VSTHRD105.md) - This is similar to
`CA2008` above in that use of implicit current scheduler is discouraged. That does not apply to workflows where it is
encouraged/required.
Expand Down
31 changes: 16 additions & 15 deletions src/Temporalio.Extensions.OpenTelemetry/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -117,21 +117,22 @@ still mostly supported.

### How Workflow Tracing Works

On workflow inbound calls for executing a workflow, handling a signal, or handling a query, a new diagnostic activity is
created _only when not replaying_ (query is never considered "replaying" in this case) but a new diagnostic workflow
activity wrapper is always created with the context from the Temporal header (i.e. the diagnostic activity created on
client workflow start). Although the diagnostic activity is started-then-stopped immediately, it becomes the parent for
diagnostic activities in that same-worker-process cached workflow instance. Workflows are removed from cache (and
therefore replayed from beginning on next run) when they throw an exception or are forced out of the cache for LRU
reasons.

Diagnostic activities created for signals and queries are parented to the client-outbound diagnostic activity that
started the workflow, and only _linked_ to the client-outbound diagnostic activity that invoked the signal/query.

If a workflow fails or if a workflow task fails (i.e. workflow suspension due to workflow/signal exception that was not
a Temporal exception), a _new_ diagnostic activity is created representing that failure. The diagnostic activity
representing the run of the workflow is not only already completed (as they all are) but it may not even be created
because this could be replaying on a different worker.
On workflow inbound calls for executing a workflow, handling a signal, handling a query, or handling an update, a new
diagnostic activity is created _only when not replaying_ (query is never considered "replaying" in this case) but a new
diagnostic workflow activity wrapper is always created with the context from the Temporal header (i.e. the diagnostic
activity created on client workflow start). Although the diagnostic activity is started-then-stopped immediately, it
becomes the parent for diagnostic activities in that same-worker-process cached workflow instance. Workflows are removed
from cache (and therefore replayed from beginning on next run) when they throw an exception or are forced out of the
cache for LRU reasons.

Diagnostic activities created for signals, queries, and updates are parented to the client-outbound diagnostic activity
that started the workflow, and only _linked_ to the client-outbound diagnostic activity that invoked the
signal/query/update.

If a workflow/update fails or if a workflow task fails (i.e. workflow suspension due to workflow/signal/update exception
that was not a Temporal exception), a _new_ diagnostic activity is created representing that failure. The diagnostic
activity representing the run of the workflow is not only already completed (as they all are) but it may not even be
created because this could be replaying on a different worker.

Outbound calls from a workflow for scheduling an activity, scheduling a local activity, starting a child, signalling a
child, or signalling an external workflow will create a diagnostic activity _only when not replaying_. This is then
Expand Down
95 changes: 95 additions & 0 deletions src/Temporalio.Extensions.OpenTelemetry/TracingInterceptor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,23 @@ public override async Task<TResult> QueryWorkflowAsync<TResult>(QueryWorkflowInp
}
}

public override async Task<WorkflowUpdateHandle<TResult>> StartWorkflowUpdateAsync<TResult>(
StartWorkflowUpdateInput input)
{
using (var activity = ClientSource.StartActivity(
$"UpdateWorkflow:{input.Update}",
kind: ActivityKind.Client,
parentContext: default,
tags: root.CreateWorkflowTags(input.Id)))
{
if (HeadersFromContext(input.Headers) is Dictionary<string, Payload> headers)
{
input = input with { Headers = headers };
}
return await base.StartWorkflowUpdateAsync<TResult>(input).ConfigureAwait(false);
}
}

/// <summary>
/// Serialize current context to headers if one exists.
/// </summary>
Expand Down Expand Up @@ -365,6 +382,84 @@ public override async Task HandleSignalAsync(HandleSignalInput input)
}
}

public override void ValidateUpdate(HandleUpdateInput input)
{
var prevBaggage = Baggage.Current;
WorkflowDiagnosticActivity? remoteActivity = null;
if (root.HeadersToContext(Workflow.Info.Headers) is PropagationContext ctx)
{
Baggage.Current = ctx.Baggage;
remoteActivity = WorkflowDiagnosticActivity.AttachFromContext(ctx.ActivityContext);
}
try
{
using (var activity = WorkflowsSource.TrackWorkflowDiagnosticActivity(
name: $"ValidateUpdate:{input.Update}",
kind: ActivityKind.Server,
tags: root.CreateInWorkflowTags(),
links: LinksFromHeaders(input.Headers),
inheritParentTags: false))
{
try
{
base.ValidateUpdate(input);
}
catch (Exception e)
{
activity.Activity?.RecordException(e);
throw;
}
}
}
finally
{
remoteActivity?.Dispose();
Baggage.Current = prevBaggage;
}
}

public override async Task<object?> HandleUpdateAsync(HandleUpdateInput input)
{
var prevBaggage = Baggage.Current;
WorkflowDiagnosticActivity? remoteActivity = null;
if (root.HeadersToContext(Workflow.Info.Headers) is PropagationContext ctx)
{
Baggage.Current = ctx.Baggage;
remoteActivity = WorkflowDiagnosticActivity.AttachFromContext(ctx.ActivityContext);
}
try
{
using (WorkflowsSource.TrackWorkflowDiagnosticActivity(
name: $"HandleUpdate:{input.Update}",
kind: ActivityKind.Server,
tags: root.CreateInWorkflowTags(),
links: LinksFromHeaders(input.Headers),
inheritParentTags: false))
{
try
{
return await base.HandleUpdateAsync(input).ConfigureAwait(true);
}
catch (Exception e)
{
// We make a new span for failure same as signal/workflow handlers do
var namePrefix = e is FailureException || e is OperationCanceledException ?
"CompleteUpdate" : "WorkflowTaskFailure";
WorkflowsSource.TrackWorkflowDiagnosticActivity(
name: $"{namePrefix}:{input.Update}",
updateActivity: act => act.RecordException(e)).
Dispose();
throw;
}
}
}
finally
{
remoteActivity?.Dispose();
Baggage.Current = prevBaggage;
}
}

private static void ApplyWorkflowException(Exception e)
{
// Continue as new is not an exception worth of recording
Expand Down
Loading

0 comments on commit ef02df3

Please sign in to comment.