This commit is contained in:
minish 2025-01-03 03:28:03 -05:00
parent 2e65f3744b
commit ea4f2a828c
Signed by: min
GPG Key ID: FEECFF24EF0CE9E9
10 changed files with 738 additions and 606 deletions

813
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,27 +1,31 @@
[package] [package]
name = "breeze" name = "breeze"
version = "0.2.6" version = "0.2.7"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
axum = { version = "0.7.5", features = ["macros", "http2"] } axum-extra = { version = "0.10.0", default-features = false, features = [
tower = "0.4.13" "tracing",
http = "1.1.0" "typed-header",
] }
axum = { version = "0.8.1", features = ["macros", "http2"] }
tower = "0.5"
http = "1.2"
headers = "0.4"
tokio = { version = "1", features = ["full"] } tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7.4", features = ["full"] } tokio-util = { version = "0.7", features = ["full"] }
tokio-stream = "0.1" tokio-stream = "0.1"
tracing = "0.1" tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
bytes = "1" bytes = "1"
async-recursion = "1.0.0"
rand = "0.8.5" rand = "0.8.5"
walkdir = "2" walkdir = "2"
anyhow = "1.0" anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_with = "3.4.0" serde_with = "3.12"
toml = "0.8.2" toml = "0.8.2"
argh = "0.1.12" argh = "0.1.12"
dashmap = { version = "5.5.3", features = ["rayon", "inline"] } dashmap = { version = "6.1.0", features = ["rayon", "inline"] }
rayon = "1.8" rayon = "1.8"
atomic-time = "0.1.4" atomic-time = "0.1.4"
img-parts = "0.3.0" img-parts = "0.3"

View File

@ -4,25 +4,24 @@ breeze is a simple, performant file upload server.
The primary instance is https://picture.wtf. The primary instance is https://picture.wtf.
## Features ## Features
Compared to the old Express.js backend, breeze has - Basic upload API tailored towards ShareX
- Streamed uploading - Streamed uploading
- Streamed downloading (on larger files) - Streamed downloading (on larger files)
- Upload caching - Upload caching in memory
- Generally faster speeds overall
- Temporary uploads - Temporary uploads
- Automatic exif data removal - Automatic exif data removal
At this time, breeze does not support encrypted uploads on disk.
## Installation ## Installation
I wrote breeze with the intention of running it in a container, but it runs just fine outside of one. On picture.wtf, breeze's primary instance, it is ran using a NixOS module. If you would like to do that too, it is provided by the Nix flake in this repository.
Either way, you need to start off by cloning the Git repository. It is very much possible to run and deploy breeze without doing that, though. Containerised and bare-metal deployments are also supported. Instructions for those are below.
To begin, clone the Git repository:
```bash ```bash
git clone https://git.min.rip/min/breeze.git git clone https://git.min.rip/min/breeze.git
``` ```
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below. You can start it using `docker compose up -d`. If you would like to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
``` ```
version: '3.6' version: '3.6'
@ -40,14 +39,14 @@ services:
ports: ports:
- 8383:8000 - 8383:8000
``` ```
For this configuration, it is expected that: With this configuration, it is expected that:
* there is a clone of the Git repository in the `./breeze` folder * there is a clone of the Git repository in the `./breeze` folder
* there is a `breeze.toml` config file in current directory * there is a `breeze.toml` config file in current directory
* there is a directory at `/srv/uploads` for storing uploads * there is a directory at `/srv/uploads` for storing uploads
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy) * port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user) * you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
It can also be installed directly if you have the Rust toolchain installed: It can also be installed directly if the Rust toolchain is installed:
```bash ```bash
cargo install --path . cargo install --path .
``` ```

View File

