From c01ddea8043eaf8a814abb091e92479656a4747a Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Wed, 9 Jul 2025 15:57:10 -0400 Subject: [PATCH 1/6] force http/1.1 to get better SSE support --- lib/ruby_llm/mcp/transports/sse.rb | 1 + lib/ruby_llm/mcp/transports/streamable_http.rb | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/lib/ruby_llm/mcp/transports/sse.rb b/lib/ruby_llm/mcp/transports/sse.rb index e79e5a3..1391c79 100644 --- a/lib/ruby_llm/mcp/transports/sse.rb +++ b/lib/ruby_llm/mcp/transports/sse.rb @@ -165,6 +165,7 @@ def listen_for_events def stream_events_from_server sse_client = HTTPX.plugin(:stream) sse_client = sse_client.with( + ssl: { alpn_protocols: ["http/1.1"] }, headers: @headers ) response = sse_client.get(@event_url, stream: true) diff --git a/lib/ruby_llm/mcp/transports/streamable_http.rb b/lib/ruby_llm/mcp/transports/streamable_http.rb index 03ccdbf..b852981 100644 --- a/lib/ruby_llm/mcp/transports/streamable_http.rb +++ b/lib/ruby_llm/mcp/transports/streamable_http.rb @@ -422,8 +422,8 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength end # Set up SSE streaming connection with callbacks - connection = create_connection_with_sse_callbacks(options) - response = connection.get(@url, headers: headers) + connection = create_connection_with_sse_callbacks(options, headers) + response = connection.get(@url) # Handle HTTPX error responses first error_result = handle_httpx_error_response!(response, context: { location: "SSE connection" }, @@ -463,7 +463,7 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength end end - def create_connection_with_sse_callbacks(options) + def create_connection_with_sse_callbacks(options, headers) buffer = +"" client = HTTPX @@ -501,7 +501,9 @@ def create_connection_with_sse_callbacks(options) read_timeout: @request_timeout / 1000, write_timeout: @request_timeout / 1000, operation_timeout: @request_timeout / 1000 - } + }, + headers: headers, + ssl: { alpn_protocols: ["http/1.1"] } ) register_client(client) end From 39bc05b6ecec721ff791abdd2c8bd913d7ba60b9 Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Thu, 10 Jul 2025 10:00:15 -0400 Subject: [PATCH 2/6] support events type and data coming in different response lines --- lib/ruby_llm/mcp/coordinator.rb | 2 +- lib/ruby_llm/mcp/transports/sse.rb | 82 +++++++++++++++++++++--------- scripts/sse_test.rb | 6 +-- 3 files changed, 63 insertions(+), 27 deletions(-) diff --git a/lib/ruby_llm/mcp/coordinator.rb b/lib/ruby_llm/mcp/coordinator.rb index 293010f..76e0f64 100644 --- a/lib/ruby_llm/mcp/coordinator.rb +++ b/lib/ruby_llm/mcp/coordinator.rb @@ -28,7 +28,7 @@ def name def request(body, **options) transport.request(body, **options) rescue RubyLLM::MCP::Errors::TimeoutError => e - if transport&.alive? + if transport&.alive? && !e.request_id.nil? cancelled_notification(reason: "Request timed out", request_id: e.request_id) end raise e diff --git a/lib/ruby_llm/mcp/transports/sse.rb b/lib/ruby_llm/mcp/transports/sse.rb index 1391c79..f7440ec 100644 --- a/lib/ruby_llm/mcp/transports/sse.rb +++ b/lib/ruby_llm/mcp/transports/sse.rb @@ -44,7 +44,7 @@ def initialize(url:, coordinator:, request_timeout:, headers: {}) RubyLLM::MCP.logger.info "Initializing SSE transport to #{@event_url} with client ID #{@client_id}" end - def request(body, add_id: true, wait_for_response: true) # rubocop:disable Metrics/MethodLength + def request(body, add_id: true, wait_for_response: true) if add_id @id_mutex.synchronize { @id_counter += 1 } request_id = @id_counter @@ -59,34 +59,20 @@ def request(body, add_id: true, wait_for_response: true) # rubocop:disable Metri end begin - http_client = HTTPClient.connection.with(timeout: { request_timeout: @request_timeout / 1000 }, - headers: @headers) - response = http_client.post(@messages_url, body: JSON.generate(body)) - - unless response.status == 200 - @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } - RubyLLM::MCP.logger.error "SSE request failed: #{response.status} - #{response.body}" - raise Errors::TransportError.new( - message: "Failed to request #{@messages_url}: #{response.status} - #{response.body}", - code: response.status - ) - end - rescue StandardError => e + send_request(body, request_id) + rescue Errors::TransportError, Errors::TimeoutError => e @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } - RubyLLM::MCP.logger.error "SSE request error (ID: #{request_id}): #{e.message}" - raise RubyLLM::MCP::Errors::TransportError.new( - message: e.message, - code: -1, - error: e - ) + RubyLLM::MCP.logger.error "Request error (ID: #{request_id}): #{e.message}" + raise e end + return unless wait_for_response begin with_timeout(@request_timeout / 1000, request_id: request_id) do response_queue.pop end - rescue RubyLLM::MCP::Errors::TimeoutError => e + rescue Errors::TimeoutError => e @pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) } RubyLLM::MCP.logger.error "SSE request timeout (ID: #{request_id}) after #{@request_timeout / 1000} seconds" raise e @@ -117,6 +103,23 @@ def set_protocol_version(version) private + def send_request(body, request_id) + http_client = HTTPX.with(timeout: { request_timeout: @request_timeout / 1000 }, + headers: @headers) + response = http_client.post(@messages_url, body: JSON.generate(body)) + handle_httpx_error_response!(response, + context: { location: "message endpoint request", request_id: request_id }) + + unless [200, 202].include?(response.status) + message = "Failed to have a successful request to #{@messages_url}: #{response.status} - #{response.body}" + RubyLLM::MCP.logger.error(message) + raise Errors::TransportError.new( + message: message, + code: response.status + ) + end + end + def start_sse_listener @connection_mutex.synchronize do return if sse_thread_running? @@ -169,14 +172,28 @@ def stream_events_from_server headers: @headers ) response = sse_client.get(@event_url, stream: true) + + event_buffer = [] response.each_line do |event_line| unless @running response.body.close next end - event = parse_event(event_line) - process_event(event) + # Strip the line and check if it's empty (indicates end of event) + line = event_line.strip + + if line.empty? + # End of event - process the accumulated buffer + if event_buffer.any? + event = parse_event(event_buffer.join("\n")) + process_event(event) + event_buffer.clear + end + else + # Accumulate the line for the current event + event_buffer << line + end end end @@ -188,6 +205,25 @@ def handle_connection_error(message, error) sleep 1 end + def handle_httpx_error_response!(response, context:) + return false unless response.is_a?(HTTPX::ErrorResponse) + + error = response.error + + if error.is_a?(HTTPX::ReadTimeoutError) + raise Errors::TimeoutError.new( + message: "Request timed out after #{@request_timeout / 1000} seconds" + ) + end + + error_message = response.error&.message || "Request failed" + + raise Errors::TransportError.new( + code: nil, + message: "Request Error #{context}: #{error_message}" + ) + end + def process_event(raw_event) # Return if we believe that are getting a partial event return if raw_event[:data].nil? diff --git a/scripts/sse_test.rb b/scripts/sse_test.rb index 6c0dbe9..180d7a3 100644 --- a/scripts/sse_test.rb +++ b/scripts/sse_test.rb @@ -12,7 +12,7 @@ end RubyLLM::MCP.configure do |config| - config.log_level = Logger::DEBUG + config.log_level = Logger::ERROR config.support_complex_parameters! end @@ -20,13 +20,13 @@ name: "test-server", transport_type: :sse, config: { - url: "http://localhost:3006/mcp/sse" + url: "https://remote-mcp-server-authless.patrickgvice.workers.dev/sse" } ) mcp.tools.each do |tool| puts "Tool: #{tool.name}" puts "Description: #{tool.description}" - puts "Parameters: #{tool.parameters}" + puts "Parameters: #{tool.parameters.map { |name, param| "#{name} (#{param.inspect})" }.join(', ')}" puts "---" end From 1d4c28f9649ca8aed39be4fd4adf0f8daafac6d4 Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Thu, 10 Jul 2025 11:20:38 -0400 Subject: [PATCH 3/6] deleted test script --- scripts/sse_test.rb | 32 -------------------------------- 1 file changed, 32 deletions(-) delete mode 100644 scripts/sse_test.rb diff --git a/scripts/sse_test.rb b/scripts/sse_test.rb deleted file mode 100644 index 180d7a3..0000000 --- a/scripts/sse_test.rb +++ /dev/null @@ -1,32 +0,0 @@ -# frozen_string_literal: true - -require "bundler/setup" -require "ruby_llm/mcp" -require "debug" -require "dotenv" - -Dotenv.load - -RubyLLM.configure do |config| - config.openai_api_key = ENV.fetch("OPENAI_API_KEY", nil) -end - -RubyLLM::MCP.configure do |config| - config.log_level = Logger::ERROR - config.support_complex_parameters! -end - -mcp = RubyLLM::MCP.client( - name: "test-server", - transport_type: :sse, - config: { - url: "https://remote-mcp-server-authless.patrickgvice.workers.dev/sse" - } -) - -mcp.tools.each do |tool| - puts "Tool: #{tool.name}" - puts "Description: #{tool.description}" - puts "Parameters: #{tool.parameters.map { |name, param| "#{name} (#{param.inspect})" }.join(', ')}" - puts "---" -end From a30dc53f2c96917f4d45342e5bc4ecf2906c6342 Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Thu, 10 Jul 2025 11:21:27 -0400 Subject: [PATCH 4/6] removed all scripts from git --- scripts/fire_local.rb | 45 ------------------------------------------- 1 file changed, 45 deletions(-) delete mode 100644 scripts/fire_local.rb diff --git a/scripts/fire_local.rb b/scripts/fire_local.rb deleted file mode 100644 index 84d2f5a..0000000 --- a/scripts/fire_local.rb +++ /dev/null @@ -1,45 +0,0 @@ -# frozen_string_literal: true - -require "bundler/setup" -require "ruby_llm/mcp" -require "debug" -require "dotenv" - -Dotenv.load - -RubyLLM.configure do |config| - config.openai_api_key = ENV.fetch("OPENAI_API_KEY", nil) -end - -RubyLLM::MCP.support_complex_parameters! - -firecrawl_mcp = RubyLLM::MCP.client( - name: "firecrawl-server", - transport_type: :stdio, - config: { - command: "npx", - args: ["-y", "firecrawl-mcp"], - env: { - "FIRECRAWL_API_KEY" => ENV.fetch("FIRECRAWL_API_KEY", nil) - } - } -) - -chat = RubyLLM.chat(model: "gpt-4.1") -chat.with_tool(firecrawl_mcp.tool("firecrawl_scrape")) - -message = "Can you scrape the website https://discord.com/blog and tell me what the purpose of this site is?" -message2 = "Can you return this in a markdown format?" - -chat.ask([message, message2].join("\n")) do |chunk| - if chunk.tool_call? - chunk.tool_calls.each do |key, tool_call| - next if tool_call.name.nil? - - puts "\n🔧 Tool call(#{key}) - #{tool_call.name}" - end - else - print chunk.content - end -end -puts "\n" From a29a2be3264e57f2a2a6c64b21a874f8852163e4 Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Thu, 10 Jul 2025 11:35:26 -0400 Subject: [PATCH 5/6] still try and defer to http2, give a config option for force http1.1 if needed --- docs/guides/transports.md | 2 ++ lib/ruby_llm/mcp/transports/sse.rb | 11 ++++-- .../mcp/transports/streamable_http.rb | 34 ++++++++++++------- spec/ruby_llm/mcp/transports/sse_spec.rb | 1 + 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/docs/guides/transports.md b/docs/guides/transports.md index c8abef9..79e8d42 100644 --- a/docs/guides/transports.md +++ b/docs/guides/transports.md @@ -62,6 +62,7 @@ client = RubyLLM::MCP.client( transport_type: :sse, config: { url: "https://api.example.com/mcp/sse", + version: :http2, # You can force HTTP/1.1 by setting this to :http1, default with try to setup HTTP/2 connection headers: { "Authorization" => "Bearer token" } } ) @@ -83,6 +84,7 @@ client = RubyLLM::MCP.client( transport_type: :streamable, config: { url: "https://api.example.com/mcp", + version: :http2, # You can force HTTP/1.1 by setting this to :http1, default with try to setup HTTP/2 connection headers: { "Content-Type" => "application/json" } } ) diff --git a/lib/ruby_llm/mcp/transports/sse.rb b/lib/ruby_llm/mcp/transports/sse.rb index f7440ec..6193461 100644 --- a/lib/ruby_llm/mcp/transports/sse.rb +++ b/lib/ruby_llm/mcp/transports/sse.rb @@ -14,11 +14,12 @@ class SSE attr_reader :headers, :id, :coordinator - def initialize(url:, coordinator:, request_timeout:, headers: {}) + def initialize(url:, coordinator:, request_timeout:, version: :http2, headers: {}) @event_url = url @messages_url = nil @coordinator = coordinator @request_timeout = request_timeout + @version = version uri = URI.parse(url) @root_url = "#{uri.scheme}://#{uri.host}" @@ -168,9 +169,15 @@ def listen_for_events def stream_events_from_server sse_client = HTTPX.plugin(:stream) sse_client = sse_client.with( - ssl: { alpn_protocols: ["http/1.1"] }, headers: @headers ) + + if @version == :http1 + sse_client = sse_client.with( + ssl: { alpn_protocols: ["http/1.1"] } + ) + end + response = sse_client.get(@event_url, stream: true) event_buffer = [] diff --git a/lib/ruby_llm/mcp/transports/streamable_http.rb b/lib/ruby_llm/mcp/transports/streamable_http.rb index b852981..a8383f2 100644 --- a/lib/ruby_llm/mcp/transports/streamable_http.rb +++ b/lib/ruby_llm/mcp/transports/streamable_http.rb @@ -49,6 +49,7 @@ def initialize( # rubocop:disable Metrics/ParameterLists request_timeout:, coordinator:, headers: {}, + version: :http2, reconnection_options: nil, session_id: nil ) @@ -56,9 +57,11 @@ def initialize( # rubocop:disable Metrics/ParameterLists @coordinator = coordinator @request_timeout = request_timeout @headers = headers || {} - @session_id = session_id + @version = version @reconnection_options = reconnection_options || ReconnectionOptions.new @protocol_version = nil + @session_id = session_id + @resource_metadata_url = nil @client_id = SecureRandom.uuid @@ -466,9 +469,9 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength def create_connection_with_sse_callbacks(options, headers) buffer = +"" - client = HTTPX - .plugin(:callbacks) - .on_response_body_chunk do |request, response, chunk| + client = HTTPX.plugin(:callbacks) + + client = client.on_response_body_chunk do |request, response, chunk| # Only process chunks for text/event-stream and if still running next unless @running && !@abort_controller @@ -495,16 +498,23 @@ def create_connection_with_sse_callbacks(options, headers) end end end - .with( - timeout: { - connect_timeout: 10, - read_timeout: @request_timeout / 1000, - write_timeout: @request_timeout / 1000, - operation_timeout: @request_timeout / 1000 - }, - headers: headers, + + client = client.with( + timeout: { + connect_timeout: 10, + read_timeout: @request_timeout / 1000, + write_timeout: @request_timeout / 1000, + operation_timeout: @request_timeout / 1000 + }, + headers: headers + ) + + if @version == :http1 + client = client.with( ssl: { alpn_protocols: ["http/1.1"] } ) + end + register_client(client) end diff --git a/spec/ruby_llm/mcp/transports/sse_spec.rb b/spec/ruby_llm/mcp/transports/sse_spec.rb index 637bdb7..47f5b0a 100644 --- a/spec/ruby_llm/mcp/transports/sse_spec.rb +++ b/spec/ruby_llm/mcp/transports/sse_spec.rb @@ -21,6 +21,7 @@ def client transport_type: :sse, config: { url: "http://localhost:#{TestServerManager::PORTS[:sse]}/mcp/sse", + version: :http1, request_timeout: 100 } ) From f0d9149d186bcf87eb4837d1f7f1ae57855a31b5 Mon Sep 17 00:00:00 2001 From: Patrick Vice Date: Thu, 10 Jul 2025 11:40:21 -0400 Subject: [PATCH 6/6] little refactor to pass lint --- .../mcp/transports/streamable_http.rb | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/lib/ruby_llm/mcp/transports/streamable_http.rb b/lib/ruby_llm/mcp/transports/streamable_http.rb index a8383f2..3248dff 100644 --- a/lib/ruby_llm/mcp/transports/streamable_http.rb +++ b/lib/ruby_llm/mcp/transports/streamable_http.rb @@ -467,11 +467,31 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength end def create_connection_with_sse_callbacks(options, headers) - buffer = +"" - client = HTTPX.plugin(:callbacks) + client = add_on_response_body_chunk_callback(client, options) + + client = client.with( + timeout: { + connect_timeout: 10, + read_timeout: @request_timeout / 1000, + write_timeout: @request_timeout / 1000, + operation_timeout: @request_timeout / 1000 + }, + headers: headers + ) + + if @version == :http1 + client = client.with( + ssl: { alpn_protocols: ["http/1.1"] } + ) + end + + register_client(client) + end - client = client.on_response_body_chunk do |request, response, chunk| + def add_on_response_body_chunk_callback(client, options) + buffer = +"" + client.on_response_body_chunk do |request, response, chunk| # Only process chunks for text/event-stream and if still running next unless @running && !@abort_controller @@ -498,24 +518,6 @@ def create_connection_with_sse_callbacks(options, headers) end end end - - client = client.with( - timeout: { - connect_timeout: 10, - read_timeout: @request_timeout / 1000, - write_timeout: @request_timeout / 1000, - operation_timeout: @request_timeout / 1000 - }, - headers: headers - ) - - if @version == :http1 - client = client.with( - ssl: { alpn_protocols: ["http/1.1"] } - ) - end - - register_client(client) end def calculate_reconnection_delay(attempt)