Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions docs/guides/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ client = RubyLLM::MCP.client(
transport_type: :sse,
config: {
url: "https://api.example.com/mcp/sse",
version: :http2, # You can force HTTP/1.1 by setting this to :http1, default with try to setup HTTP/2 connection
headers: { "Authorization" => "Bearer token" }
}
)
Expand All @@ -83,6 +84,7 @@ client = RubyLLM::MCP.client(
transport_type: :streamable,
config: {
url: "https://api.example.com/mcp",
version: :http2, # You can force HTTP/1.1 by setting this to :http1, default with try to setup HTTP/2 connection
headers: { "Content-Type" => "application/json" }
}
)
Expand Down
2 changes: 1 addition & 1 deletion lib/ruby_llm/mcp/coordinator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def name
def request(body, **options)
transport.request(body, **options)
rescue RubyLLM::MCP::Errors::TimeoutError => e
if transport&.alive?
if transport&.alive? && !e.request_id.nil?
cancelled_notification(reason: "Request timed out", request_id: e.request_id)
end
raise e
Expand Down
92 changes: 68 additions & 24 deletions lib/ruby_llm/mcp/transports/sse.rb
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ class SSE

attr_reader :headers, :id, :coordinator

def initialize(url:, coordinator:, request_timeout:, headers: {})
def initialize(url:, coordinator:, request_timeout:, version: :http2, headers: {})
@event_url = url
@messages_url = nil
@coordinator = coordinator
@request_timeout = request_timeout
@version = version

uri = URI.parse(url)
@root_url = "#{uri.scheme}://#{uri.host}"
Expand All @@ -44,7 +45,7 @@ def initialize(url:, coordinator:, request_timeout:, headers: {})
RubyLLM::MCP.logger.info "Initializing SSE transport to #{@event_url} with client ID #{@client_id}"
end

def request(body, add_id: true, wait_for_response: true) # rubocop:disable Metrics/MethodLength
def request(body, add_id: true, wait_for_response: true)
if add_id
@id_mutex.synchronize { @id_counter += 1 }
request_id = @id_counter
Expand All @@ -59,34 +60,20 @@ def request(body, add_id: true, wait_for_response: true) # rubocop:disable Metri
end

begin
http_client = HTTPClient.connection.with(timeout: { request_timeout: @request_timeout / 1000 },
headers: @headers)
response = http_client.post(@messages_url, body: JSON.generate(body))

unless response.status == 200
@pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) }
RubyLLM::MCP.logger.error "SSE request failed: #{response.status} - #{response.body}"
raise Errors::TransportError.new(
message: "Failed to request #{@messages_url}: #{response.status} - #{response.body}",
code: response.status
)
end
rescue StandardError => e
send_request(body, request_id)
rescue Errors::TransportError, Errors::TimeoutError => e
@pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) }
RubyLLM::MCP.logger.error "SSE request error (ID: #{request_id}): #{e.message}"
raise RubyLLM::MCP::Errors::TransportError.new(
message: e.message,
code: -1,
error: e
)
RubyLLM::MCP.logger.error "Request error (ID: #{request_id}): #{e.message}"
raise e
end

return unless wait_for_response

begin
with_timeout(@request_timeout / 1000, request_id: request_id) do
response_queue.pop
end
rescue RubyLLM::MCP::Errors::TimeoutError => e
rescue Errors::TimeoutError => e
@pending_mutex.synchronize { @pending_requests.delete(request_id.to_s) }
RubyLLM::MCP.logger.error "SSE request timeout (ID: #{request_id}) after #{@request_timeout / 1000} seconds"
raise e
Expand Down Expand Up @@ -117,6 +104,23 @@ def set_protocol_version(version)

private

def send_request(body, request_id)
http_client = HTTPX.with(timeout: { request_timeout: @request_timeout / 1000 },
headers: @headers)
response = http_client.post(@messages_url, body: JSON.generate(body))
handle_httpx_error_response!(response,
context: { location: "message endpoint request", request_id: request_id })

unless [200, 202].include?(response.status)
message = "Failed to have a successful request to #{@messages_url}: #{response.status} - #{response.body}"
RubyLLM::MCP.logger.error(message)
raise Errors::TransportError.new(
message: message,
code: response.status
)
end
end

def start_sse_listener
@connection_mutex.synchronize do
return if sse_thread_running?
Expand Down Expand Up @@ -167,15 +171,36 @@ def stream_events_from_server
sse_client = sse_client.with(
headers: @headers
)

