Skip to content

Commit 9ca20c6

Browse files
authored
feat: mark boxed http body as sync (#291)
1 parent 1b3db89 commit 9ca20c6

File tree

3 files changed

+30
-42
lines changed

3 files changed

+30
-42
lines changed

crates/rmcp/src/transport/common/server_side_http.rs

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::{convert::Infallible, fmt::Display, sync::Arc, time::Duration};
44
use bytes::{Buf, Bytes};
55
use http::Response;
66
use http_body::Body;
7-
use http_body_util::{BodyExt, Empty, Full, combinators::UnsyncBoxBody};
7+
use http_body_util::{BodyExt, Empty, Full, combinators::BoxBody};
88
use sse_stream::{KeepAlive, Sse, SseBody};
99

1010
use super::http_header::EVENT_STREAM_MIME_TYPE;
@@ -18,12 +18,12 @@ pub fn session_id() -> SessionId {
1818

1919
pub const DEFAULT_AUTO_PING_INTERVAL: Duration = Duration::from_secs(15);
2020

21-
pub(crate) type BoxResponse = Response<UnsyncBoxBody<Bytes, Infallible>>;
21+
pub(crate) type BoxResponse = Response<BoxBody<Bytes, Infallible>>;
2222

23-
pub(crate) fn accepted_response() -> Response<UnsyncBoxBody<Bytes, Infallible>> {
23+
pub(crate) fn accepted_response() -> Response<BoxBody<Bytes, Infallible>> {
2424
Response::builder()
2525
.status(http::StatusCode::ACCEPTED)
26-
.body(Empty::new().boxed_unsync())
26+
.body(Empty::new().boxed())
2727
.expect("valid response")
2828
}
2929
pin_project_lite::pin_project! {
@@ -63,9 +63,9 @@ pub struct ServerSseMessage {
6363
}
6464

6565
pub(crate) fn sse_stream_response(
66-
stream: impl futures::Stream<Item = ServerSseMessage> + Send + 'static,
66+
stream: impl futures::Stream<Item = ServerSseMessage> + Send + Sync + 'static,
6767
keep_alive: Option<Duration>,
68-
) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
68+
) -> Response<BoxBody<Bytes, Infallible>> {
6969
use futures::StreamExt;
7070
let stream = SseBody::new(stream.map(|message| {
7171
let data = serde_json::to_string(&message.message).expect("valid message");
@@ -76,8 +76,8 @@ pub(crate) fn sse_stream_response(
7676
let stream = match keep_alive {
7777
Some(duration) => stream
7878
.with_keep_alive::<TokioTimer>(KeepAlive::new().interval(duration))
79-
.boxed_unsync(),
80-
None => stream.boxed_unsync(),
79+
.boxed(),
80+
None => stream.boxed(),
8181
};
8282
Response::builder()
8383
.status(http::StatusCode::OK)
@@ -89,7 +89,7 @@ pub(crate) fn sse_stream_response(
8989

9090
pub(crate) const fn internal_error_response<E: Display>(
9191
context: &str,
92-
) -> impl FnOnce(E) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
92+
) -> impl FnOnce(E) -> Response<BoxBody<Bytes, Infallible>> {
9393
move |error| {
9494
tracing::error!("Internal server error when {context}: {error}");
9595
Response::builder()
@@ -98,24 +98,22 @@ pub(crate) const fn internal_error_response<E: Display>(
9898
Full::new(Bytes::from(format!(
9999
"Encounter an error when {context}: {error}"
100100
)))
101-
.boxed_unsync(),
101+
.boxed(),
102102
)
103103
.expect("valid response")
104104
}
105105
}
106106

107-
pub(crate) fn unexpected_message_response(
108-
expect: &str,
109-
) -> Response<UnsyncBoxBody<Bytes, Infallible>> {
107+
pub(crate) fn unexpected_message_response(expect: &str) -> Response<BoxBody<Bytes, Infallible>> {
110108
Response::builder()
111109
.status(http::StatusCode::UNPROCESSABLE_ENTITY)
112-
.body(Full::new(Bytes::from(format!("Unexpected message, expect {expect}"))).boxed_unsync())
110+
.body(Full::new(Bytes::from(format!("Unexpected message, expect {expect}"))).boxed())
113111
.expect("valid response")
114112
}
115113

116114
pub(crate) async fn expect_json<B>(
117115
body: B,
118-
) -> Result<ClientJsonRpcMessage, Response<UnsyncBoxBody<Bytes, Infallible>>>
116+
) -> Result<ClientJsonRpcMessage, Response<BoxBody<Bytes, Infallible>>>
119117
where
120118
B: Body + Send + 'static,
121119
B::Error: Display,
@@ -129,7 +127,7 @@ where
129127
.status(http::StatusCode::UNSUPPORTED_MEDIA_TYPE)
130128
.body(
131129
Full::new(Bytes::from(format!("fail to deserialize request body {e}")))
132-
.boxed_unsync(),
130+
.boxed(),
133131
)
134132
.expect("valid response");
135133
Err(response)
@@ -139,10 +137,7 @@ where
139137
Err(e) => {
140138
let response = Response::builder()
141139
.status(http::StatusCode::INTERNAL_SERVER_ERROR)
142-
.body(
143-
Full::new(Bytes::from(format!("Failed to read request body: {e}")))
144-
.boxed_unsync(),
145-
)
140+
.body(Full::new(Bytes::from(format!("Failed to read request body: {e}"))).boxed())
146141
.expect("valid response");
147142
Err(response)
148143
}

crates/rmcp/src/transport/streamable_http_server/session.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ pub trait SessionManager: Send + Sync + 'static {
3131
id: &SessionId,
3232
message: ClientJsonRpcMessage,
3333
) -> impl Future<
34-
Output = Result<impl Stream<Item = ServerSseMessage> + Send + 'static, Self::Error>,
34+
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
3535
> + Send;
3636
fn accept_message(
3737
&self,
@@ -42,13 +42,13 @@ pub trait SessionManager: Send + Sync + 'static {
4242
&self,
4343
id: &SessionId,
4444
) -> impl Future<
45-
Output = Result<impl Stream<Item = ServerSseMessage> + Send + 'static, Self::Error>,
45+
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
4646
> + Send;
4747
fn resume(
4848
&self,
4949
id: &SessionId,
5050
last_event_id: String,
5151
) -> impl Future<
52-
Output = Result<impl Stream<Item = ServerSseMessage> + Send + 'static, Self::Error>,
52+
Output = Result<impl Stream<Item = ServerSseMessage> + Send + Sync + 'static, Self::Error>,
5353
> + Send;
5454
}

crates/rmcp/src/transport/streamable_http_server/tower.rs

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use bytes::Bytes;
44
use futures::{StreamExt, future::BoxFuture};
55
use http::{Method, Request, Response, header::ALLOW};
66
use http_body::Body;
7-
use http_body_util::{BodyExt, Full, combinators::UnsyncBoxBody};
7+
use http_body_util::{BodyExt, Full, combinators::BoxBody};
88
use tokio_stream::wrappers::ReceiverStream;
99

1010
use super::session::SessionManager;
@@ -105,7 +105,7 @@ where
105105
fn get_service(&self) -> Result<S, std::io::Error> {
106106
(self.service_factory)()
107107
}
108-
pub async fn handle<B>(&self, request: Request<B>) -> Response<UnsyncBoxBody<Bytes, Infallible>>
108+
pub async fn handle<B>(&self, request: Request<B>) -> Response<BoxBody<Bytes, Infallible>>
109109
where
110110
B: Body + Send + 'static,
111111
B::Error: Display,
@@ -120,7 +120,7 @@ where
120120
let response = Response::builder()
121121
.status(http::StatusCode::METHOD_NOT_ALLOWED)
122122
.header(ALLOW, "GET, POST, DELETE")
123-
.body(Full::new(Bytes::from("Method Not Allowed")).boxed_unsync())
123+
.body(Full::new(Bytes::from("Method Not Allowed")).boxed())
124124
.expect("valid response");
125125
return response;
126126
}
@@ -148,7 +148,7 @@ where
148148
Full::new(Bytes::from(
149149
"Not Acceptable: Client must accept text/event-stream",
150150
))
151-
.boxed_unsync(),
151+
.boxed(),
152152
)
153153
.expect("valid response"));
154154
}
@@ -162,7 +162,7 @@ where
162162
// unauthorized
163163
return Ok(Response::builder()
164164
.status(http::StatusCode::UNAUTHORIZED)
165-
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed_unsync())
165+
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed())
166166
.expect("valid response"));
167167
};
168168
// check if session exists
@@ -175,7 +175,7 @@ where
175175
// unauthorized
176176
return Ok(Response::builder()
177177
.status(http::StatusCode::UNAUTHORIZED)
178-
.body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed_unsync())
178+
.body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed())
179179
.expect("valid response"));
180180
}
181181
// check if last event id is provided
@@ -219,7 +219,7 @@ where
219219
{
220220
return Ok(Response::builder()
221221
.status(http::StatusCode::NOT_ACCEPTABLE)
222-
.body(Full::new(Bytes::from("Not Acceptable: Client must accept both application/json and text/event-stream")).boxed_unsync())
222+
.body(Full::new(Bytes::from("Not Acceptable: Client must accept both application/json and text/event-stream")).boxed())
223223
.expect("valid response"));
224224
}
225225

