Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions examples/simple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
# or:

@unleash = Unleash::Client.new(
url: 'https://unleash.herokuapp.com/api',
custom_http_headers: { 'Authorization': '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0' },
url: 'https://app.unleash-hosted.com/demo/api',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-using sdk-examples setup https://github.com/Unleash/unleash-sdk-examples/blob/main/Ruby/.env.example as heroku is deprecated

custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' },
app_name: 'simple-test',
instance_id: 'local-test-cli',
refresh_interval: 2,
metrics_interval: 2,
retry_limit: 2
)

# feature_name = "AwesomeFeature"
feature_name = "4343443"
feature_name = "example-flag"
unleash_context = Unleash::Context.new
unleash_context.user_id = 123

Expand Down
50 changes: 50 additions & 0 deletions examples/streaming.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env ruby

require 'unleash'
require 'unleash/context'

puts ">> START streaming.rb"

@unleash = Unleash::Client.new(
url: 'https://app.unleash-hosted.com/demo/api',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

demo doesn't have streaming enabled so this needs to be changed to enterprise URL

custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' },
app_name: 'streaming-test',
instance_id: 'local-streaming-cli',
refresh_interval: 2,
metrics_interval: 2,
retry_limit: 2,
experimental_mode: { type: 'streaming' },
timeout: 5,
log_level: Logger::DEBUG
)

feature_name = "example-flag"
unleash_context = Unleash::Context.new
unleash_context.user_id = 123

puts "Waiting for client to initialize..."
sleep 2

100.times do
if @unleash.is_enabled?(feature_name, unleash_context)
puts "> #{feature_name} is enabled"
else
puts "> #{feature_name} is not enabled"
end
sleep 1
puts "---"
puts ""
puts ""
end
feature_name = "foobar"
if @unleash.is_enabled?(feature_name, unleash_context, true)
puts "> #{feature_name} is enabled"
else
puts "> #{feature_name} is not enabled"
end

puts "> shutting down client..."

@unleash.shutdown

puts ">> END streaming.rb"
26 changes: 22 additions & 4 deletions lib/unleash/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
require 'unleash/toggle_fetcher'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there's a missing somewhere here and I think that's related to the difference between the toggle fetcher internals and LD's event source lib. The toggle fetching code splits the logic for fetching toggles and the concurrency responsibility into two classes, whereas with the event source lib it's a single entity. This means we can't treat them like interchangable ducks and I really think we should be able to do that. It feels very wrong to have a :streaming_client and :fetcher_scheduled_executor exposed as accesors when we can only ever have one of these and they do the same job in different ways. It's leading to a bunch of places in this PR where we're checking which we and making local decisions. If we created something like a fetch_client that composed of both the toggle_fetcher and scheduled_executor which exposes start and stop methods we could make a lot of that go away

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the root cause is who's driving the new updates: scheduler or streaming client handler. I will play around with a different split of responsibilities (as I did in the Java SDK) on Monday but I'm not sure we can have a drop-in replacement here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You comment was spot on. Thank you for that. I managed to create streaming_client_executor and streaming_event_processor. streaming_client_executor can play a role of a fetcher_scheduled_executor (kept the same field name in attr jus tin case someone depends on it). Please let me know if you like it more now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it!

require 'unleash/metrics_reporter'
require 'unleash/scheduled_executor'
require 'unleash/streaming_client'
require 'unleash/variant'
require 'unleash/util/http'
require 'ld-eventsource'
require 'logger'
require 'time'

module Unleash
class Client
attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor
attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor, :streaming_client

# rubocop:disable Metrics/AbcSize
def initialize(*opts)
Expand All @@ -21,7 +23,8 @@ def initialize(*opts)
Unleash.engine = YggdrasilEngine.new
Unleash.engine.register_custom_strategies(Unleash.configuration.strategies.custom_strategies)

Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine
Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine unless Unleash.configuration.streaming_mode?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetcher makes initial HTTP call on instance creation. We don't want that for how with streaming to avoid cross locks between streaming client and polling client


if Unleash.configuration.disable_client
Unleash.logger.warn "Unleash::Client is disabled! Will only return default (or bootstrapped if available) results!"
Unleash.logger.warn "Unleash::Client is disabled! Metrics and MetricsReporter are also disabled!"
Expand All @@ -30,7 +33,13 @@ def initialize(*opts)
end

register
start_toggle_fetcher

if Unleash.configuration.streaming_mode?
start_streaming_client
else
start_toggle_fetcher
end

start_metrics unless Unleash.configuration.disable_metrics
end
# rubocop:enable Metrics/AbcSize
Expand Down Expand Up @@ -105,7 +114,11 @@ def shutdown
# quick shutdown: just kill running threads
def shutdown!
unless Unleash.configuration.disable_client
self.fetcher_scheduled_executor.exit
if Unleash.configuration.streaming_mode?
self.streaming_client.stop
else
self.fetcher_scheduled_executor.exit
end
self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics
end
end
Expand Down Expand Up @@ -140,6 +153,11 @@ def start_toggle_fetcher
end
end

def start_streaming_client
self.streaming_client = Unleash::StreamingClient.new Unleash.engine
self.streaming_client.start
end

