Skip to content

Commit

Permalink
Add missing gRPC calls (#144)
Browse files Browse the repository at this point in the history
Fixes #143
  • Loading branch information
cretz authored Oct 20, 2023
1 parent ab4ded4 commit 1cde5c9
Show file tree
Hide file tree
Showing 2 changed files with 131 additions and 0 deletions.
8 changes: 8 additions & 0 deletions src/Temporalio/Bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ async fn call_workflow_service(
"CountWorkflowExecutions" => rpc_call!(client, call, count_workflow_executions),
"CreateSchedule" => rpc_call!(client, call, create_schedule),
"DeleteSchedule" => rpc_call!(client, call, delete_schedule),
"DeleteWorkflowExecution" => rpc_call!(client, call, delete_workflow_execution),
"DeprecateNamespace" => rpc_call!(client, call, deprecate_namespace),
"DescribeBatchOperation" => rpc_call!(client, call, describe_batch_operation),
"DescribeNamespace" => rpc_call!(client, call, describe_namespace),
"DescribeSchedule" => rpc_call!(client, call, describe_schedule),
"DescribeTaskQueue" => rpc_call!(client, call, describe_task_queue),
Expand All @@ -289,13 +291,17 @@ async fn call_workflow_service(
"GetWorkerBuildIdCompatibility" => {
rpc_call!(client, call, get_worker_build_id_compatibility)
}
"GetWorkerTaskReachability" => {
rpc_call!(client, call, get_worker_task_reachability)
}
"GetWorkflowExecutionHistory" => rpc_call!(client, call, get_workflow_execution_history),
"GetWorkflowExecutionHistoryReverse" => {
rpc_call!(client, call, get_workflow_execution_history_reverse)
}
"ListArchivedWorkflowExecutions" => {
rpc_call!(client, call, list_archived_workflow_executions)
}
"ListBatchOperations" => rpc_call!(client, call, list_batch_operations),
"ListClosedWorkflowExecutions" => rpc_call!(client, call, list_closed_workflow_executions),
"ListNamespaces" => rpc_call!(client, call, list_namespaces),
"ListOpenWorkflowExecutions" => rpc_call!(client, call, list_open_workflow_executions),
Expand Down Expand Up @@ -339,6 +345,8 @@ async fn call_workflow_service(
}
"SignalWorkflowExecution" => rpc_call!(client, call, signal_workflow_execution),
"StartWorkflowExecution" => rpc_call!(client, call, start_workflow_execution),
"StartBatchOperation" => rpc_call!(client, call, start_batch_operation),
"StopBatchOperation" => rpc_call!(client, call, stop_batch_operation),
"TerminateWorkflowExecution" => rpc_call!(client, call, terminate_workflow_execution),
"UpdateNamespace" => rpc_call!(client, call, update_namespace),
"UpdateSchedule" => rpc_call!(client, call, update_schedule),
Expand Down
123 changes: 123 additions & 0 deletions tests/Temporalio.Tests/Client/TemporalClientTests.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
namespace Temporalio.Tests.Client;

using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Reflection;
using Temporalio.Client;
using Temporalio.Runtime;
using Xunit;
using Xunit.Abstractions;

Expand Down Expand Up @@ -48,4 +52,123 @@ public async Task CreateLazy_Connection_NotConnectedUntilCallMade()
Assert.True(client.Connection.IsConnected);
client.Connection.RpcMetadata = new Dictionary<string, string>();
}

[Fact]
public async Task ConnectAsync_Connection_AllGrpcCallsSupported()
{
// The approach we'll take here is to just start the dev server and reflectively make each
// call in workflow service and operator service and check metrics to confirm that every
// call got that far into core. This catches cases where we didn't add all string call names
// in Rust for our C# methods.
var captureMeter = new CaptureRpcCallsMeter();
await using var env = await Temporalio.Testing.WorkflowEnvironment.StartLocalAsync(new()
{
Runtime = new(new()
{
Telemetry = new() { Metrics = new() { CustomMetricMeter = captureMeter } },
}),
});

// Check workflow service and operator service
await AssertAllRpcsAsync(captureMeter.Calls, env.Client.Connection.WorkflowService);
await AssertAllRpcsAsync(captureMeter.Calls, env.Client.Connection.OperatorService, skip: "AddOrUpdateRemoteCluster");
}

private static async Task AssertAllRpcsAsync<T>(
ConcurrentQueue<string> actualCalls, T service, params string[] skip)
where T : RpcService
{
// Clear actual calls
actualCalls.Clear();

// Make calls and populate expected calls
var expectedCalls = new List<string>();
foreach (var method in typeof(T).GetMethods(
BindingFlags.Instance | BindingFlags.Public | BindingFlags.DeclaredOnly))
{
// Add expected call sans Async suffix
var call = method.Name.Substring(0, method.Name.Length - 5);
if (skip.Contains(call))
{
continue;
}
expectedCalls.Add(call);
// Make call
var task = (Task)method.Invoke(
service,
new object?[] { Activator.CreateInstance(method.GetParameters()[0].ParameterType), null })!;
#pragma warning disable CA1031 // We're ok swallowing exceptions here
try
{
await task;
}
catch
{
}
#pragma warning restore CA1031
}

// Remove skip from actual calls too and then sort both and compare
var sortedActualCalls = actualCalls.Where(c => !skip.Contains(c)).ToList();
sortedActualCalls.Sort();
expectedCalls.Sort();
Assert.Equal(expectedCalls, actualCalls);
}

private class CaptureRpcCallsMeter : ICustomMetricMeter
{
public ConcurrentQueue<string> Calls { get; } = new();

public ICustomMetricCounter<T> CreateCounter<T>(string name, string? unit, string? description)
where T : struct => new CaptureRpcCallsMetric<T>(name, Calls);

public ICustomMetricGauge<T> CreateGauge<T>(string name, string? unit, string? description)
where T : struct => new CaptureRpcCallsMetric<T>(name, Calls);

public ICustomMetricHistogram<T> CreateHistogram<T>(string name, string? unit, string? description)
where T : struct => new CaptureRpcCallsMetric<T>(name, Calls);

public object CreateTags(
object? appendFrom, IReadOnlyCollection<KeyValuePair<string, object>> tags)
{
var dict = new Dictionary<string, object>();
if (appendFrom is Dictionary<string, object> appendFromDict)
{
foreach (var kv in appendFromDict)
{
dict[kv.Key] = kv.Value;
}
}
foreach (var kv in tags)
{
dict[kv.Key] = kv.Value;
}
return dict;
}
}

private record CaptureRpcCallsMetric<T>(string Name, ConcurrentQueue<string> Calls) :
ICustomMetricCounter<T>, ICustomMetricHistogram<T>, ICustomMetricGauge<T>
where T : struct
{
public void Add(T value, object tags)
{
if (Name == "temporal_request" || Name == "temporal_long_request")
{
var call = (string)((Dictionary<string, object>)tags)["operation"];
if (!Calls.Contains(call))
{
Calls.Enqueue(call);
}
}
}

public void Record(T value, object tags)
{
}

public void Set(T value, object tags)
{
}
}
}

0 comments on commit 1cde5c9

Please sign in to comment.