🚀. Socket Launch Week Day 3:Socket Firewall Now Blocks Malicious VS Code and Open VSX Extensions.Learn more
Sign In

rmcp

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rmcp - cargo Package Compare versions

Comparing version
1.1.1
to
1.2.0
+1
-1
.cargo_vcs_info.json
{
"git": {
"sha1": "1158cfe1b80b97272fd2d1d137e94754d2635e5a"
"sha1": "3bd75220708b2e9f8c74a3fe3277ac5d4f03f478"
},
"path_in_vcs": "crates/rmcp"
}

@@ -15,3 +15,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO

name = "rmcp"
version = "1.1.1"
version = "1.2.0"
build = "build.rs"

@@ -408,3 +408,3 @@ autolib = false

[dependencies.jsonwebtoken]
version = "9"
version = "10"
optional = true

@@ -443,3 +443,3 @@

[dependencies.rmcp-macros]
version = "1.1.1"
version = "1.2.0"
optional = true

@@ -446,0 +446,0 @@

@@ -10,2 +10,18 @@ # Changelog

## [1.2.0](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.1.1...rmcp-v1.2.0) - 2026-03-11
### Added
- add missing constructors for non-exhaustive model types ([#739](https://github.com/modelcontextprotocol/rust-sdk/pull/739))
- include granted scopes in OAuth refresh token request ([#731](https://github.com/modelcontextprotocol/rust-sdk/pull/731))
### Fixed
- handle ping requests sent before initialize handshake ([#745](https://github.com/modelcontextprotocol/rust-sdk/pull/745))
- allow deserializing notifications without params field ([#729](https://github.com/modelcontextprotocol/rust-sdk/pull/729))
### Other
- *(deps)* update jsonwebtoken requirement from 9 to 10 ([#737](https://github.com/modelcontextprotocol/rust-sdk/pull/737))
## [1.1.1](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.1.0...rmcp-v1.1.1) - 2026-03-09

@@ -12,0 +28,0 @@

@@ -249,4 +249,15 @@ use std::borrow::Cow;

{
let body = Proxy::deserialize(deserializer)?;
let _meta = body.params._meta.map(|m| m.into_owned());
let body = ProxyOptionalParam::<'_, _, R>::deserialize(deserializer)?;
let (_meta, params) = match body.params {
Some(with_meta) => {
let meta = with_meta._meta.map(|m| m.into_owned());
(meta, with_meta._rest)
}
None => {
// JSON-RPC 2.0: params is optional. Treat absent params as {}.
let empty = serde_json::Value::Object(serde_json::Map::new());
let r = R::deserialize(empty).map_err(serde::de::Error::custom)?;
(None, r)
}
};
let mut extensions = Extensions::new();

@@ -259,3 +270,3 @@ if let Some(meta) = _meta {

method: body.method,
params: body.params._rest,
params,
})

@@ -262,0 +273,0 @@ }

@@ -134,18 +134,2 @@ use std::borrow::Cow;

/// Helper function to expect a request from the stream
async fn expect_request<T>(
transport: &mut T,
context: &str,
) -> Result<(ClientRequest, RequestId), ServerInitializeError>
where
T: Transport<RoleServer>,
{
let msg = expect_next_message(transport, context).await?;
let msg_clone = msg.clone();
msg.into_request()
.ok_or(ServerInitializeError::ExpectedInitializeRequest(Some(
msg_clone,
)))
}
pub async fn serve_server_with_ct<S, T, E, A>(

@@ -181,4 +165,31 @@ service: S,

// Get initialize request
let (request, id) = expect_request(&mut transport, "initialized request").await?;
// Get initialize request; the MCP spec permits ping before initialize.
// See: https://modelcontextprotocol.io/specification/2025-11-25/basic/lifecycle#initialization
let (request, id) = loop {
let msg = expect_next_message(&mut transport, "initialize request").await?;
match msg {
ClientJsonRpcMessage::Request(req)
if matches!(req.request, ClientRequest::PingRequest(_)) =>
{
transport
.send(ServerJsonRpcMessage::response(
ServerResult::EmptyResult(EmptyResult {}),
req.id,
))
.await
.map_err(|error| {
ServerInitializeError::transport::<T>(
error,
"sending pre-init ping response",
)
})?;
}
ClientJsonRpcMessage::Request(req) => break (req.request, req.id),
other => {
return Err(ServerInitializeError::ExpectedInitializeRequest(Some(
other,
)));
}
}
};

@@ -185,0 +196,0 @@ let ClientRequest::InitializeRequest(peer_info) = &request else {

@@ -147,2 +147,3 @@ use std::{borrow::Cow, collections::HashMap, sync::Arc};

request = apply_custom_headers(request, custom_headers)?;
let session_was_attached = session_id.is_some();
if let Some(session_id) = session_id {

@@ -190,2 +191,5 @@ request = request.header(HEADER_SESSION_ID, session_id.as_ref());

}
if status == reqwest::StatusCode::NOT_FOUND && session_was_attached {
return Err(StreamableHttpError::SessionExpired);
}
if !status.is_success() {

@@ -192,0 +196,0 @@ let body = response

@@ -14,3 +14,6 @@ use std::{borrow::Cow, collections::HashMap, sync::Arc, time::Duration};

RoleClient,
model::{ClientJsonRpcMessage, ServerJsonRpcMessage, ServerResult},
model::{
ClientJsonRpcMessage, ClientNotification, InitializedNotification, ServerJsonRpcMessage,
ServerResult,
},
transport::{

@@ -83,2 +86,4 @@ common::client_side_sse::SseAutoReconnectStream,

ReservedHeaderConflict(String),
#[error("Session expired (HTTP 404)")]
SessionExpired,
}

@@ -312,2 +317,65 @@

}
/// Performs a transparent re-initialization handshake after a session-expired 404.
///
/// Takes an owned clone of the client (avoiding `&self` across `.await` so the
/// future remains `Send` without requiring `C: Sync`). POSTs the saved
/// initialize request without a session ID, extracts the new session ID and
/// protocol version, sends `notifications/initialized`, and returns the new
/// `(session_id, protocol_headers)` pair. The init result message is **not**
/// forwarded to the handler because the handler already processed the original
/// initialization.
async fn perform_reinitialization(
client: C,
saved_init_request: ClientJsonRpcMessage,
uri: Arc<str>,
auth_header: Option<String>,
custom_headers: HashMap<HeaderName, HeaderValue>,
) -> Result<(Option<Arc<str>>, HashMap<HeaderName, HeaderValue>), StreamableHttpError<C::Error>>
{
let (init_msg, new_session_id_str) = client
.post_message(
uri.clone(),
saved_init_request,
None,
auth_header.clone(),
custom_headers.clone(),
)
.await?
.expect_initialized::<C::Error>()
.await?;
let new_session_id: Option<Arc<str>> = new_session_id_str.map(|s| Arc::from(s.as_str()));
// Start from custom_headers, then inject the negotiated MCP-Protocol-Version
// so all subsequent requests carry the right version (MCP 2025-06-18 spec).
let mut new_protocol_headers = custom_headers;
if let ServerJsonRpcMessage::Response(response) = &init_msg {
if let ServerResult::InitializeResult(init_result) = &response.result {
if let Ok(hv) = HeaderValue::from_str(init_result.protocol_version.as_str()) {
new_protocol_headers
.insert(HeaderName::from_static("mcp-protocol-version"), hv);
}
}
}
let initialized_notification = ClientJsonRpcMessage::notification(
ClientNotification::InitializedNotification(InitializedNotification {
method: Default::default(),
extensions: Default::default(),
}),
);
client
.post_message(
uri,
initialized_notification,
new_session_id.clone(),
auth_header,
new_protocol_headers.clone(),
)
.await?
.expect_accepted_or_json::<C::Error>()?;
Ok((new_session_id, new_protocol_headers))
}
}

@@ -344,2 +412,3 @@

} = context.recv_from_handler().await?;
let saved_init_request = initialize_request.clone();
let (message, session_id) = match self

@@ -351,4 +420,4 @@ .client

None,
self.config.auth_header,
self.config.custom_headers,
config.auth_header.clone(),
config.custom_headers.clone(),
)

@@ -372,3 +441,3 @@ .await

};
let session_id: Option<Arc<str>> = if let Some(session_id) = session_id {
let mut session_id: Option<Arc<str>> = if let Some(session_id) = session_id {
Some(session_id.into())

@@ -387,3 +456,3 @@ } else {

// for all subsequent HTTP requests (per MCP 2025-06-18 spec).
let protocol_headers = {
let mut protocol_headers = {
let mut headers = config.custom_headers.clone();

@@ -402,3 +471,3 @@ if let ServerJsonRpcMessage::Response(response) = &message {

// Store session info for cleanup when run() exits (not spawned, so cleanup completes before close() returns)
let session_cleanup_info = session_id.as_ref().map(|sid| SessionCleanupInfo {
let mut session_cleanup_info = session_id.as_ref().map(|sid| SessionCleanupInfo {
client: self.client.clone(),

@@ -527,2 +596,5 @@ uri: config.uri.clone(),

let WorkerSendRequest { message, responder } = send_request;
// Pass a clone to the first attempt so `message` is retained for a
// potential re-init retry. `post_message` takes ownership and the
// trait cannot be changed, so the clone is unavoidable.
let response = self

@@ -532,3 +604,3 @@ .client

config.uri.clone(),
message,
message.clone(),
session_id.clone(),

@@ -540,2 +612,153 @@ config.auth_header.clone(),

let send_result = match response {
Err(StreamableHttpError::SessionExpired) => {
// The server discarded the session (HTTP 404). Perform a
// fresh handshake once and replay the original message.
tracing::info!(
"session expired (HTTP 404), attempting transparent re-initialization"
);
match Self::perform_reinitialization(
self.client.clone(),
saved_init_request.clone(),
config.uri.clone(),
config.auth_header.clone(),
config.custom_headers.clone(),
)
.await
{
Ok((new_session_id, new_protocol_headers)) => {
// Old streams hold the stale session ID; abort them
// so the new standalone SSE stream takes over.
streams.abort_all();
session_id = new_session_id;
protocol_headers = new_protocol_headers;
session_cleanup_info =
session_id.as_ref().map(|sid| SessionCleanupInfo {
client: self.client.clone(),
uri: config.uri.clone(),
session_id: sid.clone(),
auth_header: config.auth_header.clone(),
protocol_headers: protocol_headers.clone(),
});
if let Some(new_sid) = &session_id {
let client = self.client.clone();
let uri = config.uri.clone();
let new_sid = new_sid.clone();
let auth_header = config.auth_header.clone();
let retry_config = self.config.retry_config.clone();
let sse_tx = sse_worker_tx.clone();
let task_ct = transport_task_ct.clone();
let config_uri = config.uri.clone();
let config_auth = config.auth_header.clone();
let spawn_headers = protocol_headers.clone();
streams.spawn(async move {
match client
.get_stream(
uri,
new_sid.clone(),
None,
auth_header.clone(),
spawn_headers.clone(),
)
.await
{
Ok(stream) => {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: client.clone(),
session_id: new_sid,
uri: config_uri,
auth_header: config_auth,
custom_headers: spawn_headers,
},
retry_config,
);
Self::execute_sse_stream(
sse_stream,
sse_tx,
false,
task_ct.child_token(),
)
.await
}
Err(StreamableHttpError::ServerDoesNotSupportSse) => {
tracing::debug!(
"server doesn't support sse after re-init"
);
Ok(())
}
Err(e) => {
tracing::error!(
"fail to get common stream after re-init: {e}"
);
Err(e)
}
}
});
}
let retry_response = self
.client
.post_message(
config.uri.clone(),
message,
session_id.clone(),
config.auth_header.clone(),
protocol_headers.clone(),
)
.await;
match retry_response {
Err(e) => Err(e),
Ok(StreamableHttpPostResponse::Accepted) => {
tracing::trace!(
"client message accepted after re-init"
);
Ok(())
}
Ok(StreamableHttpPostResponse::Json(msg, ..)) => {
context.send_to_handler(msg).await?;
Ok(())
}
Ok(StreamableHttpPostResponse::Sse(stream, ..)) => {
if let Some(sid) = &session_id {
let sse_stream = SseAutoReconnectStream::new(
stream,
StreamableHttpClientReconnect {
client: self.client.clone(),
session_id: sid.clone(),
uri: config.uri.clone(),
auth_header: config.auth_header.clone(),
custom_headers: protocol_headers.clone(),
},
self.config.retry_config.clone(),
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
} else {
let sse_stream =
SseAutoReconnectStream::never_reconnect(
stream,
StreamableHttpError::<C::Error>::UnexpectedEndOfStream,
);
streams.spawn(Self::execute_sse_stream(
sse_stream,
sse_worker_tx.clone(),
true,
transport_task_ct.child_token(),
));
}
tracing::trace!("got new sse stream after re-init");
Ok(())
}
}
}
Err(reinit_err) => Err(reinit_err),
}
}
Err(e) => Err(e),

@@ -542,0 +765,0 @@ Ok(StreamableHttpPostResponse::Accepted) => {

@@ -99,2 +99,43 @@ // cargo test --features "client" --package rmcp -- server_init

// Server responds with EmptyResult to ping received before initialize request.
#[tokio::test]
async fn server_init_ping_response_is_empty_result_before_initialize() {
let (server_transport, client_transport) = tokio::io::duplex(4096);
let _server = tokio::spawn(async move { TestServer::new().serve(server_transport).await });
let mut client = IntoTransport::<rmcp::RoleClient, _, _>::into_transport(client_transport);
client.send(ping_request(1)).await.unwrap();
let response = client.receive().await.unwrap();
assert!(
matches!(
response,
ServerJsonRpcMessage::Response(ref r)
if matches!(r.result, ServerResult::EmptyResult(_))
),
"expected EmptyResult for pre-initialize ping, got: {response:?}"
);
}
// Server initializes successfully when ping is sent before the initialize request.
#[tokio::test]
async fn server_init_succeeds_after_ping_before_initialize() {
let (server_transport, client_transport) = tokio::io::duplex(4096);
let server_handle =
tokio::spawn(async move { TestServer::new().serve(server_transport).await });
let mut client = IntoTransport::<rmcp::RoleClient, _, _>::into_transport(client_transport);
client.send(ping_request(1)).await.unwrap();
let _pong = client.receive().await.unwrap();
do_initialize(&mut client).await;
client.send(initialized_notification()).await.unwrap();
let result = server_handle.await.unwrap();
assert!(
result.is_ok(),
"server should initialize successfully after pre-initialize ping"
);
result.unwrap().cancel().await.unwrap();
}
// Server responds with EmptyResult to ping received before initialized.

@@ -101,0 +142,0 @@ #[tokio::test]

@@ -10,5 +10,9 @@ #![cfg(all(

use rmcp::{
ServiceExt,
model::{ClientJsonRpcMessage, ClientRequest, PingRequest, RequestId},
transport::{
streamable_http_client::{StreamableHttpClient, StreamableHttpError},
StreamableHttpClientTransport,
streamable_http_client::{
StreamableHttpClient, StreamableHttpClientTransportConfig, StreamableHttpError,
},
streamable_http_server::{

@@ -80,14 +84,6 @@ StreamableHttpServerConfig, StreamableHttpService, session::local::LocalSessionManager,

match result {
Err(StreamableHttpError::UnexpectedServerResponse(message)) => {
let message = message.to_string();
assert!(
message.contains("404"),
"error should include HTTP status code, got: {message}"
);
assert!(
message.to_ascii_lowercase().contains("session not found"),
"error should include session-not-found hint, got: {message}"
);
Err(StreamableHttpError::SessionExpired) => {
// Expected: post_message detects 404 with a session ID and returns SessionExpired
}
other => panic!("expected UnexpectedServerResponse, got: {other:?}"),
other => panic!("expected SessionExpired, got: {other:?}"),
}

@@ -100,1 +96,80 @@

}
/// Verify that when the server loses a session (returns HTTP 404), the client
/// transparently re-initializes and the original request succeeds.
#[tokio::test]
async fn test_transparent_reinitialization_on_session_expiry() -> anyhow::Result<()> {
let ct = CancellationToken::new();
let session_manager = Arc::new(LocalSessionManager::default());
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig {
stateful_mode: true,
sse_keep_alive: None,
cancellation_token: ct.child_token(),
..Default::default()
},
);
let router = axum::Router::new().nest_service("/mcp", service);
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await?;
let addr = listener.local_addr()?;
let server_handle = tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
// Connect a full client transport (this performs initialize + notifications/initialized)
let transport = StreamableHttpClientTransport::from_config(
StreamableHttpClientTransportConfig::with_uri(format!("http://{addr}/mcp")),
);
let client = ().serve(transport).await?;
// Verify the session is established: list_all_resources() succeeds
let _resources = client.list_all_resources().await?;
// Capture the current session ID from the server
let original_session_id = {
let sessions = session_manager.sessions.read().await;
sessions
.keys()
.next()
.cloned()
.expect("session should exist")
};
// Force session expiry by removing all sessions from the server-side manager
{
let mut sessions = session_manager.sessions.write().await;
sessions.clear();
}
// This call should trigger transparent re-initialization and still succeed
let _resources_after = client.list_all_resources().await?;
// Verify the server created a new session with a different ID
{
let sessions = session_manager.sessions.read().await;
let new_session_id = sessions
.keys()
.next()
.expect("new session should exist after re-initialization");
assert_ne!(
new_session_id, &original_session_id,
"new session ID should differ from the original"
);
}
let _ = client.cancel().await;
ct.cancel();
server_handle.await?;
Ok(())
}

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display