diff --git a/Cargo.lock b/Cargo.lock index f61015f..4742c74 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,8 +106,11 @@ dependencies = [ "axum", "bytes", "hyper", + "log", "memory-cache-rs", + "mime_guess", "rand", + "simplelog", "tokio", "tokio-stream", "tokio-util", @@ -385,6 +388,16 @@ version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "mio" version = "0.8.5" @@ -407,6 +420,15 @@ dependencies = [ "libc", ] +[[package]] +name = "num_threads" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" +dependencies = [ + "libc", +] + [[package]] name = "once_cell" version = "1.16.0" @@ -602,6 +624,17 @@ dependencies = [ "libc", ] +[[package]] +name = "simplelog" +version = "0.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786" +dependencies = [ + "log", + "termcolor", + "time", +] + [[package]] name = "slab" version = "0.4.7" @@ -644,6 +677,44 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8" +[[package]] +name = "termcolor" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bab24d30b911b2376f3a13cc2cd443142f0c81dda04c118693e35b3835757755" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "time" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" +dependencies = [ + "itoa", + "libc", + "num_threads", + "serde", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" + +[[package]] +name = "time-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" +dependencies = [ + "time-core", +] + [[package]] name = "tokio" version = "1.22.0" @@ -778,6 +849,15 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-ident" version = "1.0.5" @@ -822,6 +902,15 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi", +] + [[package]] name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" diff --git a/Cargo.toml b/Cargo.toml index 350ba99..ce57bca 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,4 +14,7 @@ tokio-stream = "0.1" tower = "0.4.13" bytes = "1" rand = "0.8.5" +log = "0.4" +simplelog = "^0.12.0" +mime_guess = "2.0.4" memory-cache-rs = "0.2.0" diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000..6552f7b --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,42 @@ +use std::{ffi::OsStr, path::PathBuf, sync::atomic::AtomicUsize, time::Duration}; + +use axum::{ + extract::BodyStream, + http::HeaderValue, + response::{IntoResponse, Response}, +}; +use bytes::{Bytes, BytesMut}; +use memory_cache::MemoryCache; +use mime_guess::mime; + +pub const MAX_LENGTH: usize = 80_000_000; +pub const DURATION: Duration = Duration::from_secs(8); +pub const FULL_SCAN_FREQ: Duration = Duration::from_secs(1); + +pub fn get_response(cache: &mut MemoryCache, original_path: PathBuf) -> Response { + let name = original_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + let cache_item = cache.get(&name.clone()); + + let data = cache_item.unwrap().clone(); + + let content_type = mime_guess::from_path(original_path) + .first() + .unwrap_or(mime::APPLICATION_OCTET_STREAM) + .to_string(); + + let mut res = data.into_response(); + let headers = res.headers_mut(); + + headers.clear(); + headers.insert( + "content-type", + HeaderValue::from_str(content_type.as_str()).unwrap(), + ); + + return res; +} diff --git a/src/main.rs b/src/main.rs index fea0105..f7a85e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,13 @@ use std::sync::Arc; extern crate axum; +#[macro_use] +extern crate log; + +extern crate simplelog; + +use simplelog::*; + use axum::{ routing::{get, post}, Router, @@ -10,24 +17,38 @@ use bytes::Bytes; use memory_cache::MemoryCache; use tokio::sync::Mutex; -mod state; +mod cache; mod new; +mod state; mod view; #[tokio::main] async fn main() { - let mut cache: MemoryCache = MemoryCache::new(); + // initialise logger + TermLogger::init( + LevelFilter::Debug, + Config::default(), + TerminalMode::Mixed, + ColorChoice::Auto, + ) + .unwrap(); + // create cache + let cache: MemoryCache = MemoryCache::with_full_scan(cache::FULL_SCAN_FREQ); + + // create appstate let state = state::AppState { - cache: Mutex::new(cache) + cache: Mutex::new(cache), }; + // build main router let app = Router::new() .route("/new", post(new::new)) .route("/p/:name", get(view::view)) .route("/", get(index)) .with_state(Arc::new(state)); + // start web server axum::Server::bind(&"127.0.0.1:8000".parse().unwrap()) // don't forget to change this! it's local for now .serve(app.into_make_service()) .await diff --git a/src/new.rs b/src/new.rs index 44f4162..7a9faac 100644 --- a/src/new.rs +++ b/src/new.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, ffi::OsStr, io::Read, path::PathBuf, sync::Arc, time::Duration}; +use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc}; use axum::{ extract::{BodyStream, Query, State}, @@ -14,6 +14,8 @@ use tokio::{ }; use tokio_stream::StreamExt; +use crate::cache; + // create an upload name from an original file name fn gen_path(original_name: &String) -> PathBuf { // extract extension from original name @@ -63,83 +65,92 @@ pub async fn new( .file_name() .and_then(OsStr::to_str) .unwrap_or_default() - .to_string(); // i hope this never happens. that would suck + .to_string(); + + // if we fail generating a name, stop now + if name.is_empty() { + return Err(StatusCode::INTERNAL_SERVER_ERROR); + } let url = format!("http://127.0.0.1:8000/p/{}", name); - // process the upload in the background so i can send the URL back immediately! - // this isn't supported by ShareX (it waits for the request to complete before handling the response) + // get the content length, and if parsing it fails, assume it's really big so it doesn't cache + let content_length = headers + .get(header::CONTENT_LENGTH) + .unwrap_or(&HeaderValue::from_static("")) + .to_str() + .and_then(|s| Ok(usize::from_str_radix(s, 10))) + .unwrap() + .unwrap_or(usize::MAX); + + // if the upload size exceeds 80 MB, we skip caching! + // previously, i was going to use redis with a 500 MB max (redis's is 512MiB) + // with or without redis, 500 MB is still a bit much.. + // it could probably be read from disk before anyone could fully download it + let mut use_cache = content_length < cache::MAX_LENGTH; + + info!( + target: "new", + "received an upload! content length: {}, using cache: {}", + content_length, use_cache + ); + + // create file to save upload to + let mut file = File::create(path) + .await + .expect("could not open file! make sure your upload path exists"); + + // if we're using cache, make some space to store the upload in + let mut data = if use_cache { + BytesMut::with_capacity(content_length) + } else { + BytesMut::new() + }; + + // start a task that handles saving files to disk (we can save to cache/disk in parallel that way) + let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); + tokio::spawn(async move { - // get the content length, and if parsing it fails, assume it's really big - // it may be better to make it fully content-length not required because this feels kind of redundant - let content_length = headers - .get(header::CONTENT_LENGTH) - .unwrap_or(&HeaderValue::from_static("")) - .to_str() - .and_then(|s| Ok(usize::from_str_radix(s, 10))) - .unwrap() - .unwrap_or(usize::MAX); - - // if the upload size exceeds 80 MB, we skip caching! - // previously, i was going to use redis with a 500 MB max (redis's is 512MiB) - // with or without redis, 500 MB is still a bit much.. - // it could probably be read from disk before anyone could fully download it - let mut use_cache = content_length < 80_000_000; - - println!( - "[upl] content length: {} using cache: {}", - content_length, use_cache - ); - - // create file to save upload to - let mut file = File::create(path) - .await - .expect("could not open file! make sure your upload path exists"); - - let mut data: BytesMut = if use_cache { - BytesMut::with_capacity(content_length) - } else { - BytesMut::new() - }; - - let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); - - tokio::spawn(async move { - while let Some(chunk) = rx.recv().await { - println!("[io] received new chunk"); - file.write_all(&chunk) - .await - .expect("error while writing file to disk"); - } - }); - - while let Some(chunk) = stream.next().await { - let chunk = chunk.unwrap(); - - println!("[upl] sending data to io task"); - tx.send(chunk.clone()).await.unwrap(); - - if use_cache { - println!("[upl] receiving data into buffer"); - if data.len() + chunk.len() > data.capacity() { - println!("[upl] too much data! the client had an invalid content-length!"); - - // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) - data = BytesMut::new(); - use_cache = false; - } else { - data.put(chunk); - } - } - } - - let mut cache = state.cache.lock().await; - - if use_cache { - println!("[upl] caching upload!!"); - cache.insert(name, data.freeze(), Some(Duration::from_secs(120))); + // receive chunks and save them to file + while let Some(chunk) = rx.recv().await { + debug!(target: "new", "writing chunk to disk (length: {})", chunk.len()); + file.write_all(&chunk) + .await + .expect("error while writing file to disk"); } }); + // read and save upload + while let Some(chunk) = stream.next().await { + let chunk = chunk.unwrap(); + + // send chunk to io task + debug!(target: "new", "sending data to io task"); + tx.send(chunk.clone()) + .await + .expect("failed to send data to io task"); + + if use_cache { + debug!(target: "new", "receiving data into buffer"); + if data.len() + chunk.len() > data.capacity() { + error!(target: "new", "too much data! the client had an invalid content-length!"); + + // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) + data = BytesMut::new(); + use_cache = false; + } else { + data.put(chunk); + } + } + } + + // insert upload into cache if necessary + if use_cache { + let mut cache = state.cache.lock().await; + + info!(target: "new", "caching upload!"); + cache.insert(name, data.freeze(), Some(cache::DURATION)); + } + Ok(url) } diff --git a/src/state.rs b/src/state.rs index eb9f5c8..2d05b02 100644 --- a/src/state.rs +++ b/src/state.rs @@ -1,7 +1,11 @@ +use std::sync::atomic::AtomicUsize; + use bytes::Bytes; use memory_cache::MemoryCache; use tokio::sync::Mutex; pub struct AppState { - pub cache: Mutex> -} \ No newline at end of file + pub cache: Mutex>, + + /* pub up_count: AtomicUsize, */ +} diff --git a/src/view.rs b/src/view.rs index c6efb00..28b290c 100644 --- a/src/view.rs +++ b/src/view.rs @@ -7,13 +7,18 @@ use std::{ use axum::{ body::StreamBody, extract::{Path, State}, - response::{IntoResponse, Response}, debug_handler, + http::HeaderValue, + response::{IntoResponse, Response}, }; -use bytes::{buf::Reader, Bytes}; + +use bytes::{Bytes, BytesMut}; use hyper::StatusCode; -use tokio::fs::File; +use mime_guess::mime; +use tokio::{fs::File, io::AsyncReadExt}; use tokio_util::io::ReaderStream; +use crate::cache; + /* pub enum ViewResponse { FromDisk(StreamBody>), FromCache(Bytes) @@ -39,7 +44,69 @@ pub async fn view( .into_iter() .any(|x| !matches!(x, Component::Normal(_))) { - println!("lol NOPE"); + error!(target: "view", "a request attempted path traversal"); + return StatusCode::NOT_FOUND.into_response(); + } + + let name = original_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + let mut cache = state.cache.lock().await; + + let cache_item = cache.get(&name.clone()); + + if cache_item.is_none() { + let mut path = PathBuf::new(); + path.push("uploads/"); + path.push(name.clone()); + + if !path.exists() || !path.is_file() { + return StatusCode::NOT_FOUND.into_response(); + } + + let mut file = File::open(path).await.unwrap(); + let file_len = file.metadata().await.unwrap().len() as usize; + + if file_len < cache::MAX_LENGTH { + info!(target: "view", "recaching upload from disk"); + + let mut data = BytesMut::zeroed(file_len); + file.read_buf(&mut data.as_mut()).await.unwrap(); + let data = data.freeze(); + + cache.insert(name.clone(), data.clone(), Some(cache::DURATION)); + + return cache::get_response(&mut cache, original_path); + } else { + let reader = ReaderStream::new(file); + let stream = StreamBody::new(reader); + + info!(target: "view", "reading upload from disk"); + + return stream.into_response(); + } + } + + info!(target: "view", "reading upload from cache"); + + return cache::get_response(&mut cache, original_path); +} + +/* #[axum::debug_handler] +pub async fn view( + State(state): State>, + Path(original_path): Path, +) -> Response { + // (hopefully) prevent path traversal, just check for any non-file components + if original_path + .components() + .into_iter() + .any(|x| !matches!(x, Component::Normal(_))) + { + error!(target: "view", "a request attempted path traversal"); return StatusCode::NOT_FOUND.into_response(); } @@ -64,71 +131,35 @@ pub async fn view( let file = File::open(path).await.unwrap(); + if file.metadata().await.unwrap().len() < (cache::MAX_LENGTH as u64) { + info!("file can be cached"); + } + let reader = ReaderStream::new(file); let stream = StreamBody::new(reader); - println!("from disk"); + info!(target: "view", "reading upload from disk"); return stream.into_response(); } - println!("from cache! :D"); + info!(target: "view", "reading upload from cache"); let data = cache_item.unwrap().clone(); - return data.into_response(); -} - -/* pub async fn view( - State(mem_cache): State>, - Path(original_path): Path, -) -> Response { - for component in original_path.components() { - println!("{:?}", component); - } - - // (hopefully) prevent path traversal, just check for any non-file components - if original_path - .components() - .into_iter() - .any(|x| !matches!(x, Component::Normal(_))) - { - return StatusCode::NOT_FOUND.into_response() - } - - // this causes an obscure bug where filenames like hiworld%2fnamehere.png will still load namehere.png - // i could limit the path components to 1 and sort of fix this - let name = original_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() + let content_type = mime_guess::from_path(original_path) + .first() + .unwrap_or(mime::APPLICATION_OCTET_STREAM) .to_string(); - let cache = mem_cache.cache.lock().unwrap(); + let mut res = data.into_response(); + let headers = res.headers_mut(); - let cache_item = cache.get(&name); + headers.clear(); + headers.insert( + "content-type", + HeaderValue::from_str(content_type.as_str()).unwrap(), + ); - if cache_item.is_some() { - println!("they requested something in the cache!"); - - let data = cache_item.unwrap().clone(); - - return data.into_response() - } - - let mut path = PathBuf::new(); - path.push("uploads/"); - path.push(name); - - if !path.exists() || !path.is_file() { - return StatusCode::NOT_FOUND.into_response() - } - - let file = File::open(path).await.unwrap(); - - let reader = ReaderStream::new(file); - let stream = StreamBody::new(reader); - - stream.into_response() -} - */ + return res; +} */