Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log forwarding #153

Merged
merged 5 commits into from
Nov 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 155 additions & 112 deletions src/Temporalio/Bridge/Api/WorkflowActivation/WorkflowActivation.cs

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions src/Temporalio/Bridge/ByteArrayRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ public static ByteArrayRef FromUTF8(string s)
return new ByteArrayRef(StrictUTF8.GetBytes(s));
}

/// <summary>
/// Copy a byte array ref contents to a UTF8 string.
/// </summary>
/// <param name="byteArray">Byte array ref.</param>
/// <returns>String.</returns>
public static unsafe string ToUtf8(Interop.ByteArrayRef byteArray) =>
StrictUTF8.GetString(byteArray.data, (int)byteArray.size);

/// <summary>
/// Convert an enumerable set of metadata pairs to a byte array. No key or value may contain
/// a newline.
Expand Down
3 changes: 3 additions & 0 deletions src/Temporalio/Bridge/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/Temporalio/Bridge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ prost-types = "0.11"
# cause non-determinism.
rand = "0.8.5"
rand_pcg = "0.3.1"
serde_json = "1.0"
temporal-client = { version = "0.1.0", path = "./sdk-core/client" }
temporal-sdk-core = { version = "0.1.0", path = "./sdk-core/core", features = ["ephemeral-server"] }
temporal-sdk-core-api = { version = "0.1.0", path = "./sdk-core/core-api" }
Expand All @@ -28,6 +29,7 @@ tokio = "1.26"
tokio-stream = "0.1"
tokio-util = "0.7"
tonic = "0.9"
tracing = "0.1"
url = "2.2"

[profile.release]
Expand Down
84 changes: 84 additions & 0 deletions src/Temporalio/Bridge/ForwardedLog.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
using System;
using System.Collections;
using System.Collections.Generic;
using System.Linq;
using Microsoft.Extensions.Logging;

namespace Temporalio.Bridge
{
/// <summary>
/// Representation of log state for a Core log.
/// </summary>
/// <param name="Level">Log level.</param>
/// <param name="Target">Log target.</param>
/// <param name="Message">Log message.</param>
/// <param name="TimestampMilliseconds">Ms since Unix epoch.</param>
/// <param name="JsonFields">JSON fields, or null to not include. The keys are the field names
/// and the values are raw JSON strings.</param>
internal record ForwardedLog(
LogLevel Level,
string Target,
string Message,
ulong TimestampMilliseconds,
IReadOnlyDictionary<string, string>? JsonFields) : IReadOnlyList<KeyValuePair<string, object?>>
{
// Unfortunately DateTime.UnixEpoch not in standard library in all versions we need
private static readonly DateTime UnixEpoch = new(1970, 1, 1, 0, 0, 0, DateTimeKind.Utc);

/// <summary>
/// Gets the timestamp for this log.
/// </summary>
public DateTime Timestamp => UnixEpoch.AddMilliseconds(TimestampMilliseconds);

/// <inheritdoc />
public int Count => 5;

/// <inheritdoc />
public KeyValuePair<string, object?> this[int index]
{
get
{
switch (index)
{
case 0:
return new("Level", Level);
case 1:
return new("Target", Target);
case 2:
return new("Message", Message);
case 3:
return new("Timestamp", Timestamp);
case 4:
return new("JsonFields", JsonFields);
default:
#pragma warning disable CA2201 // We intentionally use this usually-internal-use-only exception
throw new IndexOutOfRangeException(nameof(index));
#pragma warning restore CA2201
}
}
}

/// <inheritdoc />
public IEnumerator<KeyValuePair<string, object?>> GetEnumerator()
{
for (int i = 0; i < Count; ++i)
{
yield return this[i];
}
}

/// <inheritdoc />
public override string ToString()
{
var message = $"[sdk_core::{Target}] {Message}";
if (JsonFields is { } jsonFields)
{
message += " " + string.Join(", ", jsonFields.Select(kv => $"{kv.Key}={kv.Value}"));
}
return message;
}

/// <inheritdoc />
IEnumerator IEnumerable.GetEnumerator() => GetEnumerator();
}
}
36 changes: 34 additions & 2 deletions src/Temporalio/Bridge/Interop/Interop.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@

namespace Temporalio.Bridge.Interop
{
internal enum ForwardedLogLevel
{
Trace = 0,
Debug,
Info,
Warn,
Error,
}

internal enum MetricAttributeValueType
{
String = 1,
Expand Down Expand Up @@ -44,6 +53,10 @@ internal partial struct EphemeralServer
{
}

internal partial struct ForwardedLog
{
}

internal partial struct MetricAttributes
{
}
Expand Down Expand Up @@ -251,13 +264,16 @@ internal unsafe partial struct RuntimeOrFail
public ByteArray* fail;
}

[UnmanagedFunctionPointer(CallingConvention.Cdecl)]
internal unsafe delegate void ForwardedLogCallback([NativeTypeName("enum ForwardedLogLevel")] ForwardedLogLevel level, [NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

internal partial struct LoggingOptions
{
[NativeTypeName("struct ByteArrayRef")]
public ByteArrayRef filter;

[NativeTypeName("bool")]
public byte forward;
[NativeTypeName("ForwardedLogCallback")]
public IntPtr forward_to;
}

internal partial struct OpenTelemetryOptions
Expand Down Expand Up @@ -622,6 +638,22 @@ internal static unsafe partial class Methods
[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void byte_array_free([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("const struct ByteArray *")] ByteArray* bytes);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_target([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_message([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("uint64_t")]
public static extern ulong forwarded_log_timestamp_millis([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
[return: NativeTypeName("struct ByteArrayRef")]
public static extern ByteArrayRef forwarded_log_fields_json([NativeTypeName("const struct ForwardedLog *")] ForwardedLog* log);

[DllImport("temporal_sdk_bridge", CallingConvention = CallingConvention.Cdecl, ExactSpelling = true)]
public static extern void ephemeral_server_start_dev_server([NativeTypeName("struct Runtime *")] Runtime* runtime, [NativeTypeName("const struct DevServerOptions *")] DevServerOptions* options, void* user_data, [NativeTypeName("EphemeralServerStartCallback")] IntPtr callback);

Expand Down
3 changes: 2 additions & 1 deletion src/Temporalio/Bridge/OptionsExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,8 @@ public static unsafe Interop.LoggingOptions ToInteropOptions(
return new Interop.LoggingOptions()
{
filter = scope.ByteArray(options.Filter.FilterString),
forward = (byte)0,
// Forward callback is set in the Runtime constructor
// forward_to = <set-elsewhere>
};
}

Expand Down
113 changes: 111 additions & 2 deletions src/Temporalio/Bridge/Runtime.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
using System;
using System.Collections.Generic;
using System.Runtime.InteropServices;
using System.Text.Json;
using Microsoft.Extensions.Logging;

namespace Temporalio.Bridge
{
Expand All @@ -8,6 +11,13 @@ namespace Temporalio.Bridge
/// </summary>
internal class Runtime : SafeHandle
{
private static readonly Func<ForwardedLog, Exception?, string> ForwardLogMessageFormatter =
LogMessageFormatter;

private readonly bool forwardLoggerIncludeFields;
private readonly GCHandle? forwardLoggerCallback;
private ILogger? forwardLogger;

/// <summary>
/// Initializes a new instance of the <see cref="Runtime"/> class.
/// </summary>
Expand All @@ -20,11 +30,29 @@ public Runtime(Temporalio.Runtime.TemporalRuntimeOptions options)
{
unsafe
{
// Setup forwarding logger
if (options.Telemetry.Logging?.Forwarding is { } forwarding)
{
if (forwarding.Logger == null)
{
throw new ArgumentException("Must have logger on forwarding options");
}
forwardLogger = forwarding.Logger;
forwardLoggerIncludeFields = forwarding.IncludeFields;
}

// WARNING: It is important that this options is immediately passed to new
// because we have allocated a pointer for the custom meter which can only be
// freed on the Rust side on error
var runtimeOptions = scope.Pointer(options.ToInteropOptions(scope));
var res = Interop.Methods.runtime_new(runtimeOptions);
var runtimeOptions = options.ToInteropOptions(scope);
// Set log forwarding if enabled
if (forwardLogger != null)
{
forwardLoggerCallback = GCHandle.Alloc(new Interop.ForwardedLogCallback(OnLog));
runtimeOptions.telemetry->logging->forward_to =
Marshal.GetFunctionPointerForDelegate(forwardLoggerCallback.Value.Target!);
}
var res = Interop.Methods.runtime_new(scope.Pointer(runtimeOptions));
// If it failed, copy byte array, free runtime and byte array. Otherwise just
// return runtime.
if (res.fail != null)
Expand Down Expand Up @@ -56,6 +84,45 @@ public Runtime(Temporalio.Runtime.TemporalRuntimeOptions options)
/// </summary>
internal Lazy<MetricMeter> MetricMeter { get; private init; }

/// <summary>
/// Read a JSON object into string keys and raw JSON values.
/// </summary>
/// <param name="bytes">Byte span.</param>
/// <returns>Keys and raw values or null.</returns>
internal static unsafe IReadOnlyDictionary<string, string>? ReadJsonObjectToRawValues(
ReadOnlySpan<byte> bytes)
{
var reader = new Utf8JsonReader(bytes);
// Expect start object
if (!reader.Read() || reader.TokenType != JsonTokenType.StartObject)
{
return null;
}
// Property names one at a time
var ret = new Dictionary<string, string>();
fixed (byte* ptr = bytes)
{
while (reader.Read() && reader.TokenType != JsonTokenType.EndObject)
{
if (reader.TokenType != JsonTokenType.PropertyName)
{
return null;
}
var propertyName = reader.GetString()!;
// Read and skip and capture
if (!reader.Read())
{
return null;
}
var beginIndex = (int)reader.TokenStartIndex;
reader.Skip();
ret[propertyName] = ByteArrayRef.StrictUTF8.GetString(
ptr + beginIndex, (int)reader.BytesConsumed - beginIndex);
}
}
return ret;
}

/// <summary>
/// Free a byte array.
/// </summary>
Expand All @@ -68,8 +135,50 @@ internal unsafe void FreeByteArray(Interop.ByteArray* byteArray)
/// <inheritdoc />
protected override unsafe bool ReleaseHandle()
{
forwardLogger = null;
forwardLoggerCallback?.Free();
Interop.Methods.runtime_free(Ptr);
return true;
}

private static string LogMessageFormatter(ForwardedLog state, Exception? error) =>
state.ToString();

private unsafe void OnLog(Interop.ForwardedLogLevel coreLevel, Interop.ForwardedLog* coreLog)
{
if (forwardLogger is not { } logger)
{
return;
}
// Fortunately the Core log levels integers match .NET ones
var level = (LogLevel)coreLevel;
// Go no further if not enabled
if (!logger.IsEnabled(level))
{
return;
}
// If the fields are requested, we will try to convert from JSON
IReadOnlyDictionary<string, string>? jsonFields = null;
if (forwardLoggerIncludeFields)
{
try
{
var fieldBytes = Interop.Methods.forwarded_log_fields_json(coreLog);
jsonFields = ReadJsonObjectToRawValues(new(fieldBytes.data, (int)fieldBytes.size));
}
#pragma warning disable CA1031 // We are ok swallowing all exceptions
catch
{
}
#pragma warning restore CA1031
}
var log = new ForwardedLog(
Level: level,
Target: ByteArrayRef.ToUtf8(Interop.Methods.forwarded_log_target(coreLog)),
Message: ByteArrayRef.ToUtf8(Interop.Methods.forwarded_log_message(coreLog)),
TimestampMilliseconds: Interop.Methods.forwarded_log_timestamp_millis(coreLog),
JsonFields: jsonFields);
logger.Log(level, 0, log, null, ForwardLogMessageFormatter);
}
}
}
Loading