if @version == :http1
sse_client = sse_client.with(
ssl: { alpn_protocols: ["http/1.1"] }
)
end

response = sse_client.get(@event_url, stream: true)

event_buffer = []
response.each_line do |event_line|
unless @running
response.body.close
next
end

event = parse_event(event_line)
process_event(event)
# Strip the line and check if it's empty (indicates end of event)
line = event_line.strip

if line.empty?
# End of event - process the accumulated buffer
if event_buffer.any?
event = parse_event(event_buffer.join("\n"))
process_event(event)
event_buffer.clear
end
else
# Accumulate the line for the current event
event_buffer << line
end
end
end

Expand All @@ -187,6 +212,25 @@ def handle_connection_error(message, error)
sleep 1
end

def handle_httpx_error_response!(response, context:)
return false unless response.is_a?(HTTPX::ErrorResponse)

error = response.error

if error.is_a?(HTTPX::ReadTimeoutError)
raise Errors::TimeoutError.new(
message: "Request timed out after #{@request_timeout / 1000} seconds"
)
end

error_message = response.error&.message || "Request failed"

raise Errors::TransportError.new(
code: nil,
message: "Request Error #{context}: #{error_message}"
)
end

def process_event(raw_event)
# Return if we believe that are getting a partial event
return if raw_event[:data].nil?
Expand Down
48 changes: 31 additions & 17 deletions lib/ruby_llm/mcp/transports/streamable_http.rb
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,19 @@ def initialize( # rubocop:disable Metrics/ParameterLists
request_timeout:,
coordinator:,
headers: {},
version: :http2,
reconnection_options: nil,
session_id: nil
)
@url = URI(url)
@coordinator = coordinator
@request_timeout = request_timeout
@headers = headers || {}
@session_id = session_id
@version = version
@reconnection_options = reconnection_options || ReconnectionOptions.new
@protocol_version = nil
@session_id = session_id

@resource_metadata_url = nil
@client_id = SecureRandom.uuid

Expand Down Expand Up @@ -422,8 +425,8 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength
end

# Set up SSE streaming connection with callbacks
connection = create_connection_with_sse_callbacks(options)
response = connection.get(@url, headers: headers)
connection = create_connection_with_sse_callbacks(options, headers)
response = connection.get(@url)

# Handle HTTPX error responses first
error_result = handle_httpx_error_response!(response, context: { location: "SSE connection" },
Expand Down Expand Up @@ -463,12 +466,32 @@ def start_sse(options) # rubocop:disable Metrics/MethodLength
end
end

def create_connection_with_sse_callbacks(options)
buffer = +""
def create_connection_with_sse_callbacks(options, headers)
client = HTTPX.plugin(:callbacks)
client = add_on_response_body_chunk_callback(client, options)

client = client.with(
timeout: {
connect_timeout: 10,
read_timeout: @request_timeout / 1000,
write_timeout: @request_timeout / 1000,
operation_timeout: @request_timeout / 1000
},
headers: headers
)

if @version == :http1
client = client.with(
ssl: { alpn_protocols: ["http/1.1"] }
)
end

register_client(client)
end

client = HTTPX
.plugin(:callbacks)
.on_response_body_chunk do |request, response, chunk|
def add_on_response_body_chunk_callback(client, options)
buffer = +""
client.on_response_body_chunk do |request, response, chunk|
# Only process chunks for text/event-stream and if still running
next unless @running && !@abort_controller

Expand All @@ -495,15 +518,6 @@ def create_connection_with_sse_callbacks(options)
end
end
end
.with(
timeout: {
connect_timeout: 10,
read_timeout: @request_timeout / 1000,
write_timeout: @request_timeout / 1000,
operation_timeout: @request_timeout / 1000
}
)
register_client(client)
end

def calculate_reconnection_delay(attempt)
Expand Down
45 changes: 0 additions & 45 deletions scripts/fire_local.rb

This file was deleted.

32 changes: 0 additions & 32 deletions scripts/sse_test.rb

This file was deleted.

1 change: 1 addition & 0 deletions spec/ruby_llm/mcp/transports/sse_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ def client
transport_type: :sse,
config: {
url: "http://localhost:#{TestServerManager::PORTS[:sse]}/mcp/sse",
version: :http1,
request_timeout: 100
}
)
Expand Down