v0.2.1 - migrate to axum v0.7

This commit is contained in:
minish 2024-05-27 14:28:14 -04:00
parent 2e92ab4bf0
commit 3a853649ee
Signed by: min
GPG Key ID: FEECFF24EF0CE9E9
7 changed files with 501 additions and 390 deletions

822
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,8 @@ version = "0.2.0"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
axum = { version = "0.6.1", features = ["macros"] } axum = { version = "0.7.5", features = ["macros", "http2"] }
hyper = { version = "0.14", features = ["full"] } http = "1.1.0"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["full"] } tokio-util = { version = "0.7.4", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
@ -16,11 +16,11 @@ async-recursion = "1.0.0"
walkdir = "2" walkdir = "2"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
serde = { version = "1.0.189", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_with = "3.4.0"
toml = "0.8.2" toml = "0.8.2"
clap = { version = "4.4.6", features = ["derive"] } clap = { version = "4.4.6", features = ["derive"] }
serde_with = "3.4.0" anyhow = "1.0"
anyhow = "1.0.79"
dashmap = { version = "5.5.3", features = ["rayon", "inline"] } dashmap = { version = "5.5.3", features = ["rayon", "inline"] }
rayon = "1.8" rayon = "1.8"
atomic-time = "0.1.4" atomic-time = "0.1.4"

View File

@ -33,7 +33,7 @@ impl Disk {
/// Formats the path on disk for a `saved_name`. /// Formats the path on disk for a `saved_name`.
fn path_for(&self, saved_name: &str) -> PathBuf { fn path_for(&self, saved_name: &str) -> PathBuf {
let mut p = self.cfg.save_path.clone(); let mut p = self.cfg.save_path.clone();
p.push(&saved_name); p.push(saved_name);
p p
} }

View File

@ -6,7 +6,7 @@ use std::{
time::Duration, time::Duration,
}; };
use axum::extract::BodyStream; use axum::body::BodyDataStream;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use img_parts::{DynImage, ImageEXIF}; use img_parts::{DynImage, ImageEXIF};
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
@ -167,7 +167,7 @@ impl Engine {
saved_name: &str, saved_name: &str,
provided_len: usize, provided_len: usize,
mut use_cache: bool, mut use_cache: bool,
mut stream: BodyStream, mut stream: BodyDataStream,
lifetime: Option<Duration>, lifetime: Option<Duration>,
keep_exif: bool, keep_exif: bool,
) -> Result<(), axum::Error> { ) -> Result<(), axum::Error> {
@ -206,8 +206,8 @@ impl Engine {
// if we have an i/o task, send it off // if we have an i/o task, send it off
// also cloning this is okay because it's a Bytes // also cloning this is okay because it's a Bytes
if !coalesce_and_strip { if !coalesce_and_strip {
debug!("sending chunk to i/o task");
if let Some(ref tx) = tx { if let Some(ref tx) = tx {
debug!("sending chunk to i/o task");
let _ = tx.send(chunk.clone()).await; let _ = tx.send(chunk.clone()).await;
} }
} }
@ -248,8 +248,8 @@ impl Engine {
}; };
// send what we did over to the i/o task, all in one chunk // send what we did over to the i/o task, all in one chunk
debug!("sending filled buffer to i/o task");
if let Some(ref tx) = tx { if let Some(ref tx) = tx {
debug!("sending filled buffer to i/o task");
let _ = tx.send(data.clone()).await; let _ = tx.send(data.clone()).await;
} }
@ -281,7 +281,7 @@ impl Engine {
&self, &self,
ext: &str, ext: &str,
provided_len: usize, provided_len: usize,
stream: BodyStream, stream: BodyDataStream,
lifetime: Option<Duration>, lifetime: Option<Duration>,
keep_exif: bool, keep_exif: bool,
) -> Result<ProcessOutcome, axum::Error> { ) -> Result<ProcessOutcome, axum::Error> {

View File

@ -7,7 +7,7 @@ use axum::{
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
use tokio::{fs, signal}; use tokio::{fs, net::TcpListener, signal};
use tracing::{info, warn}; use tracing::{info, warn};
mod cache; mod cache;
@ -64,16 +64,14 @@ async fn main() {
.with_state(Arc::new(engine)); .with_state(Arc::new(engine));
// start web server // start web server
axum::Server::bind( let listener = TcpListener::bind(&cfg.http.listen_on)
&cfg.http .await
.listen_on .expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
.parse()
.expect("failed to parse listen_on address"), axum::serve(listener, app)
) .with_graceful_shutdown(shutdown_signal())
.serve(app.into_make_service()) .await
.with_graceful_shutdown(shutdown_signal()) .expect("failed to start server");
.await
.expect("failed to start server");
} }
async fn shutdown_signal() { async fn shutdown_signal() {

View File

@ -1,10 +1,10 @@
use std::{ffi::OsStr, path::PathBuf, sync::Arc, time::Duration}; use std::{ffi::OsStr, path::PathBuf, sync::Arc, time::Duration};
use axum::{ use axum::{
extract::{BodyStream, Query, State}, body::Body,
http::HeaderValue, extract::{Query, State},
}; };
use hyper::{header, HeaderMap, StatusCode}; use http::{header, HeaderMap, HeaderValue, StatusCode};
use serde::Deserialize; use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds}; use serde_with::{serde_as, DurationSeconds};
@ -35,7 +35,7 @@ pub async fn new(
State(engine): State<Arc<crate::engine::Engine>>, State(engine): State<Arc<crate::engine::Engine>>,
Query(req): Query<NewRequest>, Query(req): Query<NewRequest>,
headers: HeaderMap, headers: HeaderMap,
stream: BodyStream, body: Body,
) -> Result<String, StatusCode> { ) -> Result<String, StatusCode> {
// check upload key, if i need to // check upload key, if i need to
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key { if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
@ -62,6 +62,9 @@ pub async fn new(
.unwrap() .unwrap()
.unwrap_or(usize::MAX); .unwrap_or(usize::MAX);
// turn body into stream
let stream = Body::into_data_stream(body);
// pass it off to the engine to be processed! // pass it off to the engine to be processed!
match engine match engine
.process( .process(
@ -78,14 +81,12 @@ pub async fn new(
ProcessOutcome::Success(url) => Ok(url), ProcessOutcome::Success(url) => Ok(url),
// 413 Payload Too Large // 413 Payload Too Large
ProcessOutcome::TemporaryUploadTooLarge => { ProcessOutcome::TemporaryUploadTooLarge => Err(StatusCode::PAYLOAD_TOO_LARGE),
Err(StatusCode::PAYLOAD_TOO_LARGE)
}
// 400 Bad Request // 400 Bad Request
ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST), ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST),
}, },
// 500 Internal Server Error // 500 Internal Server Error
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR), Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
} }

View File

@ -3,12 +3,12 @@ use std::{
}; };
use axum::{ use axum::{
body::StreamBody, body::Body,
extract::{Path, State}, extract::{Path, State},
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use hyper::{http::HeaderValue, StatusCode}; use http::{HeaderValue, StatusCode};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use crate::engine::UploadData; use crate::engine::UploadData;
@ -31,11 +31,11 @@ impl IntoResponse for UploadData {
let content_length = HeaderValue::from_str(&len_str).unwrap(); let content_length = HeaderValue::from_str(&len_str).unwrap();
// create a streamed body response (we want to stream larger files) // create a streamed body response (we want to stream larger files)
let reader = ReaderStream::new(file); let stream = ReaderStream::new(file);
let stream = StreamBody::new(reader); let body = Body::from_stream(stream);
// extract mutable headers from the response // extract mutable headers from the response
let mut res = stream.into_response(); let mut res = body.into_response();
let headers = res.headers_mut(); let headers = res.headers_mut();
// clear headers, browser can imply content type // clear headers, browser can imply content type