@@ -236,7 +236,7 @@ where
236236
Full::new(Bytes::from(
237237
"Unsupported Media Type: Content-Type must be application/json",
238238
))
239-
.boxed_unsync(),
239+
.boxed(),
240240
)
241241
.expect("valid response"));
242242
}
@@ -265,10 +265,7 @@ where
265265
// unauthorized
266266
return Ok(Response::builder()
267267
.status(http::StatusCode::UNAUTHORIZED)
268-
.body(
269-
Full::new(Bytes::from("Unauthorized: Session not found"))
270-
.boxed_unsync(),
271-
)
268+
.body(Full::new(Bytes::from("Unauthorized: Session not found")).boxed())
272269
.expect("valid response"));
273270
}
274271

@@ -307,8 +304,7 @@ where
307304
_ => Ok(Response::builder()
308305
.status(http::StatusCode::NOT_IMPLEMENTED)
309306
.body(
310-
Full::new(Bytes::from("Batch requests are not supported yet"))
311-
.boxed_unsync(),
307+
Full::new(Bytes::from("Batch requests are not supported yet")).boxed(),
312308
)
313309
.expect("valid response")),
314310
}
@@ -415,10 +411,7 @@ where
415411
ClientJsonRpcMessage::Error(_json_rpc_error) => Ok(accepted_response()),
416412
_ => Ok(Response::builder()
417413
.status(http::StatusCode::NOT_IMPLEMENTED)
418-
.body(
419-
Full::new(Bytes::from("Batch requests are not supported yet"))
420-
.boxed_unsync(),
421-
)
414+
.body(Full::new(Bytes::from("Batch requests are not supported yet")).boxed())
422415
.expect("valid response")),
423416
}
424417
}
@@ -439,7 +432,7 @@ where
439432
// unauthorized
440433
return Ok(Response::builder()
441434
.status(http::StatusCode::UNAUTHORIZED)
442-
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed_unsync())
435+
.body(Full::new(Bytes::from("Unauthorized: Session ID is required")).boxed())
443436
.expect("valid response"));
444437
};
445438
// close session

0 commit comments

Comments
 (0)