diff --git a/.gitignore b/.gitignore index dccea96..3f1d26f 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,2 @@ +# binaries /target -/uploads diff --git a/.vscode/launch.json b/.vscode/launch.json deleted file mode 100644 index b0c74bc..0000000 --- a/.vscode/launch.json +++ /dev/null @@ -1,45 +0,0 @@ -{ - // Use IntelliSense to learn about possible attributes. - // Hover to view descriptions of existing attributes. - // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 - "version": "0.2.0", - "configurations": [ - { - "type": "lldb", - "request": "launch", - "name": "Debug executable 'breeze'", - "cargo": { - "args": [ - "build", - "--bin=breeze", - "--package=breeze" - ], - "filter": { - "name": "breeze", - "kind": "bin" - } - }, - "args": [], - "cwd": "${workspaceFolder}" - }, - { - "type": "lldb", - "request": "launch", - "name": "Debug unit tests in executable 'breeze'", - "cargo": { - "args": [ - "test", - "--no-run", - "--bin=breeze", - "--package=breeze" - ], - "filter": { - "name": "breeze", - "kind": "bin" - } - }, - "args": [], - "cwd": "${workspaceFolder}" - } - ] -} \ No newline at end of file diff --git a/Cargo.lock b/Cargo.lock index 4742c74..9c959de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,6 +13,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "async-recursion" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2cda8f4bcc10624c4e85bc66b3f452cca98cfa5ca002dc83a16aad2367641bea" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "async-trait" version = "0.1.59" @@ -103,6 +114,7 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" name = "breeze" version = "0.1.0" dependencies = [ + "async-recursion", "axum", "bytes", "hyper", @@ -115,6 +127,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tower", + "walkdir", ] [[package]] @@ -571,6 +584,15 @@ version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4501abdff3ae82a1c1b477a17252eb69cee9e66eb915c1abaa4f44d873df9f09" +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + [[package]] name = "scopeguard" version = "1.1.0" @@ -870,6 +892,17 @@ version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" +[[package]] +name = "walkdir" +version = "2.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" +dependencies = [ + "same-file", + "winapi", + "winapi-util", +] + [[package]] name = "want" version = "0.3.0" diff --git a/Cargo.toml b/Cargo.toml index ce57bca..34d418f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,6 +14,8 @@ tokio-stream = "0.1" tower = "0.4.13" bytes = "1" rand = "0.8.5" +async-recursion = "1.0.0" +walkdir = "2" log = "0.4" simplelog = "^0.12.0" mime_guess = "2.0.4" diff --git a/src/cache.rs b/src/cache.rs deleted file mode 100644 index 0b6ab49..0000000 --- a/src/cache.rs +++ /dev/null @@ -1,5 +0,0 @@ -use std::time::Duration; - -pub const MAX_LENGTH: usize = 80_000_000; -pub const DURATION: Duration = Duration::from_secs(8); -pub const FULL_SCAN_FREQ: Duration = Duration::from_secs(1); diff --git a/src/engine.rs b/src/engine.rs new file mode 100644 index 0000000..626a57e --- /dev/null +++ b/src/engine.rs @@ -0,0 +1,258 @@ +use std::{ + ffi::OsStr, + path::PathBuf, + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use axum::extract::BodyStream; +use bytes::{BufMut, Bytes, BytesMut}; +use hyper::StatusCode; +use memory_cache::MemoryCache; +use rand::Rng; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, + sync::{ + mpsc::{self, Receiver, Sender}, + Mutex, + }, +}; +use tokio_stream::StreamExt; +use walkdir::WalkDir; + +use crate::view::ViewResponse; + +pub struct Engine { + // state + cache: Mutex>, // in-memory cache. note/ i plan to lock the cache specifically only when needed rather than locking the whole struct + pub upl_count: AtomicUsize, // cached count of uploaded files + + // config + pub base_url: String, // base url for formatting upload urls + save_path: PathBuf, // where uploads are saved to disk + + cache_max_length: usize, // if an upload is bigger than this size, it won't be cached + cache_keep_alive: Duration, // amount of time a file should last in cache +} + +impl Engine { + // create a new engine + pub fn new( + base_url: String, + save_path: PathBuf, + cache_max_length: usize, + cache_keep_alive: Duration, + cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items + ) -> Self { + Self { + cache: Mutex::new(MemoryCache::with_full_scan(cache_full_scan_freq)), + upl_count: AtomicUsize::new(WalkDir::new(&save_path).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it + + base_url, + save_path, + + cache_max_length, + cache_keep_alive, + } + } + + fn will_use_cache(&self, length: usize) -> bool { + length <= self.cache_max_length + } + + // checks in cache or disk for an upload using a pathbuf + pub async fn upload_exists(&self, path: &PathBuf) -> bool { + let cache = self.cache.lock().await; + + // Check if upload is in cache + let name = path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + if cache.contains_key(&name) { + return true; + } + + // Check if upload is on disk + if path.exists() { + return true; + } + + return false; + } + + // generate a new save path for an upload + #[async_recursion::async_recursion] + pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf { + // generate a 6-character alphanumeric string + let id: String = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(6) + .map(char::from) + .collect(); + + // extract the extension from the original path + let original_extension = original_path + .extension() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + let mut path = self.save_path.clone(); + path.push(&id); + path.set_extension(original_extension); + + if !self.upload_exists(&path).await { + path + } else { + // we had a name collision! try again.. + self.gen_path(original_path).await + } + } + + // process an upload. this is called by the new route + pub async fn process_upload( + &self, + path: PathBuf, + name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path + content_length: usize, + mut stream: BodyStream, + ) { + // if the upload size is smaller than the specified maximum, we use the cache! + let mut use_cache = self.will_use_cache(content_length); + + // create file to save upload to + let mut file = File::create(path) + .await + .expect("could not open file! make sure your upload path exists"); + + // if we're using cache, make some space to store the upload in + let mut data = if use_cache { + BytesMut::with_capacity(content_length) + } else { + BytesMut::new() + }; + + // start a task that handles saving files to disk (we can save to cache/disk in parallel that way) + let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); + + tokio::spawn(async move { + // receive chunks and save them to file + while let Some(chunk) = rx.recv().await { + debug!(target: "process_upload", "writing chunk to disk (length: {})", chunk.len()); + file.write_all(&chunk) + .await + .expect("error while writing file to disk"); + } + }); + + // read and save upload + while let Some(chunk) = stream.next().await { + let chunk = chunk.unwrap(); + + // send chunk to io task + debug!(target: "process_upload", "sending data to io task"); + tx.send(chunk.clone()) + .await + .expect("failed to send data to io task"); + + if use_cache { + debug!(target: "process_upload", "receiving data into buffer"); + if data.len() + chunk.len() > data.capacity() { + error!(target: "process_upload", "the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."); + + // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) + data = BytesMut::new(); + use_cache = false; + } else { + data.put(chunk); + } + } + } + + // insert upload into cache if necessary + if use_cache { + let mut cache = self.cache.lock().await; + + info!(target: "process_upload", "caching upload!"); + cache.insert(name, data.freeze(), Some(self.cache_keep_alive)); + } + + // if all goes well, increment the cached upload counter + self.upl_count.fetch_add(1, Ordering::Relaxed); + } + + async fn read_cached_upload(&self, name: &String) -> Option { + let mut cache = self.cache.lock().await; + + if !cache.contains_key(&name) { + return None; + } + + let data = cache + .get(&name) + .expect("failed to read get upload data from cache") + .to_owned(); + + cache.insert(name.to_string(), data.clone(), Some(self.cache_keep_alive)); + + Some(data) + } + + pub async fn get_upload(&self, original_path: &PathBuf) -> Result { + let name = original_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + let mut path = self.save_path.clone(); + path.push(&name); + + // check if the upload exists + if !self.upload_exists(&path).await { + return Err(StatusCode::NOT_FOUND); + } + + let cached_data = self.read_cached_upload(&name).await; + + match cached_data { + Some(data) => { + info!(target: "get_upload", "got upload from cache!!"); + + return Ok(ViewResponse::FromCache(path, data)); + } + None => { + let mut file = File::open(&path).await.unwrap(); + + let length = file + .metadata() + .await + .expect("failed to read upload file metadata") + .len() as usize; + + debug!(target: "get_upload", "read upload from disk, size = {}", length); + + if self.will_use_cache(length) { + let mut data = BytesMut::with_capacity(length); + while file.read_buf(&mut data).await.unwrap() != 0 {} + let data = data.freeze(); + + let mut cache = self.cache.lock().await; + cache.insert(name, data.clone(), Some(self.cache_keep_alive)); + + info!(target: "get_upload", "recached upload from disk!"); + + return Ok(ViewResponse::FromCache(path, data)); + } + + info!(target: "get_upload", "got upload from disk!"); + + return Ok(ViewResponse::FromDisk(file)); + } + } + } +} diff --git a/src/index.rs b/src/index.rs new file mode 100644 index 0000000..48091cd --- /dev/null +++ b/src/index.rs @@ -0,0 +1,10 @@ +use std::sync::{atomic::Ordering, Arc}; + +use axum::extract::State; + +// show index status page +pub async fn index(State(engine): State>) -> String { + let count = engine.upl_count.load(Ordering::Relaxed); + + format!("minish's image host, currently hosting {} files", count) +} diff --git a/src/main.rs b/src/main.rs index f7a85e9..2a852a5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,4 @@ -use std::sync::Arc; +use std::{sync::Arc, path::PathBuf, time::Duration, process::exit}; extern crate axum; @@ -7,54 +7,54 @@ extern crate log; extern crate simplelog; -use simplelog::*; +use engine::Engine; use axum::{ routing::{get, post}, Router, }; -use bytes::Bytes; -use memory_cache::MemoryCache; -use tokio::sync::Mutex; +use simplelog::*; -mod cache; +mod engine; +mod index; mod new; -mod state; mod view; #[tokio::main] async fn main() { - // initialise logger + // Initialise logger TermLogger::init( - LevelFilter::Debug, + LevelFilter::Warn, Config::default(), TerminalMode::Mixed, ColorChoice::Auto, ) .unwrap(); - // create cache - let cache: MemoryCache = MemoryCache::with_full_scan(cache::FULL_SCAN_FREQ); + // Create engine + let engine = Engine::new( // TODO: Read config from env vars + "http://127.0.0.1:8000".to_string(), + PathBuf::from("./uploads/"), + 80_000_000, // Main instance is going to use this + Duration::from_secs(8), // CHANGE THIS!!!!!!! + Duration::from_secs(1), // THIS TOO!!!!!!!!!!!!!!! + ); - // create appstate - let state = state::AppState { - cache: Mutex::new(cache), - }; - - // build main router + // Build main router let app = Router::new() .route("/new", post(new::new)) .route("/p/:name", get(view::view)) - .route("/", get(index)) - .with_state(Arc::new(state)); + .route("/", get(index::index)) + .route("/exit", get(exit_abc)) + .with_state(Arc::new(engine)); - // start web server + // Start web server axum::Server::bind(&"127.0.0.1:8000".parse().unwrap()) // don't forget to change this! it's local for now .serve(app.into_make_service()) .await .unwrap(); } -async fn index() -> &'static str { - "hi world!" -} +async fn exit_abc() { + exit(123); +} \ No newline at end of file diff --git a/src/new.rs b/src/new.rs index 7a9faac..0245535 100644 --- a/src/new.rs +++ b/src/new.rs @@ -4,77 +4,31 @@ use axum::{ extract::{BodyStream, Query, State}, http::HeaderValue, }; -use bytes::{BufMut, Bytes, BytesMut}; -use hyper::{header, HeaderMap, StatusCode}; -use rand::Rng; -use tokio::{ - fs::File, - io::AsyncWriteExt, - sync::mpsc::{self, Receiver, Sender}, -}; -use tokio_stream::StreamExt; - -use crate::cache; - -// create an upload name from an original file name -fn gen_path(original_name: &String) -> PathBuf { - // extract extension from original name - let extension = PathBuf::from(original_name) - .extension() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - // generate a 6-character alphanumeric string - let id: String = rand::thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .take(6) - .map(char::from) - .collect(); - - // create the path - let mut path = PathBuf::new(); - path.push("uploads/"); - path.push(id); - path.set_extension(extension); - - // if we're already using it, try again - if path.exists() { - gen_path(original_name) - } else { - path - } -} +use hyper::{HeaderMap, StatusCode, header}; #[axum::debug_handler] pub async fn new( - State(state): State>, + State(engine): State>, headers: HeaderMap, Query(params): Query>, - mut stream: BodyStream, + stream: BodyStream, ) -> Result { - // require name parameter, it's used for determining the file extension if !params.contains_key("name") { return Err(StatusCode::BAD_REQUEST); } - // generate a path, take the name, format a url - let path = gen_path(params.get("name").unwrap()); + let original_name = params.get("name").unwrap(); + let original_path = PathBuf::from(original_name); + let path = engine.gen_path(&original_path).await; let name = path .file_name() .and_then(OsStr::to_str) .unwrap_or_default() .to_string(); - // if we fail generating a name, stop now - if name.is_empty() { - return Err(StatusCode::INTERNAL_SERVER_ERROR); - } + let url = format!("{}/p/{}", engine.base_url, name); - let url = format!("http://127.0.0.1:8000/p/{}", name); - - // get the content length, and if parsing it fails, assume it's really big so it doesn't cache let content_length = headers .get(header::CONTENT_LENGTH) .unwrap_or(&HeaderValue::from_static("")) @@ -83,74 +37,9 @@ pub async fn new( .unwrap() .unwrap_or(usize::MAX); - // if the upload size exceeds 80 MB, we skip caching! - // previously, i was going to use redis with a 500 MB max (redis's is 512MiB) - // with or without redis, 500 MB is still a bit much.. - // it could probably be read from disk before anyone could fully download it - let mut use_cache = content_length < cache::MAX_LENGTH; - - info!( - target: "new", - "received an upload! content length: {}, using cache: {}", - content_length, use_cache - ); - - // create file to save upload to - let mut file = File::create(path) - .await - .expect("could not open file! make sure your upload path exists"); - - // if we're using cache, make some space to store the upload in - let mut data = if use_cache { - BytesMut::with_capacity(content_length) - } else { - BytesMut::new() - }; - - // start a task that handles saving files to disk (we can save to cache/disk in parallel that way) - let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); - - tokio::spawn(async move { - // receive chunks and save them to file - while let Some(chunk) = rx.recv().await { - debug!(target: "new", "writing chunk to disk (length: {})", chunk.len()); - file.write_all(&chunk) - .await - .expect("error while writing file to disk"); - } - }); - - // read and save upload - while let Some(chunk) = stream.next().await { - let chunk = chunk.unwrap(); - - // send chunk to io task - debug!(target: "new", "sending data to io task"); - tx.send(chunk.clone()) - .await - .expect("failed to send data to io task"); - - if use_cache { - debug!(target: "new", "receiving data into buffer"); - if data.len() + chunk.len() > data.capacity() { - error!(target: "new", "too much data! the client had an invalid content-length!"); - - // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) - data = BytesMut::new(); - use_cache = false; - } else { - data.put(chunk); - } - } - } - - // insert upload into cache if necessary - if use_cache { - let mut cache = state.cache.lock().await; - - info!(target: "new", "caching upload!"); - cache.insert(name, data.freeze(), Some(cache::DURATION)); - } + engine + .process_upload(path, name, content_length, stream) + .await; Ok(url) } diff --git a/src/state.rs b/src/state.rs deleted file mode 100644 index 2d05b02..0000000 --- a/src/state.rs +++ /dev/null @@ -1,11 +0,0 @@ -use std::sync::atomic::AtomicUsize; - -use bytes::Bytes; -use memory_cache::MemoryCache; -use tokio::sync::Mutex; - -pub struct AppState { - pub cache: Mutex>, - - /* pub up_count: AtomicUsize, */ -} diff --git a/src/view.rs b/src/view.rs index e44fb69..fb7bbdf 100644 --- a/src/view.rs +++ b/src/view.rs @@ -1,5 +1,4 @@ use std::{ - ffi::OsStr, path::{Component, PathBuf}, sync::Arc, }; @@ -11,16 +10,56 @@ use axum::{ response::{IntoResponse, Response}, }; +use bytes::Bytes; use hyper::StatusCode; use mime_guess::mime; use tokio::fs::File; use tokio_util::io::ReaderStream; +pub enum ViewResponse { + FromDisk(File), + FromCache(PathBuf, Bytes), +} + +impl IntoResponse for ViewResponse { + fn into_response(self) -> Response { + match self { + ViewResponse::FromDisk(file) => { + let reader = ReaderStream::new(file); + let stream = StreamBody::new(reader); + + stream.into_response() + } + ViewResponse::FromCache(original_path, data) => { + // guess the content-type using the original path + // (axum handles this w/ streamed file responses but caches are octet-stream by default) + let content_type = mime_guess::from_path(original_path) + .first() + .unwrap_or(mime::APPLICATION_OCTET_STREAM) + .to_string(); + + // extract mutable headers from the response + let mut res = data.into_response(); + let headers = res.headers_mut(); + + // clear the headers and add our content-type + headers.clear(); + headers.insert( + "content-type", + HeaderValue::from_str(content_type.as_str()).unwrap(), + ); + + res + } + } + } +} + #[axum::debug_handler] pub async fn view( - State(state): State>, + State(engine): State>, Path(original_path): Path, -) -> Response { +) -> Result { // (hopefully) prevent path traversal, just check for any non-file components if original_path .components() @@ -28,55 +67,8 @@ pub async fn view( .any(|x| !matches!(x, Component::Normal(_))) { error!(target: "view", "a request attempted path traversal"); - return StatusCode::NOT_FOUND.into_response(); + return Err(StatusCode::NOT_FOUND); } - - let name = original_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - let cache = state.cache.lock().await; - - let cache_item = cache.get(&name); - - if cache_item.is_none() { - let mut path = PathBuf::new(); - path.push("uploads/"); - path.push(name); - - if !path.exists() || !path.is_file() { - return StatusCode::NOT_FOUND.into_response(); - } - - let file = File::open(path).await.unwrap(); - - let reader = ReaderStream::new(file); - let stream = StreamBody::new(reader); - - info!(target: "view", "reading upload from disk"); - - return stream.into_response(); - } - - info!(target: "view", "reading upload from cache"); - - let data = cache_item.unwrap().clone(); - - let content_type = mime_guess::from_path(original_path) - .first() - .unwrap_or(mime::APPLICATION_OCTET_STREAM) - .to_string(); - - let mut res = data.into_response(); - let headers = res.headers_mut(); - - headers.clear(); - headers.insert( - "content-type", - HeaderValue::from_str(content_type.as_str()).unwrap(), - ); - - return res; + + engine.get_upload(&original_path).await }