🚀. Socket Launch Week Day 3:Socket Firewall Now Blocks Malicious VS Code and Open VSX Extensions.Learn more
Sign In

rmcp

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rmcp - cargo Package Compare versions

Comparing version
1.4.0
to
1.5.0
+122
tests/test_streamable_http_connection_reuse.rs
#![cfg(not(feature = "local"))]
use std::time::Instant;
use rmcp::{
ServerHandler, ServiceExt,
handler::server::{router::tool::ToolRouter, wrapper::Parameters},
model::{CallToolRequestParams, ClientInfo, ServerCapabilities, ServerInfo},
schemars, tool, tool_handler, tool_router,
transport::{
StreamableHttpClientTransport,
streamable_http_client::StreamableHttpClientTransportConfig,
streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
},
},
};
use tokio_util::sync::CancellationToken;
#[derive(Debug, serde::Deserialize, schemars::JsonSchema)]
struct SumRequest {
a: i32,
b: i32,
}
#[derive(Debug, Clone)]
struct SumServer {
tool_router: ToolRouter<Self>,
}
impl SumServer {
fn new() -> Self {
Self {
tool_router: Self::tool_router(),
}
}
}
#[tool_router]
impl SumServer {
#[tool(description = "Sum two numbers")]
fn sum(&self, Parameters(SumRequest { a, b }): Parameters<SumRequest>) -> String {
(a + b).to_string()
}
}
#[tool_handler(router = self.tool_router)]
impl ServerHandler for SumServer {
fn get_info(&self) -> ServerInfo {
ServerInfo::new(ServerCapabilities::builder().enable_tools().build())
}
}
/// Verify that subsequent tool calls do not regress in latency due to
/// HTTP/1.1 connection pool exhaustion. Before the fix, each POST SSE
/// response was dropped without fully consuming the body, preventing
/// connection reuse and forcing a new TCP connection (~40 ms) per call.
#[tokio::test]
async fn test_subsequent_tool_calls_reuse_connections() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<SumServer, LocalSessionManager> = StreamableHttpService::new(
|| Ok(SumServer::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let transport = StreamableHttpClientTransport::from_config(
StreamableHttpClientTransportConfig::with_uri(format!("http://{addr}/mcp")),
);
let client = ClientInfo::default().serve(transport).await?;
// Warm up: first call may include one-time setup costs.
let args: serde_json::Map<String, serde_json::Value> =
serde_json::from_value(serde_json::json!({"a": 1, "b": 2}))?;
let _ = client
.call_tool(CallToolRequestParams::new("sum").with_arguments(args))
.await?;
// Measure subsequent calls.
let mut durations = Vec::new();
for i in 0..5i32 {
let args: serde_json::Map<String, serde_json::Value> =
serde_json::from_value(serde_json::json!({"a": i, "b": i + 1}))?;
let start = Instant::now();
let result = client
.call_tool(CallToolRequestParams::new("sum").with_arguments(args))
.await?;
let elapsed = start.elapsed();
durations.push(elapsed);
assert!(result.is_error != Some(true));
}
let _ = client.cancel().await;
ct.cancel();
server_handle.await?;
// With connection reuse, localhost calls should complete well under 20 ms.
// Before the fix, they consistently took ~42 ms due to new TCP connections.
let max_allowed = std::time::Duration::from_millis(20);
for d in &durations {
assert!(*d < max_allowed);
}
Ok(())
}
+1
-1
{
"git": {
"sha1": "4628720f89d27a01d4a126ea9f82f0775df9ed52"
"sha1": "020a38b6ad3d0f26487c464250a484fad2a06b0e"
},
"path_in_vcs": "crates/rmcp"
}

@@ -15,3 +15,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO

name = "rmcp"
version = "1.4.0"
version = "1.5.0"
build = "build.rs"

@@ -341,2 +341,15 @@ autolib = false

[[test]]
name = "test_streamable_http_connection_reuse"
path = "tests/test_streamable_http_connection_reuse.rs"
required-features = [
"server",
"client",
"macros",
"schemars",
"transport-streamable-http-server",
"transport-streamable-http-client",
"transport-streamable-http-client-reqwest",
]
[[test]]
name = "test_streamable_http_json_response"

@@ -522,3 +535,3 @@ path = "tests/test_streamable_http_json_response.rs"

[dependencies.rmcp-macros]
version = "1.4.0"
version = "1.5.0"
optional = true

@@ -581,3 +594,3 @@

[dependencies.which]
version = "7"
version = "8"
optional = true

@@ -584,0 +597,0 @@

@@ -10,2 +10,19 @@ # Changelog

## [1.5.0](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.4.0...rmcp-v1.5.0) - 2026-04-16
### Added
- *(transport)* add constructors for non_exhaustive error types ([#806](https://github.com/modelcontextprotocol/rust-sdk/pull/806))
- add 2025-11-25 protocol version support ([#802](https://github.com/modelcontextprotocol/rust-sdk/pull/802))
### Fixed
- treat resource metadata JSON parse failure as soft error ([#810](https://github.com/modelcontextprotocol/rust-sdk/pull/810))
- include http_request_id in request-wise priming event IDs ([#799](https://github.com/modelcontextprotocol/rust-sdk/pull/799))
- *(http)* drain SSE stream for connection reuse ([#790](https://github.com/modelcontextprotocol/rust-sdk/pull/790))
### Other
- *(deps)* update which requirement from 7 to 8 ([#807](https://github.com/modelcontextprotocol/rust-sdk/pull/807))
## [1.4.0](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.3.0...rmcp-v1.4.0) - 2026-04-09

@@ -12,0 +29,0 @@

@@ -255,2 +255,20 @@ //! # Transport

}
/// Create a `DynamicTransportError` from raw parts.
///
/// Unlike [`new`](Self::new), this does not require a concrete [`Transport`] type,
/// making it usable in test fixtures and other contexts where a real transport
/// implementation is not available.
pub fn from_parts(
transport_name: impl Into<Cow<'static, str>>,
transport_type_id: std::any::TypeId,
error: Box<dyn std::error::Error + Send + Sync>,
) -> Self {
Self {
transport_name: transport_name.into(),
transport_type_id,
error,
}
}
pub fn downcast<T: Transport<R> + 'static, R: ServiceRole>(self) -> Result<T::Error, Self> {

@@ -257,0 +275,0 @@ if !self.is::<T, R>() {

@@ -265,3 +265,3 @@ use std::{borrow::Cow, collections::HashMap, sync::Arc};

StreamableHttpClientTransport::with_client(
reqwest::Client::default(),
Self::default_http_client(),
StreamableHttpClientTransportConfig {

@@ -281,4 +281,16 @@ uri: uri.into(),

pub fn from_config(config: StreamableHttpClientTransportConfig) -> Self {
StreamableHttpClientTransport::with_client(reqwest::Client::default(), config)
StreamableHttpClientTransport::with_client(Self::default_http_client(), config)
}
/// Build the default reqwest client for this transport.
///
/// Disables idle connection pooling to avoid ~40 ms stalls caused by
/// TCP Delayed ACK on Linux when the previous response body was not
/// fully consumed before the pool attempts to reuse the connection.
fn default_http_client() -> reqwest::Client {
reqwest::Client::builder()
.pool_max_idle_per_host(0)
.build()
.expect("failed to build default reqwest client")
}
}

@@ -289,17 +301,24 @@

use super::parse_json_rpc_error;
use crate::{model::JsonRpcMessage, transport::streamable_http_client::InsufficientScopeError};
use crate::{
model::JsonRpcMessage,
transport::streamable_http_client::{AuthRequiredError, InsufficientScopeError},
};
#[test]
fn auth_required_error_new() {
let err = AuthRequiredError::new("Bearer realm=\"test\"".to_string());
assert_eq!(err.www_authenticate_header, "Bearer realm=\"test\"");
}
#[test]
fn insufficient_scope_error_can_upgrade() {
let with_scope = InsufficientScopeError {
www_authenticate_header: "Bearer scope=\"admin\"".to_string(),
required_scope: Some("admin".to_string()),
};
let with_scope = InsufficientScopeError::new(
"Bearer scope=\"admin\"".to_string(),
Some("admin".to_string()),
);
assert!(with_scope.can_upgrade());
assert_eq!(with_scope.get_required_scope(), Some("admin"));
let without_scope = InsufficientScopeError {
www_authenticate_header: "Bearer error=\"insufficient_scope\"".to_string(),
required_scope: None,
};
let without_scope =
InsufficientScopeError::new("Bearer error=\"insufficient_scope\"".to_string(), None);
assert!(!without_scope.can_upgrade());

@@ -306,0 +325,0 @@ assert_eq!(without_scope.get_required_scope(), None);

@@ -32,2 +32,11 @@ use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};

impl AuthRequiredError {
/// Create a new `AuthRequiredError` instance.
pub fn new(www_authenticate_header: String) -> Self {
Self {
www_authenticate_header,
}
}
}
#[derive(Debug)]

@@ -41,2 +50,10 @@ #[non_exhaustive]

impl InsufficientScopeError {
/// Create a new `InsufficientScopeError` instance.
pub fn new(www_authenticate_header: String, required_scope: Option<String>) -> Self {
Self {
www_authenticate_header,
required_scope,
}
}
/// check if scope upgrade is possible (i.e., we know what scope is required)

@@ -286,2 +303,33 @@ pub fn can_upgrade(&self) -> bool {

impl<C: StreamableHttpClient> StreamableHttpClientWorker<C> {
/// Convert a raw SSE stream into a JSON-RPC message stream without
/// reconnection logic.
fn raw_sse_to_jsonrpc(
stream: BoxedSseStream,
) -> impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>> + Send + 'static
{
stream.filter_map(|event| async {
match event {
Err(e) => Some(Err(StreamableHttpError::Sse(e))),
Ok(sse) => {
let is_message =
matches!(sse.event.as_deref(), None | Some("") | Some("message"));
if !is_message {
return None;
}
let data = sse.data?;
if data.trim().is_empty() {
return None;
}
match serde_json::from_str::<ServerJsonRpcMessage>(&data) {
Ok(msg) => Some(Ok(msg)),
Err(e) => {
tracing::debug!("failed to deserialize server message: {e}");
None
}
}
}
}
})
}
async fn execute_sse_stream(

@@ -309,3 +357,6 @@ sse_stream: impl Stream<Item = Result<ServerJsonRpcMessage, StreamableHttpError<C::Error>>>

};
let is_response = matches!(message, ServerJsonRpcMessage::Response(_));
let is_response = matches!(
message,
ServerJsonRpcMessage::Response(_) | ServerJsonRpcMessage::Error(_)
);
let yield_result = sse_worker_tx.send(message).await;

@@ -317,3 +368,9 @@ if yield_result.is_err() {

if close_on_response && is_response {
tracing::debug!("got response, closing sse stream");
tracing::debug!("got response, draining sse stream for connection reuse");
// Consume the remaining stream so the HTTP/1.1 connection
// returns to the pool cleanly.
let _ = tokio::time::timeout(std::time::Duration::from_millis(50), async {
while sse_stream.next().await.is_some() {}
})
.await;
break;

@@ -726,34 +783,8 @@ }

Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
if let Some(sid) = &session_id {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: self.client.clone(),
session_id: sid.clone(),
uri: config.uri.clone(),
auth_header: config.auth_header.clone(),
custom_headers: protocol_headers
.clone(),
},
self.config.retry_config.clone(),
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
} else {
let sse_stream =
SseAutoReconnectStream::never_reconnect(
stream,
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
}
streams.spawn(Self::execute_sse_stream(
Self::raw_sse_to_jsonrpc(stream),
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
tracing::trace!("got new sse stream after re-init");

@@ -778,32 +809,8 @@ Ok(())

Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
if let Some(session_id) = &session_id {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: self.client.clone(),
session_id: session_id.clone(),
uri: config.uri.clone(),
auth_header: config.auth_header.clone(),
custom_headers: protocol_headers.clone(),
},
self.config.retry_config.clone(),
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
} else {
let sse_stream = SseAutoReconnectStream::never_reconnect(
stream,
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
}
streams.spawn(Self::execute_sse_stream(
Self::raw_sse_to_jsonrpc(stream),
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
tracing::trace!("got new sse stream");

@@ -810,0 +817,0 @@ Ok(())

use std::{
collections::{HashMap, HashSet, VecDeque},
num::ParseIntError,
time::Duration,
time::{Duration, Instant},
};
use futures::Stream;
use futures::{Stream, StreamExt};
use thiserror::Error;

@@ -89,6 +89,13 @@ use tokio::sync::{

let receiver = handle.establish_request_wise_channel().await?;
handle
.push_message(message, receiver.http_request_id)
.await?;
Ok(ReceiverStream::new(receiver.inner))
let http_request_id = receiver.http_request_id;
handle.push_message(message, http_request_id).await?;
let priming = self.session_config.sse_retry.map(|retry| {
let event_id = match http_request_id {
Some(id) => format!("0/{id}"),
None => "0".into(),
};
ServerSseMessage::priming(event_id, retry)
});
Ok(futures::stream::iter(priming).chain(ReceiverStream::new(receiver.inner)))
}

@@ -192,6 +199,11 @@

capacity: usize,
starting_index: usize,
}
impl CachedTx {
fn new(tx: Sender<ServerSseMessage>, http_request_id: Option<HttpRequestId>) -> Self {
fn new(
tx: Sender<ServerSseMessage>,
http_request_id: Option<HttpRequestId>,
starting_index: usize,
) -> Self {
Self {

@@ -202,10 +214,11 @@ cache: VecDeque::with_capacity(tx.capacity()),

http_request_id,
starting_index,
}
}
fn new_common(tx: Sender<ServerSseMessage>) -> Self {
Self::new(tx, None)
Self::new(tx, None, 0)
}
fn next_event_id(&self) -> EventId {
let index = self.cache.back().map_or(0, |m| {
let index = self.cache.back().map_or(self.starting_index, |m| {
m.event_id

@@ -278,2 +291,3 @@ .as_deref()

tx: CachedTx,
completed_at: Option<Instant>,
}

@@ -349,20 +363,24 @@

fn unregister_resource(&mut self, resource: &ResourceKey) {
if let Some(http_request_id) = self.resource_router.remove(resource) {
tracing::trace!(?resource, http_request_id, "unregister resource");
if let Some(channel) = self.tx_router.get_mut(&http_request_id) {
// It's okey to do so, since we don't handle batch json rpc request anymore
// and this can be refactored after the batch request is removed in the coming version.
if channel.resources.is_empty() || matches!(resource, ResourceKey::McpRequestId(_))
{
tracing::debug!(http_request_id, "close http request wise channel");
if let Some(channel) = self.tx_router.remove(&http_request_id) {
for resource in channel.resources {
self.resource_router.remove(&resource);
}
}
}
} else {
tracing::warn!(http_request_id, "http request wise channel not found");
}
let Some(http_request_id) = self.resource_router.remove(resource) else {
return;
};
tracing::trace!(?resource, http_request_id, "unregister resource");
let Some(channel) = self.tx_router.get_mut(&http_request_id) else {
tracing::warn!(http_request_id, "http request wise channel not found");
return;
};
if !channel.resources.is_empty() && !matches!(resource, ResourceKey::McpRequestId(_)) {
return;
}
tracing::debug!(http_request_id, "close http request wise channel");
let resources: Vec<_> = channel.resources.drain().collect();
channel.completed_at = Some(Instant::now());
// Close the sender so the client's SSE stream ends,
// but keep the entry so the cache is available for
// late resume requests.
let (closed_tx, _) = tokio::sync::mpsc::channel(1);
channel.tx.tx = closed_tx;
for resource in resources {
self.resource_router.remove(&resource);
}
}

@@ -403,2 +421,7 @@ fn register_resource(&mut self, resource: ResourceKey, http_request_id: HttpRequestId) {

}
fn evict_expired_channels(&mut self) {
let ttl = self.session_config.completed_cache_ttl;
self.tx_router
.retain(|_, rw| rw.completed_at.is_none_or(|at| at.elapsed() < ttl));
}
fn next_http_request_id(&mut self) -> HttpRequestId {

@@ -414,2 +437,3 @@ let id = self.next_http_request_id;

let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
let starting_index = usize::from(self.session_config.sse_retry.is_some());
self.tx_router.insert(

@@ -419,3 +443,4 @@ http_request_id,

resources: Default::default(),
tx: CachedTx::new(tx, Some(http_request_id)),
tx: CachedTx::new(tx, Some(http_request_id), starting_index),
completed_at: None,
},

@@ -481,3 +506,3 @@ );

id: *id,
close: false,
close: true,
}

@@ -495,3 +520,3 @@ } else {

id: *id,
close: false,
close: true,
}

@@ -514,3 +539,7 @@ } else {

if close {
self.tx_router.remove(&id);
if let Some(channel) = self.tx_router.remove(&id) {
for resource in channel.resources {
self.resource_router.remove(&resource);
}
}
}

@@ -534,24 +563,21 @@ } else {

Some(http_request_id) => {
if let Some(request_wise) = self.tx_router.get_mut(&http_request_id) {
// Resume existing request-wise channel
let channel = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
let (tx, rx) = channel;
request_wise.tx.tx = tx;
let index = last_event_id.index;
// sync messages after index
request_wise.tx.sync(index).await?;
Ok(StreamableHttpMessageReceiver {
http_request_id: Some(http_request_id),
inner: rx,
})
} else {
// Request-wise channel completed (POST response already delivered).
// The client's EventSource is reconnecting after the POST SSE stream
// ended. Fall through to common channel handling below.
tracing::debug!(
http_request_id,
"Request-wise channel completed, falling back to common channel"
);
self.resume_or_shadow_common(last_event_id.index).await
let request_wise = self
.tx_router
.get_mut(&http_request_id)
.ok_or(SessionError::ChannelClosed(Some(http_request_id)))?;
let is_completed = request_wise.completed_at.is_some();
let (tx, rx) = tokio::sync::mpsc::channel(self.session_config.channel_capacity);
request_wise.tx.tx = tx;
let index = last_event_id.index;
request_wise.tx.sync(index).await?;
if is_completed {
// Drop the sender after replaying so the stream ends
// instead of hanging indefinitely.
let (closed_tx, _) = tokio::sync::mpsc::channel(1);
request_wise.tx.tx = closed_tx;
}
Ok(StreamableHttpMessageReceiver {
http_request_id: Some(http_request_id),
inner: rx,
})
}

@@ -966,2 +992,3 @@ None => self.resume_or_shadow_common(last_event_id.index).await,

loop {
self.evict_expired_channels();
let keep_alive_timeout = tokio::time::sleep(keep_alive);

@@ -1088,2 +1115,11 @@ let event = tokio::select! {

pub keep_alive: Option<Duration>,
/// SSE retry interval for priming events on request-wise streams.
/// When set, the session layer prepends a priming event with the correct
/// stream-identifying event ID to each request-wise SSE stream.
/// Default is 3 seconds, matching `StreamableHttpServerConfig::default()`.
pub sse_retry: Option<Duration>,
/// How long to retain completed request-wise channel caches for late
/// resume requests. After this duration, completed entries are evicted
/// and resume will return an error. Default is 60 seconds.
pub completed_cache_ttl: Duration,
}

@@ -1094,2 +1130,4 @@

pub const DEFAULT_KEEP_ALIVE: Duration = Duration::from_secs(300);
pub const DEFAULT_SSE_RETRY: Duration = Duration::from_secs(3);
pub const DEFAULT_COMPLETED_CACHE_TTL: Duration = Duration::from_secs(60);
}

@@ -1102,2 +1140,4 @@

keep_alive: Some(Self::DEFAULT_KEEP_ALIVE),
sse_retry: Some(Self::DEFAULT_SSE_RETRY),
completed_cache_ttl: Self::DEFAULT_COMPLETED_CACHE_TTL,
}

@@ -1104,0 +1144,0 @@ }

@@ -481,36 +481,48 @@ use std::{convert::Infallible, fmt::Display, sync::Arc, time::Duration};

if let Some(last_event_id) = last_event_id {
// check if session has this event id
let stream = self
match self
.session_manager
.resume(&session_id, last_event_id)
.await
.map_err(internal_error_response("resume session"))?;
// Resume doesn't need priming - client already has the event ID
Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
))
{
Ok(stream) => {
return Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
));
}
Err(e) => {
// Return 200 with an immediately-closed empty stream.
// Returning an HTTP error would cause EventSource to retry
// with the same Last-Event-ID in an infinite loop. An empty
// 200 cleanly terminates the EventSource without delivering
// events from a different stream.
tracing::warn!("Resume failed ({e}), returning empty stream");
return Ok(sse_stream_response(
futures::stream::empty(),
None,
self.config.cancellation_token.child_token(),
));
}
}
}
// No Last-Event-ID — create standalone stream
let stream = self
.session_manager
.create_standalone_stream(&session_id)
.await
.map_err(internal_error_response("create standalone stream"))?;
let stream = if let Some(retry) = self.config.sse_retry {
let priming = ServerSseMessage::priming("0", retry);
futures::stream::once(async move { priming })
.chain(stream)
.left_stream()
} else {
// create standalone stream
let stream = self
.session_manager
.create_standalone_stream(&session_id)
.await
.map_err(internal_error_response("create standalone stream"))?;
// Prepend priming event if sse_retry configured
let stream = if let Some(retry) = self.config.sse_retry {
let priming = ServerSseMessage::priming("0", retry);
futures::stream::once(async move { priming })
.chain(stream)
.left_stream()
} else {
stream.right_stream()
};
Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
))
}
stream.right_stream()
};
Ok(sse_stream_response(
stream,
self.config.sse_keep_alive,
self.config.cancellation_token.child_token(),
))
}

@@ -602,2 +614,5 @@

ClientJsonRpcMessage::Request(_) => {
// Priming for request-wise streams is handled by the
// session layer (SessionManager::create_stream) which
// has access to the http_request_id for correct event IDs.
let stream = self

@@ -608,11 +623,2 @@ .session_manager

.map_err(internal_error_response("get session"))?;
// Prepend priming event if sse_retry configured
let stream = if let Some(retry) = self.config.sse_retry {
let priming = ServerSseMessage::priming("0", retry);
futures::stream::once(async move { priming })
.chain(stream)
.left_stream()
} else {
stream.right_stream()
};
Ok(sse_stream_response(

@@ -619,0 +625,0 @@ stream,

@@ -869,2 +869,3 @@ #![cfg(not(feature = "local"))]

assert_eq!(ProtocolVersion::V_2025_11_25.as_str(), "2025-11-25");
assert_eq!(ProtocolVersion::V_2025_06_18.as_str(), "2025-06-18");

@@ -874,6 +875,7 @@ assert_eq!(ProtocolVersion::V_2025_03_26.as_str(), "2025-03-26");

assert_eq!(ProtocolVersion::KNOWN_VERSIONS.len(), 3);
assert_eq!(ProtocolVersion::KNOWN_VERSIONS.len(), 4);
assert!(ProtocolVersion::KNOWN_VERSIONS.contains(&ProtocolVersion::V_2024_11_05));
assert!(ProtocolVersion::KNOWN_VERSIONS.contains(&ProtocolVersion::V_2025_03_26));
assert!(ProtocolVersion::KNOWN_VERSIONS.contains(&ProtocolVersion::V_2025_06_18));
assert!(ProtocolVersion::KNOWN_VERSIONS.contains(&ProtocolVersion::V_2025_11_25));
}

@@ -880,0 +882,0 @@

@@ -5,3 +5,4 @@ #![cfg(not(feature = "local"))]

use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,
StreamableHttpServerConfig, StreamableHttpService,
session::{SessionId, local::LocalSessionManager},
};

@@ -58,3 +59,3 @@ use tokio_util::sync::CancellationToken;

// Verify priming event (first event)
// Verify priming event (first event) — initialize uses "0" (no http_request_id)
let priming_event = events[0];

@@ -77,2 +78,337 @@ assert!(priming_event.contains("id: 0"));

#[tokio::test]
async fn test_request_wise_priming_includes_http_request_id() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(
|| Ok(Calculator::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
// Initialize the session
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-11-25","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
// Send notifications/initialized
let status = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?
.status();
assert_eq!(status, 202);
// First tool call — should get http_request_id 0
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
// Priming event should encode the http_request_id (0)
let priming = events[0];
assert!(
priming.contains("id: 0/0"),
"first request priming should be 0/0, got: {priming}"
);
assert!(priming.contains("retry: 3000"));
// Response event should use index 1 (since priming occupies index 0)
let response_event = events[1];
assert!(
response_event.contains("id: 1/0"),
"first response event id should be 1/0, got: {response_event}"
);
assert!(response_event.contains(r#""id":2"#));
// Second tool call — should get http_request_id 1
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":3,"method":"tools/call","params":{"name":"sum","arguments":{"a":3,"b":4}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
let priming = events[0];
assert!(
priming.contains("id: 0/1"),
"second request priming should be 0/1, got: {priming}"
);
let response_event = events[1];
assert!(
response_event.contains("id: 1/1"),
"second response event id should be 1/1, got: {response_event}"
);
assert!(response_event.contains(r#""id":3"#));
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_resume_after_request_wise_channel_completed() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let service: StreamableHttpService<Calculator, LocalSessionManager> =
StreamableHttpService::new(
|| Ok(Calculator::new()),
Default::default(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
// Initialize session
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
// Complete handshake
let status = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?
.status();
assert_eq!(status, 202);
// Call a tool and consume the full response (channel completes)
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
let events: Vec<&str> = body.split("\n\n").filter(|e| !e.is_empty()).collect();
assert!(
events.len() >= 2,
"expected priming + response, got: {body}"
);
assert!(events[0].contains("id: 0/0"));
assert!(events[1].contains(r#""id":2"#));
// Resume with Last-Event-ID after the channel has completed.
// The server returns 200 — either with replayed cached events
// (if the channel is still retained) or an empty stream (if the
// session worker hasn't processed the completion yet).
let resume = client
.get(format!("http://{addr}/mcp"))
.header("Accept", "text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.header("last-event-id", "0/0")
.send()
.await?;
assert_eq!(resume.status(), 200);
let resume_body = resume.text().await?;
// The stream should complete (not hang), regardless of whether
// it contains replayed events or is empty.
assert!(
!resume_body.contains("standalone"),
"should not receive events from a different stream"
);
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_completed_cache_ttl_eviction() -> anyhow::Result<()> {
use std::sync::Arc;
let ct = CancellationToken::new();
let mut session_manager = LocalSessionManager::default();
session_manager.session_config.completed_cache_ttl = Duration::from_millis(200);
let session_manager = Arc::new(session_manager);
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = tcp_listener.local_addr()?;
let handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
// Initialize session
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await?;
assert_eq!(response.status(), 200);
let session_id: SessionId = response.headers()["mcp-session-id"].to_str()?.into();
// Complete handshake
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?;
// Call a tool and consume the response (channel completes)
let body = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","id":2,"method":"tools/call","params":{"name":"sum","arguments":{"a":1,"b":2}}}"#)
.send()
.await?
.text()
.await?;
assert!(body.contains(r#""id":2"#));
// Wait for TTL to expire (200ms) plus margin
tokio::time::sleep(Duration::from_millis(400)).await;
// Send a notification to trigger an event loop iteration (runs eviction)
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await?;
// Small delay to ensure the eviction ran
tokio::time::sleep(Duration::from_millis(50)).await;
// Resume after TTL — channel should be evicted. The server returns
// 200 with an empty stream (no events from a different stream).
let resume = client
.get(format!("http://{addr}/mcp"))
.header("Accept", "text/event-stream")
.header("mcp-session-id", session_id.to_string())
.header("Mcp-Protocol-Version", "2025-06-18")
.header("last-event-id", "0/0")
.send()
.await?;
assert_eq!(resume.status(), 200);
let body = resume.text().await?;
assert!(
!body.contains(r#""id":2"#),
"should NOT contain the old tool response after eviction, got: {body}"
);
ct.cancel();
handle.await?;
Ok(())
}
#[tokio::test]
async fn test_priming_on_stream_close() -> anyhow::Result<()> {

@@ -79,0 +415,0 @@ use std::sync::Arc;

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display