Skip to content

Commit 536ad9f

Browse files
authored
Merge pull request #73 from Flagsmith/feat/enable_realtime_mode
feat: Enable realtime mode (SSE)
2 parents 3e0d42c + b8eadbb commit 536ad9f

File tree

4 files changed

+202
-1
lines changed

4 files changed

+202
-1
lines changed

lib/flagsmith.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
require 'flagsmith/sdk/models/flags'
1717
require 'flagsmith/sdk/models/segments'
1818
require 'flagsmith/sdk/offline_handlers'
19+
require 'flagsmith/sdk/realtime_client'
1920

2021
require 'flagsmith/engine/core'
2122

@@ -46,6 +47,7 @@ class Client # rubocop:disable Metrics/ClassLength
4647
# :environment_key, :api_url, :custom_headers, :request_timeout_seconds, :enable_local_evaluation,
4748
# :environment_refresh_interval_seconds, :retries, :enable_analytics, :default_flag_handler,
4849
# :offline_mode, :offline_handler, :polling_manager_failure_limit
50+
# :realtime_api_url, :enable_realtime_updates, :logger
4951
#
5052
# You can see full description in the Flagsmith::Config
5153

@@ -59,6 +61,7 @@ def initialize(config)
5961
@identity_overrides_by_identifier = {}
6062

6163
validate_offline_mode!
64+
validate_realtime_mode!
6265

6366
api_client
6467
analytics_processor
@@ -78,10 +81,21 @@ def validate_offline_mode!
7881
'Cannot use offline_handler and default_flag_handler at the same time.'
7982
end
8083

84+
def validate_realtime_mode!
85+
return unless @config.realtime_mode? && !@config.local_evaluation?
86+
87+
raise Flagsmith::ClientError,
88+
'The enable_realtime_updates config param requires a matching enable_local_evaluation param.'
89+
end
90+
8191
def api_client
8292
@api_client ||= Flagsmith::ApiClient.new(@config)
8393
end
8494

95+
def realtime_client
96+
@realtime_client ||= Flagsmith::RealtimeClient.new(@config)
97+
end
98+
8599
def engine
86100
@engine ||= Flagsmith::Engine::Engine.new
87101
end
@@ -104,6 +118,14 @@ def load_offline_handler
104118
def environment_data_polling_manager
105119
return nil unless @config.local_evaluation?
106120

121+
# Bypass the environment data polling manager if realtime
122+
# is present in the configuration.
123+
if @config.realtime_mode?
124+
update_environment
125+
realtime_client.listen self unless realtime_client.running
126+
return
127+
end
128+
107129
update_environment if @environment_data_polling_manager.nil?
108130

