Skip to content

feat(client): add option to HttpConnector to enable or disable #2837

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 87 additions & 12 deletions src/client/connect/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,16 @@ use std::marker::PhantomData;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr};
use std::pin::Pin;
use std::sync::Arc;
use std::task::{self, Poll};
use std::task::{self, Poll, Context};
use std::time::Duration;
use std::ops::{Deref, DerefMut};

use futures_util::future::Either;
use http::uri::{Scheme, Uri};
use pin_project_lite::pin_project;
use tokio::net::{TcpSocket, TcpStream};
use tokio::time::Sleep;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::{debug, trace, warn};

use super::dns::{self, resolve, GaiResolver, Resolve};
Expand All @@ -35,6 +37,12 @@ pub struct HttpConnector<R = GaiResolver> {
resolver: R,
}

/// Connection returned by `HttpConnector`.
pub struct HttpConnection {
inner: TcpStream,
config: Arc<Config>,
}

/// Extra information about the transport when an HttpConnector is used.
///
/// # Example
Expand Down Expand Up @@ -81,6 +89,7 @@ struct Config {
reuse_address: bool,
send_buffer_size: Option<usize>,
recv_buffer_size: Option<usize>,
http_info: bool,
}

// ===== impl HttpConnector =====
Expand Down Expand Up @@ -121,6 +130,7 @@ impl<R> HttpConnector<R> {
reuse_address: false,
send_buffer_size: None,
recv_buffer_size: None,
http_info: true,
}),
resolver,
}
Expand Down Expand Up @@ -164,6 +174,14 @@ impl<R> HttpConnector<R> {
self.config_mut().recv_buffer_size = size;
}

/// Set if `HttpInfo` is enabled or disabled in connection metadata.
///
/// Default is `true`.
#[inline]
pub fn set_httpinfo(&mut self, httpinfo: bool) {
self.config_mut().http_info = httpinfo;
}

/// Set that all sockets are bound to the configured address before connection.
///
/// If `None`, the sockets will not be bound.
Expand Down Expand Up @@ -256,7 +274,7 @@ where
R: Resolve + Clone + Send + Sync + 'static,
R::Future: Send,
{
type Response = TcpStream;
type Response = HttpConnection;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be a breaking change. Can we provide the configuration without needing a new type?

type Error = ConnectError;
type Future = HttpConnecting<R>;

Expand Down Expand Up @@ -323,7 +341,7 @@ impl<R> HttpConnector<R>
where
R: Resolve,
{
async fn call_async(&mut self, dst: Uri) -> Result<TcpStream, ConnectError> {
async fn call_async(&mut self, dst: Uri) -> Result<HttpConnection, ConnectError> {
let config = &self.config;

let (host, port) = get_host_port(config, &dst)?;
Expand All @@ -340,7 +358,7 @@ where
let addrs = addrs
.map(|mut addr| {
addr.set_port(port);
addr
addr
})
.collect();
dns::SocketAddrs::new(addrs)
Expand All @@ -354,18 +372,74 @@ where
warn!("tcp set_nodelay error: {}", e);
}

Ok(sock)
Ok(HttpConnection{inner:sock, config: config.clone()})
}
}

impl AsyncWrite for HttpConnection {
fn poll_shutdown(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_shutdown(cx)
}

fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<(), io::Error>> {
Pin::new(&mut self.inner).poll_flush(cx)
}

fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<Result<usize, io::Error>> {
Pin::new(&mut self.inner).poll_write(cx, buf)
}
}

impl AsyncRead for HttpConnection {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<io::Result<()>> {
Pin::new(&mut self.inner).poll_read(cx, buf)
}
}

impl fmt::Debug for HttpConnection {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("HttpConnection").finish()
}
}

impl Deref for HttpConnection {
type Target = TcpStream;
fn deref(&self) -> &TcpStream {
&self.inner
}
}

impl Connection for TcpStream {
impl DerefMut for HttpConnection {
fn deref_mut(&mut self) -> &mut TcpStream {
&mut self.inner
}
}

impl Connection for HttpConnection {
fn connected(&self) -> Connected {
let connected = Connected::new();
if let (Ok(remote_addr), Ok(local_addr)) = (self.peer_addr(), self.local_addr()) {
connected.extra(HttpInfo { remote_addr, local_addr })
} else {
connected
let mut connected = Connected::new();

if self.config.http_info {
if let (Ok(remote_addr), Ok(local_addr)) = (self.inner.peer_addr(), self.inner.local_addr()) {
connected = connected.extra(HttpInfo { remote_addr, local_addr });
}
}

connected
}
}

Expand Down Expand Up @@ -396,7 +470,7 @@ pin_project! {
}
}

type ConnectResult = Result<TcpStream, ConnectError>;
type ConnectResult = Result<HttpConnection, ConnectError>;
type BoxConnecting = Pin<Box<dyn Future<Output = ConnectResult> + Send>>;

impl<R: Resolve> Future for HttpConnecting<R> {
Expand Down Expand Up @@ -942,6 +1016,7 @@ mod tests {
enforce_http: false,
send_buffer_size: None,
recv_buffer_size: None,
http_info: true,
};
let connecting_tcp = ConnectingTcp::new(dns::SocketAddrs::new(addrs), &cfg);
let start = Instant::now();
Expand Down
2 changes: 1 addition & 1 deletion src/client/connect/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ use ::http::Extensions;
cfg_feature! {
#![feature = "tcp"]

pub use self::http::{HttpConnector, HttpInfo};
pub use self::http::{HttpConnector, HttpInfo, HttpConnection};

pub mod dns;
mod http;
Expand Down
4 changes: 2 additions & 2 deletions src/client/tests.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::io;

use futures_util::future;
use tokio::net::TcpStream;

use super::Client;
use super::connect::HttpConnection;

#[tokio::test]
async fn client_connect_uri_argument() {
Expand All @@ -13,7 +13,7 @@ async fn client_connect_uri_argument() {
assert_eq!(dst.port(), None);
assert_eq!(dst.path(), "/", "path should be removed");

future::err::<TcpStream, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
future::err::<HttpConnection, _>(io::Error::new(io::ErrorKind::Other, "expect me"))
});

let client = Client::builder().build::<_, crate::Body>(connector);
Expand Down
5 changes: 2 additions & 3 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1120,11 +1120,10 @@ mod dispatch_impl {
use futures_util::stream::StreamExt;
use http::Uri;
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;

use super::support;
use hyper::body::HttpBody;
use hyper::client::connect::{Connected, Connection, HttpConnector};
use hyper::client::connect::{Connected, Connection, HttpConnection, HttpConnector};
use hyper::Client;

#[test]
Expand Down Expand Up @@ -2090,7 +2089,7 @@ mod dispatch_impl {
}

struct DebugStream {
tcp: TcpStream,
tcp: HttpConnection,
on_drop: mpsc::Sender<()>,
is_alpn_h2: bool,
is_proxy: bool,
Expand Down