Skip to content

Commit

Permalink
Client API key (#203)
Browse files Browse the repository at this point in the history
Fixes #191
  • Loading branch information
cretz authored Mar 12, 2024
1 parent 070fba5 commit 3faaf2b
Show file tree
Hide file tree
Showing 12 changed files with 394 additions and 191 deletions.
452 changes: 282 additions & 170 deletions src/Temporalio/Bridge/Cargo.lock

Large diffs are not rendered by default.

7 changes: 3 additions & 4 deletions src/Temporalio/Bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ anyhow = "1.0"
futures = "0.3"
libc = "0.2"
log = "0.4"
parking_lot = "0.12"
prost = "0.11"
prost-types = "0.11"
prost = "0.12"
prost-types = "0.12"
# We rely on Cargo semver rules not updating a 0.x to 0.y. Per the rand
# documentation, before 1.0, minor 0.x updates _can_ break portability which can
# cause non-determinism.
Expand All @@ -28,7 +27,7 @@ temporal-sdk-core-protos = { version = "0.1.0", path = "./sdk-core/sdk-core-prot
tokio = "1.26"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.9"
tonic = "0.11"
tracing = "0.1"
url = "2.2"

Expand Down
15 changes: 15 additions & 0 deletions src/Temporalio/Bridge/Client.cs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,21 @@ public void UpdateMetadata(IEnumerable<KeyValuePair<string, string>> metadata)
}
}

/// <summary>
/// Update client API key.
/// </summary>
/// <param name="apiKey">API key to set.</param>
public void UpdateApiKey(string? apiKey)
{
using (var scope = new Scope())
{
unsafe
{
Interop.Methods.client_update_api_key(Ptr, scope.ByteArray(apiKey));
}
}
}

/// <summary>
/// Make RPC call to Temporal.
/// </summary>
Expand Down
6 changes: 6 additions & 0 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,9 @@ internal unsafe partial struct ClientOptions
[NativeTypeName("MetadataRef")]
public ByteArrayRef metadata;

[NativeTypeName("struct ByteArrayRef")]
public ByteArrayRef api_key;

[NativeTypeName("struct ByteArrayRef")]
public ByteArrayRef identity;

Expand Down Expand Up @@ -585,6 +588,9 @@ internal static unsafe partial class Methods
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void client_update_metadata([NativeTypeName("struct Client *")] Client* client, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef metadata);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void client_update_api_key([NativeTypeName("struct Client *")] Client* client, [NativeTypeName("struct ByteArrayRef")] ByteArrayRef api_key);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void client_rpc_call([NativeTypeName("struct Client *")] Client* client, [NativeTypeName("const struct RpcCallOptions *")] RpcCallOptions* options, void* user_data, [NativeTypeName("ClientRpcCallCallback")] IntPtr callback);

