From 7fb2b860c2e59ad962261cbed3b675d235cc5eab Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 12:09:54 +0200 Subject: [PATCH 01/18] feat: streaming support --- examples/simple.rb | 7 +-- examples/streaming.rb | 50 ++++++++++++++++ lib/unleash/client.rb | 26 +++++++-- lib/unleash/configuration.rb | 18 +++++- lib/unleash/streaming_client.rb | 93 ++++++++++++++++++++++++++++++ spec/unleash/configuration_spec.rb | 6 ++ unleash-client.gemspec | 1 + 7 files changed, 191 insertions(+), 10 deletions(-) create mode 100755 examples/streaming.rb create mode 100644 lib/unleash/streaming_client.rb diff --git a/examples/simple.rb b/examples/simple.rb index 4d74d0d6..fb1acafe 100755 --- a/examples/simple.rb +++ b/examples/simple.rb @@ -18,8 +18,8 @@ # or: @unleash = Unleash::Client.new( - url: 'https://unleash.herokuapp.com/api', - custom_http_headers: { 'Authorization': '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0' }, + url: 'https://app.unleash-hosted.com/demo/api', + custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' }, app_name: 'simple-test', instance_id: 'local-test-cli', refresh_interval: 2, @@ -27,8 +27,7 @@ retry_limit: 2 ) -# feature_name = "AwesomeFeature" -feature_name = "4343443" +feature_name = "example-flag" unleash_context = Unleash::Context.new unleash_context.user_id = 123 diff --git a/examples/streaming.rb b/examples/streaming.rb new file mode 100755 index 00000000..e43d989c --- /dev/null +++ b/examples/streaming.rb @@ -0,0 +1,50 @@ +#!/usr/bin/env ruby + +require 'unleash' +require 'unleash/context' + +puts ">> START streaming.rb" + +@unleash = Unleash::Client.new( + url: 'https://app.unleash-hosted.com/demo/api', + custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' }, + app_name: 'streaming-test', + instance_id: 'local-streaming-cli', + refresh_interval: 2, + metrics_interval: 2, + retry_limit: 2, + experimental_mode: { type: 'streaming' }, + timeout: 5, + log_level: Logger::DEBUG +) + +feature_name = "example-flag" +unleash_context = Unleash::Context.new +unleash_context.user_id = 123 + +puts "Waiting for client to initialize..." +sleep 2 + +100.times do + if @unleash.is_enabled?(feature_name, unleash_context) + puts "> #{feature_name} is enabled" + else + puts "> #{feature_name} is not enabled" + end + sleep 1 + puts "---" + puts "" + puts "" +end +feature_name = "foobar" +if @unleash.is_enabled?(feature_name, unleash_context, true) + puts "> #{feature_name} is enabled" +else + puts "> #{feature_name} is not enabled" +end + +puts "> shutting down client..." + +@unleash.shutdown + +puts ">> END streaming.rb" diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index 9538a86c..d994cb2e 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -2,14 +2,16 @@ require 'unleash/toggle_fetcher' require 'unleash/metrics_reporter' require 'unleash/scheduled_executor' +require 'unleash/streaming_client' require 'unleash/variant' require 'unleash/util/http' +require 'ld-eventsource' require 'logger' require 'time' module Unleash class Client - attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor + attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor, :streaming_client # rubocop:disable Metrics/AbcSize def initialize(*opts) @@ -21,7 +23,8 @@ def initialize(*opts) Unleash.engine = YggdrasilEngine.new Unleash.engine.register_custom_strategies(Unleash.configuration.strategies.custom_strategies) - Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine + Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine unless Unleash.configuration.streaming_mode? + if Unleash.configuration.disable_client Unleash.logger.warn "Unleash::Client is disabled! Will only return default (or bootstrapped if available) results!" Unleash.logger.warn "Unleash::Client is disabled! Metrics and MetricsReporter are also disabled!" @@ -30,7 +33,13 @@ def initialize(*opts) end register - start_toggle_fetcher + + if Unleash.configuration.streaming_mode? + start_streaming_client + else + start_toggle_fetcher + end + start_metrics unless Unleash.configuration.disable_metrics end # rubocop:enable Metrics/AbcSize @@ -105,7 +114,11 @@ def shutdown # quick shutdown: just kill running threads def shutdown! unless Unleash.configuration.disable_client - self.fetcher_scheduled_executor.exit + if Unleash.configuration.streaming_mode? + self.streaming_client.stop + else + self.fetcher_scheduled_executor.exit + end self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics end end @@ -140,6 +153,11 @@ def start_toggle_fetcher end end + def start_streaming_client + self.streaming_client = Unleash::StreamingClient.new Unleash.engine + self.streaming_client.start + end + def start_metrics Unleash.reporter = Unleash::MetricsReporter.new self.metrics_scheduled_executor = Unleash::ScheduledExecutor.new( diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index 7a32ad7f..a8788324 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -22,7 +22,8 @@ class Configuration :log_level, :bootstrap_config, :strategies, - :use_delta_api + :use_delta_api, + :experimental_mode attr_reader :connection_id def initialize(opts = {}) @@ -67,7 +68,9 @@ def fetch_toggles_uri uri = nil ## Personal feeling but Rubocop's suggestion here is too dense to be properly readable # rubocop:disable Style/ConditionalAssignment - if self.use_delta_api + if streaming_mode? + uri = URI("#{self.url_stripped_of_slash}/client/streaming") + elsif self.use_delta_api || polling_with_delta? uri = URI("#{self.url_stripped_of_slash}/client/delta") else uri = URI("#{self.url_stripped_of_slash}/client/features") @@ -93,6 +96,16 @@ def use_bootstrap? self.bootstrap_config&.valid? end + def streaming_mode? + self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' + end + + def polling_with_delta? + self.experimental_mode.is_a?(Hash) && + self.experimental_mode[:type] == 'polling' && + self.experimental_mode[:format] == 'delta' + end + private def set_defaults @@ -112,6 +125,7 @@ def set_defaults self.bootstrap_config = nil self.strategies = Unleash::Strategies.new self.use_delta_api = false + self.experimental_mode = nil self.custom_http_headers = {} @connection_id = SecureRandom.uuid diff --git a/lib/unleash/streaming_client.rb b/lib/unleash/streaming_client.rb new file mode 100644 index 00000000..c2509dc0 --- /dev/null +++ b/lib/unleash/streaming_client.rb @@ -0,0 +1,93 @@ +require 'ld-eventsource' +require 'json' + +module Unleash + class StreamingClient + attr_accessor :event_source, :toggle_engine, :running, :mutex + + def initialize(toggle_engine) + self.toggle_engine = toggle_engine + self.event_source = nil + self.running = false + self.mutex = Mutex.new + end + + def start + self.mutex.synchronize do + return if self.running || Unleash.configuration.disable_client + + Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}" + + headers = (Unleash.configuration.http_headers || {}).dup + + self.event_source = SSE::Client.new( + Unleash.configuration.fetch_toggles_uri.to_s, + headers: headers, + read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute + reconnect_time: 2, + connect_timeout: 10, + logger: Unleash.logger + ) + + self.event_source.on_event do |event| + handle_event(event) + end + + self.event_source.on_error do |error| + Unleash.logger.warn "Streaming error: #{error}" + end + + self.running = true + end + end + + def stop + self.mutex.synchronize do + return unless self.running + + Unleash.logger.info "Stopping streaming client" + self.running = false + self.event_source&.close + self.event_source = nil + end + end + + def running? + self.mutex.synchronize{ self.running } + end + + private + + def handle_event(event) + case event.type.to_s + when 'unleash-connected' + Unleash.logger.debug "Streaming client connected" + handle_connected_event(event) + when 'unleash-updated' + Unleash.logger.debug "Received streaming update" + handle_updated_event(event) + else + Unleash.logger.debug "Received unknown event type: #{event.type}" + end + rescue StandardError => e + Unleash.logger.error "Error handling streaming event: #{e.message}" + end + + def handle_connected_event(event) + Unleash.logger.debug "Processing initial hydration data" + handle_updated_event(event) + end + + def handle_updated_event(event) + self.mutex.synchronize do + self.toggle_engine.take_state(event.data) + end + + # TODO: update backup file + rescue JSON::ParserError => e + Unleash.logger.error "Failed to parse streaming event data: #{e.message}" + rescue StandardError => e + Unleash.logger.error "Error processing delta update: #{e.message}" + end + end +end diff --git a/spec/unleash/configuration_spec.rb b/spec/unleash/configuration_spec.rb index e8dc83d3..59d1705c 100644 --- a/spec/unleash/configuration_spec.rb +++ b/spec/unleash/configuration_spec.rb @@ -105,6 +105,12 @@ expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/features?project=test-project') end + it "should build the correct unleash endpoint when streaming mode is enabled" do + config = Unleash::Configuration.new(url: 'https://testurl/api', app_name: 'test-app') + config.experimental_mode = { type: 'streaming' } + expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/streaming') + end + it "should allow hashes for custom_http_headers via yield" do Unleash.configure do |config| config.url = 'http://test-url/' diff --git a/unleash-client.gemspec b/unleash-client.gemspec index cb23ebed..1cfc95ac 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" + spec.add_dependency "ld-eventsource", "~> 2.2.0" spec.add_dependency "yggdrasil-engine", "~> 1.0.4" spec.add_dependency "base64", "~> 0.3.0" From d52542a4661ae00fdb43cb817d6e12a4554a6b5f Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 14:00:54 +0200 Subject: [PATCH 02/18] test: streaming client --- spec/unleash/client_spec.rb | 83 +++++++++++++++++++++++++++++++++++++ 1 file changed, 83 insertions(+) diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index 93292a1a..37476e42 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -761,4 +761,87 @@ def enabled?(_params, context) unleash_client.is_enabled?('featureX', Unleash::Context.new({})) ).to be false end + + describe "streaming mode" do + it "should process unleash-connected event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-connected + data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end + + unleash_client = Unleash::Client.new + + sleep(0.1) + + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + + expect(unleash_client.is_enabled?('test-feature')).to be true + + unleash_client.shutdown! + end + + it "should process unleash-updated event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-updated + data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end + + unleash_client = Unleash::Client.new + + expect(unleash_client.streaming_client).to be_a(Unleash::StreamingClient) + expect(unleash_client.streaming_client.running?).to be true + + sleep(0.1) + + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + + expect(unleash_client.is_enabled?('test-feature')).to be true + + unleash_client.shutdown! + end + end end From 739a856f951e4ffc8d8db96b9519835d0a2274c1 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 14:22:28 +0200 Subject: [PATCH 03/18] test: streaming client fix for jruby --- unleash-client.gemspec | 1 + 1 file changed, 1 insertion(+) diff --git a/unleash-client.gemspec b/unleash-client.gemspec index 1cfc95ac..eb5839c1 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" + spec.add_dependency "http", "< 5.0.0" spec.add_dependency "ld-eventsource", "~> 2.2.0" spec.add_dependency "yggdrasil-engine", "~> 1.0.4" From 237e448140c62f96929dafee6784c61c463f3ce3 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 14:52:59 +0200 Subject: [PATCH 04/18] feat: jruby falls back to polling --- lib/unleash/client.rb | 5 +++++ lib/unleash/configuration.rb | 2 ++ spec/unleash/client_spec.rb | 4 ++++ unleash-client.gemspec | 1 - 4 files changed, 11 insertions(+), 1 deletion(-) diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index d994cb2e..658ab897 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -37,6 +37,11 @@ def initialize(*opts) if Unleash.configuration.streaming_mode? start_streaming_client else + if RUBY_ENGINE == 'jruby' && + Unleash.configuration.experimental_mode.is_a?(Hash) && + Unleash.configuration.experimental_mode[:type] == 'streaming' + Unleash.logger.warn "Streaming mode is disabled on JRuby. Falling back to polling." + end start_toggle_fetcher end diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index a8788324..5cdb4532 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -97,6 +97,8 @@ def use_bootstrap? end def streaming_mode? + return false if RUBY_ENGINE == 'jruby' + self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' end diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index 37476e42..76483a36 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -763,6 +763,10 @@ def enabled?(_params, context) end describe "streaming mode" do + before(:context) do + skip "Streaming mode is not supported on JRuby" if RUBY_ENGINE == 'jruby' + end + it "should process unleash-connected event" do WebMock.stub_request(:post, "http://test-url/client/register") .to_return(status: 200, body: "", headers: {}) diff --git a/unleash-client.gemspec b/unleash-client.gemspec index eb5839c1..1cfc95ac 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,7 +23,6 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" - spec.add_dependency "http", "< 5.0.0" spec.add_dependency "ld-eventsource", "~> 2.2.0" spec.add_dependency "yggdrasil-engine", "~> 1.0.4" From c9c842b17b819247b4c4b73285f86214bd06c13c Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 15:00:04 +0200 Subject: [PATCH 05/18] feat: jruby falls back to polling --- lib/unleash/client.rb | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index 658ab897..6eb5d9be 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -34,16 +34,7 @@ def initialize(*opts) register - if Unleash.configuration.streaming_mode? - start_streaming_client - else - if RUBY_ENGINE == 'jruby' && - Unleash.configuration.experimental_mode.is_a?(Hash) && - Unleash.configuration.experimental_mode[:type] == 'streaming' - Unleash.logger.warn "Streaming mode is disabled on JRuby. Falling back to polling." - end - start_toggle_fetcher - end + initialize_client_mode start_metrics unless Unleash.configuration.disable_metrics end @@ -195,5 +186,18 @@ def disabled_variant def first_fetch_is_eager Unleash.configuration.use_bootstrap? end + + def initialize_client_mode + if Unleash.configuration.streaming_mode? + start_streaming_client + else + if RUBY_ENGINE == 'jruby' && + Unleash.configuration.experimental_mode.is_a?(Hash) && + Unleash.configuration.experimental_mode[:type] == 'streaming' + Unleash.logger.warn "Streaming mode is disabled on JRuby. Falling back to polling." + end + start_toggle_fetcher + end + end end end From 8c26899f9309f276bcc701c8b5ddf9f06ceddfd1 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 15:48:23 +0200 Subject: [PATCH 06/18] feat: jruby falls back to polling --- lib/unleash/client.rb | 12 +-- lib/unleash/streaming_client.rb | 51 +++++++----- spec/unleash/client_spec.rb | 138 ++++++++++++++++---------------- unleash-client.gemspec | 2 +- 4 files changed, 108 insertions(+), 95 deletions(-) diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index 6eb5d9be..e8c2941c 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -5,7 +5,7 @@ require 'unleash/streaming_client' require 'unleash/variant' require 'unleash/util/http' -require 'ld-eventsource' +require 'unleash/util/event_source_wrapper' require 'logger' require 'time' @@ -189,13 +189,13 @@ def first_fetch_is_eager def initialize_client_mode if Unleash.configuration.streaming_mode? - start_streaming_client - else - if RUBY_ENGINE == 'jruby' && - Unleash.configuration.experimental_mode.is_a?(Hash) && - Unleash.configuration.experimental_mode[:type] == 'streaming' + if RUBY_ENGINE == 'jruby' Unleash.logger.warn "Streaming mode is disabled on JRuby. Falling back to polling." + start_toggle_fetcher + else + start_streaming_client end + else start_toggle_fetcher end end diff --git a/lib/unleash/streaming_client.rb b/lib/unleash/streaming_client.rb index c2509dc0..b80dacb8 100644 --- a/lib/unleash/streaming_client.rb +++ b/lib/unleash/streaming_client.rb @@ -1,4 +1,3 @@ -require 'ld-eventsource' require 'json' module Unleash @@ -18,24 +17,10 @@ def start Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}" - headers = (Unleash.configuration.http_headers || {}).dup + create_event_source + return if self.event_source.nil? - self.event_source = SSE::Client.new( - Unleash.configuration.fetch_toggles_uri.to_s, - headers: headers, - read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute - reconnect_time: 2, - connect_timeout: 10, - logger: Unleash.logger - ) - - self.event_source.on_event do |event| - handle_event(event) - end - - self.event_source.on_error do |error| - Unleash.logger.warn "Streaming error: #{error}" - end + setup_event_handlers self.running = true end @@ -89,5 +74,35 @@ def handle_updated_event(event) rescue StandardError => e Unleash.logger.error "Error processing delta update: #{e.message}" end + + def create_event_source + sse_client = Unleash::Util::EventSourceWrapper.client + if sse_client.nil? + Unleash.logger.warn "Streaming mode is not available. Falling back to polling." + self.event_source = nil + return + end + + headers = (Unleash.configuration.http_headers || {}).dup + + self.event_source = sse_client.new( + Unleash.configuration.fetch_toggles_uri.to_s, + headers: headers, + read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute + reconnect_time: 2, + connect_timeout: 10, + logger: Unleash.logger + ) + end + + def setup_event_handlers + self.event_source.on_event do |event| + handle_event(event) + end + + self.event_source.on_error do |error| + Unleash.logger.warn "Streaming error: #{error}" + end + end end end diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index 76483a36..c9f990d2 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -762,90 +762,88 @@ def enabled?(_params, context) ).to be false end - describe "streaming mode" do - before(:context) do - skip "Streaming mode is not supported on JRuby" if RUBY_ENGINE == 'jruby' - end + unless RUBY_ENGINE == 'jruby' + describe "streaming mode" do + it "should process unleash-connected event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-connected + data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end - it "should process unleash-connected event" do - WebMock.stub_request(:post, "http://test-url/client/register") - .to_return(status: 200, body: "", headers: {}) + unleash_client = Unleash::Client.new - sse_response_body = <<~SSE - event: unleash-connected - data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + sleep(0.1) - SSE + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) - WebMock.stub_request(:get, "http://test-url/client/streaming") - .with(headers: { 'X-API-KEY' => '123' }) - .to_return( - status: 200, - body: sse_response_body, - headers: { 'Content-Type' => 'text/event-stream' } - ) + expect(unleash_client.is_enabled?('test-feature')).to be true - Unleash.configure do |config| - config.url = 'http://test-url/' - config.app_name = 'my-test-app' - config.instance_id = 'rspec/test' - config.disable_metrics = true - config.custom_http_headers = { 'X-API-KEY' => '123' } - config.experimental_mode = { type: 'streaming' } + unleash_client.shutdown! end - unleash_client = Unleash::Client.new - - sleep(0.1) - - expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") - .with(headers: { 'X-API-KEY' => '123' }) + it "should process unleash-updated event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-updated + data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end - expect(unleash_client.is_enabled?('test-feature')).to be true + unleash_client = Unleash::Client.new - unleash_client.shutdown! - end + expect(unleash_client.streaming_client).to be_a(Unleash::StreamingClient) + expect(unleash_client.streaming_client.running?).to be true - it "should process unleash-updated event" do - WebMock.stub_request(:post, "http://test-url/client/register") - .to_return(status: 200, body: "", headers: {}) + sleep(0.1) - sse_response_body = <<~SSE - event: unleash-updated - data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) - SSE + expect(unleash_client.is_enabled?('test-feature')).to be true - WebMock.stub_request(:get, "http://test-url/client/streaming") - .with(headers: { 'X-API-KEY' => '123' }) - .to_return( - status: 200, - body: sse_response_body, - headers: { 'Content-Type' => 'text/event-stream' } - ) - - Unleash.configure do |config| - config.url = 'http://test-url/' - config.app_name = 'my-test-app' - config.instance_id = 'rspec/test' - config.disable_metrics = true - config.custom_http_headers = { 'X-API-KEY' => '123' } - config.experimental_mode = { type: 'streaming' } + unleash_client.shutdown! end - - unleash_client = Unleash::Client.new - - expect(unleash_client.streaming_client).to be_a(Unleash::StreamingClient) - expect(unleash_client.streaming_client.running?).to be true - - sleep(0.1) - - expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") - .with(headers: { 'X-API-KEY' => '123' }) - - expect(unleash_client.is_enabled?('test-feature')).to be true - - unleash_client.shutdown! end end end diff --git a/unleash-client.gemspec b/unleash-client.gemspec index 1cfc95ac..7bc8aa0e 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" - spec.add_dependency "ld-eventsource", "~> 2.2.0" + spec.add_dependency "ld-eventsource", "2.2.6" unless RUBY_ENGINE == 'jruby' spec.add_dependency "yggdrasil-engine", "~> 1.0.4" spec.add_dependency "base64", "~> 0.3.0" From c92cd91f835a4b03211408576c2abfee170ea365 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 15:51:15 +0200 Subject: [PATCH 07/18] feat: jruby falls back to polling --- lib/unleash/util/event_source_wrapper.rb | 17 +++++++++++++++++ unleash-client.gemspec | 2 +- 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 lib/unleash/util/event_source_wrapper.rb diff --git a/lib/unleash/util/event_source_wrapper.rb b/lib/unleash/util/event_source_wrapper.rb new file mode 100644 index 00000000..f08d65e9 --- /dev/null +++ b/lib/unleash/util/event_source_wrapper.rb @@ -0,0 +1,17 @@ +module Unleash + module Util + module EventSourceWrapper + def self.client + return nil if RUBY_ENGINE == 'jruby' + + begin + require 'ld-eventsource' + SSE::Client + rescue LoadError => e + Unleash.logger.error "Failed to load ld-eventsource: #{e.message}" + nil + end + end + end + end +end diff --git a/unleash-client.gemspec b/unleash-client.gemspec index 7bc8aa0e..31cf71b9 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,7 +23,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" - spec.add_dependency "ld-eventsource", "2.2.6" unless RUBY_ENGINE == 'jruby' + spec.add_dependency "ld-eventsource", "2.2.4" unless RUBY_ENGINE == 'jruby' spec.add_dependency "yggdrasil-engine", "~> 1.0.4" spec.add_dependency "base64", "~> 0.3.0" From 0ca130f8c86c05945fc0cf6fb66d0f3ad7cf6ec9 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 15:55:03 +0200 Subject: [PATCH 08/18] feat: jruby falls back to polling --- spec/unleash/configuration_spec.rb | 6 ------ 1 file changed, 6 deletions(-) diff --git a/spec/unleash/configuration_spec.rb b/spec/unleash/configuration_spec.rb index 59d1705c..e8dc83d3 100644 --- a/spec/unleash/configuration_spec.rb +++ b/spec/unleash/configuration_spec.rb @@ -105,12 +105,6 @@ expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/features?project=test-project') end - it "should build the correct unleash endpoint when streaming mode is enabled" do - config = Unleash::Configuration.new(url: 'https://testurl/api', app_name: 'test-app') - config.experimental_mode = { type: 'streaming' } - expect(config.fetch_toggles_uri.to_s).to eq('https://testurl/api/client/streaming') - end - it "should allow hashes for custom_http_headers via yield" do Unleash.configure do |config| config.url = 'http://test-url/' From 4bc41eed5a257cc1a9a1d69184794d01ce7f519c Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 16:00:50 +0200 Subject: [PATCH 09/18] refactor: remove mutable state --- lib/unleash/streaming_client.rb | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/lib/unleash/streaming_client.rb b/lib/unleash/streaming_client.rb index b80dacb8..0a3d3861 100644 --- a/lib/unleash/streaming_client.rb +++ b/lib/unleash/streaming_client.rb @@ -17,7 +17,7 @@ def start Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}" - create_event_source + self.event_source = create_event_source return if self.event_source.nil? setup_event_handlers @@ -79,13 +79,12 @@ def create_event_source sse_client = Unleash::Util::EventSourceWrapper.client if sse_client.nil? Unleash.logger.warn "Streaming mode is not available. Falling back to polling." - self.event_source = nil - return + return nil end headers = (Unleash.configuration.http_headers || {}).dup - self.event_source = sse_client.new( + sse_client.new( Unleash.configuration.fetch_toggles_uri.to_s, headers: headers, read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute From 84ce97ea51dc511da1353a476cce67d1ce791b76 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Tue, 5 Aug 2025 16:07:45 +0200 Subject: [PATCH 10/18] refactor: simplify mode selection --- lib/unleash/client.rb | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index e8c2941c..0ac50f1b 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -189,12 +189,7 @@ def first_fetch_is_eager def initialize_client_mode if Unleash.configuration.streaming_mode? - if RUBY_ENGINE == 'jruby' - Unleash.logger.warn "Streaming mode is disabled on JRuby. Falling back to polling." - start_toggle_fetcher - else - start_streaming_client - end + start_streaming_client else start_toggle_fetcher end From 4886cb7e76b1802c7bb60c86e5b3f88f1f23d9f3 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Thu, 7 Aug 2025 11:42:07 +0200 Subject: [PATCH 11/18] fix: event format in spec --- spec/unleash/client_spec.rb | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index c9f990d2..c1578978 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -770,7 +770,7 @@ def enabled?(_params, context) sse_response_body = <<~SSE event: unleash-connected - data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + data: {"events":[{"type":"hydration","eventId":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}],"segments":[]}]} SSE @@ -809,7 +809,7 @@ def enabled?(_params, context) sse_response_body = <<~SSE event: unleash-updated - data: {"version":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}]} + data: {"events":[{"type":"feature-updated","eventId":2,"feature":{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}}]} SSE From 019ddec5a3ce2f6c0e949189a115749839e336ef Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 13:58:02 +0200 Subject: [PATCH 12/18] refactor: apply PR feedback about duck typed client code --- lib/unleash/client.rb | 14 ++- lib/unleash/streaming_client.rb | 107 ----------------------- lib/unleash/streaming_client_executor.rb | 89 +++++++++++++++++++ lib/unleash/streaming_event_processor.rb | 50 +++++++++++ spec/unleash/client_spec.rb | 4 +- 5 files changed, 146 insertions(+), 118 deletions(-) delete mode 100644 lib/unleash/streaming_client.rb create mode 100644 lib/unleash/streaming_client_executor.rb create mode 100644 lib/unleash/streaming_event_processor.rb diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index 0ac50f1b..9b0c6e69 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -2,7 +2,7 @@ require 'unleash/toggle_fetcher' require 'unleash/metrics_reporter' require 'unleash/scheduled_executor' -require 'unleash/streaming_client' +require 'unleash/streaming_client_executor' require 'unleash/variant' require 'unleash/util/http' require 'unleash/util/event_source_wrapper' @@ -11,7 +11,7 @@ module Unleash class Client - attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor, :streaming_client + attr_accessor :fetcher_scheduled_executor, :metrics_scheduled_executor # rubocop:disable Metrics/AbcSize def initialize(*opts) @@ -110,11 +110,7 @@ def shutdown # quick shutdown: just kill running threads def shutdown! unless Unleash.configuration.disable_client - if Unleash.configuration.streaming_mode? - self.streaming_client.stop - else - self.fetcher_scheduled_executor.exit - end + self.fetcher_scheduled_executor&.exit self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics end end @@ -150,8 +146,8 @@ def start_toggle_fetcher end def start_streaming_client - self.streaming_client = Unleash::StreamingClient.new Unleash.engine - self.streaming_client.start + self.fetcher_scheduled_executor = Unleash::StreamingClientExecutor.new('StreamingExecutor', Unleash.engine) + self.fetcher_scheduled_executor.run end def start_metrics diff --git a/lib/unleash/streaming_client.rb b/lib/unleash/streaming_client.rb deleted file mode 100644 index 0a3d3861..00000000 --- a/lib/unleash/streaming_client.rb +++ /dev/null @@ -1,107 +0,0 @@ -require 'json' - -module Unleash - class StreamingClient - attr_accessor :event_source, :toggle_engine, :running, :mutex - - def initialize(toggle_engine) - self.toggle_engine = toggle_engine - self.event_source = nil - self.running = false - self.mutex = Mutex.new - end - - def start - self.mutex.synchronize do - return if self.running || Unleash.configuration.disable_client - - Unleash.logger.debug "Starting streaming from URL: #{Unleash.configuration.fetch_toggles_uri}" - - self.event_source = create_event_source - return if self.event_source.nil? - - setup_event_handlers - - self.running = true - end - end - - def stop - self.mutex.synchronize do - return unless self.running - - Unleash.logger.info "Stopping streaming client" - self.running = false - self.event_source&.close - self.event_source = nil - end - end - - def running? - self.mutex.synchronize{ self.running } - end - - private - - def handle_event(event) - case event.type.to_s - when 'unleash-connected' - Unleash.logger.debug "Streaming client connected" - handle_connected_event(event) - when 'unleash-updated' - Unleash.logger.debug "Received streaming update" - handle_updated_event(event) - else - Unleash.logger.debug "Received unknown event type: #{event.type}" - end - rescue StandardError => e - Unleash.logger.error "Error handling streaming event: #{e.message}" - end - - def handle_connected_event(event) - Unleash.logger.debug "Processing initial hydration data" - handle_updated_event(event) - end - - def handle_updated_event(event) - self.mutex.synchronize do - self.toggle_engine.take_state(event.data) - end - - # TODO: update backup file - rescue JSON::ParserError => e - Unleash.logger.error "Failed to parse streaming event data: #{e.message}" - rescue StandardError => e - Unleash.logger.error "Error processing delta update: #{e.message}" - end - - def create_event_source - sse_client = Unleash::Util::EventSourceWrapper.client - if sse_client.nil? - Unleash.logger.warn "Streaming mode is not available. Falling back to polling." - return nil - end - - headers = (Unleash.configuration.http_headers || {}).dup - - sse_client.new( - Unleash.configuration.fetch_toggles_uri.to_s, - headers: headers, - read_timeout: 60, # start a new SSE connection when no heartbeat received in 1 minute - reconnect_time: 2, - connect_timeout: 10, - logger: Unleash.logger - ) - end - - def setup_event_handlers - self.event_source.on_event do |event| - handle_event(event) - end - - self.event_source.on_error do |error| - Unleash.logger.warn "Streaming error: #{error}" - end - end - end -end diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb new file mode 100644 index 00000000..25a3cfdc --- /dev/null +++ b/lib/unleash/streaming_client_executor.rb @@ -0,0 +1,89 @@ +require 'unleash/streaming_event_processor' +require 'unleash/util/event_source_wrapper' + +module Unleash + class StreamingClientExecutor + attr_accessor :name, :event_source, :event_processor, :running, :mutex + + def initialize(name, engine) + self.name = name || 'StreamingClientExecutor' + self.event_source = nil + self.event_processor = Unleash::StreamingEventProcessor.new(engine) + self.running = false + self.mutex = Mutex.new + end + + def run(&block) + start + end + + def start + self.mutex.synchronize do + return if self.running || Unleash.configuration.disable_client + + Unleash.logger.debug "Starting streaming executor from URL: #{Unleash.configuration.fetch_toggles_uri}" + + self.event_source = create_event_source + return if self.event_source.nil? + + setup_event_handlers + + self.running = true + end + end + + def stop + self.mutex.synchronize do + return unless self.running + + Unleash.logger.info "Stopping streaming executor" + self.running = false + self.event_source&.close + self.event_source = nil + end + end + + alias exit stop + + def running? + self.mutex.synchronize { self.running } + end + + private + + def create_event_source + sse_client = Unleash::Util::EventSourceWrapper.client + if sse_client.nil? + Unleash.logger.warn "Streaming mode is not available. Falling back to polling." + return nil + end + + headers = (Unleash.configuration.http_headers || {}).dup + + sse_client.new( + Unleash.configuration.fetch_toggles_uri.to_s, + headers: headers, + read_timeout: 60, + reconnect_time: 2, + connect_timeout: 10, + logger: Unleash.logger + ) + end + + def setup_event_handlers + self.event_source.on_event do |event| + handle_event(event) + end + + self.event_source.on_error do |error| + Unleash.logger.warn "Streaming error: #{error}" + end + end + + def handle_event(event) + self.event_processor.process_event(event) + rescue StandardError => e + Unleash.logger.error "Error in streaming executor event handling: #{e.message}" + end + end +end \ No newline at end of file diff --git a/lib/unleash/streaming_event_processor.rb b/lib/unleash/streaming_event_processor.rb new file mode 100644 index 00000000..41d352cd --- /dev/null +++ b/lib/unleash/streaming_event_processor.rb @@ -0,0 +1,50 @@ +require 'json' + +module Unleash + class StreamingEventProcessor + attr_accessor :toggle_engine, :mutex + + def initialize(toggle_engine) + self.toggle_engine = toggle_engine + self.mutex = Mutex.new + end + + def process_event(event) + case event.type.to_s + when 'unleash-connected' + Unleash.logger.debug "Streaming client connected" + handle_connected_event(event) + when 'unleash-updated' + Unleash.logger.debug "Received streaming update" + handle_updated_event(event) + else + Unleash.logger.debug "Received unknown event type: #{event.type}" + end + rescue StandardError => e + Unleash.logger.error "Error handling streaming event: #{e.message}" + end + + def handle_delta_event(event_data) + self.mutex.synchronize do + self.toggle_engine.take_state(event_data) + end + end + + private + + def handle_connected_event(event) + Unleash.logger.debug "Processing initial hydration data" + handle_updated_event(event) + end + + def handle_updated_event(event) + handle_delta_event(event.data) + + # TODO: update backup file + rescue JSON::ParserError => e + Unleash.logger.error "Failed to parse streaming event data: #{e.message}" + rescue StandardError => e + Unleash.logger.error "Error processing delta update: #{e.message}" + end + end +end \ No newline at end of file diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index c1578978..fd7c94a0 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -832,8 +832,8 @@ def enabled?(_params, context) unleash_client = Unleash::Client.new - expect(unleash_client.streaming_client).to be_a(Unleash::StreamingClient) - expect(unleash_client.streaming_client.running?).to be true + expect(unleash_client.fetcher_scheduled_executor).to be_a(Unleash::StreamingClientExecutor) + expect(unleash_client.fetcher_scheduled_executor.running?).to be true sleep(0.1) From c85d9f1a4736af03bef369ae8cdccbefd4de63fa Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:02:40 +0200 Subject: [PATCH 13/18] feat: fail hard on no sse client --- lib/unleash/streaming_client_executor.rb | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb index 25a3cfdc..691d0de6 100644 --- a/lib/unleash/streaming_client_executor.rb +++ b/lib/unleash/streaming_client_executor.rb @@ -24,8 +24,6 @@ def start Unleash.logger.debug "Starting streaming executor from URL: #{Unleash.configuration.fetch_toggles_uri}" self.event_source = create_event_source - return if self.event_source.nil? - setup_event_handlers self.running = true @@ -54,8 +52,7 @@ def running? def create_event_source sse_client = Unleash::Util::EventSourceWrapper.client if sse_client.nil? - Unleash.logger.warn "Streaming mode is not available. Falling back to polling." - return nil + raise "Streaming mode is configured but EventSource client is not available. Please install the 'ld-eventsource' gem or switch to polling mode." end headers = (Unleash.configuration.http_headers || {}).dup From 7b8b8d6a5ad3af41ba439a83f2859610e552da39 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:07:32 +0200 Subject: [PATCH 14/18] feat: fail hard on sse with jruby --- lib/unleash/configuration.rb | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index 5cdb4532..cdf48e92 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -97,9 +97,13 @@ def use_bootstrap? end def streaming_mode? - return false if RUBY_ENGINE == 'jruby' - - self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' + is_streaming_configured = self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' + + if is_streaming_configured && RUBY_ENGINE == 'jruby' + raise "Streaming mode is not supported on JRuby. Please use polling mode instead." + end + + is_streaming_configured end def polling_with_delta? From 623b9ddbc5bf5e0ed08e1856dbccad7370028b01 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:17:09 +0200 Subject: [PATCH 15/18] refactor: remove unnecessary mutex --- lib/unleash/streaming_client_executor.rb | 29 ++++++++++-------------- 1 file changed, 12 insertions(+), 17 deletions(-) diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb index 691d0de6..7bee23d5 100644 --- a/lib/unleash/streaming_client_executor.rb +++ b/lib/unleash/streaming_client_executor.rb @@ -3,14 +3,13 @@ module Unleash class StreamingClientExecutor - attr_accessor :name, :event_source, :event_processor, :running, :mutex + attr_accessor :name, :event_source, :event_processor, :running def initialize(name, engine) self.name = name || 'StreamingClientExecutor' self.event_source = nil self.event_processor = Unleash::StreamingEventProcessor.new(engine) self.running = false - self.mutex = Mutex.new end def run(&block) @@ -18,33 +17,29 @@ def run(&block) end def start - self.mutex.synchronize do - return if self.running || Unleash.configuration.disable_client + return if self.running || Unleash.configuration.disable_client - Unleash.logger.debug "Starting streaming executor from URL: #{Unleash.configuration.fetch_toggles_uri}" + Unleash.logger.debug "Starting streaming executor from URL: #{Unleash.configuration.fetch_toggles_uri}" - self.event_source = create_event_source - setup_event_handlers + self.event_source = create_event_source + setup_event_handlers - self.running = true - end + self.running = true end def stop - self.mutex.synchronize do - return unless self.running + return unless self.running - Unleash.logger.info "Stopping streaming executor" - self.running = false - self.event_source&.close - self.event_source = nil - end + Unleash.logger.info "Stopping streaming executor" + self.running = false + self.event_source&.close + self.event_source = nil end alias exit stop def running? - self.mutex.synchronize { self.running } + self.running end private From 0fdba090e852dd388f0cbc43837423f0497c4a2a Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:19:03 +0200 Subject: [PATCH 16/18] chore: fix linting errors --- lib/unleash/configuration.rb | 4 ++-- lib/unleash/streaming_client_executor.rb | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index cdf48e92..b0dc479e 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -98,11 +98,11 @@ def use_bootstrap? def streaming_mode? is_streaming_configured = self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' - + if is_streaming_configured && RUBY_ENGINE == 'jruby' raise "Streaming mode is not supported on JRuby. Please use polling mode instead." end - + is_streaming_configured end diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb index 7bee23d5..b08b6a89 100644 --- a/lib/unleash/streaming_client_executor.rb +++ b/lib/unleash/streaming_client_executor.rb @@ -12,7 +12,7 @@ def initialize(name, engine) self.running = false end - def run(&block) + def run(&_block) start end @@ -47,7 +47,8 @@ def running? def create_event_source sse_client = Unleash::Util::EventSourceWrapper.client if sse_client.nil? - raise "Streaming mode is configured but EventSource client is not available. Please install the 'ld-eventsource' gem or switch to polling mode." + raise "Streaming mode is configured but EventSource client is not available. " \ + "Please install the 'ld-eventsource' gem or switch to polling mode." end headers = (Unleash.configuration.http_headers || {}).dup From 0a0ab3551c032b0d2ffd2e8bf88dd47ead100815 Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:20:44 +0200 Subject: [PATCH 17/18] chore: fix linting errors --- lib/unleash/streaming_client_executor.rb | 2 +- lib/unleash/streaming_event_processor.rb | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb index b08b6a89..01499fb6 100644 --- a/lib/unleash/streaming_client_executor.rb +++ b/lib/unleash/streaming_client_executor.rb @@ -79,4 +79,4 @@ def handle_event(event) Unleash.logger.error "Error in streaming executor event handling: #{e.message}" end end -end \ No newline at end of file +end diff --git a/lib/unleash/streaming_event_processor.rb b/lib/unleash/streaming_event_processor.rb index 41d352cd..a46e6b82 100644 --- a/lib/unleash/streaming_event_processor.rb +++ b/lib/unleash/streaming_event_processor.rb @@ -47,4 +47,4 @@ def handle_updated_event(event) Unleash.logger.error "Error processing delta update: #{e.message}" end end -end \ No newline at end of file +end From da360d131a26f98e079c16a7eab044bc56c9a53d Mon Sep 17 00:00:00 2001 From: kwasniew Date: Mon, 11 Aug 2025 14:39:01 +0200 Subject: [PATCH 18/18] chore: consistent logging --- lib/unleash/configuration.rb | 19 ++++++++++++------- lib/unleash/streaming_client_executor.rb | 11 +++++++---- lib/unleash/streaming_event_processor.rb | 9 ++++++--- 3 files changed, 25 insertions(+), 14 deletions(-) diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index b0dc479e..0980055b 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -97,13 +97,8 @@ def use_bootstrap? end def streaming_mode? - is_streaming_configured = self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' - - if is_streaming_configured && RUBY_ENGINE == 'jruby' - raise "Streaming mode is not supported on JRuby. Please use polling mode instead." - end - - is_streaming_configured + validate_streaming_support! if streaming_configured? + streaming_configured? end def polling_with_delta? @@ -169,5 +164,15 @@ def set_option(opt, val) rescue NoMethodError raise ArgumentError, "unknown configuration parameter '#{val}'" end + + def streaming_configured? + self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' + end + + def validate_streaming_support! + return unless RUBY_ENGINE == 'jruby' + + raise "Streaming mode is not supported on JRuby. Please use polling mode instead or switch to MRI/CRuby." + end end end diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb index 01499fb6..a0c40857 100644 --- a/lib/unleash/streaming_client_executor.rb +++ b/lib/unleash/streaming_client_executor.rb @@ -19,21 +19,23 @@ def run(&_block) def start return if self.running || Unleash.configuration.disable_client - Unleash.logger.debug "Starting streaming executor from URL: #{Unleash.configuration.fetch_toggles_uri}" + Unleash.logger.debug "Streaming client #{self.name} starting connection to: #{Unleash.configuration.fetch_toggles_uri}" self.event_source = create_event_source setup_event_handlers self.running = true + Unleash.logger.debug "Streaming client #{self.name} connection established" end def stop return unless self.running - Unleash.logger.info "Stopping streaming executor" + Unleash.logger.debug "Streaming client #{self.name} stopping connection" self.running = false self.event_source&.close self.event_source = nil + Unleash.logger.debug "Streaming client #{self.name} connection closed" end alias exit stop @@ -69,14 +71,15 @@ def setup_event_handlers end self.event_source.on_error do |error| - Unleash.logger.warn "Streaming error: #{error}" + Unleash.logger.warn "Streaming client #{self.name} error: #{error}" end end def handle_event(event) self.event_processor.process_event(event) rescue StandardError => e - Unleash.logger.error "Error in streaming executor event handling: #{e.message}" + Unleash.logger.error "Streaming client #{self.name} threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" end end end diff --git a/lib/unleash/streaming_event_processor.rb b/lib/unleash/streaming_event_processor.rb index a46e6b82..e07c2c14 100644 --- a/lib/unleash/streaming_event_processor.rb +++ b/lib/unleash/streaming_event_processor.rb @@ -21,7 +21,8 @@ def process_event(event) Unleash.logger.debug "Received unknown event type: #{event.type}" end rescue StandardError => e - Unleash.logger.error "Error handling streaming event: #{e.message}" + Unleash.logger.error "Error handling streaming event threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" end def handle_delta_event(event_data) @@ -42,9 +43,11 @@ def handle_updated_event(event) # TODO: update backup file rescue JSON::ParserError => e - Unleash.logger.error "Failed to parse streaming event data: #{e.message}" + Unleash.logger.error "Unable to parse JSON from streaming event data. Exception thrown #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" rescue StandardError => e - Unleash.logger.error "Error processing delta update: #{e.message}" + Unleash.logger.error "Error processing delta update threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" end end end