Skip to content

Commit 410f2d5

Browse files
authored
Infrequent polling benign exception (#42)
* in progress * add infrequent polling sample with BENIGN exception * lint fixes * robocop fixes * review changes * review fixes, added test * lint fixes and test skip on certain envs
1 parent c87fe43 commit 410f2d5

9 files changed

Lines changed: 186 additions & 0 deletions

File tree

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ Prerequisites:
2828
* [context_propagation](context_propagation) - Use interceptors to propagate thread/fiber local data from clients
2929
through workflows/activities.
3030
* [message_passing_simple](message_passing_simple) - Simple workflow that accepts signals, queries, and updates.
31+
* [polling/infrequent](polling/infrequent) - Implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature.
3132
* [rails_app](rails_app) - Basic Rails API application using Temporal workflows and activities.
3233
* [sorbet_generic](sorbet_generic) - Proof of concept of how to do _advanced_ Sorbet typing with the SDK.
3334
* [worker_specific_task_queues](worker_specific_task_queues) - Use a unique Task Queue for each Worker to run a sequence of Activities on the same Worker.

polling/infrequent/README.md

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
# Infrequent Polling using Activity Retries
2+
3+
This sample demonstrates how to implement an infrequent polling mechanism using Temporal's automatic Activity Retry feature.
4+
5+
## Purpose
6+
7+
A common requirement for a workflow is to poll an external service until a process is complete. This polling often needs to happen at infrequent intervals (e.g., once a minute or once an hour).
8+
9+
Instead of implementing a `while` loop with a `sleep` call inside the workflow, which can lead to very long-running workflows with large histories, we can offload this logic to Temporal's built-in retry mechanism. This is the more robust and recommended pattern.
10+
11+
This sample shows a workflow that calls an activity. The activity simulates a service that is not immediately available by raising an exception. The workflow configures a `RetryPolicy` on the activity, telling the Temporal Cluster to automatically retry it after a set interval. The workflow itself remains simple and clean, only seeing the final successful result or a terminal failure.
12+
13+
## How to Run
14+
15+
1. **Start the Worker:**
16+
17+
Open a terminal and run the following command to start the worker process. The worker will listen for tasks on the `infrequent-polling-sample` task queue.
18+
19+
```bash
20+
bundle exec ruby polling/infrequent/worker.rb
21+
```
22+
23+
You will see the worker log messages indicating it is attempting to run the activity. It will try several times, with a 60-second delay between each attempt.
24+
25+
2. **Start the Workflow:**
26+
27+
In a separate terminal, run this command to start the workflow. This script will start the workflow and wait for its completion, printing the final result.
28+
29+
```bash
30+
bundle exec ruby polling/infrequent/starter.rb
31+
```
32+
33+
After about 4 minutes (4 failed attempts with a 60s delay), the service will succeed. You will see the final result printed in the starter's terminal, and the worker will log the successful completion.
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/activity'
4+
require_relative 'test_service'
5+
6+
module Polling
7+
module Infrequent
8+
class ComposeGreetingActivity < Temporalio::Activity::Definition
9+
def execute(input)
10+
activity_info = Temporalio::Activity::Context.current.info
11+
TestService.get_service_result(input, activity_info)
12+
rescue TestService::TestServiceError => e
13+
raise Temporalio::Error::ApplicationError.new(
14+
e.message,
15+
category: Temporalio::Error::ApplicationError::Category::BENIGN
16+
)
17+
end
18+
end
19+
end
20+
end
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/workflow'
4+
require_relative 'compose_greeting_activity'
5+
6+
module Polling
7+
module Infrequent
8+
class GreetingWorkflow < Temporalio::Workflow::Definition
9+
def execute(name)
10+
Temporalio::Workflow.execute_activity(
11+
ComposeGreetingActivity,
12+
{ greeting: 'Hello', name: name },
13+
retry_policy: Temporalio::RetryPolicy.new(
14+
initial_interval: 1, # seconds
15+
backoff_coefficient: 1.0
16+
),
17+
start_to_close_timeout: 2
18+
)
19+
end
20+
end
21+
end
22+
end

polling/infrequent/starter.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# frozen_string_literal: true
2+
3+
require 'temporalio/client'
4+
require_relative 'greeting_workflow'
5+
6+
# Create a client
7+
client = Temporalio::Client.connect('localhost:7233', 'default')
8+
9+
# Run workflow
10+
puts 'Executing workflow'
11+
result = client.execute_workflow(
12+
Polling::Infrequent::GreetingWorkflow,
13+
'World',
14+
id: "infrequent-polling-sample-workflow-id-#{Time.now.to_i}",
15+
task_queue: 'infrequent-polling-sample'
16+
)
17+
puts "Workflow result: #{result}"

polling/infrequent/test_service.rb

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
# frozen_string_literal: true
2+
3+
module Polling
4+
module Infrequent
5+
# A mock external service with simulated errors.
6+
module TestService
7+
class TestServiceError < StandardError; end
8+
9+
@attempts = Hash.new(0)
10+
ERROR_ATTEMPTS = 5
11+
12+
def get_service_result(input, activity_info)
13+
workflow_id = activity_info.workflow_id
14+
@attempts[workflow_id] ||= 0
15+
@attempts[workflow_id] += 1
16+
17+
puts "Attempt #{@attempts[workflow_id]} of #{ERROR_ATTEMPTS} to invoke service"
18+
19+
raise TestServiceError, 'service is down' unless @attempts[workflow_id] == ERROR_ATTEMPTS
20+
21+
"#{input['greeting']}, #{input['name']}!"
22+
end
23+
module_function :get_service_result
24+
end
25+
end
26+
end

polling/infrequent/worker.rb

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
# frozen_string_literal: true
2+
3+
require_relative 'greeting_workflow'
4+
require_relative 'compose_greeting_activity'
5+
require 'temporalio/client'
6+
require 'temporalio/worker'
7+
8+
# Create a client
9+
client = Temporalio::Client.connect('localhost:7233', 'default')
10+
11+
worker = Temporalio::Worker.new(
12+
client:,
13+
task_queue: 'infrequent-polling-sample',
14+
workflows: [Polling::Infrequent::GreetingWorkflow],
15+
activities: [Polling::Infrequent::ComposeGreetingActivity]
16+
)
17+
18+
puts 'Starting worker (ctrl+c to exit)'
19+
worker.run(shutdown_signals: ['SIGINT'])
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
# frozen_string_literal: true
2+
3+
require 'test'
4+
require 'securerandom'
5+
require 'temporalio/testing'
6+
require 'temporalio/worker'
7+
require 'polling/infrequent/greeting_workflow'
8+
require 'polling/infrequent/compose_greeting_activity'
9+
require 'polling/infrequent/test_service'
10+
11+
module Polling
12+
module Infrequent
13+
class GreetingWorkflowTest < Test
14+
def test_workflow_completes_after_polling
15+
skip_if_not_x86!
16+
task_queue = "tq-#{SecureRandom.uuid}"
17+
18+
Temporalio::Testing::WorkflowEnvironment.start_time_skipping do |env|
19+
worker = Temporalio::Worker.new(
20+
client: env.client,
21+
task_queue: task_queue,
22+
activities: [Polling::Infrequent::ComposeGreetingActivity],
23+
workflows: [Polling::Infrequent::GreetingWorkflow]
24+
)
25+
26+
handle = env.client.start_workflow(
27+
Polling::Infrequent::GreetingWorkflow,
28+
'Temporal',
29+
id: "wf-#{SecureRandom.uuid}",
30+
task_queue: task_queue
31+
)
32+
33+
worker.run do
34+
# Advance time forward to allow for the 4 retries (4 * 60s) plus a buffer
35+
env.sleep(241)
36+
37+
# Wait for the workflow to complete and assert its result
38+
result = handle.result
39+
assert_equal('Hello, Temporal!', result)
40+
end
41+
end
42+
end
43+
end
44+
end
45+
end

test/test.rb

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,4 +3,7 @@
33
require 'minitest/autorun'
44

55
class Test < Minitest::Test
6+
def skip_if_not_x86!
7+
skip('Test only supported on x86') unless RbConfig::CONFIG['host_cpu'] == 'x86_64'
8+
end
69
end

0 commit comments

Comments
 (0)