109131
@environment_data_polling_manager ||= Flagsmith::EnvironmentDataPollingManager.new(

lib/flagsmith/sdk/config.rb

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,13 @@ module Flagsmith
44
# Config options shared around Engine
55
class Config
66
DEFAULT_API_URL = 'https://edge.api.flagsmith.com/api/v1/'
7+
DEFAULT_REALTIME_API_URL = 'https://realtime.flagsmith.com/'
8+
79
OPTIONS = %i[
810
environment_key api_url custom_headers request_timeout_seconds enable_local_evaluation
911
environment_refresh_interval_seconds retries enable_analytics default_flag_handler
10-
offline_mode offline_handler polling_manager_failure_limit logger
12+
offline_mode offline_handler polling_manager_failure_limit
13+
realtime_api_url enable_realtime_updates logger
1114
].freeze
1215

1316
# Available Configs
@@ -40,6 +43,9 @@ class Config
4043
# the entire environment, project, flags, etc.
4144
# +polling_manager_failure_limit+ - An integer to control how long to suppress errors in
4245
# the polling manager for local evaluation mode.
46+
# +realtime_api_url+ - Override the realtime api URL to communicate with a
47+
# non-standard realtime endpoint.
48+
# +enable_realtime_updates+ - A boolean to enable realtime updates.
4349
# +logger+ - Pass your logger, default is Logger.new($stdout)
4450
#
4551
attr_reader(*OPTIONS)
@@ -62,6 +68,10 @@ def offline_mode?
6268
@offline_mode
6369
end
6470

71+
def realtime_mode?
72+
@enable_realtime_updates
73+
end
74+
6575
def environment_flags_url
6676
'flags/'
6777
end
@@ -92,6 +102,9 @@ def build_config(options)
92102
@offline_mode = opts.fetch(:offline_mode, false)
93103
@offline_handler = opts[:offline_handler]
94104
@polling_manager_failure_limit = opts.fetch(:polling_manager_failure_limit, 10)
105+
@realtime_api_url = opts.fetch(:realtime_api_url, Flagsmith::Config::DEFAULT_REALTIME_API_URL)
106+
@realtime_api_url << '/' unless @realtime_api_url.end_with? '/'
107+
@enable_realtime_updates = opts.fetch(:enable_realtime_updates, false)
95108
@logger = options.fetch(:logger, Logger.new($stdout).tap { |l| l.level = :debug })
96109
end
97110
# rubocop:enable Metrics/AbcSize, Metrics/MethodLength
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
# frozen_string_literal: true
2+
3+
require 'logger'
4+
require 'faraday'
5+
require 'json'
6+
7+
module Flagsmith
8+
# Ruby client for realtime access to flagsmith.com
9+
class RealtimeClient
10+
attr_accessor :running
11+
12+
def initialize(config)
13+
@config = config
14+
@thread = nil
15+
@running = false
16+
@main = nil
17+
end
18+
19+
def endpoint
20+
"#{@config.realtime_api_url}sse/environments/#{@main.environment.api_key}/stream"
21+
end
22+
23+
def listen(main, remaining_attempts: Float::INFINITY, retry_interval: 0.5) # rubocop:disable Metrics/CyclomaticComplexity, Metrics/AbcSize, Metrics/MethodLength
24+
last_updated_at = 0
25+
@main = main
26+
@running = true
27+
@thread = Thread.new do
28+
while @running && remaining_attempts.positive?
29+
remaining_attempts -= 1
30+
@config.logger.warn 'Beginning to pull down realtime endpoint'
31+
begin
32+
sleep retry_interval
33+
# Open connection to SSE endpoint
34+
Faraday.new(url: endpoint).get do |req|
35+
req.options.timeout = nil # Keep connection alive indefinitely
36+
req.options.open_timeout = 10
37+
end.body.each_line do |line| # rubocop:disable Style/MultilineBlockChain
38+
# SSE protocol: Skip non-event lines
39+
next if line.strip.empty? || line.start_with?(':')
40+
41+
# Parse SSE fields
42+
next unless line.start_with?('data: ')
43+
44+
data = JSON.parse(line[6..].strip)
45+
updated_at = data['updated_at']
46+
next unless updated_at > last_updated_at
47+
48+
@config.logger.info "Realtime updating environment from #{last_updated_at} to #{updated_at}"
49+
@main.update_environment
50+
last_updated_at = updated_at
51+
end
52+
rescue Faraday::ConnectionFailed, Faraday::TimeoutError => e
53+
@config.logger.warn "Connection failed: #{e.message}. Retrying in #{retry_interval} seconds..."
54+
rescue StandardError => e
55+
@config.logger.error "Error: #{e.message}. Retrying in #{retry_interval} seconds..."
56+
end
57+
end
58+
end
59+
60+
@running = false
61+
end
62+
end
63+
end

spec/sdk/realtime_client_spec.rb

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
require 'spec_helper'
2+
require 'faraday'
3+
4+
RSpec.describe Flagsmith::RealtimeClient do
5+
let(:mock_logger) { double('Logger', warn: nil, info: nil, error: nil) }
6+
let(:mock_config) do
7+
double('Config',
8+
realtime_api_url: 'https://example.com/',
9+
environment_key: 'test-environment',
10+
logger: mock_logger)
11+
end
12+
let(:mock_environment) { double('Environment',
13+
api_key: 'some_api_key' )}
14+
let(:mock_main) { double('Main',
15+
update_environment: nil,
16+
environment: mock_environment,
17+
) }
18+
let(:realtime_client) { described_class.new(mock_config) }
19+
let(:sse_response) do
20+
<<~SSE
21+
data: {"updated_at": 1}
22+
23+
data: {"updated_at": 2}
24+
SSE
25+
end
26+
let(:retry_interval) { 0.01 }
27+
28+
before(:each) do
29+
allow(Faraday).to receive(:new).and_return(double('Faraday::Connection', get: double('Response', body: sse_response)))
30+
allow(Thread).to receive(:new).and_yield
31+
end
32+
33+
describe '#listen' do
34+
after { realtime_client.running = false }
35+
36+
it 'parses SSE data and calls update_environment when updated_at increases' do
37+
expect(mock_main).to receive(:update_environment).twice
38+
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
39+
end
40+
41+
it 'logs retries and continues on connection failure' do
42+
allow(Faraday).to receive(:new).and_raise(Faraday::ConnectionFailed.new('Connection failed'))
43+
44+
expect(mock_logger).to receive(:warn).with(/Connection failed/).at_least(:once)
45+
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
46+
end
47+
48+
it 'handles and logs unexpected errors gracefully' do
49+
allow(Faraday).to receive(:new).and_raise(StandardError.new('Unexpected error'))
50+
51+
expect(mock_logger).to receive(:error).with(/Unexpected error/).at_least(:once)
52+
realtime_client.listen(mock_main, retry_interval: retry_interval, remaining_attempts: 3)
53+
end
54+
55+
end
56+
end
57+
58+
RSpec.describe Flagsmith::Client do
59+
describe '#initialize' do
60+
before do
61+
# Mock the methods to avoid initialization interferring.
62+
allow_any_instance_of(Flagsmith::Client).to receive(:api_client)
63+
allow_any_instance_of(Flagsmith::Client).to receive(:analytics_processor)
64+
allow_any_instance_of(Flagsmith::Client).to receive(:environment_data_polling_manager)
65+
allow_any_instance_of(Flagsmith::Client).to receive(:engine)
66+
allow_any_instance_of(Flagsmith::Client).to receive(:load_offline_handler)
67+
end
68+
69+
context 'when realtime_mode is true and local_evaluation is false' do
70+
it 'raises a Flagsmith::ClientError' do
71+
config = double(
72+
'Config',
73+
realtime_mode?: true,
74+
local_evaluation?: false,
75+
offline_mode?: false,
76+
offline_handler: nil,
77+
)
78+
allow(Flagsmith::Config).to receive(:new).and_return(config)
79+
80+
expect {
81+
Flagsmith::Client.new(config)
82+
}.to raise_error(Flagsmith::ClientError, 'The enable_realtime_updates config param requires a matching enable_local_evaluation param.')
83+
end
84+
end
85+
86+
context 'when realtime_mode is false or local_evaluation is true' do
87+
it 'does not raise an exception' do
88+
config = double(
89+
'Config',
90+
realtime_mode?: false,
91+
local_evaluation?: true,
92+
offline_mode?: false,
93+
offline_handler: nil,
94+
)
95+
allow(Flagsmith::Config).to receive(:new).and_return(config)
96+
97+
expect {
98+
Flagsmith::Client.new(config)
99+
}.not_to raise_error
100+
end
101+
end
102+
end
103+
end

0 commit comments

Comments
 (0)