Skip to content

refactor(proxy/http): a concrete orig_proto error #3901

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions linkerd/proxy/http/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ where
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<()>> {
match self {
Self::H2(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx),
Self::OrigProtoUpgrade(ref mut svc) => svc.poll_ready(cx).map_err(Into::into),
Self::Http1(ref mut svc) => svc.poll_ready(cx),
}
}
Expand All @@ -156,7 +156,7 @@ where

match self {
Self::Http1(ref mut svc) => svc.call(req),
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req),
Self::OrigProtoUpgrade(ref mut svc) => svc.call(req).map_err(Into::into).boxed(),
Self::H2(ref mut svc) => Box::pin(
svc.call(req)
.err_into::<Error>()
Expand Down
134 changes: 86 additions & 48 deletions linkerd/proxy/http/src/orig_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use super::{h1, h2, Body};
use futures::prelude::*;
use http::header::{HeaderValue, TRANSFER_ENCODING};
use http_body::Frame;
use linkerd_error::{Error, Result};
use linkerd_error::Result;
use linkerd_http_box::BoxBody;
use linkerd_stack::{layer, MakeConnection, Service};
use std::{
Expand Down Expand Up @@ -43,6 +43,20 @@ pub struct Downgrade<S> {
#[derive(Clone, Debug)]
pub struct WasUpgrade(());

/// An error returned by the [`Upgrade`] client.
///
/// This can represent an error presented by either of the underlying HTTP/1 or HTTP/2 clients,
/// or a "downgraded" HTTP/2 error.
#[derive(Debug, Error)]
pub enum Error {
#[error("{0}")]
Downgraded(#[from] DowngradedH2Error),
#[error(transparent)]
H1(linkerd_error::Error),
#[error(transparent)]
H2(hyper::Error),
}

// === impl Upgrade ===

impl<C, T, B> Upgrade<C, T, B> {
Expand All @@ -59,22 +73,24 @@ where
C::Future: Unpin + Send + 'static,
B: crate::Body + Send + Unpin + 'static,
B::Data: Send,
B::Error: Into<Error> + Send + Sync,
B::Error: Into<linkerd_error::Error> + Send + Sync,
{
type Response = http::Response<BoxBody>;
type Error = Error;
type Future = Pin<Box<dyn Future<Output = Result<http::Response<BoxBody>>> + Send + 'static>>;
type Future = Pin<
Box<dyn Future<Output = Result<http::Response<BoxBody>, Self::Error>> + Send + 'static>,
>;

#[inline]
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
let Self { http1, h2 } = self;

match http1.poll_ready(cx) {
match http1.poll_ready(cx).map_err(Error::H1) {
Poll::Ready(Ok(())) => {}
poll => return poll,
}

h2.poll_ready(cx).map_err(downgrade_h2_error)
h2.poll_ready(cx).map_err(Error::h2)
}

fn call(&mut self, mut req: http::Request<B>) -> Self::Future {
Expand All @@ -85,7 +101,12 @@ where
.is_some()
{
debug!("Skipping orig-proto upgrade due to HTTP/1.1 upgrade");
return Box::pin(self.http1.call(req).map_ok(|rsp| rsp.map(BoxBody::new)));
return Box::pin(
self.http1
.call(req)
.map_ok(|rsp| rsp.map(BoxBody::new))
.map_err(Error::H1),
);
}

let orig_version = req.version();
Expand Down Expand Up @@ -113,97 +134,113 @@ where

*req.version_mut() = http::Version::HTTP_2;

Box::pin(
self.h2
.call(req)
.map_err(downgrade_h2_error)
.map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
}),
)
Box::pin(self.h2.call(req).map_err(Error::h2).map_ok(move |mut rsp| {
let version = rsp
.headers_mut()
.remove(L5D_ORIG_PROTO)
.and_then(|orig_proto| {
if orig_proto == "HTTP/1.1" {
Some(http::Version::HTTP_11)
} else if orig_proto == "HTTP/1.0" {
Some(http::Version::HTTP_10)
} else {
None
}
})
.unwrap_or(orig_version);
trace!(?version, "Downgrading response");
*rsp.version_mut() = version;
rsp.map(|inner| BoxBody::new(UpgradeResponseBody { inner }))
}))
}
}

// === impl Error ===

impl Error {
fn h2(err: hyper::Error) -> Self {
if let Some(downgraded) = downgrade_h2_error(&err) {
return Self::Downgraded(downgraded);
}

Self::H2(err)
}
}

/// Handles HTTP/2 client errors for HTTP/1.1 requests by wrapping the error type. This
/// simplifies error handling elsewhere so that HTTP/2 errors can only be encountered when the
/// original request was HTTP/2.
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(orig: E) -> Error {
fn downgrade_h2_error<E: std::error::Error + Send + Sync + 'static>(
orig: &E,
) -> Option<DowngradedH2Error> {
#[inline]
fn reason(e: &(dyn std::error::Error + 'static)) -> Option<h2::Reason> {
e.downcast_ref::<h2::H2Error>()?.reason()
}

// If the provided error was an H2 error, wrap it as a downgraded error.
if let Some(reason) = reason(&orig) {
return DowngradedH2Error(reason).into();
if let Some(reason) = reason(orig) {
return Some(DowngradedH2Error(reason));
}

// Otherwise, check the source chain to see if its original error was an H2 error.
let mut cause = orig.source();
while let Some(error) = cause {
if let Some(reason) = reason(error) {
return DowngradedH2Error(reason).into();
return Some(DowngradedH2Error(reason));
}

cause = error.source();
}

// If the error was not an H2 error, return the original error (boxed).
orig.into()
// If the error was not an H2 error, return None.
None
}

#[cfg(test)]
#[test]
fn test_downgrade_h2_error() {
assert!(
downgrade_h2_error(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is::<DowngradedH2Error>(),
downgrade_h2_error(&h2::H2Error::from(h2::Reason::PROTOCOL_ERROR)).is_some(),
"h2 errors must be downgraded"
);

#[derive(Debug, Error)]
#[error("wrapped h2 error: {0}")]
struct WrapError(#[source] Error);
struct WrapError(#[source] linkerd_error::Error);
assert!(
downgrade_h2_error(WrapError(
downgrade_h2_error(&WrapError(
h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()
))
.is::<DowngradedH2Error>(),
.is_some(),
"wrapped h2 errors must be downgraded"
);

assert!(
downgrade_h2_error(WrapError(
downgrade_h2_error(&WrapError(
WrapError(h2::H2Error::from(h2::Reason::PROTOCOL_ERROR).into()).into()
))
.is::<DowngradedH2Error>(),
.is_some(),
"double-wrapped h2 errors must be downgraded"
);

assert!(
!downgrade_h2_error(std::io::Error::new(
downgrade_h2_error(&std::io::Error::new(
std::io::ErrorKind::Other,
"non h2 error"
))
.is::<DowngradedH2Error>(),
.is_none(),
"other h2 errors must not be downgraded"
);
}

#[cfg(test)]
#[test]
fn test_downgrade_error_source() {
let err = Error::Downgraded(DowngradedH2Error(h2::Reason::PROTOCOL_ERROR));
assert!(linkerd_error::is_caused_by::<DowngradedH2Error>(&err));
}

// === impl UpgradeResponseBody ===

impl<B> Body for UpgradeResponseBody<B>
Expand All @@ -212,7 +249,7 @@ where
B::Error: std::error::Error + Send + Sync + 'static,
{
type Data = B::Data;
type Error = Error;
type Error = linkerd_error::Error;

#[inline]
fn is_end_stream(&self) -> bool {
Expand All @@ -223,10 +260,11 @@ where
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
self.project()
.inner
.poll_frame(cx)
.map_err(downgrade_h2_error)
self.project().inner.poll_frame(cx).map_err(|err| {
downgrade_h2_error(&err)
.map(Into::into)
.unwrap_or_else(|| err.into())
})
}

#[inline]
Expand Down
Loading