Skip to content

Commit 59f0004

Browse files
add periodic polling sample
1 parent ec6cb7e commit 59f0004

10 files changed

Lines changed: 209 additions & 2 deletions

File tree

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,9 @@ Prerequisites:
2727
* [encryption](encryption) - Demonstrates how to make a codec for end-to-end encryption.
2828
* [env_config](env_config) - Load client configuration from TOML files with programmatic overrides.
2929
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.
30+
* [polling/frequent](polling/infrequent) - Implement a frequent polling mechanism inside an Activity.
3031
* [polling/infrequent](polling/infrequent) - Implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature.
32+
* [polling/periodic_sequence](polling/infrequent) - Implement a periodic polling mechanism using a Child Workflow.
3133
* [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities.
3234
* [saga](saga) - Using undo/compensation using a very simplistic Saga pattern.
3335
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
@@ -40,4 +42,3 @@ Prerequisites:
4042
To check format and test this repository, run:
4143

4244
bundle exec rake
43-
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Periodic Polling of a Sequence of Activities
2+
3+
This sample demonstrates how to use a Child Workflow for periodic Activity polling.
4+
5+
This is a rare scenario where polling requires execution of a Sequence of Activities, or Activity arguments need to change between polling retries. For this case we use a Child Workflow to call polling activities a set number of times in a loop and then periodically call Continue-As-New.
6+
7+
## How to Run
8+
9+
To run, first see [README.md](../README.md) for prerequisites.
10+
11+
1. **Start the Worker:**
12+
13+
Open a terminal and run the following command to start the worker process.
14+
The worker will listen for tasks on the `frequent-polling-sample` task queue.
15+
16+
```bash
17+
bundle exec ruby worker.rb
18+
```
19+
20+
You will see the worker log messages indicating it is calling the service.
21+
It will try several times, with a short delay between each attempt.
22+
23+
2. **Start the Workflow:**
24+
25+
In a separate terminal, run this command to start the workflow.
26+
This script will start the workflow and wait for its completion,
27+
printing the final result.
28+
29+
```bash
30+
bundle exec ruby starter.rb
31+
```
32+
33+
After a few seconds, the service will succeed.
34+
You will see the final result printed in the starter's terminal,
35+
and the worker will log the successful completion.
36+
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'compose_greeting_activity'
5+
require_relative 'test_service'
6+
7+
module Polling
8+
module PeriodicSequence
9+
class ChildWorkflow < Temporalio::Workflow::Definition
10+
def execute(name)
11+
4.times do
12+
begin
13+
return Temporalio::Workflow.execute_activity(
14+
ComposeGreetingActivity,
15+
{ greeting: 'Hello', name: name },
16+
retry_policy: Temporalio::RetryPolicy.new(
17+
max_attempts: 1
18+
),
19+
start_to_close_timeout: 4
20+
)
21+
rescue Temporalio::Error::ActivityError
22+
Temporalio::Workflow.logger.info('Activity failed')
23+
end
24+
Temporalio::Workflow.sleep(1)
25+
end
26+
raise Temporalio::Workflow::ContinueAsNewError, name
27+
end
28+
end
29+
end
30+
end
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
require_relative 'test_service'
5+
6+
module Polling
7+
module PeriodicSequence
8+
class ComposeGreetingActivity < Temporalio::Activity::Definition
9+
def execute(input)
10+
activity_info = Temporalio::Activity::Context.current.info
11+
TestService.get_service_result(input, activity_info)
12+
end
13+
end
14+
end
15+
end
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'child_workflow'
5+
6+
module Polling
7+
module PeriodicSequence
8+
class GreetingWorkflow < Temporalio::Workflow::Definition
9+
def execute(name)
10+
Temporalio::Workflow.execute_child_workflow(ChildWorkflow, name)
11+
end
12+
end
13+
end
14+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'greeting_workflow'
5+
6+
# Create a client
7+
client = Temporalio::Client.connect('localhost:7233', 'default')
8+
9+
# Run workflow
10+
puts 'Executing workflow'
11+
result = client.execute_workflow(
12+
Polling::PeriodicSequence::GreetingWorkflow,
13+
'World',
14+
id: "periodic-sequence-polling-sample-workflow-id-#{Time.now.to_i}",
15+
task_queue: 'periodic-sequence-polling-sample'
16+
)
17+
puts "Workflow result: #{result}"
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
# frozen_string_literal: true
2+
3+
module Polling
4+
module PeriodicSequence
5+
# A mock external service with simulated errors.
6+
module TestService
7+
class TestServiceError < StandardError; end
8+
9+
@attempts = Hash.new(0)
10+
ERROR_ATTEMPTS = 5
11+
12+
def get_service_result(input, activity_info)
13+
workflow_id = activity_info.workflow_id
14+
@attempts[workflow_id] ||= 0
15+
@attempts[workflow_id] += 1
16+
17+
# Fake delay to simulate service call
18+
sleep 0.01
19+
puts "Attempt #{@attempts[workflow_id]} of #{ERROR_ATTEMPTS} to invoke service"
20+
21+
raise TestServiceError, 'service is down' unless @attempts[workflow_id] == ERROR_ATTEMPTS
22+
23+
"#{input['greeting']}, #{input['name']}!"
24+
end
25+
module_function :get_service_result
26+
end
27+
end
28+
end
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'greeting_workflow'
4+
require_relative 'child_workflow'
5+
require_relative 'compose_greeting_activity'
6+
require 'temporalio/client'
7+
require 'temporalio/worker'
8+
9+
# Create a client
10+
client = Temporalio::Client.connect('localhost:7233', 'default')
11+
12+
worker = Temporalio::Worker.new(
13+
client:,
14+
task_queue: 'periodic-sequence-polling-sample',
15+
workflows: [Polling::PeriodicSequence::GreetingWorkflow, Polling::PeriodicSequence::ChildWorkflow],
16+
activities: [Polling::PeriodicSequence::ComposeGreetingActivity]
17+
)
18+
19+
puts 'Starting worker (ctrl+c to exit)'
20+
worker.run(shutdown_signals: ['SIGINT'])

test/polling/frequent/greeting_workflow_test.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,6 @@ module Polling
1212
module Frequent
1313
class GreetingWorkflowTest < Test
1414
def test_workflow_completes_after_polling
15-
# skip_if_not_x86!
1615
task_queue = "tq-#{SecureRandom.uuid}"
1716

1817
Temporalio::Testing::WorkflowEnvironment.start_local do |env|
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'securerandom'
5+
require 'temporalio/testing'
6+
require 'temporalio/worker'
7+
require 'polling/periodic_sequence/greeting_workflow'
8+
require 'polling/periodic_sequence/compose_greeting_activity'
9+
require 'polling/periodic_sequence/test_service'
10+
11+
module Polling
12+
module PeriodicSequence
13+
class GreetingWorkflowTest < Test
14+
def test_workflow_completes_after_polling
15+
task_queue = "tq-#{SecureRandom.uuid}"
16+
17+
Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env|
18+
worker = Temporalio::Worker.new(
19+
client: env.client,
20+
task_queue: task_queue,
21+
activities: [Polling::PeriodicSequence::ComposeGreetingActivity],
22+
workflows: [Polling::PeriodicSequence::GreetingWorkflow, Polling::PeriodicSequence::ChildWorkflow]
23+
)
24+
25+
handle = env.client.start_workflow(
26+
Polling::PeriodicSequence::GreetingWorkflow,
27+
'Temporal',
28+
id: "wf-#{SecureRandom.uuid}",
29+
task_queue: task_queue
30+
)
31+
32+
worker.run do
33+
env.sleep(5)
34+
# Wait for the workflow to complete and assert its result
35+
result = handle.result
36+
assert_equal 'Hello, Temporal!', result
37+
end
38+
39+
child_started_event = handle.fetch_history_events.filter_map do |e|
40+
e.child_workflow_execution_started_event_attributes&.workflow_type&.name
41+
end.first
42+
assert_equal 'ChildWorkflow', child_started_event
43+
end
44+
end
45+
end
46+
end
47+
end

0 commit comments

Comments
 (0)