Implement optional Hyper 1 support in hyper-boring (#246)
This commit is contained in:
parent
6d3639f173
commit
193bf3b9d7
|
|
@ -361,3 +361,5 @@ jobs:
|
|||
name: Run `rpk,underscore-wildcards` tests
|
||||
- run: cargo test --features pq-experimental,rpk,underscore-wildcards
|
||||
name: Run `pq-experimental,rpk,underscore-wildcards` tests
|
||||
- run: cargo test -p hyper-boring --features hyper1
|
||||
name: Run hyper 1.0 tests for hyper-boring
|
||||
|
|
|
|||
10
Cargo.toml
10
Cargo.toml
|
|
@ -24,6 +24,7 @@ boring = { version = "4.9.1", path = "./boring" }
|
|||
tokio-boring = { version = "4.9.1", path = "./tokio-boring" }
|
||||
|
||||
bindgen = { version = "0.70.1", default-features = false, features = ["runtime"] }
|
||||
bytes = "1"
|
||||
cmake = "0.1.18"
|
||||
fs_extra = "1.3.0"
|
||||
fslock = "0.2"
|
||||
|
|
@ -36,10 +37,15 @@ futures = "0.3"
|
|||
tokio = "1"
|
||||
anyhow = "1"
|
||||
antidote = "1.0.0"
|
||||
http = "0.2"
|
||||
hyper = { version = "0.14", default-features = false }
|
||||
http = "1"
|
||||
http-body-util = "0.1.2"
|
||||
http_old = { package = "http", version = "0.2" }
|
||||
hyper = "1"
|
||||
hyper-util = "0.1.6"
|
||||
hyper_old = { package = "hyper", version = "0.14", default-features = false }
|
||||
linked_hash_set = "0.1"
|
||||
once_cell = "1.0"
|
||||
openssl-macros = "0.1.1"
|
||||
tower = "0.4"
|
||||
tower-layer = "0.3"
|
||||
tower-service = "0.3"
|
||||
|
|
|
|||
|
|
@ -81,3 +81,6 @@ bindgen = { workspace = true }
|
|||
cmake = { workspace = true }
|
||||
fs_extra = { workspace = true }
|
||||
fslock = { workspace = true }
|
||||
|
||||
[lints.rust]
|
||||
unexpected_cfgs = { level = "allow", check-cfg = ['cfg(const_fn)'] }
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ rustdoc-args = ["--cfg", "docsrs"]
|
|||
[features]
|
||||
default = ["runtime"]
|
||||
|
||||
runtime = ["hyper/runtime"]
|
||||
runtime = ["hyper_old/runtime"]
|
||||
|
||||
# Use a FIPS-validated version of boringssl.
|
||||
fips = ["tokio-boring/fips"]
|
||||
|
|
@ -28,19 +28,30 @@ fips-link-precompiled = ["tokio-boring/fips-link-precompiled"]
|
|||
# Enables experimental post-quantum crypto (https://blog.cloudflare.com/post-quantum-for-all/)
|
||||
pq-experimental = ["tokio-boring/pq-experimental"]
|
||||
|
||||
# Enable Hyper 1 support
|
||||
hyper1 = ["dep:http", "dep:hyper", "dep:hyper-util", "dep:tower-service"]
|
||||
|
||||
[dependencies]
|
||||
antidote = { workspace = true }
|
||||
http = { workspace = true }
|
||||
hyper = { workspace = true, features = ["client"] }
|
||||
http = { workspace = true, optional = true }
|
||||
http_old = { workspace = true }
|
||||
hyper = { workspace = true, optional = true }
|
||||
hyper-util = { workspace = true, optional = true, features = ["client", "client-legacy"] }
|
||||
hyper_old = { workspace = true, features = ["client"] }
|
||||
linked_hash_set = { workspace = true }
|
||||
once_cell = { workspace = true }
|
||||
boring = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
tokio-boring = { workspace = true }
|
||||
tower-layer = { workspace = true }
|
||||
tower-service = { workspace = true, optional = true }
|
||||
|
||||
[dev-dependencies]
|
||||
hyper = { workspace = true, features = [ "full" ] }
|
||||
bytes = { workspace = true }
|
||||
http-body-util = { workspace = true }
|
||||
hyper-util = { workspace = true, features = ["http1", "http2", "service", "tokio"] }
|
||||
hyper = { workspace = true, features = ["server"] }
|
||||
hyper_old = { workspace = true, features = [ "full" ] }
|
||||
tokio = { workspace = true, features = [ "full" ] }
|
||||
tower = { workspace = true, features = ["util"] }
|
||||
futures = { workspace = true }
|
||||
|
|
|
|||
|
|
@ -2,91 +2,27 @@
|
|||
#![warn(missing_docs)]
|
||||
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
|
||||
|
||||
use crate::cache::{SessionCache, SessionKey};
|
||||
use antidote::Mutex;
|
||||
use crate::cache::SessionKey;
|
||||
use boring::error::ErrorStack;
|
||||
use boring::ex_data::Index;
|
||||
use boring::ssl::{
|
||||
ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef,
|
||||
SslSessionCacheMode,
|
||||
};
|
||||
use http::uri::Scheme;
|
||||
use hyper::client::connect::{Connected, Connection};
|
||||
#[cfg(feature = "runtime")]
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::service::Service;
|
||||
use hyper::Uri;
|
||||
use boring::ssl::Ssl;
|
||||
use once_cell::sync::OnceCell;
|
||||
use std::fmt::Debug;
|
||||
use std::future::Future;
|
||||
use std::io;
|
||||
use std::net;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{error::Error, fmt};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use std::fmt;
|
||||
use tokio_boring::SslStream;
|
||||
use tower_layer::Layer;
|
||||
|
||||
mod cache;
|
||||
#[cfg(test)]
|
||||
mod test;
|
||||
mod v0;
|
||||
/// Hyper 1 support.
|
||||
#[cfg(feature = "hyper1")]
|
||||
pub mod v1;
|
||||
|
||||
pub use self::v0::*;
|
||||
|
||||
fn key_index() -> Result<Index<Ssl, SessionKey>, ErrorStack> {
|
||||
static IDX: OnceCell<Index<Ssl, SessionKey>> = OnceCell::new();
|
||||
IDX.get_or_try_init(Ssl::new_ex_index).copied()
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Inner {
|
||||
ssl: SslConnector,
|
||||
cache: Arc<Mutex<SessionCache>>,
|
||||
callback: Option<Callback>,
|
||||
ssl_callback: Option<SslCallback>,
|
||||
}
|
||||
|
||||
type Callback =
|
||||
Arc<dyn Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
type SslCallback = Arc<dyn Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
|
||||
impl Inner {
|
||||
fn setup_ssl(&self, uri: &Uri, host: &str) -> Result<Ssl, ErrorStack> {
|
||||
let mut conf = self.ssl.configure()?;
|
||||
|
||||
if let Some(ref callback) = self.callback {
|
||||
callback(&mut conf, uri)?;
|
||||
}
|
||||
|
||||
let key = SessionKey {
|
||||
host: host.to_string(),
|
||||
port: uri.port_u16().unwrap_or(443),
|
||||
};
|
||||
|
||||
if let Some(session) = self.cache.lock().get(&key) {
|
||||
unsafe {
|
||||
conf.set_session(&session)?;
|
||||
}
|
||||
}
|
||||
|
||||
let idx = key_index()?;
|
||||
conf.set_ex_data(idx, key);
|
||||
|
||||
let mut ssl = conf.into_ssl(host)?;
|
||||
|
||||
if let Some(ref ssl_callback) = self.ssl_callback {
|
||||
ssl_callback(&mut ssl, uri)?;
|
||||
}
|
||||
|
||||
Ok(ssl)
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer which wraps services in an `HttpsConnector`.
|
||||
pub struct HttpsLayer {
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
/// Settings for [`HttpsLayer`]
|
||||
pub struct HttpsLayerSettings {
|
||||
session_cache_capacity: usize,
|
||||
|
|
@ -123,214 +59,6 @@ impl HttpsLayerSettingsBuilder {
|
|||
}
|
||||
}
|
||||
|
||||
impl HttpsLayer {
|
||||
/// Creates a new `HttpsLayer` with default settings.
|
||||
///
|
||||
/// ALPN is configured to support both HTTP/1 and HTTP/1.1.
|
||||
pub fn new() -> Result<HttpsLayer, ErrorStack> {
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls())?;
|
||||
|
||||
ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?;
|
||||
|
||||
Self::with_connector(ssl)
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(ssl: SslConnectorBuilder) -> Result<HttpsLayer, ErrorStack> {
|
||||
Self::with_connector_and_settings(ssl, Default::default())
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer` with settings
|
||||
pub fn with_connector_and_settings(
|
||||
mut ssl: SslConnectorBuilder,
|
||||
settings: HttpsLayerSettings,
|
||||
) -> Result<HttpsLayer, ErrorStack> {
|
||||
let cache = Arc::new(Mutex::new(SessionCache::with_capacity(
|
||||
settings.session_cache_capacity,
|
||||
)));
|
||||
|
||||
ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT);
|
||||
|
||||
ssl.set_new_session_callback({
|
||||
let cache = cache.clone();
|
||||
move |ssl, session| {
|
||||
if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) {
|
||||
cache.lock().insert(key.clone(), session);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpsLayer {
|
||||
inner: Inner {
|
||||
ssl: ssl.build(),
|
||||
cache,
|
||||
callback: None,
|
||||
ssl_callback: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for HttpsLayer {
|
||||
type Service = HttpsConnector<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> HttpsConnector<S> {
|
||||
HttpsConnector {
|
||||
http: inner,
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A Connector using OpenSSL to support `http` and `https` schemes.
|
||||
#[derive(Clone)]
|
||||
pub struct HttpsConnector<T> {
|
||||
http: T,
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
impl HttpsConnector<HttpConnector> {
|
||||
/// Creates a a new `HttpsConnector` using default settings.
|
||||
///
|
||||
/// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both
|
||||
/// HTTP/2 and HTTP/1.1.
|
||||
///
|
||||
/// Requires the `runtime` Cargo feature.
|
||||
pub fn new() -> Result<HttpsConnector<HttpConnector>, ErrorStack> {
|
||||
let mut http = HttpConnector::new();
|
||||
http.enforce_http(false);
|
||||
|
||||
HttpsLayer::new().map(|l| l.layer(http))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri, Response = T> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
T: AsyncRead + AsyncWrite + Connection + Unpin + Debug + Sync + Send + 'static,
|
||||
{
|
||||
/// Creates a new `HttpsConnector`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(
|
||||
http: S,
|
||||
ssl: SslConnectorBuilder,
|
||||
) -> Result<HttpsConnector<S>, ErrorStack> {
|
||||
HttpsLayer::with_connector(ssl).map(|l| l.layer(http))
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Service<Uri> for HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
S::Response: AsyncRead + AsyncWrite + Connection + Unpin + Debug + Sync + Send + 'static,
|
||||
{
|
||||
type Response = MaybeHttpsStream<S::Response>;
|
||||
type Error = Box<dyn Error + Sync + Send>;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.http.poll_ready(cx).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, uri: Uri) -> Self::Future {
|
||||
let is_tls_scheme = uri
|
||||
.scheme()
|
||||
.map(|s| s == &Scheme::HTTPS || s.as_str() == "wss")
|
||||
.unwrap_or(false);
|
||||
|
||||
let tls_setup = if is_tls_scheme {
|
||||
Some((self.inner.clone(), uri.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let connect = self.http.call(uri);
|
||||
|
||||
let f = async {
|
||||
let conn = connect.await.map_err(Into::into)?;
|
||||
|
||||
let (inner, uri) = match tls_setup {
|
||||
Some((inner, uri)) => (inner, uri),
|
||||
None => return Ok(MaybeHttpsStream::Http(conn)),
|
||||
};
|
||||
|
||||
let mut host = uri.host().ok_or("URI missing host")?;
|
||||
|
||||
// If `host` is an IPv6 address, we must strip away the square brackets that surround
|
||||
// it (otherwise, boring will fail to parse the host as an IP address, eventually
|
||||
// causing the handshake to fail due a hostname verification error).
|
||||
if !host.is_empty() {
|
||||
let last = host.len() - 1;
|
||||
let mut chars = host.chars();
|
||||
|
||||
if let (Some('['), Some(']')) = (chars.next(), chars.last()) {
|
||||
if host[1..last].parse::<net::Ipv6Addr>().is_ok() {
|
||||
host = &host[1..last];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let ssl = inner.setup_ssl(&uri, host)?;
|
||||
let stream = tokio_boring::SslStreamBuilder::new(ssl, conn)
|
||||
.connect()
|
||||
.await?;
|
||||
|
||||
Ok(MaybeHttpsStream::Https(stream))
|
||||
};
|
||||
|
||||
Box::pin(f)
|
||||
}
|
||||
}
|
||||
|
||||
/// A stream which may be wrapped with TLS.
|
||||
pub enum MaybeHttpsStream<T> {
|
||||
/// A raw HTTP stream.
|
||||
|
|
@ -339,72 +67,6 @@ pub enum MaybeHttpsStream<T> {
|
|||
Https(SslStream<T>),
|
||||
}
|
||||
|
||||
impl<T> AsyncRead for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
ctx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf,
|
||||
) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(ctx, buf),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(ctx, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsyncWrite for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
ctx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(ctx, buf),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(ctx, buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(ctx),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(ctx),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection for MaybeHttpsStream<T>
|
||||
where
|
||||
T: Connection,
|
||||
{
|
||||
fn connected(&self) -> Connected {
|
||||
match self {
|
||||
MaybeHttpsStream::Http(s) => s.connected(),
|
||||
MaybeHttpsStream::Https(s) => {
|
||||
let mut connected = s.get_ref().connected();
|
||||
|
||||
if s.ssl().selected_alpn_protocol() == Some(b"h2") {
|
||||
connected = connected.negotiated_h2();
|
||||
}
|
||||
|
||||
connected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> fmt::Debug for MaybeHttpsStream<T> {
|
||||
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
|
||||
match *self {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,345 @@
|
|||
use crate::cache::{SessionCache, SessionKey};
|
||||
use crate::{key_index, HttpsLayerSettings, MaybeHttpsStream};
|
||||
use antidote::Mutex;
|
||||
use boring::error::ErrorStack;
|
||||
use boring::ssl::{
|
||||
ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef,
|
||||
SslSessionCacheMode,
|
||||
};
|
||||
use http_old::uri::Scheme;
|
||||
use hyper_old::client::connect::{Connected, Connection};
|
||||
use hyper_old::client::HttpConnector;
|
||||
use hyper_old::service::Service;
|
||||
use hyper_old::Uri;
|
||||
use std::error::Error;
|
||||
use std::future::Future;
|
||||
use std::net;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{fmt, io};
|
||||
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
|
||||
use tower_layer::Layer;
|
||||
|
||||
/// A Connector using OpenSSL to support `http` and `https` schemes.
|
||||
#[derive(Clone)]
|
||||
pub struct HttpsConnector<T> {
|
||||
http: T,
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
impl HttpsConnector<HttpConnector> {
|
||||
/// Creates a a new `HttpsConnector` using default settings.
|
||||
///
|
||||
/// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both
|
||||
/// HTTP/2 and HTTP/1.1.
|
||||
///
|
||||
/// Requires the `runtime` Cargo feature.
|
||||
pub fn new() -> Result<HttpsConnector<HttpConnector>, ErrorStack> {
|
||||
let mut http = HttpConnector::new();
|
||||
http.enforce_http(false);
|
||||
|
||||
HttpsLayer::new().map(|l| l.layer(http))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri, Response = T> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static,
|
||||
{
|
||||
/// Creates a new `HttpsConnector`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(
|
||||
http: S,
|
||||
ssl: SslConnectorBuilder,
|
||||
) -> Result<HttpsConnector<S>, ErrorStack> {
|
||||
HttpsLayer::with_connector(ssl).map(|l| l.layer(http))
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer which wraps services in an `HttpsConnector`.
|
||||
pub struct HttpsLayer {
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Inner {
|
||||
ssl: SslConnector,
|
||||
cache: Arc<Mutex<SessionCache>>,
|
||||
callback: Option<Callback>,
|
||||
ssl_callback: Option<SslCallback>,
|
||||
}
|
||||
|
||||
type Callback =
|
||||
Arc<dyn Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
type SslCallback = Arc<dyn Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
|
||||
impl HttpsLayer {
|
||||
/// Creates a new `HttpsLayer` with default settings.
|
||||
///
|
||||
/// ALPN is configured to support both HTTP/1 and HTTP/1.1.
|
||||
pub fn new() -> Result<HttpsLayer, ErrorStack> {
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls())?;
|
||||
|
||||
ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?;
|
||||
|
||||
Self::with_connector(ssl)
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(ssl: SslConnectorBuilder) -> Result<HttpsLayer, ErrorStack> {
|
||||
Self::with_connector_and_settings(ssl, Default::default())
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer` with settings
|
||||
pub fn with_connector_and_settings(
|
||||
mut ssl: SslConnectorBuilder,
|
||||
settings: HttpsLayerSettings,
|
||||
) -> Result<HttpsLayer, ErrorStack> {
|
||||
let cache = Arc::new(Mutex::new(SessionCache::with_capacity(
|
||||
settings.session_cache_capacity,
|
||||
)));
|
||||
|
||||
ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT);
|
||||
|
||||
ssl.set_new_session_callback({
|
||||
let cache = cache.clone();
|
||||
move |ssl, session| {
|
||||
if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) {
|
||||
cache.lock().insert(key.clone(), session);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpsLayer {
|
||||
inner: Inner {
|
||||
ssl: ssl.build(),
|
||||
cache,
|
||||
callback: None,
|
||||
ssl_callback: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for HttpsLayer {
|
||||
type Service = HttpsConnector<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> HttpsConnector<S> {
|
||||
HttpsConnector {
|
||||
http: inner,
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn setup_ssl(&self, uri: &Uri, host: &str) -> Result<Ssl, ErrorStack> {
|
||||
let mut conf = self.ssl.configure()?;
|
||||
|
||||
if let Some(ref callback) = self.callback {
|
||||
callback(&mut conf, uri)?;
|
||||
}
|
||||
|
||||
let key = SessionKey {
|
||||
host: host.to_string(),
|
||||
port: uri.port_u16().unwrap_or(443),
|
||||
};
|
||||
|
||||
if let Some(session) = self.cache.lock().get(&key) {
|
||||
unsafe {
|
||||
conf.set_session(&session)?;
|
||||
}
|
||||
}
|
||||
|
||||
let idx = key_index()?;
|
||||
conf.set_ex_data(idx, key);
|
||||
|
||||
let mut ssl = conf.into_ssl(host)?;
|
||||
|
||||
if let Some(ref ssl_callback) = self.ssl_callback {
|
||||
ssl_callback(&mut ssl, uri)?;
|
||||
}
|
||||
|
||||
Ok(ssl)
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Service<Uri> for HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
S::Response: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static,
|
||||
{
|
||||
type Response = MaybeHttpsStream<S::Response>;
|
||||
type Error = Box<dyn Error + Sync + Send>;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.http.poll_ready(cx).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, uri: Uri) -> Self::Future {
|
||||
let is_tls_scheme = uri
|
||||
.scheme()
|
||||
.map(|s| s == &Scheme::HTTPS || s.as_str() == "wss")
|
||||
.unwrap_or(false);
|
||||
|
||||
let tls_setup = if is_tls_scheme {
|
||||
Some((self.inner.clone(), uri.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let connect = self.http.call(uri);
|
||||
|
||||
let f = async {
|
||||
let conn = connect.await.map_err(Into::into)?;
|
||||
|
||||
let (inner, uri) = match tls_setup {
|
||||
Some((inner, uri)) => (inner, uri),
|
||||
None => return Ok(MaybeHttpsStream::Http(conn)),
|
||||
};
|
||||
|
||||
let mut host = uri.host().ok_or("URI missing host")?;
|
||||
|
||||
// If `host` is an IPv6 address, we must strip away the square brackets that surround
|
||||
// it (otherwise, boring will fail to parse the host as an IP address, eventually
|
||||
// causing the handshake to fail due a hostname verification error).
|
||||
if !host.is_empty() {
|
||||
let last = host.len() - 1;
|
||||
let mut chars = host.chars();
|
||||
|
||||
if let (Some('['), Some(']')) = (chars.next(), chars.last()) {
|
||||
if host[1..last].parse::<net::Ipv6Addr>().is_ok() {
|
||||
host = &host[1..last];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let ssl = inner.setup_ssl(&uri, host)?;
|
||||
let stream = tokio_boring::SslStreamBuilder::new(ssl, conn)
|
||||
.connect()
|
||||
.await?;
|
||||
|
||||
Ok(MaybeHttpsStream::Https(stream))
|
||||
};
|
||||
|
||||
Box::pin(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection for MaybeHttpsStream<T>
|
||||
where
|
||||
T: Connection,
|
||||
{
|
||||
fn connected(&self) -> Connected {
|
||||
match self {
|
||||
MaybeHttpsStream::Http(s) => s.connected(),
|
||||
MaybeHttpsStream::Https(s) => {
|
||||
let mut connected = s.get_ref().connected();
|
||||
|
||||
if s.ssl().selected_alpn_protocol() == Some(b"h2") {
|
||||
connected = connected.negotiated_h2();
|
||||
}
|
||||
|
||||
connected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsyncRead for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
ctx: &mut Context<'_>,
|
||||
buf: &mut ReadBuf<'_>,
|
||||
) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_read(ctx, buf),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_read(ctx, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> AsyncWrite for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
ctx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_write(ctx, buf),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_write(ctx, buf),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_flush(ctx),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_flush(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(s) => Pin::new(s).poll_shutdown(ctx),
|
||||
MaybeHttpsStream::Https(s) => Pin::new(s).poll_shutdown(ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,350 @@
|
|||
use crate::cache::{SessionCache, SessionKey};
|
||||
use crate::{key_index, HttpsLayerSettings, MaybeHttpsStream};
|
||||
use antidote::Mutex;
|
||||
use boring::error::ErrorStack;
|
||||
use boring::ssl::{
|
||||
ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef,
|
||||
SslSessionCacheMode,
|
||||
};
|
||||
use http::uri::Scheme;
|
||||
use http::Uri;
|
||||
use hyper::rt::{Read, ReadBufCursor, Write};
|
||||
use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector};
|
||||
use hyper_util::rt::TokioIo;
|
||||
use std::error::Error;
|
||||
use std::fmt;
|
||||
use std::future::Future;
|
||||
use std::pin::Pin;
|
||||
use std::sync::Arc;
|
||||
use std::task::{Context, Poll};
|
||||
use std::{io, net};
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tower_layer::Layer;
|
||||
use tower_service::Service;
|
||||
|
||||
/// A Connector using BoringSSL to support `http` and `https` schemes.
|
||||
#[derive(Clone)]
|
||||
pub struct HttpsConnector<T> {
|
||||
http: T,
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[cfg(feature = "runtime")]
|
||||
impl HttpsConnector<HttpConnector> {
|
||||
/// Creates a a new `HttpsConnector` using default settings.
|
||||
///
|
||||
/// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both
|
||||
/// HTTP/2 and HTTP/1.1.
|
||||
///
|
||||
/// Requires the `runtime` Cargo feature.
|
||||
pub fn new() -> Result<HttpsConnector<HttpConnector>, ErrorStack> {
|
||||
let mut http = HttpConnector::new();
|
||||
http.enforce_http(false);
|
||||
|
||||
HttpsLayer::new().map(|l| l.layer(http))
|
||||
}
|
||||
}
|
||||
|
||||
impl<S, T> HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri, Response = TokioIo<T>> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static,
|
||||
{
|
||||
/// Creates a new `HttpsConnector`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(
|
||||
http: S,
|
||||
ssl: SslConnectorBuilder,
|
||||
) -> Result<HttpsConnector<S>, ErrorStack> {
|
||||
HttpsLayer::with_connector(ssl).map(|l| l.layer(http))
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
/// A layer which wraps services in an `HttpsConnector`.
|
||||
pub struct HttpsLayer {
|
||||
inner: Inner,
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
struct Inner {
|
||||
ssl: SslConnector,
|
||||
cache: Arc<Mutex<SessionCache>>,
|
||||
callback: Option<Callback>,
|
||||
ssl_callback: Option<SslCallback>,
|
||||
}
|
||||
|
||||
type Callback =
|
||||
Arc<dyn Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
type SslCallback = Arc<dyn Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + Sync + Send>;
|
||||
|
||||
impl HttpsLayer {
|
||||
/// Creates a new `HttpsLayer` with default settings.
|
||||
///
|
||||
/// ALPN is configured to support both HTTP/1 and HTTP/1.1.
|
||||
pub fn new() -> Result<HttpsLayer, ErrorStack> {
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls())?;
|
||||
|
||||
ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?;
|
||||
|
||||
Self::with_connector(ssl)
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer`.
|
||||
///
|
||||
/// The session cache configuration of `ssl` will be overwritten.
|
||||
pub fn with_connector(ssl: SslConnectorBuilder) -> Result<HttpsLayer, ErrorStack> {
|
||||
Self::with_connector_and_settings(ssl, Default::default())
|
||||
}
|
||||
|
||||
/// Creates a new `HttpsLayer` with settings
|
||||
pub fn with_connector_and_settings(
|
||||
mut ssl: SslConnectorBuilder,
|
||||
settings: HttpsLayerSettings,
|
||||
) -> Result<HttpsLayer, ErrorStack> {
|
||||
let cache = Arc::new(Mutex::new(SessionCache::with_capacity(
|
||||
settings.session_cache_capacity,
|
||||
)));
|
||||
|
||||
ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT);
|
||||
|
||||
ssl.set_new_session_callback({
|
||||
let cache = cache.clone();
|
||||
move |ssl, session| {
|
||||
if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) {
|
||||
cache.lock().insert(key.clone(), session);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(HttpsLayer {
|
||||
inner: Inner {
|
||||
ssl: ssl.build(),
|
||||
cache,
|
||||
callback: None,
|
||||
ssl_callback: None,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the configuration of each connection.
|
||||
///
|
||||
/// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`),
|
||||
/// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`]
|
||||
/// instead.
|
||||
pub fn set_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.callback = Some(Arc::new(callback));
|
||||
}
|
||||
|
||||
/// Registers a callback which can customize the `Ssl` of each connection.
|
||||
pub fn set_ssl_callback<F>(&mut self, callback: F)
|
||||
where
|
||||
F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send,
|
||||
{
|
||||
self.inner.ssl_callback = Some(Arc::new(callback));
|
||||
}
|
||||
}
|
||||
|
||||
impl<S> Layer<S> for HttpsLayer {
|
||||
type Service = HttpsConnector<S>;
|
||||
|
||||
fn layer(&self, inner: S) -> HttpsConnector<S> {
|
||||
HttpsConnector {
|
||||
http: inner,
|
||||
inner: self.inner.clone(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Inner {
|
||||
fn setup_ssl(&self, uri: &Uri, host: &str) -> Result<Ssl, ErrorStack> {
|
||||
let mut conf = self.ssl.configure()?;
|
||||
|
||||
if let Some(ref callback) = self.callback {
|
||||
callback(&mut conf, uri)?;
|
||||
}
|
||||
|
||||
let key = SessionKey {
|
||||
host: host.to_string(),
|
||||
port: uri.port_u16().unwrap_or(443),
|
||||
};
|
||||
|
||||
if let Some(session) = self.cache.lock().get(&key) {
|
||||
unsafe {
|
||||
conf.set_session(&session)?;
|
||||
}
|
||||
}
|
||||
|
||||
let idx = key_index()?;
|
||||
conf.set_ex_data(idx, key);
|
||||
|
||||
let mut ssl = conf.into_ssl(host)?;
|
||||
|
||||
if let Some(ref ssl_callback) = self.ssl_callback {
|
||||
ssl_callback(&mut ssl, uri)?;
|
||||
}
|
||||
|
||||
Ok(ssl)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T, S> Service<Uri> for HttpsConnector<S>
|
||||
where
|
||||
S: Service<Uri, Response = TokioIo<T>> + Send,
|
||||
S::Error: Into<Box<dyn Error + Send + Sync>>,
|
||||
S::Future: Unpin + Send + 'static,
|
||||
T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static,
|
||||
{
|
||||
type Response = MaybeHttpsStream<T>;
|
||||
type Error = Box<dyn Error + Sync + Send>;
|
||||
#[allow(clippy::type_complexity)]
|
||||
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
||||
|
||||
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||
self.http.poll_ready(cx).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn call(&mut self, uri: Uri) -> Self::Future {
|
||||
let is_tls_scheme = uri
|
||||
.scheme()
|
||||
.map(|s| s == &Scheme::HTTPS || s.as_str() == "wss")
|
||||
.unwrap_or(false);
|
||||
|
||||
let tls_setup = if is_tls_scheme {
|
||||
Some((self.inner.clone(), uri.clone()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
let connect = self.http.call(uri);
|
||||
|
||||
let f = async {
|
||||
let conn = connect.await.map_err(Into::into)?.into_inner();
|
||||
|
||||
let (inner, uri) = match tls_setup {
|
||||
Some((inner, uri)) => (inner, uri),
|
||||
None => return Ok(MaybeHttpsStream::Http(conn)),
|
||||
};
|
||||
|
||||
let mut host = uri.host().ok_or("URI missing host")?;
|
||||
|
||||
// If `host` is an IPv6 address, we must strip away the square brackets that surround
|
||||
// it (otherwise, boring will fail to parse the host as an IP address, eventually
|
||||
// causing the handshake to fail due a hostname verification error).
|
||||
if !host.is_empty() {
|
||||
let last = host.len() - 1;
|
||||
let mut chars = host.chars();
|
||||
|
||||
if let (Some('['), Some(']')) = (chars.next(), chars.last()) {
|
||||
if host[1..last].parse::<net::Ipv6Addr>().is_ok() {
|
||||
host = &host[1..last];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let ssl = inner.setup_ssl(&uri, host)?;
|
||||
let stream = tokio_boring::SslStreamBuilder::new(ssl, conn)
|
||||
.connect()
|
||||
.await?;
|
||||
|
||||
Ok(MaybeHttpsStream::Https(stream))
|
||||
};
|
||||
|
||||
Box::pin(f)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Connection for MaybeHttpsStream<T>
|
||||
where
|
||||
T: Connection,
|
||||
{
|
||||
fn connected(&self) -> Connected {
|
||||
match self {
|
||||
MaybeHttpsStream::Http(s) => s.connected(),
|
||||
MaybeHttpsStream::Https(s) => {
|
||||
let mut connected = s.get_ref().connected();
|
||||
|
||||
if s.ssl().selected_alpn_protocol() == Some(b"h2") {
|
||||
connected = connected.negotiated_h2();
|
||||
}
|
||||
|
||||
connected
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Read for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_read(
|
||||
mut self: Pin<&mut Self>,
|
||||
cx: &mut Context<'_>,
|
||||
buf: ReadBufCursor<'_>,
|
||||
) -> Poll<Result<(), std::io::Error>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf),
|
||||
MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> Write for MaybeHttpsStream<T>
|
||||
where
|
||||
T: AsyncRead + AsyncWrite + Unpin,
|
||||
{
|
||||
fn poll_write(
|
||||
mut self: Pin<&mut Self>,
|
||||
ctx: &mut Context<'_>,
|
||||
buf: &[u8],
|
||||
) -> Poll<io::Result<usize>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(inner) => {
|
||||
Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf)
|
||||
}
|
||||
MaybeHttpsStream::Https(inner) => {
|
||||
Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx),
|
||||
MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx),
|
||||
}
|
||||
}
|
||||
|
||||
fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<io::Result<()>> {
|
||||
match &mut *self {
|
||||
MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx),
|
||||
MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,12 +1,12 @@
|
|||
use super::*;
|
||||
use boring::ssl::{SslAcceptor, SslFiletype, SslMethod};
|
||||
use boring::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod};
|
||||
use futures::StreamExt;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::server::conn::Http;
|
||||
use hyper::{service, Response};
|
||||
use hyper::{Body, Client};
|
||||
use hyper_boring::HttpsConnector;
|
||||
use hyper_old::client::HttpConnector;
|
||||
use hyper_old::server::conn::Http;
|
||||
use hyper_old::{service, Response};
|
||||
use hyper_old::{Body, Client};
|
||||
use std::convert::Infallible;
|
||||
use std::iter;
|
||||
use std::{io, iter};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[tokio::test]
|
||||
|
|
@ -37,10 +37,10 @@ async fn localhost() {
|
|||
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||
acceptor.set_session_id_context(b"test").unwrap();
|
||||
acceptor
|
||||
.set_private_key_file("test/key.pem", SslFiletype::PEM)
|
||||
.set_private_key_file("tests/test/key.pem", SslFiletype::PEM)
|
||||
.unwrap();
|
||||
acceptor
|
||||
.set_certificate_chain_file("test/cert.pem")
|
||||
.set_certificate_chain_file("tests/test/cert.pem")
|
||||
.unwrap();
|
||||
let acceptor = acceptor.build();
|
||||
|
||||
|
|
@ -69,7 +69,7 @@ async fn localhost() {
|
|||
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
|
||||
ssl.set_ca_file("test/root-ca.pem").unwrap();
|
||||
ssl.set_ca_file("tests/test/root-ca.pem").unwrap();
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
|
|
@ -104,10 +104,10 @@ async fn alpn_h2() {
|
|||
let server = async move {
|
||||
let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap();
|
||||
acceptor
|
||||
.set_certificate_chain_file("test/cert.pem")
|
||||
.set_certificate_chain_file("tests/test/cert.pem")
|
||||
.unwrap();
|
||||
acceptor
|
||||
.set_private_key_file("test/key.pem", SslFiletype::PEM)
|
||||
.set_private_key_file("tests/test/key.pem", SslFiletype::PEM)
|
||||
.unwrap();
|
||||
acceptor.set_alpn_select_callback(|_, client| {
|
||||
ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK)
|
||||
|
|
@ -138,7 +138,7 @@ async fn alpn_h2() {
|
|||
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
|
||||
ssl.set_ca_file("test/root-ca.pem").unwrap();
|
||||
ssl.set_ca_file("tests/test/root-ca.pem").unwrap();
|
||||
|
||||
let mut ssl = HttpsConnector::with_connector(connector, ssl).unwrap();
|
||||
|
||||
|
|
@ -0,0 +1,160 @@
|
|||
#![cfg(feature = "hyper1")]
|
||||
|
||||
use boring::ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod};
|
||||
use bytes::Bytes;
|
||||
use futures::StreamExt;
|
||||
use http_body_util::{BodyStream, Empty};
|
||||
use hyper::{service, Response};
|
||||
use hyper_boring::v1::HttpsConnector;
|
||||
use hyper_util::client::legacy::connect::HttpConnector;
|
||||
use hyper_util::client::legacy::Client;
|
||||
use hyper_util::rt::{TokioExecutor, TokioIo};
|
||||
use std::convert::Infallible;
|
||||
use std::{io, iter};
|
||||
use tokio::net::TcpListener;
|
||||
|
||||
#[tokio::test]
|
||||
async fn google() {
|
||||
let ssl = HttpsConnector::new().unwrap();
|
||||
let client = Client::builder(TokioExecutor::new())
|
||||
.pool_max_idle_per_host(0)
|
||||
.build::<_, Empty<Bytes>>(ssl);
|
||||
|
||||
for _ in 0..3 {
|
||||
let resp = client
|
||||
.get("https://www.google.com".parse().unwrap())
|
||||
.await
|
||||
.expect("connection should succeed");
|
||||
let mut body = BodyStream::new(resp.into_body());
|
||||
while body.next().await.transpose().unwrap().is_some() {}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn localhost() {
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let port = addr.port();
|
||||
|
||||
let server = async move {
|
||||
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
|
||||
acceptor.set_session_id_context(b"test").unwrap();
|
||||
acceptor
|
||||
.set_private_key_file("tests/test/key.pem", SslFiletype::PEM)
|
||||
.unwrap();
|
||||
acceptor
|
||||
.set_certificate_chain_file("tests/test/cert.pem")
|
||||
.unwrap();
|
||||
let acceptor = acceptor.build();
|
||||
|
||||
for _ in 0..3 {
|
||||
let stream = listener.accept().await.unwrap().0;
|
||||
let stream = tokio_boring::accept(&acceptor, stream).await.unwrap();
|
||||
|
||||
let service = service::service_fn(|_| async {
|
||||
Ok::<_, io::Error>(Response::new(<Empty<Bytes>>::new()))
|
||||
});
|
||||
|
||||
hyper::server::conn::http1::Builder::new()
|
||||
.keep_alive(false)
|
||||
.serve_connection(TokioIo::new(stream), service)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
};
|
||||
tokio::spawn(server);
|
||||
|
||||
let resolver =
|
||||
tower::service_fn(move |_name| async move { Ok::<_, Infallible>(iter::once(addr)) });
|
||||
|
||||
let mut connector = HttpConnector::new_with_resolver(resolver);
|
||||
|
||||
connector.enforce_http(false);
|
||||
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
|
||||
ssl.set_ca_file("tests/test/root-ca.pem").unwrap();
|
||||
|
||||
use std::fs::File;
|
||||
use std::io::Write;
|
||||
|
||||
let file = File::create("../target/keyfile.log").unwrap();
|
||||
ssl.set_keylog_callback(move |_, line| {
|
||||
let _ = writeln!(&file, "{}", line);
|
||||
});
|
||||
|
||||
let ssl = HttpsConnector::with_connector(connector, ssl).unwrap();
|
||||
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(ssl);
|
||||
|
||||
for _ in 0..3 {
|
||||
let resp = client
|
||||
.get(format!("https://foobar.com:{}", port).parse().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(resp.status().is_success(), "{}", resp.status());
|
||||
let mut body = BodyStream::new(resp.into_body());
|
||||
while body.next().await.transpose().unwrap().is_some() {}
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn alpn_h2() {
|
||||
use boring::ssl::{self, AlpnError};
|
||||
|
||||
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let addr = listener.local_addr().unwrap();
|
||||
let port = addr.port();
|
||||
|
||||
let server = async move {
|
||||
let mut acceptor = SslAcceptor::mozilla_modern(SslMethod::tls()).unwrap();
|
||||
acceptor
|
||||
.set_certificate_chain_file("tests/test/cert.pem")
|
||||
.unwrap();
|
||||
acceptor
|
||||
.set_private_key_file("tests/test/key.pem", SslFiletype::PEM)
|
||||
.unwrap();
|
||||
acceptor.set_alpn_select_callback(|_, client| {
|
||||
ssl::select_next_proto(b"\x02h2", client).ok_or(AlpnError::NOACK)
|
||||
});
|
||||
let acceptor = acceptor.build();
|
||||
|
||||
let stream = listener.accept().await.unwrap().0;
|
||||
let stream = tokio_boring::accept(&acceptor, stream).await.unwrap();
|
||||
assert_eq!(stream.ssl().selected_alpn_protocol().unwrap(), b"h2");
|
||||
|
||||
let service = service::service_fn(|_| async {
|
||||
Ok::<_, io::Error>(Response::new(<Empty<Bytes>>::new()))
|
||||
});
|
||||
|
||||
hyper::server::conn::http2::Builder::new(TokioExecutor::new())
|
||||
.serve_connection(TokioIo::new(stream), service)
|
||||
.await
|
||||
.unwrap();
|
||||
};
|
||||
tokio::spawn(server);
|
||||
|
||||
let resolver =
|
||||
tower::service_fn(move |_name| async move { Ok::<_, Infallible>(iter::once(addr)) });
|
||||
|
||||
let mut connector = HttpConnector::new_with_resolver(resolver);
|
||||
|
||||
connector.enforce_http(false);
|
||||
|
||||
let mut ssl = SslConnector::builder(SslMethod::tls()).unwrap();
|
||||
|
||||
ssl.set_ca_file("tests/test/root-ca.pem").unwrap();
|
||||
|
||||
let mut ssl = HttpsConnector::with_connector(connector, ssl).unwrap();
|
||||
|
||||
ssl.set_ssl_callback(|ssl, _| ssl.set_alpn_protos(b"\x02h2\x08http/1.1"));
|
||||
|
||||
let client = Client::builder(TokioExecutor::new()).build::<_, Empty<Bytes>>(ssl);
|
||||
|
||||
let resp = client
|
||||
.get(format!("https://foobar.com:{}", port).parse().unwrap())
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(resp.status().is_success(), "{}", resp.status());
|
||||
let mut body = BodyStream::new(resp.into_body());
|
||||
while body.next().await.transpose().unwrap().is_some() {}
|
||||
}
|
||||
Loading…
Reference in New Issue