use crate::cache::{SessionCache, SessionKey}; use crate::{key_index, HttpsLayerSettings, MaybeHttpsStream}; use antidote::Mutex; use boring::error::ErrorStack; use boring::ssl::{ ConnectConfiguration, Ssl, SslConnector, SslConnectorBuilder, SslMethod, SslRef, SslSessionCacheMode, }; use http::uri::Scheme; use http::Uri; use hyper::rt::{Read, ReadBufCursor, Write}; use hyper_util::client::legacy::connect::{Connected, Connection, HttpConnector}; use hyper_util::rt::TokioIo; use std::error::Error; use std::fmt; use std::future::Future; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; use std::{io, net}; use tokio::io::{AsyncRead, AsyncWrite}; use tower_layer::Layer; use tower_service::Service; /// A Connector using BoringSSL to support `http` and `https` schemes. #[derive(Clone)] pub struct HttpsConnector { http: T, inner: Inner, } impl HttpsConnector { /// Creates a a new `HttpsConnector` using default settings. /// /// The Hyper `HttpConnector` is used to perform the TCP socket connection. ALPN is configured to support both /// HTTP/2 and HTTP/1.1. pub fn new() -> Result, ErrorStack> { let mut http = HttpConnector::new(); http.enforce_http(false); HttpsLayer::new().map(|l| l.layer(http)) } } impl HttpsConnector where S: Service> + Send, S::Error: Into>, S::Future: Unpin + Send + 'static, T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, { /// Creates a new `HttpsConnector`. /// /// The session cache configuration of `ssl` will be overwritten. pub fn with_connector( http: S, ssl: SslConnectorBuilder, ) -> Result, ErrorStack> { HttpsLayer::with_connector(ssl).map(|l| l.layer(http)) } /// Registers a callback which can customize the configuration of each connection. /// /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] /// instead. pub fn set_callback(&mut self, callback: F) where F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, { self.inner.callback = Some(Arc::new(callback)); } /// Registers a callback which can customize the `Ssl` of each connection. pub fn set_ssl_callback(&mut self, callback: F) where F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, { self.inner.ssl_callback = Some(Arc::new(callback)); } } /// A layer which wraps services in an `HttpsConnector`. pub struct HttpsLayer { inner: Inner, } #[derive(Clone)] struct Inner { ssl: SslConnector, cache: Arc>, callback: Option, ssl_callback: Option, } type Callback = Arc Result<(), ErrorStack> + Sync + Send>; type SslCallback = Arc Result<(), ErrorStack> + Sync + Send>; impl HttpsLayer { /// Creates a new `HttpsLayer` with default settings. /// /// ALPN is configured to support both HTTP/1 and HTTP/1.1. pub fn new() -> Result { let mut ssl = SslConnector::builder(SslMethod::tls())?; ssl.set_alpn_protos(b"\x02h2\x08http/1.1")?; Self::with_connector(ssl) } /// Creates a new `HttpsLayer`. /// /// The session cache configuration of `ssl` will be overwritten. pub fn with_connector(ssl: SslConnectorBuilder) -> Result { Self::with_connector_and_settings(ssl, Default::default()) } /// Creates a new `HttpsLayer` with settings pub fn with_connector_and_settings( mut ssl: SslConnectorBuilder, settings: HttpsLayerSettings, ) -> Result { let cache = Arc::new(Mutex::new(SessionCache::with_capacity( settings.session_cache_capacity, ))); ssl.set_session_cache_mode(SslSessionCacheMode::CLIENT); ssl.set_new_session_callback({ let cache = cache.clone(); move |ssl, session| { if let Some(key) = key_index().ok().and_then(|idx| ssl.ex_data(idx)) { cache.lock().insert(key.clone(), session); } } }); Ok(HttpsLayer { inner: Inner { ssl: ssl.build(), cache, callback: None, ssl_callback: None, }, }) } /// Registers a callback which can customize the configuration of each connection. /// /// Unsuitable to change verify hostflags (with `config.param_mut().set_hostflags(…)`), /// as they are reset after the callback is executed. Use [`Self::set_ssl_callback`] /// instead. pub fn set_callback(&mut self, callback: F) where F: Fn(&mut ConnectConfiguration, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, { self.inner.callback = Some(Arc::new(callback)); } /// Registers a callback which can customize the `Ssl` of each connection. pub fn set_ssl_callback(&mut self, callback: F) where F: Fn(&mut SslRef, &Uri) -> Result<(), ErrorStack> + 'static + Sync + Send, { self.inner.ssl_callback = Some(Arc::new(callback)); } } impl Layer for HttpsLayer { type Service = HttpsConnector; fn layer(&self, inner: S) -> HttpsConnector { HttpsConnector { http: inner, inner: self.inner.clone(), } } } impl Inner { fn setup_ssl(&self, uri: &Uri, host: &str) -> Result { let mut conf = self.ssl.configure()?; if let Some(ref callback) = self.callback { callback(&mut conf, uri)?; } let key = SessionKey { host: host.to_string(), port: uri.port_u16().unwrap_or(443), }; if let Some(session) = self.cache.lock().get(&key) { unsafe { conf.set_session(&session)?; } } let idx = key_index()?; conf.set_ex_data(idx, key); let mut ssl = conf.into_ssl(host)?; if let Some(ref ssl_callback) = self.ssl_callback { ssl_callback(&mut ssl, uri)?; } Ok(ssl) } } impl Service for HttpsConnector where S: Service> + Send, S::Error: Into>, S::Future: Unpin + Send + 'static, T: AsyncRead + AsyncWrite + Connection + Unpin + fmt::Debug + Sync + Send + 'static, { type Response = MaybeHttpsStream; type Error = Box; #[allow(clippy::type_complexity)] type Future = Pin> + Send>>; fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { self.http.poll_ready(cx).map_err(Into::into) } fn call(&mut self, uri: Uri) -> Self::Future { let is_tls_scheme = uri .scheme() .map(|s| s == &Scheme::HTTPS || s.as_str() == "wss") .unwrap_or(false); let tls_setup = if is_tls_scheme { Some((self.inner.clone(), uri.clone())) } else { None }; let connect = self.http.call(uri); let f = async { let conn = connect.await.map_err(Into::into)?.into_inner(); let (inner, uri) = match tls_setup { Some((inner, uri)) => (inner, uri), None => return Ok(MaybeHttpsStream::Http(conn)), }; let mut host = uri.host().ok_or("URI missing host")?; // If `host` is an IPv6 address, we must strip away the square brackets that surround // it (otherwise, boring will fail to parse the host as an IP address, eventually // causing the handshake to fail due a hostname verification error). if !host.is_empty() { let last = host.len() - 1; let mut chars = host.chars(); if let (Some('['), Some(']')) = (chars.next(), chars.last()) { if host[1..last].parse::().is_ok() { host = &host[1..last]; } } } let ssl = inner.setup_ssl(&uri, host)?; let stream = tokio_boring::SslStreamBuilder::new(ssl, conn) .connect() .await?; Ok(MaybeHttpsStream::Https(stream)) }; Box::pin(f) } } impl Connection for MaybeHttpsStream where T: Connection, { fn connected(&self) -> Connected { match self { MaybeHttpsStream::Http(s) => s.connected(), MaybeHttpsStream::Https(s) => { let mut connected = s.get_ref().connected(); if s.ssl().selected_alpn_protocol() == Some(b"h2") { connected = connected.negotiated_h2(); } connected } } } } impl Read for MaybeHttpsStream where T: AsyncRead + AsyncWrite + Unpin, { fn poll_read( mut self: Pin<&mut Self>, cx: &mut Context<'_>, buf: ReadBufCursor<'_>, ) -> Poll> { match &mut *self { MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf), MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_read(cx, buf), } } } impl Write for MaybeHttpsStream where T: AsyncRead + AsyncWrite + Unpin, { fn poll_write( mut self: Pin<&mut Self>, ctx: &mut Context<'_>, buf: &[u8], ) -> Poll> { match &mut *self { MaybeHttpsStream::Http(inner) => { Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf) } MaybeHttpsStream::Https(inner) => { Pin::new(&mut TokioIo::new(inner)).poll_write(ctx, buf) } } } fn poll_flush(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { match &mut *self { MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx), MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_flush(ctx), } } fn poll_shutdown(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { match &mut *self { MaybeHttpsStream::Http(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx), MaybeHttpsStream::Https(inner) => Pin::new(&mut TokioIo::new(inner)).poll_shutdown(ctx), } } }