Skip to content

Commit efe6c17

Browse files
authored
Saga sample (#47)
Fixes #14
1 parent 3dc2e07 commit efe6c17

7 files changed

Lines changed: 237 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ Prerequisites:
3030
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.
3131
* [polling/infrequent](polling/infrequent) - Implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature.
3232
* [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities.
33+
* [saga](saga) - Using undo/compensation using a very simplistic Saga pattern.
3334
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
3435
* [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker.
3536

saga/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Saga
2+
3+
This sample demonstrates undo/compensation using a very simplistic Saga pattern.
4+
5+
To run, first see [README.md](../README.md) for prerequisites. Then, in another terminal, start the Ruby worker
6+
from this directory:
7+
8+
bundle exec ruby worker.rb
9+
10+
Finally in another terminal, use the Ruby client to the workflow from this directory:
11+
12+
bundle exec ruby starter.rb
13+
14+
By intention, this will fail with an exception and backtrace. The exception will be a
15+
`Temporalio::Error::WorkflowFailedError` with a cause of `Temporalio::Error::ActivityError` that will have a
16+
"Simulated failure" message.
17+
18+
Looking at the worker side, the following logs will be visible (adjusted for clarity):
19+
20+
```
21+
INFO -- : Withdrawing 100 from acc1000. Reference ID: 1324
22+
INFO -- : Depositing 100 into acc2000. Reference ID: 1324
23+
INFO -- : Simulate failure to trigger compensation. Reference ID: 1324
24+
WARN -- : Completing activity as failed
25+
WARN -- : Simulated failure
26+
<backtrace omitted>
27+
INFO -- : Undoing deposit of 100 into acc2000. Reference ID: 1324
28+
INFO -- : Undoing withdraw of 100 from acc1000. Reference ID: 1324
29+
```
30+
31+
This shows a withdraw and deposit activity completing, but then an activity raised an error (by intention in this
32+
sample), so we undo the deposit/withdraw in reverse before re-raising that error. These steps that were performed are
33+
also visible in the UI.

saga/activities.rb

Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
5+
# To get JSON additions for struct
6+
require 'json/add/struct'
7+
8+
module Saga
9+
module Activities
10+
# Transfer details parameter we use everywhere
11+
TransferDetails = Struct.new(
12+
:amount,
13+
:from_account,
14+
:to_account,
15+
:reference_id
16+
)
17+
18+
class Withdraw < Temporalio::Activity::Definition
19+
def execute(details)
20+
Temporalio::Activity::Context.current.logger.info(
21+
"Withdrawing #{details.amount} from #{details.from_account}. Reference ID: #{details.reference_id}"
22+
)
23+
end
24+
end
25+
26+
class WithdrawCompensation < Temporalio::Activity::Definition
27+
def execute(details)
28+
Temporalio::Activity::Context.current.logger.info(
29+
"Undoing withdraw of #{details.amount} from #{details.from_account}. Reference ID: #{details.reference_id}"
30+
)
31+
end
32+
end
33+
34+
class Deposit < Temporalio::Activity::Definition
35+
def execute(details)
36+
Temporalio::Activity::Context.current.logger.info(
37+
"Depositing #{details.amount} into #{details.to_account}. Reference ID: #{details.reference_id}"
38+
)
39+
end
40+
end
41+
42+
class DepositCompensation < Temporalio::Activity::Definition
43+
def execute(details)
44+
Temporalio::Activity::Context.current.logger.info(
45+
"Undoing deposit of #{details.amount} into #{details.to_account}. Reference ID: #{details.reference_id}"
46+
)
47+
end
48+
end
49+
50+
class SomethingThatFails < Temporalio::Activity::Definition
51+
def execute(details)
52+
Temporalio::Activity::Context.current.logger.info(
53+
"Simulate failure to trigger compensation. Reference ID: #{details.reference_id}"
54+
)
55+
raise Temporalio::Error::ApplicationError.new('Simulated failure', non_retryable: true)
56+
end
57+
end
58+
end
59+
end

saga/saga_workflow.rb

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'activities'
5+
6+
module Saga
7+
class SagaWorkflow < Temporalio::Workflow::Definition
8+
def execute(details)
9+
# Collect compensation activities (aka activities to perform undo) in an array
10+
compensations = []
11+
12+
# Perform some actions. Notice how we add compensations _before_ executing the associated activities. This is a
13+
# user choice, but usually it is best before because the activity may have run but the end of it failed or timed
14+
# out. The compensation should be smart enough to check what is about to undo before it does it.
15+
16+
# Withdraw money
17+
compensations << Activities::WithdrawCompensation
18+
Temporalio::Workflow.execute_activity(Activities::Withdraw, details, start_to_close_timeout: 30)
19+
20+
# Deposit money
21+
compensations << Activities::DepositCompensation
22+
Temporalio::Workflow.execute_activity(Activities::Deposit, details, start_to_close_timeout: 30)
23+
24+
# Simulate a failure. This simulates a failure after withdraw and deposit, but this could just as easily be a
25+
# failure with either of those.
26+
Temporalio::Workflow.execute_activity(Activities::SomethingThatFails, details, start_to_close_timeout: 30)
27+
28+
# Never reached
29+
nil
30+
rescue StandardError
31+
# Perform the compensations in reverse. It is user choice on whether a compensation failure should be allowed to
32+
# raise thereby swallowing the existing one, or if it should be swallowed. This sample raises the compensation
33+
# error because either error fails the workflow anyways and both errors are going to be visible in history.
34+
compensations.reverse_each do |compensating_activity|
35+
Temporalio::Workflow.execute_activity(compensating_activity, details, start_to_close_timeout: 30)
36+
end
37+
38+
# Re-raise
39+
raise
40+
end
41+
end
42+
end

saga/starter.rb

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'activities'
5+
require_relative 'saga_workflow'
6+
7+
# Create a Temporal client
8+
client = Temporalio::Client.connect('localhost:7233', 'default')
9+
10+
# Run workflow that we know will fail
11+
client.execute_workflow(
12+
Saga::SagaWorkflow,
13+
Saga::Activities::TransferDetails.new(
14+
amount: 100,
15+
from_account: 'acc1000',
16+
to_account: 'acc2000',
17+
reference_id: '1324'
18+
),
19+
id: 'saga-sample-workflow-id',
20+
task_queue: 'saga-sample'
21+
)

saga/worker.rb

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
require 'logger'
4+
require 'temporalio/client'
5+
require 'temporalio/worker'
6+
require_relative 'activities'
7+
require_relative 'saga_workflow'
8+
9+
# Create a Temporal client
10+
client = Temporalio::Client.connect(
11+
'localhost:7233', 'default',
12+
# Enable info logging to see our activity logs
13+
logger: Logger.new($stdout, level: Logger::INFO)
14+
)
15+
16+
# Create worker with the activities and workflow
17+
worker = Temporalio::Worker.new(
18+
client:,
19+
task_queue: 'saga-sample',
20+
activities: [Saga::Activities::Withdraw, Saga::Activities::WithdrawCompensation,
21+
Saga::Activities::Deposit, Saga::Activities::DepositCompensation,
22+
Saga::Activities::SomethingThatFails],
23+
workflows: [Saga::SagaWorkflow]
24+
)
25+
26+
# Run the worker until SIGINT
27+
puts 'Starting worker (ctrl+c to exit)'
28+
worker.run(shutdown_signals: ['SIGINT'])

test/saga/saga_workflow_test.rb

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'securerandom'
5+
require 'saga/activities'
6+
require 'saga/saga_workflow'
7+
require 'temporalio/testing'
8+
require 'temporalio/worker'
9+
require 'polling/infrequent/compose_greeting_activity'
10+
require 'polling/infrequent/test_service'
11+
12+
module Saga
13+
class SagaWorkflowTest < Test
14+
def test_workflow_runs_compensations
15+
# Run worker in test environment
16+
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
17+
worker = Temporalio::Worker.new(
18+
client: env.client,
19+
task_queue: "tq-#{SecureRandom.uuid}",
20+
activities: [Activities::Withdraw, Activities::WithdrawCompensation,
21+
Activities::Deposit, Activities::DepositCompensation,
22+
Activities::SomethingThatFails],
23+
workflows: [SagaWorkflow]
24+
)
25+
worker.run do
26+
# Start workflow
27+
handle = env.client.start_workflow(
28+
SagaWorkflow,
29+
Saga::Activities::TransferDetails.new(
30+
amount: 100,
31+
from_account: 'acc1000',
32+
to_account: 'acc2000',
33+
reference_id: '1324'
34+
),
35+
id: "wf-#{SecureRandom.uuid}",
36+
task_queue: worker.task_queue
37+
)
38+
39+
# Confirm it failed as expected
40+
err = assert_raises(Temporalio::Error::WorkflowFailedError) { handle.result }
41+
assert_instance_of(Temporalio::Error::ActivityError, err.cause)
42+
assert_instance_of(Temporalio::Error::ApplicationError, err.cause.cause)
43+
assert_equal('Simulated failure', err.cause.cause.message)
44+
45+
# Confirm last two events are the compensations
46+
activity_events = handle.fetch_history_events.map(&:activity_task_scheduled_event_attributes).compact
47+
assert_equal('DepositCompensation', activity_events[-2].activity_type.name)
48+
assert_equal('WithdrawCompensation', activity_events[-1].activity_type.name)
49+
end
50+
end
51+
end
52+
end
53+
end

0 commit comments

Comments
 (0)