Skip to content

Commit a9f4e12

Browse files
authored
Add IsReplayingHistoryEvents to allow logging in queries and validators (#594)
Related to temporalio/features#718
1 parent cc9f3b1 commit a9f4e12

6 files changed

Lines changed: 57 additions & 12 deletions

File tree

src/Temporalio/Worker/ReplaySafeLogger.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ public void Log<TState>(
3030
Exception exception,
3131
Func<TState, Exception, string> formatter)
3232
{
33-
if (!Workflows.Workflow.Unsafe.IsReplaying)
33+
if (!Workflows.Workflow.Unsafe.IsReplayingHistoryEvents)
3434
{
3535
underlying.Log<TState>(logLevel, eventId, state, exception, formatter);
3636
}

src/Temporalio/Worker/ReplaySafeMetricMeter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ internal Counter(MetricCounter<T> underlying)
4646
public override void Add(
4747
T value, IEnumerable<KeyValuePair<string, object>>? extraTags = null)
4848
{
49-
if (!Workflows.Workflow.Unsafe.IsReplaying)
49+
if (!Workflows.Workflow.Unsafe.IsReplayingHistoryEvents)
5050
{
5151
underlying.Add(value, extraTags);
5252
}
@@ -68,7 +68,7 @@ internal Histogram(MetricHistogram<T> underlying)
6868
public override void Record(
6969
T value, IEnumerable<KeyValuePair<string, object>>? extraTags = null)
7070
{
71-
if (!Workflows.Workflow.Unsafe.IsReplaying)
71+
if (!Workflows.Workflow.Unsafe.IsReplayingHistoryEvents)
7272
{
7373
underlying.Record(value, extraTags);
7474
}
@@ -90,7 +90,7 @@ internal Gauge(MetricGauge<T> underlying)
9090
public override void Set(
9191
T value, IEnumerable<KeyValuePair<string, object>>? extraTags = null)
9292
{
93-
if (!Workflows.Workflow.Unsafe.IsReplaying)
93+
if (!Workflows.Workflow.Unsafe.IsReplayingHistoryEvents)
9494
{
9595
underlying.Set(value, extraTags);
9696
}

src/Temporalio/Worker/WorkflowInstance.cs

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,7 @@ internal class WorkflowInstance : TaskScheduler, IWorkflowInstance, IWorkflowCon
9595
private bool workflowInitialized;
9696
private bool applyModernEventLoopLogic;
9797
private bool dynamicOptionsGetterInvoked;
98+
private bool inQueryOrValidator;
9899

99100
/// <summary>
100101
/// Initializes a new instance of the <see cref="WorkflowInstance"/> class.
@@ -352,6 +353,9 @@ public object Instance
352353
/// <inheritdoc />
353354
public bool IsReplaying { get; private set; }
354355

356+
/// <inheritdoc />
357+
public bool IsReplayingHistoryEvents => IsReplaying && !inQueryOrValidator;
358+
355359
/// <inheritdoc />
356360
public ILogger Logger => replaySafeLogger;
357361

@@ -1188,12 +1192,20 @@ private Task ApplyDoUpdateAsync(DoUpdate update)
11881192
{
11891193
// Capture command count so we can ensure it is unchanged after call
11901194
var origCmdCount = completion?.Successful?.Commands?.Count ?? 0;
1191-
inbound.Value.ValidateUpdate(new(
1192-
Id: update.Id,
1193-
Update: update.Name,
1194-
Definition: updateDefn,
1195-
Args: DecodeUpdateArgs(),
1196-
Headers: update.Headers));
1195+
try
1196+
{
1197+
inQueryOrValidator = true;
1198+
inbound.Value.ValidateUpdate(new(
1199+
Id: update.Id,
1200+
Update: update.Name,
1201+
Definition: updateDefn,
1202+
Args: DecodeUpdateArgs(),
1203+
Headers: update.Headers));
1204+
}
1205+
finally
1206+
{
1207+
inQueryOrValidator = false;
1208+
}
11971209
// If the command count changed, we need to issue a task failure
11981210
var newCmdCount = completion?.Successful?.Commands?.Count ?? 0;
11991211
if (origCmdCount != newCmdCount)
@@ -1367,6 +1379,7 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
13671379
var origCmdCount = completion?.Successful?.Commands?.Count;
13681380
try
13691381
{
1382+
inQueryOrValidator = true;
13701383
WorkflowQueryDefinition? queryDefn;
13711384
object? resultObj;
13721385

@@ -1437,6 +1450,10 @@ private void ApplyQueryWorkflow(QueryWorkflow query)
14371450
});
14381451
return Task.CompletedTask;
14391452
}
1453+
finally
1454+
{
1455+
inQueryOrValidator = false;
1456+
}
14401457
// Check for commands but don't include null counts in check since Successful is
14411458
// unset by other completion failures
14421459
var newCmdCount = completion?.Successful?.Commands?.Count;

src/Temporalio/Workflows/IWorkflowContext.cs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,11 @@ internal interface IWorkflowContext
8888
/// </summary>
8989
bool IsReplaying { get; }
9090

91+
/// <summary>
92+
/// Gets a value indicating whether <see cref="Workflow.Unsafe.IsReplayingHistoryEvents" /> is true.
93+
/// </summary>
94+
bool IsReplayingHistoryEvents { get; }
95+
9196
/// <summary>
9297
/// Gets value for <see cref="Workflow.Logger" />.
9398
/// </summary>

src/Temporalio/Workflows/Workflow.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1393,11 +1393,21 @@ public static class Unsafe
13931393
/// Gets a value indicating whether this workflow is replaying.
13941394
/// </summary>
13951395
/// <remarks>
1396-
/// This should not be used for most cases. It is only valuable for advanced cases like
1397-
/// preventing a log or metric from being recorded on replay.
1396+
/// This returns true during replay, including during queries and update validators that
1397+
/// are executed during a replaying workflow task. This should not be used for most cases.
13981398
/// </remarks>
13991399
public static bool IsReplaying => Context.IsReplaying;
14001400

1401+
/// <summary>
1402+
/// Gets a value indicating whether this workflow is replaying history events.
1403+
/// </summary>
1404+
/// <remarks>
1405+
/// This returns true during replay, but false during queries and update validators even
1406+
/// when they are executed during a replaying workflow task. This should not be used for
1407+
/// most cases.
1408+
/// </remarks>
1409+
public static bool IsReplayingHistoryEvents => Context.IsReplayingHistoryEvents;
1410+
14011411
/// <summary>
14021412
/// Disables the event listener that catches invalid calls and thread operations in
14031413
/// workflows. This is the equivalent of

tests/Temporalio.Tests/Worker/WorkflowWorkerTests.cs

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6052,6 +6052,13 @@ public async Task UpdateAsync()
60526052
Workflow.Logger.LogInformation("In update");
60536053
complete.SetResult();
60546054
}
6055+
6056+
[WorkflowQuery]
6057+
public string QueryWithLogging()
6058+
{
6059+
Workflow.Logger.LogInformation("In query");
6060+
return "query-result";
6061+
}
60556062
}
60566063

60576064
[Fact]
@@ -6066,6 +6073,8 @@ await ExecuteWorkerAsync<UpdateLogWorkflow>(
60666073
var handle = await Client.StartWorkflowAsync(
60676074
(UpdateLogWorkflow wf) => wf.RunAsync(),
60686075
new(id: $"workflow-{Guid.NewGuid()}", taskQueue: worker.Options.TaskQueue!));
6076+
// Execute query to confirm it logs
6077+
Assert.Equal("query-result", await handle.QueryAsync(wf => wf.QueryWithLogging()));
60696078
await handle.ExecuteUpdateAsync(
60706079
wf => wf.UpdateAsync(), new() { Id = "my-update-id" });
60716080
await handle.GetResultAsync();
@@ -6084,6 +6093,10 @@ await handle.ExecuteUpdateAsync(
60846093
Assert.Equal("UpdateLogWorkflow", updateLog.ScopeValues["WorkflowType"]);
60856094
Assert.Equal("my-update-id", updateLog.ScopeValues["UpdateId"]);
60866095
Assert.Equal("Update", updateLog.ScopeValues["UpdateName"]);
6096+
// Check query log - should be present now with IsReplayingHistoryEvents
6097+
var queryLog = Assert.Single(loggerFactory.Logs, l => l.Formatted == "In query");
6098+
Assert.Equal("UpdateLogWorkflow", queryLog.ScopeValues["WorkflowType"]);
6099+
Assert.False(queryLog.ScopeValues.ContainsKey("UpdateId"));
60876100
}
60886101

60896102
[Workflow]

0 commit comments

Comments
 (0)