def start_metrics
Unleash.reporter = Unleash::MetricsReporter.new
self.metrics_scheduled_executor = Unleash::ScheduledExecutor.new(
Expand Down
18 changes: 16 additions & 2 deletions lib/unleash/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class Configuration
:log_level,
:bootstrap_config,
:strategies,
:use_delta_api
:use_delta_api,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should deprecate this property as we have a standardized experimental mode in Node SDK that is either {type: 'streaming'} or {type: 'polling', format: 'delta'} or {type: 'polling', format: 'full'}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think having a standard with interface between language and SDK is too important here, most of the SDKs provide interfaces that are comfortable in that language.

I do agree that we should deprecate this and use your pattern though

:experimental_mode
attr_reader :connection_id

def initialize(opts = {})
Expand Down Expand Up @@ -67,7 +68,9 @@ def fetch_toggles_uri
uri = nil
## Personal feeling but Rubocop's suggestion here is too dense to be properly readable
# rubocop:disable Style/ConditionalAssignment
if self.use_delta_api
if streaming_mode?
uri = URI("#{self.url_stripped_of_slash}/client/streaming")
elsif self.use_delta_api || polling_with_delta?
uri = URI("#{self.url_stripped_of_slash}/client/delta")
else
uri = URI("#{self.url_stripped_of_slash}/client/features")
Expand All @@ -93,6 +96,16 @@ def use_bootstrap?
self.bootstrap_config&.valid?
end

def streaming_mode?
self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming'
end

def polling_with_delta?
self.experimental_mode.is_a?(Hash) &&
self.experimental_mode[:type] == 'polling' &&
self.experimental_mode[:format] == 'delta'
end

private

def set_defaults
Expand All @@ -112,6 +125,7 @@ def set_defaults
self.bootstrap_config = nil
self.strategies = Unleash::Strategies.new
self.use_delta_api = false
self.experimental_mode = nil

self.custom_http_headers = {}
@connection_id = SecureRandom.uuid
Expand Down
93 changes: 93 additions & 0 deletions lib/unleash/streaming_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
require 'ld-eventsource'
require 'json'

module Unleash
class StreamingClient
attr_accessor :event_source, :toggle_engine, :running, :mutex

def initialize(toggle_engine)
self.toggle_engine = toggle_engine
self.event_source = nil
self.running = false
self.mutex = Mutex.new
end

def start
self.mutex.synchronize do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar synchronization pattern to toggle_fetcher so that we guard running property

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to guard running? That should only ever be a problem if two threads are calling start and stop on the same instance in quick succession, surely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No mutex needed in the scheduler for SSEs - lifecycle methods called from single thread. But in the event processor event processing happens from EventSource background threads that could potentially be concurrent so keeping it there

return if self.running || Unleash.configuration.disable_client

Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}"

headers = (Unleash.configuration.http_headers || {}).dup

self.event_source = SSE::Client.new(
Unleash.configuration.fetch_toggles_uri.to_s,
headers: headers,
read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute
reconnect_time: 2,
connect_timeout: 10,
logger: Unleash.logger
)

self.event_source.on_event do |event|
handle_event(event)
end

self.event_source.on_error do |error|
Unleash.logger.warn "Streaming error: #{error}"
end

self.running = true
end
end

def stop
self.mutex.synchronize do
return unless self.running

Unleash.logger.info "Stopping streaming client"
self.running = false
self.event_source&.close
self.event_source = nil
end
end

def running?
self.mutex.synchronize{ self.running }
end

private

def handle_event(event)
case event.type.to_s
when 'unleash-connected'
Unleash.logger.debug "Streaming client connected"
handle_connected_event(event)
when 'unleash-updated'
Unleash.logger.debug "Received streaming update"
handle_updated_event(event)
else
Unleash.logger.debug "Received unknown event type: #{event.type}"
end
rescue StandardError => e
Unleash.logger.error "Error handling streaming event: #{e.message}"
end

def handle_connected_event(event)
Unleash.logger.debug "Processing initial hydration data"
handle_updated_event(event)
end

def handle_updated_event(event)
self.mutex.synchronize do
self.toggle_engine.take_state(event.data)
end

# TODO: update backup file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state engine needs to expose current state so we can save it . Both streaming and polling with delta will need it

rescue JSON::ParserError => e
Unleash.logger.error "Failed to parse streaming event data: #{e.message}"
rescue StandardError => e
Unleash.logger.error "Error processing delta update: #{e.message}"
end
end
end
6 changes: 6 additions & 0 deletions spec/unleash/configuration_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@
expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/features?project=test-project')
end

it "should build the correct unleash endpoint when streaming mode is enabled" do
config = Unleash::Configuration.new(url: 'https://testurl/api', app_name: 'test-app')
config.experimental_mode = { type: 'streaming' }
expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/streaming')
end

it "should allow hashes for custom_http_headers via yield" do
Unleash.configure do |config|
config.url = 'http://test-url/'
Expand Down
1 change: 1 addition & 0 deletions unleash-client.gemspec
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Gem::Specification.new do |spec|
spec.require_paths = ["lib"]
spec.required_ruby_version = ">= 2.7"

spec.add_dependency "ld-eventsource", "~> 2.2.0"
spec.add_dependency "yggdrasil-engine", "~> 1.0.4"

spec.add_dependency "base64", "~> 0.3.0"
Expand Down
Loading