Skip to content

[Bug] Updates conflict with replay #848

@ndtretyak

Description

@ndtretyak

What are you really trying to do?

I'm using updates to allow the workflow to proceed to the next step.

Describe the bug

If an update occurs when the workflow execution is not cached on the worker, the update is effectively "lost", and I need to issue another update for it to be applied.

Minimal Reproduction

  1. Start the worker

  2. Start the workflow

    temporal workflow start --task-queue my-task-queue --type Workflow --workflow-id test --id-reuse-policy TerminateIfRunning
    
  3. Restart the worker

  4. Execute the update

    temporal workflow update execute --workflow-id test --name go
    
  5. Observe that the activity has not been scheduled during the update handling. Additionally, there are some debug prints in the workflow code that may help.

import asyncio
import concurrent.futures
import datetime

from temporalio import activity
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker


@activity.defn
async def job() -> None:
    activity.logger.info("Activity started")


@workflow.defn(sandboxed=False)
class Workflow:
    def _set_can_run(self, value: bool, comment: str) -> None:
        print("Setting can_run to:", value, "comment:", comment)
        self.can_run = value

    @workflow.update
    def go(self) -> None:
        self._set_can_run(True, "update")

    @workflow.run
    async def run(self) -> None:
        self._set_can_run(True, "workflow start")

        def wait_condition() -> bool:
            print("Checking condition. result:", self.can_run)
            return self.can_run

        while True:
            await workflow.wait_condition(wait_condition)
            await workflow.execute_activity(job, start_to_close_timeout=datetime.timedelta(seconds=5))
            self._set_can_run(False, "activity completed")


async def main():
    client = await Client.connect("localhost:7233")

    with concurrent.futures.ThreadPoolExecutor(max_workers=100) as activity_executor:
        worker = Worker(
            client,
            task_queue="my-task-queue",
            workflows=[Workflow],
          activities=[job],
          activity_executor=activity_executor,
        )
    await worker.run()


if __name__ == "__main__":
    asyncio.run(main())

Environment/Versions

  • Temporal Version: 1.27.1
  • SDK Version: 1.11.0

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions