Skip to content
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions examples/simple.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,16 @@
# or:

@unleash = Unleash::Client.new(
url: 'https://unleash.herokuapp.com/api',
custom_http_headers: { 'Authorization': '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0' },
url: 'https://app.unleash-hosted.com/demo/api',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

re-using sdk-examples setup https://github.com/Unleash/unleash-sdk-examples/blob/main/Ruby/.env.example as heroku is deprecated

custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' },
app_name: 'simple-test',
instance_id: 'local-test-cli',
refresh_interval: 2,
metrics_interval: 2,
retry_limit: 2
)

# feature_name = "AwesomeFeature"
feature_name = "4343443"
feature_name = "example-flag"
unleash_context = Unleash::Context.new
unleash_context.user_id = 123

Expand Down
50 changes: 50 additions & 0 deletions examples/streaming.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
#!/usr/bin/env ruby

require 'unleash'
require 'unleash/context'

puts ">> START streaming.rb"

@unleash = Unleash::Client.new(
url: 'https://app.unleash-hosted.com/demo/api',
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

demo doesn't have streaming enabled so this needs to be changed to enterprise URL

custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' },
app_name: 'streaming-test',
instance_id: 'local-streaming-cli',
refresh_interval: 2,
metrics_interval: 2,
retry_limit: 2,
experimental_mode: { type: 'streaming' },
timeout: 5,
log_level: Logger::DEBUG
)

feature_name = "example-flag"
unleash_context = Unleash::Context.new
unleash_context.user_id = 123

puts "Waiting for client to initialize..."
sleep 2

100.times do
if @unleash.is_enabled?(feature_name, unleash_context)
puts "> #{feature_name} is enabled"
else
puts "> #{feature_name} is not enabled"
end
sleep 1
puts "---"
puts ""
puts ""
end
feature_name = "foobar"
if @unleash.is_enabled?(feature_name, unleash_context, true)
puts "> #{feature_name} is enabled"
else
puts "> #{feature_name} is not enabled"
end

puts "> shutting down client..."

@unleash.shutdown

puts ">> END streaming.rb"
30 changes: 26 additions & 4 deletions lib/unleash/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@
require 'unleash/toggle_fetcher'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like there's a missing somewhere here and I think that's related to the difference between the toggle fetcher internals and LD's event source lib. The toggle fetching code splits the logic for fetching toggles and the concurrency responsibility into two classes, whereas with the event source lib it's a single entity. This means we can't treat them like interchangable ducks and I really think we should be able to do that. It feels very wrong to have a :streaming_client and :fetcher_scheduled_executor exposed as accesors when we can only ever have one of these and they do the same job in different ways. It's leading to a bunch of places in this PR where we're checking which we and making local decisions. If we created something like a fetch_client that composed of both the toggle_fetcher and scheduled_executor which exposes start and stop methods we could make a lot of that go away

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the root cause is who's driving the new updates: scheduler or streaming client handler. I will play around with a different split of responsibilities (as I did in the Java SDK) on Monday but I'm not sure we can have a drop-in replacement here

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You comment was spot on. Thank you for that. I managed to create streaming_client_executor and streaming_event_processor. streaming_client_executor can play a role of a fetcher_scheduled_executor (kept the same field name in attr jus tin case someone depends on it). Please let me know if you like it more now

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it!

require 'unleash/metrics_reporter'
require 'unleash/scheduled_executor'
require 'unleash/streaming_client'
require 'unleash/variant'
require 'unleash/util/http'
require 'unleash/util/event_source_wrapper'
require 'logger'
require 'time'

module Unleash
class Client
attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor
attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor, :streaming_client

# rubocop:disable Metrics/AbcSize
def initialize(*opts)
Expand All @@ -21,7 +23,8 @@ def initialize(*opts)
Unleash.engine = YggdrasilEngine.new
Unleash.engine.register_custom_strategies(Unleash.configuration.strategies.custom_strategies)

Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine
Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine unless Unleash.configuration.streaming_mode?
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fetcher makes initial HTTP call on instance creation. We don't want that for how with streaming to avoid cross locks between streaming client and polling client


if Unleash.configuration.disable_client
Unleash.logger.warn "Unleash::Client is disabled! Will only return default (or bootstrapped if available) results!"
Unleash.logger.warn "Unleash::Client is disabled! Metrics and MetricsReporter are also disabled!"
Expand All @@ -30,7 +33,9 @@ def initialize(*opts)
end

register
start_toggle_fetcher

initialize_client_mode

start_metrics unless Unleash.configuration.disable_metrics
end
# rubocop:enable Metrics/AbcSize
Expand Down Expand Up @@ -105,7 +110,11 @@ def shutdown
# quick shutdown: just kill running threads
def shutdown!
unless Unleash.configuration.disable_client
self.fetcher_scheduled_executor.exit
if Unleash.configuration.streaming_mode?
self.streaming_client.stop
else
self.fetcher_scheduled_executor.exit
end
self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics
end
end
Expand Down Expand Up @@ -140,6 +149,11 @@ def start_toggle_fetcher
end
end

def start_streaming_client
self.streaming_client = Unleash::StreamingClient.new Unleash.engine
self.streaming_client.start
end

def start_metrics
Unleash.reporter = Unleash::MetricsReporter.new
self.metrics_scheduled_executor = Unleash::ScheduledExecutor.new(
Expand Down Expand Up @@ -172,5 +186,13 @@ def disabled_variant
def first_fetch_is_eager
Unleash.configuration.use_bootstrap?
end

def initialize_client_mode
if Unleash.configuration.streaming_mode?
start_streaming_client
else
start_toggle_fetcher
end
end
end
end
20 changes: 18 additions & 2 deletions lib/unleash/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class Configuration
:log_level,
:bootstrap_config,
:strategies,
:use_delta_api
:use_delta_api,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should deprecate this property as we have a standardized experimental mode in Node SDK that is either {type: 'streaming'} or {type: 'polling', format: 'delta'} or {type: 'polling', format: 'full'}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think having a standard with interface between language and SDK is too important here, most of the SDKs provide interfaces that are comfortable in that language.

I do agree that we should deprecate this and use your pattern though

:experimental_mode
attr_reader :connection_id

def initialize(opts = {})
Expand Down Expand Up @@ -67,7 +68,9 @@ def fetch_toggles_uri
uri = nil
## Personal feeling but Rubocop's suggestion here is too dense to be properly readable
# rubocop:disable Style/ConditionalAssignment
if self.use_delta_api
if streaming_mode?
uri = URI("#{self.url_stripped_of_slash}/client/streaming")
elsif self.use_delta_api || polling_with_delta?
uri = URI("#{self.url_stripped_of_slash}/client/delta")
else
uri = URI("#{self.url_stripped_of_slash}/client/features")
Expand All @@ -93,6 +96,18 @@ def use_bootstrap?
self.bootstrap_config&.valid?
end

def streaming_mode?
return false if RUBY_ENGINE == 'jruby'
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should consider raising an exception here or logging an error. I would be very annoyed if an SDK I used didn't do what I told it and quietly did something different

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair point


self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming'
end

def polling_with_delta?
self.experimental_mode.is_a?(Hash) &&
self.experimental_mode[:type] == 'polling' &&
self.experimental_mode[:format] == 'delta'
end

private

def set_defaults
Expand All @@ -112,6 +127,7 @@ def set_defaults
self.bootstrap_config = nil
self.strategies = Unleash::Strategies.new
self.use_delta_api = false
self.experimental_mode = nil

self.custom_http_headers = {}
@connection_id = SecureRandom.uuid
Expand Down
107 changes: 107 additions & 0 deletions lib/unleash/streaming_client.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
require 'json'

module Unleash
class StreamingClient
attr_accessor :event_source, :toggle_engine, :running, :mutex

def initialize(toggle_engine)
self.toggle_engine = toggle_engine
self.event_source = nil
self.running = false
self.mutex = Mutex.new
end

def start
self.mutex.synchronize do
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

similar synchronization pattern to toggle_fetcher so that we guard running property

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to guard running? That should only ever be a problem if two threads are calling start and stop on the same instance in quick succession, surely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No mutex needed in the scheduler for SSEs - lifecycle methods called from single thread. But in the event processor event processing happens from EventSource background threads that could potentially be concurrent so keeping it there

return if self.running || Unleash.configuration.disable_client

Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}"

self.event_source = create_event_source
return if self.event_source.nil?

setup_event_handlers

self.running = true
end
end

def stop
self.mutex.synchronize do
return unless self.running

Unleash.logger.info "Stopping streaming client"
self.running = false
self.event_source&.close
self.event_source = nil
end
end

def running?
self.mutex.synchronize{ self.running }
end

private

def handle_event(event)
case event.type.to_s
when 'unleash-connected'
Unleash.logger.debug "Streaming client connected"
handle_connected_event(event)
when 'unleash-updated'
Unleash.logger.debug "Received streaming update"
handle_updated_event(event)
else
Unleash.logger.debug "Received unknown event type: #{event.type}"
end
rescue StandardError => e
Unleash.logger.error "Error handling streaming event: #{e.message}"
end

def handle_connected_event(event)
Unleash.logger.debug "Processing initial hydration data"
handle_updated_event(event)
end

def handle_updated_event(event)
self.mutex.synchronize do
self.toggle_engine.take_state(event.data)
end

# TODO: update backup file
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

state engine needs to expose current state so we can save it . Both streaming and polling with delta will need it

rescue JSON::ParserError => e
Unleash.logger.error "Failed to parse streaming event data: #{e.message}"
rescue StandardError => e
Unleash.logger.error "Error processing delta update: #{e.message}"
end

def create_event_source
sse_client = Unleash::Util::EventSourceWrapper.client
if sse_client.nil?
Unleash.logger.warn "Streaming mode is not available. Falling back to polling."
return nil
end

headers = (Unleash.configuration.http_headers || {}).dup

sse_client.new(
Unleash.configuration.fetch_toggles_uri.to_s,
headers: headers,
read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute
reconnect_time: 2,
connect_timeout: 10,
logger: Unleash.logger
)
end

def setup_event_handlers
self.event_source.on_event do |event|
handle_event(event)
end

self.event_source.on_error do |error|
Unleash.logger.warn "Streaming error: #{error}"
end
end
end
end
17 changes: 17 additions & 0 deletions lib/unleash/util/event_source_wrapper.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
module Unleash
module Util
module EventSourceWrapper
def self.client
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I'm a bit tempted to fail hard here rather than fall back to polling. I'm kinda okay with errors that happen on startup and I think it's better to force folks to set their stuff up correctly. We can always relax it later but if we do this now, I doubt we can enforce it later

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return nil if RUBY_ENGINE == 'jruby'

begin
require 'ld-eventsource'
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't let this dependency spread throughout the code

SSE::Client
rescue LoadError => e
Unleash.logger.error "Failed to load ld-eventsource: #{e.message}"
nil
end
end
end
end
end
Loading
Loading