Skip to content

Commit 08e84a1

Browse files
authored
Add is_replaying_history_events to allow logging in queries and validators (#1274)
Related: temporalio/features#718
1 parent 65044a3 commit 08e84a1

File tree

4 files changed

+34
-7
lines changed

4 files changed

+34
-7
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,3 +11,5 @@ temporalio/bridge/temporal_sdk_bridge*
1111
/.zed
1212
*.DS_Store
1313
tags
14+
/.claude
15+
tmpclaude-*

temporalio/worker/_workflow_instance.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,7 @@ def __init__(self, det: WorkflowInstanceDetails) -> None:
273273
self._is_replaying: bool = False
274274
self._random = random.Random(det.randomness_seed)
275275
self._read_only = False
276+
self._in_query_or_validator = False
276277

277278
# Patches we have been notified of and memoized patch responses
278279
self._patches_notified: set[str] = set()
@@ -618,7 +619,7 @@ async def run_update() -> None:
618619
)
619620

620621
if job.run_validator and defn.validator is not None:
621-
with self._as_read_only():
622+
with self._as_read_only(in_query_or_validator=True):
622623
self._inbound.handle_update_validator(handler_input)
623624
# Re-process arguments to avoid any problems caused by user mutation of them during validation
624625
args = self._process_handler_args(
@@ -710,7 +711,7 @@ def _apply_query_workflow(
710711
# Wrap entire bunch of work in a task
711712
async def run_query() -> None:
712713
try:
713-
with self._as_read_only():
714+
with self._as_read_only(in_query_or_validator=True):
714715
# Named query or dynamic
715716
defn = self._queries.get(job.query_type) or self._queries.get(None)
716717
if not defn:
@@ -1218,6 +1219,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool:
12181219
def workflow_is_replaying(self) -> bool:
12191220
return self._is_replaying
12201221

1222+
def workflow_is_replaying_history_events(self) -> bool:
1223+
return self._is_replaying and not self._in_query_or_validator
1224+
12211225
def workflow_memo(self) -> Mapping[str, Any]:
12221226
if self._untyped_converted_memo is None:
12231227
self._untyped_converted_memo = {
@@ -2008,13 +2012,16 @@ def _add_command(self) -> temporalio.bridge.proto.workflow_commands.WorkflowComm
20082012
return self._current_completion.successful.commands.add()
20092013

20102014
@contextmanager
2011-
def _as_read_only(self) -> Iterator[None]:
2012-
prev_val = self._read_only
2015+
def _as_read_only(self, *, in_query_or_validator: bool) -> Iterator[None]:
2016+
prev_read_only = self._read_only
2017+
prev_in_query_or_validator = self._in_query_or_validator
20132018
self._read_only = True
2019+
self._in_query_or_validator = in_query_or_validator
20142020
try:
20152021
yield None
20162022
finally:
2017-
self._read_only = prev_val
2023+
self._read_only = prev_read_only
2024+
self._in_query_or_validator = prev_in_query_or_validator
20182025

20192026
def _assert_not_read_only(
20202027
self, action_attempted: str, *, allow_during_delete: bool = False
@@ -2191,7 +2198,7 @@ def _instantiate_workflow_object(self) -> Any:
21912198
if self._defn.name is None and self._defn.dynamic_config_fn is not None:
21922199
dynamic_config = None
21932200
try:
2194-
with self._as_read_only():
2201+
with self._as_read_only(in_query_or_validator=False):
21952202
dynamic_config = self._defn.dynamic_config_fn(workflow_instance)
21962203
except Exception as err:
21972204
logger.exception(

temporalio/workflow.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -740,6 +740,9 @@ def workflow_is_continue_as_new_suggested(self) -> bool: ...
740740
@abstractmethod
741741
def workflow_is_replaying(self) -> bool: ...
742742

743+
@abstractmethod
744+
def workflow_is_replaying_history_events(self) -> bool: ...
745+
743746
@abstractmethod
744747
def workflow_memo(self) -> Mapping[str, Any]: ...
745748

@@ -1444,11 +1447,24 @@ def _set_in_sandbox(v: bool) -> None:
14441447
def is_replaying() -> bool:
14451448
"""Whether the workflow is currently replaying.
14461449
1450+
This includes queries and update validators that occur during replay.
1451+
14471452
Returns:
14481453
True if the workflow is currently replaying
14491454
"""
14501455
return _Runtime.current().workflow_is_replaying()
14511456

1457+
@staticmethod
1458+
def is_replaying_history_events() -> bool:
1459+
"""Whether the workflow is replaying history events.
1460+
1461+
This excludes queries and update validators, which are live operations.
1462+
1463+
Returns:
1464+
True if replaying history events, False otherwise.
1465+
"""
1466+
return _Runtime.current().workflow_is_replaying_history_events()
1467+
14521468
@staticmethod
14531469
def is_sandbox_unrestricted() -> bool:
14541470
"""Whether the current block of code is not restricted via sandbox.
@@ -1602,7 +1618,7 @@ def process(
16021618

16031619
def isEnabledFor(self, level: int) -> bool:
16041620
"""Override to ignore replay logs."""
1605-
if not self.log_during_replay and unsafe.is_replaying():
1621+
if not self.log_during_replay and unsafe.is_replaying_history_events():
16061622
return False
16071623
return super().isEnabledFor(level)
16081624

tests/worker/test_workflow.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1986,6 +1986,7 @@ def my_update(self, value: str) -> None:
19861986

19871987
@workflow.query
19881988
def last_signal(self) -> str:
1989+
workflow.logger.info("Query called")
19891990
return self._last_signal
19901991

19911992

@@ -2021,6 +2022,7 @@ async def test_workflow_logging(client: Client):
20212022
assert capturer.find_log("Signal: signal 2")
20222023
assert capturer.find_log("Update: update 1")
20232024
assert capturer.find_log("Update: update 2")
2025+
assert capturer.find_log("Query called")
20242026
assert not capturer.find_log("Signal: signal 3")
20252027
# Also make sure it has some workflow info and correct funcName
20262028
record = capturer.find_log("Signal: signal 1")

0 commit comments

Comments
 (0)