203 lines
5.7 KiB
Rust
203 lines
5.7 KiB
Rust
//! This example demonstrates an HTTP client that requests files from a server.
|
|
//!
|
|
//! Checkout the `README.md` for guidance.
|
|
|
|
use anyhow::{anyhow, Result};
|
|
use boring::x509::X509;
|
|
use bytes::BytesMut;
|
|
use bytes::{Buf, Bytes};
|
|
use clap::Parser;
|
|
use http::Uri;
|
|
use http_body::Body;
|
|
use http_body_util::BodyExt;
|
|
use quinn_boring2::QuicSslContext;
|
|
use std::{
|
|
fs,
|
|
net::{SocketAddr, ToSocketAddrs},
|
|
path::PathBuf,
|
|
pin::Pin,
|
|
sync::Arc,
|
|
task::{Context, Poll},
|
|
time::Instant,
|
|
};
|
|
use tracing::Level;
|
|
use url::Url;
|
|
|
|
/// HTTP/0.9 over QUIC client
|
|
#[derive(Parser, Debug)]
|
|
#[clap(name = "client")]
|
|
struct Opt {
|
|
/// Log level e.g. trace, debug, info, warn, error
|
|
#[clap(long, default_value = "info")]
|
|
log: Level,
|
|
|
|
url: Url,
|
|
|
|
/// Override hostname used for certificate verification
|
|
#[clap(long)]
|
|
host: Option<String>,
|
|
|
|
/// Custom certificate authority to trust, in DER format
|
|
#[clap(long)]
|
|
ca: Option<PathBuf>,
|
|
|
|
/// Simulate NAT rebinding after connecting
|
|
#[clap(long)]
|
|
rebind: bool,
|
|
|
|
/// Address to bind on
|
|
#[clap(long, default_value = "[::]:0")]
|
|
bind: SocketAddr,
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
tracing_subscriber::fmt()
|
|
.with_max_level(tracing::Level::INFO)
|
|
.init();
|
|
|
|
let options = Opt::parse();
|
|
|
|
let url = options.url;
|
|
let url_host = strip_ipv6_brackets(url.host_str().unwrap());
|
|
let remote = (url_host, url.port().unwrap_or(443))
|
|
.to_socket_addrs()?
|
|
.next()
|
|
.ok_or_else(|| anyhow!("couldn't resolve to an address"))?;
|
|
|
|
let mut client_crypto = quinn_boring2::ClientConfig::new()?;
|
|
if let Some(ca_path) = options.ca {
|
|
client_crypto
|
|
.ctx_mut()
|
|
.cert_store_mut()
|
|
.add_cert(X509::from_der(&fs::read(ca_path)?)?)?;
|
|
} else {
|
|
client_crypto
|
|
.ctx_mut()
|
|
.cert_store_mut()
|
|
.set_default_paths()?;
|
|
}
|
|
|
|
let mut endpoint = quinn_boring2::helpers::client_endpoint(options.bind)?;
|
|
endpoint.set_default_client_config(quinn::ClientConfig::new(Arc::new(client_crypto)));
|
|
|
|
let start = Instant::now();
|
|
let rebind = options.rebind;
|
|
let host = options.host.as_deref().unwrap_or(url_host);
|
|
|
|
tracing::info!("connecting to {host} at {remote}");
|
|
let conn = endpoint
|
|
.connect(remote, host)?
|
|
.await
|
|
.map_err(|e| anyhow!("failed to connect: {}", e))?;
|
|
tracing::info!("connected at {:?}", start.elapsed());
|
|
|
|
if rebind {
|
|
let socket = std::net::UdpSocket::bind("[::]:0").unwrap();
|
|
let addr = socket.local_addr().unwrap();
|
|
tracing::info!("rebinding to {addr}");
|
|
endpoint.rebind(socket).expect("rebind failed");
|
|
}
|
|
|
|
let (mut h3_conn, mut tx) = h3::client::new(h3_quinn::Connection::new(conn)).await?;
|
|
|
|
let req = http::Request::get(Uri::try_from(url.as_str())?).body(())?;
|
|
let (mut send, mut recv) = tx.send_request(req).await?.split();
|
|
if let Err(e) = send.finish().await {
|
|
tracing::error!("failed to send request: {e}");
|
|
}
|
|
|
|
let resp = {
|
|
let resp = recv.recv_response().await?;
|
|
let resp_body = Incoming::new(recv, resp.headers())
|
|
.map_err(Into::<BoxError>::into)
|
|
.boxed();
|
|
resp.map(|_| resp_body)
|
|
};
|
|
tracing::info!("response: {:#?}", resp);
|
|
|
|
let body = BodyExt::collect(resp.into_body())
|
|
.await
|
|
.map(|buf| buf.to_bytes())
|
|
.map_err(|e| anyhow!("failed to collect response body: {e}"))?;
|
|
tracing::info!("response body: {}", String::from_utf8_lossy(&body));
|
|
|
|
h3_conn.shutdown(0).await?;
|
|
Ok(())
|
|
}
|
|
|
|
fn strip_ipv6_brackets(host: &str) -> &str {
|
|
// An ipv6 url looks like eg https://[::1]:4433/Cargo.toml, wherein the host [::1] is the
|
|
// ipv6 address ::1 wrapped in brackets, per RFC 2732. This strips those.
|
|
if host.starts_with('[') && host.ends_with(']') {
|
|
&host[1..host.len() - 1]
|
|
} else {
|
|
host
|
|
}
|
|
}
|
|
|
|
type BoxError = Box<dyn std::error::Error + Send + Sync>;
|
|
|
|
struct Incoming<S, B> {
|
|
inner: h3::client::RequestStream<S, B>,
|
|
content_length: Option<u64>,
|
|
}
|
|
|
|
impl<S, B> Incoming<S, B> {
|
|
fn new(stream: h3::client::RequestStream<S, B>, headers: &http::header::HeaderMap) -> Self {
|
|
Self {
|
|
inner: stream,
|
|
content_length: headers
|
|
.get(http::header::CONTENT_LENGTH)
|
|
.and_then(|h| h.to_str().ok())
|
|
.and_then(|v| v.parse().ok()),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<S, B> http_body::Body for Incoming<S, B>
|
|
where
|
|
S: h3::quic::RecvStream,
|
|
{
|
|
type Data = Bytes;
|
|
type Error = BoxError;
|
|
|
|
fn poll_frame(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut Context,
|
|
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
|
|
match futures_core::ready!(self.inner.poll_recv_data(cx)) {
|
|
Ok(Some(mut b)) => Poll::Ready(Some(Ok(http_body::Frame::data(
|
|
b.copy_to_bytes(b.remaining()),
|
|
)))),
|
|
Ok(None) => Poll::Ready(None),
|
|
Err(e) => Poll::Ready(Some(Err(e.into()))),
|
|
}
|
|
}
|
|
|
|
fn size_hint(&self) -> http_body::SizeHint {
|
|
if let Some(content_length) = self.content_length {
|
|
http_body::SizeHint::with_exact(content_length)
|
|
} else {
|
|
http_body::SizeHint::default()
|
|
}
|
|
}
|
|
}
|
|
|
|
pub async fn body_to_string<B>(
|
|
mut body: B,
|
|
) -> Result<String, Box<dyn std::error::Error + Send + Sync>>
|
|
where
|
|
B: Body<Data = bytes::Bytes> + Unpin,
|
|
B::Error: std::error::Error + Send + Sync + 'static,
|
|
{
|
|
let mut buf = BytesMut::new();
|
|
while let Some(frame) = body.frame().await {
|
|
let frame = frame?;
|
|
if let Some(data) = frame.data_ref() {
|
|
buf.extend_from_slice(data);
|
|
}
|
|
}
|
|
Ok(String::from_utf8(buf.to_vec())?)
|
|
}
|