🚀. Socket Launch Week Day 3:Socket Firewall Now Blocks Malicious VS Code and Open VSX Extensions.Learn more
Sign In

rmcp

Package Overview
Dependencies
Maintainers
1
Versions
46
Alerts
File Explorer

Advanced tools

Socket logo

Install Socket

Detect and block malicious and high-risk dependencies

Install

rmcp - cargo Package Compare versions

Comparing version
1.6.0
to
1.7.0
+252
tests/test_streamable_http_idle_timeout_log.rs
#![cfg(all(
feature = "transport-streamable-http-server",
feature = "transport-streamable-http-client-reqwest",
not(feature = "local")
))]
use std::{
sync::{Arc, Mutex},
time::Duration,
};
use rmcp::transport::streamable_http_server::{
StreamableHttpServerConfig, StreamableHttpService,
session::{SessionManager, local::LocalSessionManager},
};
use tokio_util::sync::CancellationToken;
use tracing_subscriber::layer::SubscriberExt;
mod common;
use common::calculator::Calculator;
struct CapturedEvent {
level: tracing::Level,
target: String,
message: String,
}
struct CapturingLayer {
events: Arc<Mutex<Vec<CapturedEvent>>>,
}
impl<S: tracing::Subscriber> tracing_subscriber::Layer<S> for CapturingLayer {
fn on_event(
&self,
event: &tracing::Event<'_>,
_ctx: tracing_subscriber::layer::Context<'_, S>,
) {
let mut visitor = MessageVisitor(String::new());
event.record(&mut visitor);
self.events.lock().unwrap().push(CapturedEvent {
level: *event.metadata().level(),
target: event.metadata().target().to_string(),
message: visitor.0,
});
}
}
struct MessageVisitor(String);
impl tracing::field::Visit for MessageVisitor {
fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "message" {
self.0 = format!("{:?}", value);
}
}
}
#[tokio::test(flavor = "current_thread")]
async fn test_keep_alive_timeout_does_not_emit_error_log() {
let events = Arc::new(Mutex::new(Vec::<CapturedEvent>::new()));
let subscriber = tracing_subscriber::registry().with(CapturingLayer {
events: events.clone(),
});
let _guard = tracing::subscriber::set_default(subscriber);
let ct = CancellationToken::new();
let mut session_manager = LocalSessionManager::default();
session_manager.session_config.keep_alive = Some(Duration::from_millis(200));
let session_manager = Arc::new(session_manager);
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = tcp_listener.local_addr().unwrap();
tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let session_id = response.headers()["mcp-session-id"]
.to_str()
.unwrap()
.to_string();
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", &session_id)
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(400)).await;
// Wait until close_session() has completed so all logs are captured.
let session_id_parsed: Arc<str> = Arc::from(session_id.as_str());
for _ in 0..20 {
if !session_manager
.has_session(&session_id_parsed)
.await
.unwrap()
{
break;
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
assert!(
!session_manager
.has_session(&session_id_parsed)
.await
.unwrap(),
"session should have been removed after idle reap"
);
let captured = events.lock().unwrap();
let error_events: Vec<_> = captured
.iter()
.filter(|e| e.level == tracing::Level::ERROR && e.target.starts_with("rmcp"))
.collect();
assert!(
error_events.is_empty(),
"idle reap should not produce any ERROR logs, found {}: {:?}",
error_events.len(),
error_events.iter().map(|e| &e.message).collect::<Vec<_>>()
);
let debug_events: Vec<_> = captured
.iter()
.filter(|e| {
e.level == tracing::Level::DEBUG
&& e.target.starts_with("rmcp")
&& e.message.contains("IdleTimeout")
})
.collect();
assert!(
!debug_events.is_empty(),
"expected a DEBUG log with IdleTimeout, but found none"
);
ct.cancel();
}
#[tokio::test(flavor = "current_thread")]
async fn test_explicit_close_on_live_session_succeeds() {
let ct = CancellationToken::new();
let mut session_manager = LocalSessionManager::default();
session_manager.session_config.keep_alive = Some(Duration::from_secs(60));
let session_manager = Arc::new(session_manager);
let service = StreamableHttpService::new(
|| Ok(Calculator::new()),
session_manager.clone(),
StreamableHttpServerConfig::default()
.with_sse_keep_alive(None)
.with_cancellation_token(ct.child_token()),
);
let router = axum::Router::new().nest_service("/mcp", service);
let tcp_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = tcp_listener.local_addr().unwrap();
tokio::spawn({
let ct = ct.clone();
async move {
let _ = axum::serve(tcp_listener, router)
.with_graceful_shutdown(async move { ct.cancelled_owned().await })
.await;
}
});
let client = reqwest::Client::new();
let response = client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.body(r#"{"jsonrpc":"2.0","id":1,"method":"initialize","params":{"protocolVersion":"2025-06-18","capabilities":{},"clientInfo":{"name":"test","version":"1.0"}}}"#)
.send()
.await
.unwrap();
assert_eq!(response.status(), 200);
let session_id = response.headers()["mcp-session-id"]
.to_str()
.unwrap()
.to_string();
client
.post(format!("http://{addr}/mcp"))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", &session_id)
.header("Mcp-Protocol-Version", "2025-06-18")
.body(r#"{"jsonrpc":"2.0","method":"notifications/initialized"}"#)
.send()
.await
.unwrap();
let session_id_parsed: Arc<str> = Arc::from(session_id.as_str());
assert!(
session_manager
.has_session(&session_id_parsed)
.await
.unwrap(),
"session should exist before explicit close"
);
let result = session_manager.close_session(&session_id_parsed).await;
assert!(
result.is_ok(),
"close_session on a live worker should succeed: {result:?}"
);
assert!(
!session_manager
.has_session(&session_id_parsed)
.await
.unwrap(),
"session should not exist after explicit close"
);
ct.cancel();
}
+1
-1
{
"git": {
"sha1": "014fb2e6cd9faddbe86ae30b5cc9adf84a62edb9"
"sha1": "3529c3675ff64db805bd947ca6ece6090809e43d"
},
"path_in_vcs": "crates/rmcp"
}

@@ -15,3 +15,3 @@ # THIS FILE IS AUTOMATICALLY GENERATED BY CARGO

name = "rmcp"
version = "1.6.0"
version = "1.7.0"
build = "build.rs"

@@ -354,2 +354,6 @@ autolib = false

[[test]]
name = "test_streamable_http_idle_timeout_log"
path = "tests/test_streamable_http_idle_timeout_log.rs"
[[test]]
name = "test_streamable_http_init_timeout"

@@ -553,3 +557,3 @@ path = "tests/test_streamable_http_init_timeout.rs"

[dependencies.rmcp-macros]
version = "1.6.0"
version = "1.7.0"
optional = true

@@ -674,3 +678,7 @@

version = "0.4.38"
features = ["serde"]
features = [
"serde",
"now",
]
default-features = false

@@ -677,0 +685,0 @@ [lints.clippy]

@@ -10,2 +10,18 @@ # Changelog

## [1.7.0](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.6.0...rmcp-v1.7.0) - 2026-05-13
### Added
- add task-based stdio examples ([#839](https://github.com/modelcontextprotocol/rust-sdk/pull/839))
### Fixed
- *(rmcp)* flatten Resource variant of PromptMessageContent ([#843](https://github.com/modelcontextprotocol/rust-sdk/pull/843))
- reply -32700 on stdio parse errors instead of closing ([#833](https://github.com/modelcontextprotocol/rust-sdk/pull/833))
### Other
- *(rmcp)* remove dependency on chrono default features ([#829](https://github.com/modelcontextprotocol/rust-sdk/pull/829))
- Fix/issue 817 idle timeout log level ([#824](https://github.com/modelcontextprotocol/rust-sdk/pull/824))
## [1.6.0](https://github.com/modelcontextprotocol/rust-sdk/compare/rmcp-v1.5.0...rmcp-v1.6.0) - 2026-05-01

@@ -12,0 +28,0 @@

@@ -161,3 +161,6 @@ use serde::{Deserialize, Serialize};

/// Embedded server-side resource
Resource { resource: EmbeddedResource },
Resource {
#[serde(flatten)]
resource: EmbeddedResource,
},
/// A link to a resource that can be fetched separately

@@ -326,2 +329,53 @@ ResourceLink {

#[test]
fn test_prompt_message_resource_serialization_is_flat() {
// Regression test: PromptMessageContent::Resource must serialize to
// the spec-compliant flat shape `{ "type": "resource", "resource": { "uri", "mimeType", "text" } }`
// and NOT the double-nested shape `{ "type": "resource", "resource": { "resource": {...} } }`.
// See: https://modelcontextprotocol.io/specification/2025-06-18/server/prompts
let message = PromptMessage::new_resource(
PromptMessageRole::User,
"alc://packages/sc/narrative".to_string(),
Some("text/markdown".to_string()),
Some("# Hello".to_string()),
None,
None,
None,
);
let value: serde_json::Value = serde_json::to_value(&message).unwrap();
// Drill into content
let content = value.get("content").expect("content present");
assert_eq!(
content.get("type").and_then(|v| v.as_str()),
Some("resource")
);
let resource = content
.get("resource")
.expect("resource field present at content level");
// Spec-compliant: resource.uri / resource.mimeType / resource.text MUST be flat
assert_eq!(
resource.get("uri").and_then(|v| v.as_str()),
Some("alc://packages/sc/narrative"),
"expected flat resource.uri, got: {resource:#?}"
);
assert_eq!(
resource.get("mimeType").and_then(|v| v.as_str()),
Some("text/markdown")
);
assert_eq!(
resource.get("text").and_then(|v| v.as_str()),
Some("# Hello")
);
// Regression guard: content.resource MUST NOT contain a nested `resource` key.
assert!(
resource.get("resource").is_none(),
"double-nested resource detected (regression): {resource:#?}"
);
}
#[test]
fn test_prompt_message_content_resource_link_deserialization() {

@@ -328,0 +382,0 @@ let json = r#"{

@@ -884,3 +884,3 @@ use futures::FutureExt;

JsonRpcMessage::Response(response) => Some(&response.id),
JsonRpcMessage::Error(error) => Some(&error.id),
JsonRpcMessage::Error(error) => error.id.as_ref(),
_ => None,

@@ -975,3 +975,3 @@ } {

tracing::warn!(%id, ?error, "response error");
JsonRpcMessage::error(error, id)
JsonRpcMessage::error(error, Some(id))
}

@@ -1033,2 +1033,8 @@ };

Event::PeerMessage(JsonRpcMessage::Error(JsonRpcError { error, id, .. })) => {
let Some(id) = id else {
// MCP error responses without an id (e.g. Parse error / Invalid Request)
// can't be routed back to a pending request — log and drop.
tracing::debug!(?error, "received id-less peer error");
continue;
};
if let Some(responder) = local_responder_pool.remove(&id) {

@@ -1035,0 +1041,0 @@ let _response_result = responder.send(Err(ServiceError::McpError(error)));

@@ -222,3 +222,3 @@ use std::borrow::Cow;

transport
.send(ServerJsonRpcMessage::error(e.clone(), id))
.send(ServerJsonRpcMessage::error(e.clone(), Some(id)))
.await

@@ -225,0 +225,0 @@ .map_err(|error| {

use std::{marker::PhantomData, sync::Arc};
// use crate::schema::*;
use futures::{SinkExt, StreamExt};
use futures::SinkExt;
use serde::{Serialize, de::DeserializeOwned};
use thiserror::Error;
use tokio::{
io::{AsyncRead, AsyncWrite},
io::{AsyncBufReadExt, AsyncRead, AsyncWrite, BufReader},
sync::Mutex,

@@ -13,7 +12,10 @@ };

bytes::{Buf, BufMut, BytesMut},
codec::{Decoder, Encoder, FramedRead, FramedWrite},
codec::{Decoder, Encoder, FramedWrite},
};
use super::{IntoTransport, Transport};
use crate::service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage};
use crate::{
model::ErrorData,
service::{RxJsonRpcMessage, ServiceRole, TxJsonRpcMessage},
};

@@ -51,4 +53,6 @@ #[non_exhaustive]

pub struct AsyncRwTransport<Role: ServiceRole, R: AsyncRead, W: AsyncWrite> {
read: FramedRead<R, JsonRpcMessageCodec<RxJsonRpcMessage<Role>>>,
read: BufReader<R>,
line_buf: Vec<u8>,
write: Arc<Mutex<Option<TransportWriter<Role, W>>>>,
_role: PhantomData<fn() -> Role>,
}

@@ -62,6 +66,3 @@

pub fn new(read: R, write: W) -> Self {
let read = FramedRead::new(
read,
JsonRpcMessageCodec::<RxJsonRpcMessage<Role>>::default(),
);
let read = BufReader::new(read);
let write = Arc::new(Mutex::new(Some(FramedWrite::new(

@@ -71,3 +72,8 @@ write,

))));
Self { read, write }
Self {
read,
line_buf: Vec::new(),
write,
_role: PhantomData,
}
}

@@ -123,11 +129,39 @@ }

fn receive(&mut self) -> impl Future<Output = Option<RxJsonRpcMessage<Role>>> {
let next = self.read.next();
async {
next.await.and_then(|e| {
e.inspect_err(|e| {
async fn receive(&mut self) -> Option<RxJsonRpcMessage<Role>> {
loop {
self.line_buf.clear();
match self.read.read_until(b'\n', &mut self.line_buf).await {
Ok(0) => return None,
Ok(_) => {}
Err(e) => {
tracing::error!("Error reading from stream: {}", e);
})
.ok()
})
return None;
}
}
let line = without_carriage_return(
self.line_buf.strip_suffix(b"\n").unwrap_or(&self.line_buf),
);
if line.is_empty() {
continue;
}
match try_parse_with_compatibility::<RxJsonRpcMessage<Role>>(line, "receive") {
Ok(Some(msg)) => return Some(msg),
Ok(None) => continue,
Err(JsonRpcMessageCodecError::Serde(e)) => {
tracing::debug!("Parse error on incoming message: {e}");
let mut write = self.write.lock().await;
let framed = write.as_mut()?;
let response = TxJsonRpcMessage::<Role>::error(
ErrorData::parse_error("Parse error", None),
None,
);
if framed.send(response).await.is_err() {
return None;
}
}
Err(e) => {
tracing::error!("Error reading from stream: {}", e);
return None;
}
}
}

@@ -180,9 +214,8 @@ }

fn without_carriage_return(s: &[u8]) -> &[u8] {
if let Some(&b'\r') = s.last() {
&s[..s.len() - 1]
} else {
s
}
s.strip_suffix(b"\r").unwrap_or(s)
}
/// UTF-8 byte order mark. RFC 8259 §8.1 allows JSON parsers to ignore a leading BOM.
const UTF8_BOM: &[u8; 3] = b"\xEF\xBB\xBF";
/// Check if a method is a standard MCP method (request, response, or notification).

@@ -256,2 +289,3 @@ /// This includes both requests and notifications defined in the MCP specification.

) -> Result<Option<T>, JsonRpcMessageCodecError> {
let line = line.strip_prefix(UTF8_BOM.as_slice()).unwrap_or(line);
if let Ok(line_str) = std::str::from_utf8(line) {

@@ -416,3 +450,4 @@ match serde_json::from_slice(line) {

mod test {
use futures::{Sink, Stream};
use futures::{Sink, Stream, StreamExt};
use tokio_util::codec::FramedRead;

@@ -566,2 +601,74 @@ use super::*;

}
#[tokio::test]
async fn test_decode_strips_utf8_bom() {
use futures::StreamExt;
use tokio::io::BufReader;
// Valid JSON-RPC message preceded by a UTF-8 BOM (EF BB BF). Some Windows
// tooling and editors prepend this; the codec should ignore it per RFC 8259 §8.1.
let mut data = Vec::new();
data.extend_from_slice(UTF8_BOM);
data.extend_from_slice(br#"{"jsonrpc":"2.0","method":"ping","id":1}"#);
data.push(b'\n');
let mut cursor = BufReader::new(&data[..]);
let mut stream = from_async_read::<serde_json::Value, _>(&mut cursor);
let item = stream
.next()
.await
.expect("should decode BOM-prefixed line");
assert_eq!(
item,
serde_json::json!({"jsonrpc": "2.0", "method": "ping", "id": 1})
);
}
#[cfg(feature = "server")]
#[tokio::test]
async fn receive_recovers_from_parse_error() {
use tokio::io::AsyncWriteExt;
use crate::{RoleServer, transport::Transport};
// Two paired streams: `server_io` is wrapped by the transport; the test
// drives `client_io` to act as the peer.
let (server_io, client_io) = tokio::io::duplex(4096);
let (server_r, server_w) = tokio::io::split(server_io);
let (mut client_r, mut client_w) = tokio::io::split(client_io);
let mut transport = AsyncRwTransport::<RoleServer, _, _>::new(server_r, server_w);
client_w
.write_all(
b"not json\n{\"jsonrpc\":\"2.0\",\"method\":\"notifications/initialized\"}\n",
)
.await
.unwrap();
let received = transport
.receive()
.await
.expect("transport should recover and yield the next valid message");
// Read one line back from the peer side and parse as JSON.
let mut reply_buf = Vec::new();
let mut peer = tokio::io::BufReader::new(&mut client_r);
peer.read_until(b'\n', &mut reply_buf).await.unwrap();
let reply: serde_json::Value = serde_json::from_slice(&reply_buf).unwrap();
// Per MCP 2025-11-25: id is omitted when the server can't read the request id.
assert_eq!(
reply,
serde_json::json!({
"jsonrpc": "2.0",
"error": {"code": -32700, "message": "Parse error"},
})
);
assert_eq!(
serde_json::to_value(&received).unwrap()["method"],
"notifications/initialized",
);
}
}

@@ -69,5 +69,12 @@ use std::{

async fn close_session(&self, id: &SessionId) -> Result<(), Self::Error> {
let mut sessions = self.sessions.write().await;
if let Some(handle) = sessions.remove(id) {
handle.close().await?;
let handle = {
let mut sessions = self.sessions.write().await;
sessions.remove(id)
};
if let Some(handle) = handle {
match handle.close().await {
// Worker already exited — nothing left to clean up.
Ok(()) | Err(SessionError::SessionServiceTerminated) => {}
Err(e) => return Err(e.into()),
}
}

@@ -520,10 +527,8 @@ Ok(())

ServerJsonRpcMessage::Error(json_rpc_error) => {
if let Some(id) = self
.resource_router
.get(&ResourceKey::McpRequestId(json_rpc_error.id.clone()))
{
OutboundChannel::RequestWise {
id: *id,
close: true,
}
if let Some(id) = json_rpc_error.id.clone().and_then(|rid| {
self.resource_router
.get(&ResourceKey::McpRequestId(rid))
.copied()
}) {
OutboundChannel::RequestWise { id, close: true }
} else {

@@ -933,2 +938,3 @@ OutboundChannel::Common

FailToHandleMessage(SessionError),
#[deprecated(note = "idle timeout now surfaces as WorkerQuitReason::IdleTimeout")]
#[error("keep alive timeout after {}ms", _0.as_millis())]

@@ -1027,3 +1033,3 @@ KeepAliveTimeout(Duration),

_ = keep_alive_timeout => {
return Err(WorkerQuitReason::fatal(LocalSessionWorkerError::KeepAliveTimeout(keep_alive), "poll next session event"))
return Err(WorkerQuitReason::IdleTimeout(keep_alive))
}

@@ -1040,4 +1046,3 @@ };

crate::model::JsonRpcMessage::Error(json_rpc_error) => {
let request_id = json_rpc_error.id.clone();
Some(ResourceKey::McpRequestId(request_id))
json_rpc_error.id.clone().map(ResourceKey::McpRequestId)
}

@@ -1044,0 +1049,0 @@ _ => {

@@ -1,2 +0,2 @@

use std::borrow::Cow;
use std::{borrow::Cow, time::Duration};

@@ -25,2 +25,4 @@ use tokio_util::sync::CancellationToken;

HandlerTerminated,
#[error("Worker idle timeout after {}ms", _0.as_millis())]
IdleTimeout(Duration),
}

@@ -126,3 +128,4 @@

| WorkerQuitReason::TransportClosed
| WorkerQuitReason::HandlerTerminated => {
| WorkerQuitReason::HandlerTerminated
| WorkerQuitReason::IdleTimeout(_) => {
tracing::debug!("worker quit with reason: {:?}", e);

@@ -129,0 +132,0 @@ }

@@ -33,3 +33,3 @@ // cargo test --features "server client" --package rmcp test_client_initialization

jsonrpc: JsonRpcVersion2_0,
id: RequestId::Number(1),
id: Some(RequestId::Number(1)),
error: ErrorData {

@@ -36,0 +36,0 @@ code: ErrorCode(-32600),

@@ -865,3 +865,10 @@ {

"id": {
"$ref": "#/definitions/NumberOrString"
"anyOf": [
{
"$ref": "#/definitions/NumberOrString"
},
{
"type": "null"
}
]
},

@@ -874,3 +881,2 @@ "jsonrpc": {

"jsonrpc",
"id",
"error"

@@ -877,0 +883,0 @@ ]

@@ -865,3 +865,10 @@ {

"id": {
"$ref": "#/definitions/NumberOrString"
"anyOf": [
{
"$ref": "#/definitions/NumberOrString"
},
{
"type": "null"
}
]
},

@@ -874,3 +881,2 @@ "jsonrpc": {

"jsonrpc",
"id",
"error"

@@ -877,0 +883,0 @@ ]

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is not supported yet

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display

Sorry, the diff of this file is too big to display