@ -67,9 +67,9 @@ pub struct Cache {
} }
impl Cache { impl Cache {
pub fn from_config(cfg: config::CacheConfig) -> Self { pub fn with_config(cfg: config::CacheConfig) -> Self {
Self { Self {
map: DashMap::with_capacity(256), map: DashMap::with_capacity(64),
length: AtomicUsize::new(0), length: AtomicUsize::new(0),
cfg, cfg,
@ -212,7 +212,7 @@ impl Cache {
/// Returns if an upload is able to be cached /// Returns if an upload is able to be cached
/// with the current caching rules /// with the current caching rules
#[inline(always)] #[inline(always)]
pub fn will_use(&self, length: usize) -> bool { pub fn will_use(&self, length: u64) -> bool {
length <= self.cfg.max_length length <= self.cfg.max_length
} }

View File

@ -35,7 +35,7 @@ pub struct EngineConfig {
/// Maximum size of an upload that will be accepted. /// Maximum size of an upload that will be accepted.
/// Files above this size can not be uploaded. /// Files above this size can not be uploaded.
pub max_upload_len: Option<usize>, pub max_upload_len: Option<u64>,
/// Maximum lifetime of a temporary upload /// Maximum lifetime of a temporary upload
#[serde_as(as = "DurationSeconds")] #[serde_as(as = "DurationSeconds")]
@ -43,7 +43,7 @@ pub struct EngineConfig {
/// Maximum length (in bytes) a file can be before the server will /// Maximum length (in bytes) a file can be before the server will
/// decide not to remove its EXIF data. /// decide not to remove its EXIF data.
pub max_strip_len: usize, pub max_strip_len: u64,
/// Motd displayed when the server's index page is visited. /// Motd displayed when the server's index page is visited.
/// ///
@ -64,7 +64,7 @@ pub struct DiskConfig {
pub struct CacheConfig { pub struct CacheConfig {
/// The maximum length in bytes that a file can be /// The maximum length in bytes that a file can be
/// before it skips cache (in seconds) /// before it skips cache (in seconds)
pub max_length: usize, pub max_length: u64,
/// The amount of time a file can last inside the cache (in seconds) /// The amount of time a file can last inside the cache (in seconds)
#[serde_as(as = "DurationSeconds")] #[serde_as(as = "DurationSeconds")]

View File

@ -4,7 +4,7 @@ use bytes::Bytes;
use tokio::{ use tokio::{
fs::File, fs::File,
io::{self, AsyncWriteExt}, io::{self, AsyncWriteExt},
sync::mpsc::{self, Receiver, Sender}, sync::mpsc,
}; };
use tracing::debug; use tracing::debug;
use walkdir::WalkDir; use walkdir::WalkDir;
@ -18,7 +18,7 @@ pub struct Disk {
} }
impl Disk { impl Disk {
pub fn from_config(cfg: config::DiskConfig) -> Self { pub fn with_config(cfg: config::DiskConfig) -> Self {
Self { cfg } Self { cfg }
} }
@ -40,7 +40,7 @@ impl Disk {
/// Try to open a file on disk, and if we didn't find it, /// Try to open a file on disk, and if we didn't find it,
/// then return [`None`]. /// then return [`None`].
pub async fn open(&self, saved_name: &str) -> Result<Option<File>, io::Error> { pub async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
let p = self.path_for(saved_name); let p = self.path_for(saved_name);
match File::open(p).await { match File::open(p).await {
@ -53,14 +53,22 @@ impl Disk {
} }
/// Get the size of an upload's file /// Get the size of an upload's file
pub async fn len(&self, f: &File) -> Result<usize, io::Error> { pub async fn len(&self, f: &File) -> io::Result<u64> {
Ok(f.metadata().await?.len() as usize) Ok(f.metadata().await?.len())
}
/// Remove an upload from disk.
pub async fn remove(&self, saved_name: &str) -> io::Result<()> {
let p = self.path_for(saved_name);
tokio::fs::remove_file(p).await
} }
/// Create a background I/O task /// Create a background I/O task
pub async fn start_save(&self, saved_name: &str) -> Sender<Bytes> { pub async fn start_save(&self, saved_name: &str) -> mpsc::UnboundedSender<Bytes> {
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way) // start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(256); let (tx, mut rx): (mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>) =
mpsc::unbounded_channel();
let p = self.path_for(saved_name); let p = self.path_for(saved_name);

View File

@ -1,4 +1,5 @@
use std::{ use std::{
ops::Bound,
sync::{ sync::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Arc,
@ -10,9 +11,12 @@ use axum::body::BodyDataStream;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use img_parts::{DynImage, ImageEXIF}; use img_parts::{DynImage, ImageEXIF};
use rand::distributions::{Alphanumeric, DistString}; use rand::distributions::{Alphanumeric, DistString};
use tokio::{fs::File, io::AsyncReadExt}; use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt},
};
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{debug, info}; use tracing::{debug, error, info};
use crate::{cache, config, disk}; use crate::{cache, config, disk};
@ -20,12 +24,18 @@ use crate::{cache, config, disk};
pub enum UploadData { pub enum UploadData {
/// Send back the data from memory /// Send back the data from memory
Cache(Bytes), Cache(Bytes),
/// Stream the file from disk to the client /// Stream the file from disk to the client
Disk(File, usize), Disk(tokio::io::Take<File>),
} }
/// Rejection outcomes of an [`Engine::process`] call pub struct UploadResponse {
pub full_len: u64,
pub range: (u64, u64),
pub data: UploadData,
}
/// Non-error outcomes of an [`Engine::process`] call.
/// Some are rejections.
pub enum ProcessOutcome { pub enum ProcessOutcome {
/// The upload was successful. /// The upload was successful.
/// We give the user their file's URL /// We give the user their file's URL
@ -41,26 +51,68 @@ pub enum ProcessOutcome {
TemporaryUploadLifetimeTooLong, TemporaryUploadLifetimeTooLong,
} }
/// breeze engine! this is the core of everything /// Non-error outcomes of an [`Engine::get`] call.
pub enum GetOutcome {
/// Successfully read upload.
Success(UploadResponse),
/// The upload was not found anywhere
NotFound,
/// A range was requested that exceeds an upload's bounds
RangeNotSatisfiable,
}
/// breeze engine
pub struct Engine { pub struct Engine {
/// Cached count of uploaded files. /// Cached count of uploaded files
pub upl_count: AtomicUsize, pub upl_count: AtomicUsize,
/// Engine configuration /// Engine configuration
pub cfg: config::EngineConfig, pub cfg: config::EngineConfig,
/// The in-memory cache that cached uploads are stored in. /// The in-memory cache that cached uploads are stored in
cache: Arc<cache::Cache>, cache: Arc<cache::Cache>,
/// An interface to the on-disk upload store /// An interface to the on-disk upload store
disk: disk::Disk, disk: disk::Disk,
} }
fn resolve_range(range: Option<headers::Range>, full_len: u64) -> Option<(u64, u64)> {
let last_byte = full_len - 1;
let (start, end) =
if let Some((start, end)) = range.and_then(|r| r.satisfiable_ranges(full_len).next()) {
// satisfiable_ranges will never return Excluded so this is ok
let start = if let Bound::Included(start_incl) = start {
start_incl
} else {
0
};
let end = if let Bound::Included(end_incl) = end {
end_incl
} else {
last_byte
};
(start, end)
} else {
(0, last_byte)
};
// catch ranges we can't satisfy
if end > last_byte || start > end {
return None;
}
Some((start, end))
}
impl Engine { impl Engine {
/// Creates a new instance of the breeze engine. /// Creates a new instance of the engine
pub fn from_config(cfg: config::EngineConfig) -> Self { pub fn with_config(cfg: config::EngineConfig) -> Self {
let cache = cache::Cache::from_config(cfg.cache.clone()); let cache = cache::Cache::with_config(cfg.cache.clone());
let disk = disk::Disk::from_config(cfg.disk.clone()); let disk = disk::Disk::with_config(cfg.disk.clone());
let cache = Arc::new(cache); let cache = Arc::new(cache);
@ -78,35 +130,40 @@ impl Engine {
} }
} }
/// Fetch an upload /// Fetch an upload.
/// ///
/// This will first try to read from cache, and then disk after. /// This will first try to read from cache, and then disk after.
/// If an upload is eligible to be cached, it will be cached and /// If an upload is eligible to be cached, it will be cached and
/// sent back as a cache response instead of a disk response. /// sent back as a cache response instead of a disk response.
pub async fn get(&self, saved_name: &str) -> anyhow::Result<Option<UploadData>> { ///
// check the cache first /// If there is a range, it is applied at the very end.
if let Some(u) = self.cache.get(saved_name) { pub async fn get(
return Ok(Some(UploadData::Cache(u))); &self,
} saved_name: &str,
range: Option<headers::Range>,
) -> anyhow::Result<GetOutcome> {
let data = if let Some(u) = self.cache.get(saved_name) {
u
} else {
// now, check if we have it on disk // now, check if we have it on disk
let mut f = if let Some(f) = self.disk.open(saved_name).await? { let mut f = if let Some(f) = self.disk.open(saved_name).await? {
f f
} else { } else {
// file didn't exist // file didn't exist
return Ok(None); return Ok(GetOutcome::NotFound);
}; };
let len = self.disk.len(&f).await?; let full_len = self.disk.len(&f).await?;
// can this be recached? // if possible, recache and send a cache response
if self.cache.will_use(len) { // else, send a disk response
if self.cache.will_use(full_len) {
// read file from disk // read file from disk
let mut full = BytesMut::with_capacity(len); let mut data = BytesMut::with_capacity(full_len.try_into()?);
// read file from disk and if it fails at any point, return 500 // read file from disk and if it fails at any point, return 500
loop { loop {
match f.read_buf(&mut full).await { match f.read_buf(&mut data).await {
Ok(n) => { Ok(n) => {
if n == 0 { if n == 0 {
break; break;
@ -116,17 +173,56 @@ impl Engine {
} }
} }
let full = full.freeze(); let data = data.freeze();
// re-insert it into cache // re-insert it into cache
self.cache.add(saved_name, full.clone()); self.cache.add(saved_name, data.clone());
return Ok(Some(UploadData::Cache(full))); data
} } else {
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
Ok(Some(UploadData::Disk(f, len))) range
} else {
return Ok(GetOutcome::RangeNotSatisfiable);
};
let range_len = (end - start) + 1;
f.seek(std::io::SeekFrom::Start(start)).await?;
let f = f.take(range_len);
let res = UploadResponse {
full_len,
range: (start, end),
data: UploadData::Disk(f),
};
return Ok(GetOutcome::Success(res));
}
};
let full_len = data.len() as u64;
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
range
} else {
return Ok(GetOutcome::RangeNotSatisfiable);
};
// cut down to range
let data = data.slice((start as usize)..=(end as usize));
// build response
let res = UploadResponse {
full_len,
range: (start, end),
data: UploadData::Cache(data),
};
Ok(GetOutcome::Success(res))
} }
/// Check if we have an upload stored anywhere.
///
/// This is only used to prevent `saved_name` collisions!!
/// It is not used to deliver "not found" errors.
pub async fn has(&self, saved_name: &str) -> bool { pub async fn has(&self, saved_name: &str) -> bool {
if self.cache.has(saved_name) { if self.cache.has(saved_name) {
return true; return true;
@ -143,10 +239,9 @@ impl Engine {
/// Generate a new saved name for an upload. /// Generate a new saved name for an upload.
/// ///
/// This will call itself recursively if it picks /// If it picks a name that already exists, it will try again.
/// a name that's already used. (it is rare)
#[async_recursion::async_recursion]
pub async fn gen_saved_name(&self, ext: &str) -> String { pub async fn gen_saved_name(&self, ext: &str) -> String {
loop {
// generate a 6-character alphanumeric string // generate a 6-character alphanumeric string
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6); let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
@ -157,13 +252,27 @@ impl Engine {
} }
if !self.has(&saved_name).await { if !self.has(&saved_name).await {
saved_name break saved_name;
} else { } else {
// we had a name collision! try again.. // there was a name collision. loop and try again
info!("name collision! saved_name= {}", saved_name); info!("name collision! saved_name= {}", saved_name);
self.gen_saved_name(ext).await
} }
} }
}
/// Wipe out an upload from all storage.
///
/// This is for deleting failed uploads only!!
pub async fn remove(&self, saved_name: &str) -> anyhow::Result<()> {
info!("!! removing upload: {saved_name}");
self.cache.remove(saved_name);
self.disk.remove(saved_name).await?;
info!("!! successfully removed upload");
Ok(())
}
/// Save a file to disk, and optionally cache. /// Save a file to disk, and optionally cache.
/// ///
@ -171,15 +280,15 @@ impl Engine {
pub async fn save( pub async fn save(
&self, &self,
saved_name: &str, saved_name: &str,
provided_len: usize, provided_len: u64,
mut use_cache: bool, mut use_cache: bool,
mut stream: BodyDataStream, mut stream: BodyDataStream,
lifetime: Option<Duration>, lifetime: Option<Duration>,
keep_exif: bool, keep_exif: bool,
) -> Result<(), anyhow::Error> { ) -> anyhow::Result<()> {
// if we're using cache, make some space to store the upload in // if we're using cache, make some space to store the upload in
let mut data = if use_cache { let mut data = if use_cache {
BytesMut::with_capacity(provided_len) BytesMut::with_capacity(provided_len.try_into()?)
} else { } else {
BytesMut::new() BytesMut::new()
}; };
@ -214,7 +323,7 @@ impl Engine {
if !coalesce_and_strip { if !coalesce_and_strip {
if let Some(ref tx) = tx { if let Some(ref tx) = tx {
debug!("sending chunk to i/o task"); debug!("sending chunk to i/o task");
tx.send(chunk.clone()).await?; tx.send(chunk.clone())?;
} }
} }
@ -256,7 +365,7 @@ impl Engine {
// send what we did over to the i/o task, all in one chunk // send what we did over to the i/o task, all in one chunk
if let Some(ref tx) = tx { if let Some(ref tx) = tx {
debug!("sending filled buffer to i/o task"); debug!("sending filled buffer to i/o task");
tx.send(data.clone()).await?; tx.send(data.clone())?;
} }
data data
@ -275,29 +384,24 @@ impl Engine {
}; };
} }
info!("finished processing upload!!");
// if all goes well, increment the cached upload counter
self.upl_count.fetch_add(1, Ordering::Relaxed);
Ok(()) Ok(())
} }
pub async fn process( pub async fn process(
&self, &self,
ext: &str, ext: &str,
provided_len: usize, provided_len: u64,
stream: BodyDataStream, stream: BodyDataStream,
lifetime: Option<Duration>, lifetime: Option<Duration>,
keep_exif: bool, keep_exif: bool,
) -> Result<ProcessOutcome, anyhow::Error> { ) -> anyhow::Result<ProcessOutcome> {
// if the upload size is greater than our max file size, deny it now // if the upload size is greater than our max file size, deny it now
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) { if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
return Ok(ProcessOutcome::UploadTooLarge); return Ok(ProcessOutcome::UploadTooLarge);
} }
// if the upload size is smaller than the specified maximum, we use the cache! // if the upload size is smaller than the specified maximum, we use the cache!
let use_cache: bool = self.cache.will_use(provided_len); let use_cache = self.cache.will_use(provided_len);
// if a temp file is too big for cache, reject it now // if a temp file is too big for cache, reject it now
if lifetime.is_some() && !use_cache { if lifetime.is_some() && !use_cache {
@ -313,7 +417,8 @@ impl Engine {
let saved_name = self.gen_saved_name(ext).await; let saved_name = self.gen_saved_name(ext).await;
// save it // save it
self.save( let save_result = self
.save(
&saved_name, &saved_name,
provided_len, provided_len,
use_cache, use_cache,
@ -321,11 +426,24 @@ impl Engine {
lifetime, lifetime,
keep_exif, keep_exif,
) )
.await?; .await;
// If anything fails, delete the upload and return the error
if save_result.is_err() {
error!("failed processing upload!");
self.remove(&saved_name).await?;
save_result?;
}
// format and send back the url // format and send back the url
let url = format!("{}/p/{}", self.cfg.base_url, saved_name); let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
// if all goes well, increment the cached upload counter
self.upl_count.fetch_add(1, Ordering::Relaxed);
info!("finished processing upload!");
Ok(ProcessOutcome::Success(url)) Ok(ProcessOutcome::Success(url))
} }
} }

View File

@ -28,10 +28,10 @@ struct Args {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// read & parse args // Read & parse args
let args: Args = argh::from_env(); let args: Args = argh::from_env();
// read & parse config // Read & parse config
let cfg: config::Config = { let cfg: config::Config = {
let config_str = fs::read_to_string(args.config).await.expect( let config_str = fs::read_to_string(args.config).await.expect(
"failed to read config file! make sure it exists and you have read permissions", "failed to read config file! make sure it exists and you have read permissions",
@ -42,38 +42,38 @@ async fn main() {
}) })
}; };
// Set up tracing
tracing_subscriber::fmt() tracing_subscriber::fmt()
.with_max_level(cfg.logger.level) .with_max_level(cfg.logger.level)
.init(); .init();
// Check config
{ {
let save_path = cfg.engine.disk.save_path.clone(); let save_path = cfg.engine.disk.save_path.clone();
if !save_path.exists() || !save_path.is_dir() { if !save_path.exists() || !save_path.is_dir() {
panic!("the save path does not exist or is not a directory! this is invalid"); panic!("the save path does not exist or is not a directory! this is invalid");
} }
} }
if cfg.engine.upload_key.is_empty() { if cfg.engine.upload_key.is_empty() {
warn!("engine upload_key is empty! no key will be required for uploading new files"); warn!("engine upload_key is empty! no key will be required for uploading new files");
} }
// create engine // Create engine
let engine = Engine::from_config(cfg.engine); let engine = Engine::with_config(cfg.engine);
// build main router // Build main router
let app = Router::new() let app = Router::new()
.route("/new", post(new::new)) .route("/new", post(new::new))
.route("/p/:name", get(view::view)) .route("/p/{saved_name}", get(view::view))
.route("/", get(index::index)) .route("/", get(index::index))
.route("/robots.txt", get(index::robots_txt)) .route("/robots.txt", get(index::robots_txt))
.with_state(Arc::new(engine)); .with_state(Arc::new(engine));
// start web server // Start web server
info!("starting server.");
let listener = TcpListener::bind(&cfg.http.listen_on) let listener = TcpListener::bind(&cfg.http.listen_on)
.await .await
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound"); .expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
info!("starting server.");
axum::serve(listener, app) axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal()) .with_graceful_shutdown(shutdown_signal())
.await .await

View File

@ -4,7 +4,9 @@ use axum::{
body::Body, body::Body,
extract::{Query, State}, extract::{Query, State},
}; };
use http::{header, HeaderMap, HeaderValue, StatusCode}; use axum_extra::TypedHeader;
use headers::ContentLength;
use http::StatusCode;
use serde::Deserialize; use serde::Deserialize;
use serde_with::{serde_as, DurationSeconds}; use serde_with::{serde_as, DurationSeconds};
@ -34,7 +36,7 @@ pub struct NewRequest {
pub async fn new( pub async fn new(
State(engine): State<Arc<crate::engine::Engine>>, State(engine): State<Arc<crate::engine::Engine>>,
Query(req): Query<NewRequest>, Query(req): Query<NewRequest>,
headers: HeaderMap, TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
body: Body, body: Body,
) -> Result<String, StatusCode> { ) -> Result<String, StatusCode> {
// check upload key, if i need to // check upload key, if i need to
@ -53,19 +55,14 @@ pub async fn new(
.unwrap_or_default() .unwrap_or_default()
.to_string(); .to_string();
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
let content_length = headers
.get(header::CONTENT_LENGTH)
.unwrap_or(&HeaderValue::from_static(""))
.to_str()
.map(|s| s.parse::<usize>())
.unwrap()
.unwrap_or(usize::MAX);
// turn body into stream // turn body into stream
let stream = Body::into_data_stream(body); let stream = Body::into_data_stream(body);
// pass it off to the engine to be processed! // pass it off to the engine to be processed
// --
// also, error responses here don't get represented properly in ShareX most of the time
// they don't expect the connection to close before they're done uploading, i think
// so it will just present the user with a "connection closed" error
match engine match engine
.process( .process(
&extension, &extension,

View File

@ -6,10 +6,12 @@ use axum::{
response::{IntoResponse, Response}, response::{IntoResponse, Response},
}; };
use axum_extra::TypedHeader;
use headers::Range;
use http::{HeaderValue, StatusCode}; use http::{HeaderValue, StatusCode};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use crate::engine::UploadData; use crate::engine::{GetOutcome, UploadData, UploadResponse};
/// Responses for a failed view operation /// Responses for a failed view operation
pub enum ViewError { pub enum ViewError {
@ -18,80 +20,91 @@ pub enum ViewError {
/// Will send status code 500 with a plaintext "internal server error" message. /// Will send status code 500 with a plaintext "internal server error" message.
InternalServerError, InternalServerError,
}
impl IntoResponse for UploadData { /// Sends status code 206 with a plaintext "range not satisfiable" message.
fn into_response(self) -> Response { RangeNotSatisfiable,
match self {
UploadData::Disk(file, len) => {
// create our content-length header
let len_str = len.to_string();
let content_length = HeaderValue::from_str(&len_str).unwrap();
// create a streamed body response (we want to stream larger files)
let stream = ReaderStream::new(file);
let body = Body::from_stream(stream);
// extract mutable headers from the response
let mut res = body.into_response();
let headers = res.headers_mut();
// clear headers, browser can imply content type
headers.clear();
// insert Content-Length header
// that way the browser shows how big a file is when it's being downloaded
headers.insert("Content-Length", content_length);
res
}
UploadData::Cache(data) => {
// extract mutable headers from the response
let mut res = data.into_response();
let headers = res.headers_mut();
// clear the headers, let the browser imply it
headers.clear();
res
}
}
}
} }
impl IntoResponse for ViewError { impl IntoResponse for ViewError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
match self { match self {
ViewError::NotFound => ( ViewError::NotFound => (StatusCode::NOT_FOUND, "Not found!").into_response(),
StatusCode::NOT_FOUND,
"not found!"
).into_response(),
ViewError::InternalServerError => ( ViewError::InternalServerError => {
StatusCode::INTERNAL_SERVER_ERROR, (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error!").into_response()
"internal server error!" }
).into_response(),
ViewError::RangeNotSatisfiable => {
(StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable!").into_response()
}
} }
} }
} }
/// The request handler for /p/* path. impl IntoResponse for UploadResponse {
fn into_response(self) -> Response {
let (start, end) = self.range;
let range_len = (end - start) + 1;
let mut res = match self.data {
UploadData::Cache(data) => data.into_response(),
UploadData::Disk(file) => {
let reader_stream = ReaderStream::new(file);
let body = Body::from_stream(reader_stream);
let mut res = body.into_response();
let headers = res.headers_mut();
// add Content-Length header so the browser shows how big a file is when it's being downloaded
let content_length = HeaderValue::from_str(&range_len.to_string())
.expect("construct content-length header failed");
headers.insert("Content-Length", content_length);
res
}
};
let headers = res.headers_mut();
// remove content-type, browser can imply content type
headers.remove("Content-Type");
headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));
// ^-- indicate that byte ranges are supported. maybe unneeded, but probably good
// if it is not the full size, add relevant headers/status for range request
if range_len != self.full_len {
let content_range =
HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end, self.full_len))
.expect("construct content-range header failed");
headers.insert("Content-Range", content_range);
*res.status_mut() = StatusCode::PARTIAL_CONTENT;
}
res
}
}
/// GET request handler for /p/* path.
/// All file views are handled here. /// All file views are handled here.
#[axum::debug_handler] #[axum::debug_handler]
pub async fn view( pub async fn view(
State(engine): State<Arc<crate::engine::Engine>>, State(engine): State<Arc<crate::engine::Engine>>,
Path(original_path): Path<PathBuf>, Path(original_path): Path<PathBuf>,
) -> Result<UploadData, ViewError> { range: Option<TypedHeader<Range>>,
) -> Result<UploadResponse, ViewError> {
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) { let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
n n
} else { } else {
return Err(ViewError::NotFound); return Err(ViewError::NotFound);
}; };
// get result from the engine! let range = range.map(|th| th.0);
match engine.get(saved_name).await {
Ok(Some(u)) => Ok(u), // get result from the engine
Ok(None) => Err(ViewError::NotFound), match engine.get(saved_name, range).await {
Ok(GetOutcome::Success(res)) => Ok(res),
Ok(GetOutcome::NotFound) => Err(ViewError::NotFound),
Ok(GetOutcome::RangeNotSatisfiable) => Err(ViewError::RangeNotSatisfiable),
Err(_) => Err(ViewError::InternalServerError), Err(_) => Err(ViewError::InternalServerError),
} }
} }