Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,14 @@ Set `stateless: true` in `MCP::Server::Transports::StreamableHTTPTransport.new`
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, stateless: true)
```

By default, sessions do not expire. To mitigate session hijacking risks, you can set a `session_idle_timeout` (in seconds).
When configured, sessions that receive no HTTP requests for this duration are automatically expired and cleaned up:

```ruby
# Session timeout of 30 minutes
transport = MCP::Server::Transports::StreamableHTTPTransport.new(server, session_idle_timeout: 1800)
```

### Unsupported Features (to be implemented in future versions)

- Resource subscriptions
Expand Down
94 changes: 89 additions & 5 deletions lib/mcp/server/transports/streamable_http_transport.rb
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,30 @@ module MCP
class Server
module Transports
class StreamableHTTPTransport < Transport
def initialize(server, stateless: false)
def initialize(server, stateless: false, session_idle_timeout: nil)
super(server)
# { session_id => { stream: stream_object }
# Session data structure: `{ session_id => { stream: stream_object, last_active_at: float_from_monotonic_clock } }`.
@sessions = {}
@mutex = Mutex.new

@stateless = stateless
@session_idle_timeout = session_idle_timeout

if @session_idle_timeout
if @stateless
raise ArgumentError, "session_idle_timeout is not supported in stateless mode."
elsif @session_idle_timeout <= 0
raise ArgumentError, "session_idle_timeout must be a positive number."
end
end

start_reaper_thread if @session_idle_timeout
end

REQUIRED_POST_ACCEPT_TYPES = ["application/json", "text/event-stream"].freeze
REQUIRED_GET_ACCEPT_TYPES = ["text/event-stream"].freeze
STREAM_WRITE_ERRORS = [IOError, Errno::EPIPE, Errno::ECONNRESET].freeze
SESSION_REAP_INTERVAL = 60

def handle_request(request)
case request.env["REQUEST_METHOD"]
Expand All @@ -35,6 +47,9 @@ def handle_request(request)
end

def close
@reaper_thread&.kill
@reaper_thread = nil

@mutex.synchronize do
@sessions.each_key { |session_id| cleanup_session_unsafe(session_id) }
end
Expand All @@ -56,6 +71,11 @@ def send_notification(method, params = nil, session_id: nil)
session = @sessions[session_id]
return false unless session && session[:stream]

if session_expired?(session)
cleanup_session_unsafe(session_id)
return false
end

begin
send_to_stream(session[:stream], notification)
true
Expand All @@ -75,6 +95,11 @@ def send_notification(method, params = nil, session_id: nil)
@sessions.each do |sid, session|
next unless session[:stream]

if session_expired?(session)
failed_sessions << sid
next
end

begin
send_to_stream(session[:stream], notification)
sent_count += 1
Expand All @@ -97,6 +122,39 @@ def send_notification(method, params = nil, session_id: nil)

private

def start_reaper_thread
@reaper_thread = Thread.new do
loop do
sleep(SESSION_REAP_INTERVAL)
reap_expired_sessions
rescue StandardError => e
MCP.configuration.exception_reporter.call(e, error: "Session reaper error")
end
end
end

def reap_expired_sessions
return unless @session_idle_timeout

expired_streams = @mutex.synchronize do
@sessions.each_with_object([]) do |(session_id, session), streams|
next unless session_expired?(session)

streams << session[:stream] if session[:stream]
@sessions.delete(session_id)
end
end

expired_streams.each do |stream|
# Closing outside the mutex is safe because expired sessions are already
# removed from `@sessions` above, so other threads will not find them
# and will not attempt to close the same stream.
stream.close
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is stream.close idempotent on closed sessions? Can there a be race condition where a session closes by natural means after we collect it for reaping?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

reap_expired_sessions deletes the session from @sessions inside the mutex before calling stream.close outside it, so there should be no issue. Added a code comment to clarify this.

rescue
nil
end
end

def send_to_stream(stream, data)
message = data.is_a?(String) ? data : data.to_json
stream.write("data: #{message}\n\n")
Expand Down Expand Up @@ -141,7 +199,9 @@ def handle_get(request)
session_id = extract_session_id(request)

return missing_session_id_response unless session_id
return session_not_found_response unless session_exists?(session_id)

error_response = validate_and_touch_session(session_id)
return error_response if error_response
return session_already_connected_response if get_session_stream(session_id)

setup_sse_stream(session_id)
Expand Down Expand Up @@ -235,6 +295,7 @@ def handle_initialization(body_string, body)
@mutex.synchronize do
@sessions[session_id] = {
stream: nil,
last_active_at: Process.clock_gettime(Process::CLOCK_MONOTONIC),
}
end
end
Expand All @@ -256,8 +317,9 @@ def handle_accepted

def handle_regular_request(body_string, session_id)
unless @stateless
if session_id && !session_exists?(session_id)
return session_not_found_response
if session_id
error_response = validate_and_touch_session(session_id)
return error_response if error_response
end
end

Expand All @@ -273,6 +335,22 @@ def handle_regular_request(body_string, session_id)
end
end

def validate_and_touch_session(session_id)
@mutex.synchronize do
return session_not_found_response unless (session = @sessions[session_id])
return unless @session_idle_timeout

if session_expired?(session)
cleanup_session_unsafe(session_id)
return session_not_found_response
end

session[:last_active_at] = Process.clock_gettime(Process::CLOCK_MONOTONIC)
end

nil
end

def get_session_stream(session_id)
@mutex.synchronize { @sessions[session_id]&.fetch(:stream, nil) }
end
Expand Down Expand Up @@ -378,6 +456,12 @@ def send_keepalive_ping(session_id)
)
raise # Re-raise to exit the keepalive loop
end

def session_expired?(session)
return false unless @session_idle_timeout

Process.clock_gettime(Process::CLOCK_MONOTONIC) - session[:last_active_at] > @session_idle_timeout
end
end
end
end
Expand Down
Loading