diff --git a/examples/simple.rb b/examples/simple.rb index 4d74d0d6..fb1acafe 100755 --- a/examples/simple.rb +++ b/examples/simple.rb @@ -18,8 +18,8 @@ # or: @unleash = Unleash::Client.new( - url: 'https://unleash.herokuapp.com/api', - custom_http_headers: { 'Authorization': '943ca9171e2c884c545c5d82417a655fb77cec970cc3b78a8ff87f4406b495d0' }, + url: 'https://app.unleash-hosted.com/demo/api', + custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' }, app_name: 'simple-test', instance_id: 'local-test-cli', refresh_interval: 2, @@ -27,8 +27,7 @@ retry_limit: 2 ) -# feature_name = "AwesomeFeature" -feature_name = "4343443" +feature_name = "example-flag" unleash_context = Unleash::Context.new unleash_context.user_id = 123 diff --git a/examples/streaming.rb b/examples/streaming.rb new file mode 100755 index 00000000..e43d989c --- /dev/null +++ b/examples/streaming.rb @@ -0,0 +1,50 @@ +#!/usr/bin/env ruby + +require 'unleash' +require 'unleash/context' + +puts ">> START streaming.rb" + +@unleash = Unleash::Client.new( + url: 'https://app.unleash-hosted.com/demo/api', + custom_http_headers: { 'Authorization': 'demo-app:dev.9fc74dd72d2b88bea5253c04240b21a54841f08d9918046ed55a06b5' }, + app_name: 'streaming-test', + instance_id: 'local-streaming-cli', + refresh_interval: 2, + metrics_interval: 2, + retry_limit: 2, + experimental_mode: { type: 'streaming' }, + timeout: 5, + log_level: Logger::DEBUG +) + +feature_name = "example-flag" +unleash_context = Unleash::Context.new +unleash_context.user_id = 123 + +puts "Waiting for client to initialize..." +sleep 2 + +100.times do + if @unleash.is_enabled?(feature_name, unleash_context) + puts "> #{feature_name} is enabled" + else + puts "> #{feature_name} is not enabled" + end + sleep 1 + puts "---" + puts "" + puts "" +end +feature_name = "foobar" +if @unleash.is_enabled?(feature_name, unleash_context, true) + puts "> #{feature_name} is enabled" +else + puts "> #{feature_name} is not enabled" +end + +puts "> shutting down client..." + +@unleash.shutdown + +puts ">> END streaming.rb" diff --git a/lib/unleash/client.rb b/lib/unleash/client.rb index 9538a86c..9b0c6e69 100644 --- a/lib/unleash/client.rb +++ b/lib/unleash/client.rb @@ -2,8 +2,10 @@ require 'unleash/toggle_fetcher' require 'unleash/metrics_reporter' require 'unleash/scheduled_executor' +require 'unleash/streaming_client_executor' require 'unleash/variant' require 'unleash/util/http' +require 'unleash/util/event_source_wrapper' require 'logger' require 'time' @@ -21,7 +23,8 @@ def initialize(*opts) Unleash.engine = YggdrasilEngine.new Unleash.engine.register_custom_strategies(Unleash.configuration.strategies.custom_strategies) - Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine + Unleash.toggle_fetcher = Unleash::ToggleFetcher.new Unleash.engine unless Unleash.configuration.streaming_mode? + if Unleash.configuration.disable_client Unleash.logger.warn "Unleash::Client is disabled! Will only return default (or bootstrapped if available) results!" Unleash.logger.warn "Unleash::Client is disabled! Metrics and MetricsReporter are also disabled!" @@ -30,7 +33,9 @@ def initialize(*opts) end register - start_toggle_fetcher + + initialize_client_mode + start_metrics unless Unleash.configuration.disable_metrics end # rubocop:enable Metrics/AbcSize @@ -105,7 +110,7 @@ def shutdown # quick shutdown: just kill running threads def shutdown! unless Unleash.configuration.disable_client - self.fetcher_scheduled_executor.exit + self.fetcher_scheduled_executor&.exit self.metrics_scheduled_executor.exit unless Unleash.configuration.disable_metrics end end @@ -140,6 +145,11 @@ def start_toggle_fetcher end end + def start_streaming_client + self.fetcher_scheduled_executor = Unleash::StreamingClientExecutor.new('StreamingExecutor', Unleash.engine) + self.fetcher_scheduled_executor.run + end + def start_metrics Unleash.reporter = Unleash::MetricsReporter.new self.metrics_scheduled_executor = Unleash::ScheduledExecutor.new( @@ -172,5 +182,13 @@ def disabled_variant def first_fetch_is_eager Unleash.configuration.use_bootstrap? end + + def initialize_client_mode + if Unleash.configuration.streaming_mode? + start_streaming_client + else + start_toggle_fetcher + end + end end end diff --git a/lib/unleash/configuration.rb b/lib/unleash/configuration.rb index 7a32ad7f..0980055b 100644 --- a/lib/unleash/configuration.rb +++ b/lib/unleash/configuration.rb @@ -22,7 +22,8 @@ class Configuration :log_level, :bootstrap_config, :strategies, - :use_delta_api + :use_delta_api, + :experimental_mode attr_reader :connection_id def initialize(opts = {}) @@ -67,7 +68,9 @@ def fetch_toggles_uri uri = nil ## Personal feeling but Rubocop's suggestion here is too dense to be properly readable # rubocop:disable Style/ConditionalAssignment - if self.use_delta_api + if streaming_mode? + uri = URI("#{self.url_stripped_of_slash}/client/streaming") + elsif self.use_delta_api || polling_with_delta? uri = URI("#{self.url_stripped_of_slash}/client/delta") else uri = URI("#{self.url_stripped_of_slash}/client/features") @@ -93,6 +96,17 @@ def use_bootstrap? self.bootstrap_config&.valid? end + def streaming_mode? + validate_streaming_support! if streaming_configured? + streaming_configured? + end + + def polling_with_delta? + self.experimental_mode.is_a?(Hash) && + self.experimental_mode[:type] == 'polling' && + self.experimental_mode[:format] == 'delta' + end + private def set_defaults @@ -112,6 +126,7 @@ def set_defaults self.bootstrap_config = nil self.strategies = Unleash::Strategies.new self.use_delta_api = false + self.experimental_mode = nil self.custom_http_headers = {} @connection_id = SecureRandom.uuid @@ -149,5 +164,15 @@ def set_option(opt, val) rescue NoMethodError raise ArgumentError, "unknown configuration parameter '#{val}'" end + + def streaming_configured? + self.experimental_mode.is_a?(Hash) && self.experimental_mode[:type] == 'streaming' + end + + def validate_streaming_support! + return unless RUBY_ENGINE == 'jruby' + + raise "Streaming mode is not supported on JRuby. Please use polling mode instead or switch to MRI/CRuby." + end end end diff --git a/lib/unleash/streaming_client_executor.rb b/lib/unleash/streaming_client_executor.rb new file mode 100644 index 00000000..a0c40857 --- /dev/null +++ b/lib/unleash/streaming_client_executor.rb @@ -0,0 +1,85 @@ +require 'unleash/streaming_event_processor' +require 'unleash/util/event_source_wrapper' + +module Unleash + class StreamingClientExecutor + attr_accessor :name, :event_source, :event_processor, :running + + def initialize(name, engine) + self.name = name || 'StreamingClientExecutor' + self.event_source = nil + self.event_processor = Unleash::StreamingEventProcessor.new(engine) + self.running = false + end + + def run(&_block) + start + end + + def start + return if self.running || Unleash.configuration.disable_client + + Unleash.logger.debug "Streaming client #{self.name} starting connection to: #{Unleash.configuration.fetch_toggles_uri}" + + self.event_source = create_event_source + setup_event_handlers + + self.running = true + Unleash.logger.debug "Streaming client #{self.name} connection established" + end + + def stop + return unless self.running + + Unleash.logger.debug "Streaming client #{self.name} stopping connection" + self.running = false + self.event_source&.close + self.event_source = nil + Unleash.logger.debug "Streaming client #{self.name} connection closed" + end + + alias exit stop + + def running? + self.running + end + + private + + def create_event_source + sse_client = Unleash::Util::EventSourceWrapper.client + if sse_client.nil? + raise "Streaming mode is configured but EventSource client is not available. " \ + "Please install the 'ld-eventsource' gem or switch to polling mode." + end + + headers = (Unleash.configuration.http_headers || {}).dup + + sse_client.new( + Unleash.configuration.fetch_toggles_uri.to_s, + headers: headers, + read_timeout: 60, + reconnect_time: 2, + connect_timeout: 10, + logger: Unleash.logger + ) + end + + def setup_event_handlers + self.event_source.on_event do |event| + handle_event(event) + end + + self.event_source.on_error do |error| + Unleash.logger.warn "Streaming client #{self.name} error: #{error}" + end + end + + def handle_event(event) + self.event_processor.process_event(event) + rescue StandardError => e + Unleash.logger.error "Streaming client #{self.name} threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" + end + end +end diff --git a/lib/unleash/streaming_event_processor.rb b/lib/unleash/streaming_event_processor.rb new file mode 100644 index 00000000..e07c2c14 --- /dev/null +++ b/lib/unleash/streaming_event_processor.rb @@ -0,0 +1,53 @@ +require 'json' + +module Unleash + class StreamingEventProcessor + attr_accessor :toggle_engine, :mutex + + def initialize(toggle_engine) + self.toggle_engine = toggle_engine + self.mutex = Mutex.new + end + + def process_event(event) + case event.type.to_s + when 'unleash-connected' + Unleash.logger.debug "Streaming client connected" + handle_connected_event(event) + when 'unleash-updated' + Unleash.logger.debug "Received streaming update" + handle_updated_event(event) + else + Unleash.logger.debug "Received unknown event type: #{event.type}" + end + rescue StandardError => e + Unleash.logger.error "Error handling streaming event threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" + end + + def handle_delta_event(event_data) + self.mutex.synchronize do + self.toggle_engine.take_state(event_data) + end + end + + private + + def handle_connected_event(event) + Unleash.logger.debug "Processing initial hydration data" + handle_updated_event(event) + end + + def handle_updated_event(event) + handle_delta_event(event.data) + + # TODO: update backup file + rescue JSON::ParserError => e + Unleash.logger.error "Unable to parse JSON from streaming event data. Exception thrown #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" + rescue StandardError => e + Unleash.logger.error "Error processing delta update threw exception #{e.class}: '#{e}'" + Unleash.logger.debug "stacktrace: #{e.backtrace}" + end + end +end diff --git a/lib/unleash/util/event_source_wrapper.rb b/lib/unleash/util/event_source_wrapper.rb new file mode 100644 index 00000000..f08d65e9 --- /dev/null +++ b/lib/unleash/util/event_source_wrapper.rb @@ -0,0 +1,17 @@ +module Unleash + module Util + module EventSourceWrapper + def self.client + return nil if RUBY_ENGINE == 'jruby' + + begin + require 'ld-eventsource' + SSE::Client + rescue LoadError => e + Unleash.logger.error "Failed to load ld-eventsource: #{e.message}" + nil + end + end + end + end +end diff --git a/spec/unleash/client_spec.rb b/spec/unleash/client_spec.rb index 93292a1a..fd7c94a0 100644 --- a/spec/unleash/client_spec.rb +++ b/spec/unleash/client_spec.rb @@ -761,4 +761,89 @@ def enabled?(_params, context) unleash_client.is_enabled?('featureX', Unleash::Context.new({})) ).to be false end + + unless RUBY_ENGINE == 'jruby' + describe "streaming mode" do + it "should process unleash-connected event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-connected + data: {"events":[{"type":"hydration","eventId":1,"features":[{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}],"segments":[]}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end + + unleash_client = Unleash::Client.new + + sleep(0.1) + + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + + expect(unleash_client.is_enabled?('test-feature')).to be true + + unleash_client.shutdown! + end + + it "should process unleash-updated event" do + WebMock.stub_request(:post, "http://test-url/client/register") + .to_return(status: 200, body: "", headers: {}) + + sse_response_body = <<~SSE + event: unleash-updated + data: {"events":[{"type":"feature-updated","eventId":2,"feature":{"name":"test-feature","enabled":true,"strategies":[{"name":"default"}]}}]} + + SSE + + WebMock.stub_request(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + .to_return( + status: 200, + body: sse_response_body, + headers: { 'Content-Type' => 'text/event-stream' } + ) + + Unleash.configure do |config| + config.url = 'http://test-url/' + config.app_name = 'my-test-app' + config.instance_id = 'rspec/test' + config.disable_metrics = true + config.custom_http_headers = { 'X-API-KEY' => '123' } + config.experimental_mode = { type: 'streaming' } + end + + unleash_client = Unleash::Client.new + + expect(unleash_client.fetcher_scheduled_executor).to be_a(Unleash::StreamingClientExecutor) + expect(unleash_client.fetcher_scheduled_executor.running?).to be true + + sleep(0.1) + + expect(WebMock).to have_requested(:get, "http://test-url/client/streaming") + .with(headers: { 'X-API-KEY' => '123' }) + + expect(unleash_client.is_enabled?('test-feature')).to be true + + unleash_client.shutdown! + end + end + end end diff --git a/unleash-client.gemspec b/unleash-client.gemspec index cb23ebed..31cf71b9 100644 --- a/unleash-client.gemspec +++ b/unleash-client.gemspec @@ -23,6 +23,7 @@ Gem::Specification.new do |spec| spec.require_paths = ["lib"] spec.required_ruby_version = ">= 2.7" + spec.add_dependency "ld-eventsource", "2.2.4" unless RUBY_ENGINE == 'jruby' spec.add_dependency "yggdrasil-engine", "~> 1.0.4" spec.add_dependency "base64", "~> 0.3.0"