Expand Down
1 change: 1 addition & 0 deletions src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,7 @@ public static unsafe Interop.ClientOptions ToInteropOptions(
client_name = ClientName.Ref,
client_version = ClientVersion.Ref,
metadata = scope.Metadata(options.RpcMetadata),
api_key = scope.ByteArray(options.ApiKey),
identity = scope.ByteArray(options.Identity),
tls_options =
options.Tls == null ? null : scope.Pointer(options.Tls.ToInteropOptions(scope)),
Expand Down
3 changes: 3 additions & 0 deletions src/Temporalio/Bridge/include/temporal-sdk-bridge.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ typedef struct ClientOptions {
struct ByteArrayRef client_name;
struct ByteArrayRef client_version;
MetadataRef metadata;
struct ByteArrayRef api_key;
struct ByteArrayRef identity;
const struct ClientTlsOptions *tls_options;
const struct ClientRetryOptions *retry_options;
Expand Down Expand Up @@ -405,6 +406,8 @@ void client_free(struct Client *client);

void client_update_metadata(struct Client *client, struct ByteArrayRef metadata);

void client_update_api_key(struct Client *client, struct ByteArrayRef api_key);

/**
* Client, options, and user data must live through callback.
*/
Expand Down
29 changes: 18 additions & 11 deletions src/Temporalio/Bridge/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,7 @@ use crate::CancellationToken;
use crate::MetadataRef;
use crate::UserDataHandle;

use parking_lot::RwLock;
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use temporal_client::{
ClientKeepAliveConfig, ClientOptions as CoreClientOptions, ClientOptionsBuilder,
Expand All @@ -23,6 +21,7 @@ pub struct ClientOptions {
client_name: ByteArrayRef,
client_version: ByteArrayRef,
metadata: MetadataRef,
api_key: ByteArrayRef,
identity: ByteArrayRef,
tls_options: *const ClientTlsOptions,
retry_options: *const ClientRetryOptions,
Expand Down Expand Up @@ -83,13 +82,6 @@ pub extern "C" fn client_connect(
let runtime = unsafe { &mut *runtime };
// Convert opts
let options = unsafe { &*options };
let headers = if options.metadata.size == 0 {
None
} else {
Some(Arc::new(RwLock::new(
options.metadata.to_string_map_on_newlines(),
)))
};
let core_options: CoreClientOptions = match options.try_into() {
Ok(v) => v,
Err(err) => {
Expand All @@ -110,7 +102,7 @@ pub extern "C" fn client_connect(
let core = runtime.core.clone();
runtime.core.tokio_handle().spawn(async move {
match core_options
.connect_no_namespace(core.telemetry().get_temporal_metric_meter(), headers)
.connect_no_namespace(core.telemetry().get_temporal_metric_meter())
.await
{
Ok(core) => {
Expand Down Expand Up @@ -151,6 +143,15 @@ pub extern "C" fn client_update_metadata(client: *mut Client, metadata: ByteArra
.set_headers(metadata.to_string_map_on_newlines());
}

#[no_mangle]
pub extern "C" fn client_update_api_key(client: *mut Client, api_key: ByteArrayRef) {
let client = unsafe { &*client };
client
.core
.get_client()
.set_api_key(api_key.to_option_string());
}

#[repr(C)]
pub struct RpcCallOptions {
service: RpcService,
Expand Down Expand Up @@ -441,7 +442,13 @@ impl TryFrom<&ClientOptions> for CoreClientOptions {
.retry_config(
unsafe { opts.retry_options.as_ref() }.map_or(RetryConfig::default(), |c| c.into()),
)
.keep_alive(unsafe { opts.keep_alive_options.as_ref() }.map(Into::into));
.keep_alive(unsafe { opts.keep_alive_options.as_ref() }.map(Into::into))
.headers(if opts.metadata.size == 0 {
None
} else {
Some(opts.metadata.to_string_map_on_newlines())
})
.api_key(opts.api_key.to_option_string());
if let Some(tls_config) = unsafe { opts.tls_options.as_ref() } {
opts_builder.tls_cfg(tls_config.try_into()?);
}
Expand Down
24 changes: 20 additions & 4 deletions src/Temporalio/Client/ITemporalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,31 @@ namespace Temporalio.Client
public interface ITemporalConnection : IBridgeClientProvider
{
/// <summary>
/// Gets or sets the current RPC metadata (i.e. the headers). This can be updated which will
/// apply to all future calls the client makes including inside a worker. Setting this value
/// is thread safe. When setting, this will error if the client is not already connected
/// (e.g. a lazy client has not made a call).
/// Gets or sets the current RPC metadata (i.e. the headers).
/// </summary>
/// <remarks>
/// This can be updated which will apply to all future calls the client makes including
/// inside a worker. Setting this value is thread safe. When setting, this will error if the
/// client is not already connected (e.g. a lazy client has not made a call).
/// </remarks>
/// <exception cref="System.InvalidOperationException">Client is not already
/// connected.</exception>
IReadOnlyCollection<KeyValuePair<string, string>> RpcMetadata { get; set; }

/// <summary>
/// Gets or sets the current API key.
/// </summary>
/// <remarks>
/// This is the "Authorization" HTTP header for every call, with "Bearer " prepended. This
/// is only set if the RPC metadata doesn't already have an "Authorization" key. This can be
/// updated which will apply to all future calls the client makes including inside a worker.
/// Setting this value is thread safe. When setting, this will error if the client is not
/// already connected (e.g. a lazy client has not made a call).
/// </remarks>
/// <exception cref="System.InvalidOperationException">Client is not already
/// connected.</exception>
string? ApiKey { get; set; }

/// <summary>
/// Gets the raw workflow service.
/// </summary>
Expand Down
34 changes: 33 additions & 1 deletion src/Temporalio/Client/TemporalConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ public sealed class TemporalConnection : ITemporalConnection
// Not set if not lazy
private readonly SemaphoreSlim? semaphoreForLazyClient;
private readonly object rpcMetadataLock = new();
private readonly object apiKeyLock = new();
private Bridge.Client? client;
private IReadOnlyCollection<KeyValuePair<string, string>> rpcMetadata;
private string? apiKey;

private TemporalConnection(TemporalConnectionOptions options, bool lazy)
{
Expand All @@ -40,6 +42,7 @@ private TemporalConnection(TemporalConnectionOptions options, bool lazy)
{
rpcMetadata = new List<KeyValuePair<string, string>>(options.RpcMetadata);
}
apiKey = options.ApiKey;
// Set default identity if unset
options.Identity ??= System.Diagnostics.Process.GetCurrentProcess().Id
+ "@"
Expand Down Expand Up @@ -78,7 +81,7 @@ public IReadOnlyCollection<KeyValuePair<string, string>> RpcMetadata
{
throw new InvalidOperationException("Cannot set RPC metadata if client never connected");
}
lock (rpcMetadata)
lock (rpcMetadataLock)
{
// Set on Rust side first to prevent errors from affecting field
#pragma warning disable VSTHRD002 // We know it's completed
Expand All @@ -90,6 +93,35 @@ public IReadOnlyCollection<KeyValuePair<string, string>> RpcMetadata
}
}

/// <inheritdoc />
public string? ApiKey
{
get
{
lock (apiKeyLock)
{
return apiKey;
}
}

set
{
var client = this.client;
if (client == null)
{
throw new InvalidOperationException("Cannot set API key if client never connected");
}
lock (apiKeyLock)
{
// Set on Rust side first to prevent errors from affecting field
#pragma warning disable VSTHRD002 // We know it's completed
client.UpdateApiKey(value);
#pragma warning restore VSTHRD002
apiKey = value;
}
}
}

/// <inheritdoc />
public WorkflowService WorkflowService { get; private init; }

Expand Down
11 changes: 11 additions & 0 deletions src/Temporalio/Client/TemporalConnectionOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,17 @@ public TemporalConnectionOptions()
/// <seealso cref="RpcOptions.Metadata" />
public IReadOnlyCollection<KeyValuePair<string, string>>? RpcMetadata { get; set; }

/// <summary>
/// Gets or sets the API key for all calls.
/// </summary>
/// <remarks>
/// This is the "Authorization" HTTP header for every call, with "Bearer " prepended. This
/// is only set if the RPC metadata doesn't already have an "Authorization" key. Note, this
/// is only the initial value, updates will not be applied. Use
/// <see cref="ITemporalConnection.ApiKey" /> property setter to update.
/// </remarks>
public string? ApiKey { get; set; }

/// <summary>
/// Gets or sets the identity for this connection.
/// </summary>
Expand Down
1 change: 1 addition & 0 deletions tests/Temporalio.Tests/Client/TemporalClientTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public async Task ConnectAsync_Connection_Succeeds()
// Update some headers and call again
// TODO(cretz): Find way to confirm this works without running our own gRPC server
Client.Connection.RpcMetadata = new Dictionary<string, string> { ["header"] = "value" };
Client.Connection.ApiKey = "my-api-key";
resp = await Client.Connection.WorkflowService.GetSystemInfoAsync(
new Api.WorkflowService.V1.GetSystemInfoRequest());
Assert.NotEmpty(resp.ServerVersion);
Expand Down

0 comments on commit 3faaf2b

Please sign in to comment.