From 03de6f428383c683f23935df145a6dae8b163ad7 Mon Sep 17 00:00:00 2001 From: minish Date: Mon, 30 Jan 2023 18:11:30 -0500 Subject: [PATCH] release! --- .dockerignore | 5 + Cargo.lock | 217 ++++++++++------- Cargo.toml | 7 +- Dockerfile | 10 + archived/.gitignore | 2 + archived/Cargo.lock | 23 ++ archived/Cargo.toml | 9 + archived/src/entry.rs | 26 ++ archived/src/lib.rs | 173 ++++++++++++++ src/engine.rs | 540 ++++++++++++++++++++++-------------------- src/index.rs | 20 +- src/main.rs | 84 +++++-- src/new.rs | 90 +++---- src/view.rs | 136 +++++------ 14 files changed, 844 insertions(+), 498 deletions(-) create mode 100644 .dockerignore create mode 100644 Dockerfile create mode 100644 archived/.gitignore create mode 100644 archived/Cargo.lock create mode 100644 archived/Cargo.toml create mode 100644 archived/src/entry.rs create mode 100644 archived/src/lib.rs diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..27a3f9a --- /dev/null +++ b/.dockerignore @@ -0,0 +1,5 @@ +# git repository +/.git + +# binaries +/target diff --git a/Cargo.lock b/Cargo.lock index 9c959de..20e5ecb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13,6 +13,23 @@ dependencies = [ "version_check", ] +[[package]] +name = "aho-corasick" +version = "0.7.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc936419f96fa211c1b9166887b38e5e40b19958e5b895be7c1f93adec7071ac" +dependencies = [ + "memchr", +] + +[[package]] +name = "archived" +version = "0.2.0" +dependencies = [ + "bytes", + "once_cell", +] + [[package]] name = "async-recursion" version = "1.0.0" @@ -112,17 +129,16 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "breeze" -version = "0.1.0" +version = "0.1.1" dependencies = [ + "archived", "async-recursion", "axum", "bytes", + "env_logger", "hyper", "log", - "memory-cache-rs", - "mime_guess", "rand", - "simplelog", "tokio", "tokio-stream", "tokio-util", @@ -136,12 +152,52 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" +[[package]] +name = "cc" +version = "1.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" + [[package]] name = "cfg-if" version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "env_logger" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "85cdab6a89accf66733ad5a1693a4dcced6aeff64602b634530dd73c1f3ee9f0" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "errno" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f639046355ee4f37944e44f60642c6f3a7efa3cf6b78c78a0d989a8ce6c396a1" +dependencies = [ + "errno-dragonfly", + "libc", + "winapi", +] + +[[package]] +name = "errno-dragonfly" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa68f1b12764fab894d2755d2518754e71b4fd80ecfb822714a1206c2aab39bf" +dependencies = [ + "cc", + "libc", +] + [[package]] name = "fnv" version = "1.0.7" @@ -269,6 +325,15 @@ dependencies = [ "libc", ] +[[package]] +name = "hermit-abi" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee512640fe35acbfb4bb779db6f0d80704c2cacfa2e39b601ef3e3f47d1ae4c7" +dependencies = [ + "libc", +] + [[package]] name = "http" version = "0.2.8" @@ -309,6 +374,12 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hyper" version = "0.14.23" @@ -343,6 +414,28 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "io-lifetimes" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7d6c6f8c91b4b9ed43484ad1a938e393caf35960fce7f82a040497207bd8e9e" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "is-terminal" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "28dfb6c8100ccc63462345b67d1bbc3679177c75ee4bf59bf29c8b1d110b8189" +dependencies = [ + "hermit-abi 0.2.6", + "io-lifetimes", + "rustix", + "windows-sys", +] + [[package]] name = "itoa" version = "1.0.4" @@ -355,6 +448,12 @@ version = "0.2.137" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7fcc620a3bff7cdd7a365be3376c97191aeaccc2a603e600951e452615bf89" +[[package]] +name = "linux-raw-sys" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f051f77a7c8e6957c0696eac88f26b0117e54f52d3fc682ab19397a8812846a4" + [[package]] name = "lock_api" version = "0.4.9" @@ -386,31 +485,12 @@ version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" -[[package]] -name = "memory-cache-rs" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a4098b4f50a8fe57ad06b9a125812c9b1e04b3f226f29642b7219e75ba33b1d" -dependencies = [ - "once_cell", -] - [[package]] name = "mime" version = "0.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d" -[[package]] -name = "mime_guess" -version = "2.0.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" -dependencies = [ - "mime", - "unicase", -] - [[package]] name = "mio" version = "0.8.5" @@ -429,16 +509,7 @@ version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f6058e64324c71e02bc2b150e4f3bc8286db6c83092132ffa3f6b1eab0f9def5" dependencies = [ - "hermit-abi", - "libc", -] - -[[package]] -name = "num_threads" -version = "0.1.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2819ce041d2ee131036f4fc9d6ae7ae125a3a40e97ba64d04fe799ad9dabbb44" -dependencies = [ + "hermit-abi 0.1.19", "libc", ] @@ -572,6 +643,37 @@ dependencies = [ "bitflags", ] +[[package]] +name = "regex" +version = "1.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "48aaa5748ba571fb95cd2c85c09f629215d3a6ece942baa100950af03a34f733" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.6.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "456c603be3e8d448b072f410900c09faf164fbce2d480456f50eea6e25f9c848" + +[[package]] +name = "rustix" +version = "0.36.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4fdebc4b395b7fbb9ab11e462e20ed9051e7b16e42d24042c776eca0ac81b03" +dependencies = [ + "bitflags", + "errno", + "io-lifetimes", + "libc", + "linux-raw-sys", + "windows-sys", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -646,17 +748,6 @@ dependencies = [ "libc", ] -[[package]] -name = "simplelog" -version = "0.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "48dfff04aade74dd495b007c831cd6f4e0cee19c344dd9dc0884c0289b70a786" -dependencies = [ - "log", - "termcolor", - "time", -] - [[package]] name = "slab" version = "0.4.7" @@ -708,35 +799,6 @@ dependencies = [ "winapi-util", ] -[[package]] -name = "time" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a561bf4617eebd33bca6434b988f39ed798e527f51a1e797d0ee4f61c0a38376" -dependencies = [ - "itoa", - "libc", - "num_threads", - "serde", - "time-core", - "time-macros", -] - -[[package]] -name = "time-core" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e153e1f1acaef8acc537e68b44906d2db6436e2b35ac2c6b42640fff91f00fd" - -[[package]] -name = "time-macros" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d967f99f534ca7e495c575c62638eebc2898a8c84c119b89e250477bc4ba16b2" -dependencies = [ - "time-core", -] - [[package]] name = "tokio" version = "1.22.0" @@ -871,15 +933,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642" -[[package]] -name = "unicase" -version = "2.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" -dependencies = [ - "version_check", -] - [[package]] name = "unicode-ident" version = "1.0.5" diff --git a/Cargo.toml b/Cargo.toml index 34d418f..932fb46 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "breeze" -version = "0.1.0" +version = "0.1.1" edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html @@ -17,6 +17,5 @@ rand = "0.8.5" async-recursion = "1.0.0" walkdir = "2" log = "0.4" -simplelog = "^0.12.0" -mime_guess = "2.0.4" -memory-cache-rs = "0.2.0" +env_logger = "0.10.0" +archived = { path = "./archived" } diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..37f358f --- /dev/null +++ b/Dockerfile @@ -0,0 +1,10 @@ +FROM rust:1.67.0 as builder + +WORKDIR /usr/src/breeze +COPY . . +RUN [ "cargo", "install", "--path", "." ] + +FROM debian:bullseye-slim +COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze + +ENTRYPOINT [ "breeze" ] \ No newline at end of file diff --git a/archived/.gitignore b/archived/.gitignore new file mode 100644 index 0000000..928b102 --- /dev/null +++ b/archived/.gitignore @@ -0,0 +1,2 @@ +.idea +target diff --git a/archived/Cargo.lock b/archived/Cargo.lock new file mode 100644 index 0000000..9fe28b6 --- /dev/null +++ b/archived/Cargo.lock @@ -0,0 +1,23 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "archived" +version = "0.2.0" +dependencies = [ + "bytes", + "once_cell", +] + +[[package]] +name = "bytes" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c" + +[[package]] +name = "once_cell" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b" diff --git a/archived/Cargo.toml b/archived/Cargo.toml new file mode 100644 index 0000000..18621e5 --- /dev/null +++ b/archived/Cargo.toml @@ -0,0 +1,9 @@ +[package] +name = "archived" +version = "0.2.0" +edition = "2018" +license = "MIT" + +[dependencies] +bytes = "1.3.0" +once_cell = "1.3.1" diff --git a/archived/src/entry.rs b/archived/src/entry.rs new file mode 100644 index 0000000..0b825b9 --- /dev/null +++ b/archived/src/entry.rs @@ -0,0 +1,26 @@ +use std::time::{Duration, SystemTime}; + +/// Represents a set of eviction and expiration details for a specific cache entry. +pub(crate) struct CacheEntry { + /// Entry value. + pub(crate) value: B, + + /// Expiration time. + /// + /// - [`None`] if the value must be kept forever. + pub(crate) expiration_time: SystemTime, +} + +impl CacheEntry { + pub(crate) fn new(value: B, lifetime: Duration) -> Self { + Self { + expiration_time: SystemTime::now() + lifetime, + value, + } + } + + /// Check if a entry is expired. + pub(crate) fn is_expired(&self, current_time: SystemTime) -> bool { + current_time >= self.expiration_time + } +} diff --git a/archived/src/lib.rs b/archived/src/lib.rs new file mode 100644 index 0000000..384e806 --- /dev/null +++ b/archived/src/lib.rs @@ -0,0 +1,173 @@ +mod entry; + +use bytes::Bytes; + +use crate::entry::*; +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::time::{Duration, SystemTime}; + +pub struct Archive { + cache_table: HashMap>, + full_scan_frequency: Option, + created_time: SystemTime, + last_scan_time: Option, + entry_lifetime: Duration, + capacity: usize, + length: usize, +} + +impl Archive { + /* pub fn new(capacity: usize) -> Self { + Self { + cache_table: HashMap::new(), + full_scan_frequency: None, + created_time: SystemTime::now(), + last_scan_time: None, + capacity, + length: 0, + } + } */ + + pub fn with_full_scan(full_scan_frequency: Duration, entry_lifetime: Duration, capacity: usize) -> Self { + Self { + cache_table: HashMap::with_capacity(256), + full_scan_frequency: Some(full_scan_frequency), + created_time: SystemTime::now(), + last_scan_time: None, + entry_lifetime, + capacity, + length: 0, + } + } + + pub fn contains_key(&self, key: &String) -> bool { + let now = SystemTime::now(); + + self.cache_table + .get(key) + .filter(|cache_entry| !cache_entry.is_expired(now)) + .is_some() + } + + pub fn get_last_scan_time(&self) -> Option { + self.last_scan_time + } + + pub fn get_full_scan_frequency(&self) -> Option { + self.full_scan_frequency + } + + pub fn get(&self, key: &String) -> Option<&Bytes> { + let now = SystemTime::now(); + + self.cache_table + .get(key) + .filter(|cache_entry| !cache_entry.is_expired(now)) + .map(|cache_entry| &cache_entry.value) + } + + pub fn get_or_insert( + &mut self, + key: String, + factory: F, + ) -> &Bytes + where + F: Fn() -> Bytes, + { + let now = SystemTime::now(); + + self.try_full_scan_expired_items(now); + + match self.cache_table.entry(key) { + Entry::Occupied(mut occupied) => { + if occupied.get().is_expired(now) { + occupied.insert(CacheEntry::new(factory(), self.entry_lifetime)); + } + + &occupied.into_mut().value + } + Entry::Vacant(vacant) => &vacant.insert(CacheEntry::new(factory(), self.entry_lifetime)).value, + } + } + + pub fn insert( + &mut self, + key: String, + value: Bytes, + ) -> Option { + let now = SystemTime::now(); + + self.try_full_scan_expired_items(now); + + if value.len() + self.length > self.capacity { + return None; + } + + self.length += value.len(); + + self.cache_table + .insert(key, CacheEntry::new(value, self.entry_lifetime)) + .filter(|cache_entry| !cache_entry.is_expired(now)) + .map(|cache_entry| cache_entry.value) + } + + pub fn remove(&mut self, key: &String) -> Option { + let now = SystemTime::now(); + + self.try_full_scan_expired_items(now); + + let mut removed_len: usize = 0; + let result = self + .cache_table + .remove(key) + .filter(|cache_entry| !cache_entry.is_expired(now)) + .and_then(|o| { + removed_len += o.value.len(); + return Some(o); + }) + .map(|cache_entry| cache_entry.value); + self.length -= removed_len; + return result; + } + + pub fn renew(&mut self, key: &String) -> Option<()> { + let now = SystemTime::now(); + + self.try_full_scan_expired_items(now); + + let entry = self.cache_table.get_mut(key); + + if entry.is_some() { + let mut entry = entry.unwrap(); + + entry.expiration_time = now + self.entry_lifetime; + + return Some(()); + } else { + return None; + } + } + + fn try_full_scan_expired_items(&mut self, current_time: SystemTime) { + if let Some(full_scan_frequency) = self.full_scan_frequency { + let since = current_time + .duration_since(self.last_scan_time.unwrap_or(self.created_time)) + .unwrap(); + + if since >= full_scan_frequency { + let mut removed_len = 0; + self.cache_table.retain(|_, cache_entry| { + if cache_entry.is_expired(current_time) { + removed_len += cache_entry.value.len(); + return false; + } + return true; + }); + self.length -= removed_len; + + self.last_scan_time = Some(current_time); + } + } + } +} diff --git a/src/engine.rs b/src/engine.rs index 626a57e..39c82d2 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,258 +1,282 @@ -use std::{ - ffi::OsStr, - path::PathBuf, - sync::atomic::{AtomicUsize, Ordering}, - time::Duration, -}; - -use axum::extract::BodyStream; -use bytes::{BufMut, Bytes, BytesMut}; -use hyper::StatusCode; -use memory_cache::MemoryCache; -use rand::Rng; -use tokio::{ - fs::File, - io::{AsyncReadExt, AsyncWriteExt}, - sync::{ - mpsc::{self, Receiver, Sender}, - Mutex, - }, -}; -use tokio_stream::StreamExt; -use walkdir::WalkDir; - -use crate::view::ViewResponse; - -pub struct Engine { - // state - cache: Mutex>, // in-memory cache. note/ i plan to lock the cache specifically only when needed rather than locking the whole struct - pub upl_count: AtomicUsize, // cached count of uploaded files - - // config - pub base_url: String, // base url for formatting upload urls - save_path: PathBuf, // where uploads are saved to disk - - cache_max_length: usize, // if an upload is bigger than this size, it won't be cached - cache_keep_alive: Duration, // amount of time a file should last in cache -} - -impl Engine { - // create a new engine - pub fn new( - base_url: String, - save_path: PathBuf, - cache_max_length: usize, - cache_keep_alive: Duration, - cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items - ) -> Self { - Self { - cache: Mutex::new(MemoryCache::with_full_scan(cache_full_scan_freq)), - upl_count: AtomicUsize::new(WalkDir::new(&save_path).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it - - base_url, - save_path, - - cache_max_length, - cache_keep_alive, - } - } - - fn will_use_cache(&self, length: usize) -> bool { - length <= self.cache_max_length - } - - // checks in cache or disk for an upload using a pathbuf - pub async fn upload_exists(&self, path: &PathBuf) -> bool { - let cache = self.cache.lock().await; - - // Check if upload is in cache - let name = path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - if cache.contains_key(&name) { - return true; - } - - // Check if upload is on disk - if path.exists() { - return true; - } - - return false; - } - - // generate a new save path for an upload - #[async_recursion::async_recursion] - pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf { - // generate a 6-character alphanumeric string - let id: String = rand::thread_rng() - .sample_iter(&rand::distributions::Alphanumeric) - .take(6) - .map(char::from) - .collect(); - - // extract the extension from the original path - let original_extension = original_path - .extension() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - let mut path = self.save_path.clone(); - path.push(&id); - path.set_extension(original_extension); - - if !self.upload_exists(&path).await { - path - } else { - // we had a name collision! try again.. - self.gen_path(original_path).await - } - } - - // process an upload. this is called by the new route - pub async fn process_upload( - &self, - path: PathBuf, - name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path - content_length: usize, - mut stream: BodyStream, - ) { - // if the upload size is smaller than the specified maximum, we use the cache! - let mut use_cache = self.will_use_cache(content_length); - - // create file to save upload to - let mut file = File::create(path) - .await - .expect("could not open file! make sure your upload path exists"); - - // if we're using cache, make some space to store the upload in - let mut data = if use_cache { - BytesMut::with_capacity(content_length) - } else { - BytesMut::new() - }; - - // start a task that handles saving files to disk (we can save to cache/disk in parallel that way) - let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); - - tokio::spawn(async move { - // receive chunks and save them to file - while let Some(chunk) = rx.recv().await { - debug!(target: "process_upload", "writing chunk to disk (length: {})", chunk.len()); - file.write_all(&chunk) - .await - .expect("error while writing file to disk"); - } - }); - - // read and save upload - while let Some(chunk) = stream.next().await { - let chunk = chunk.unwrap(); - - // send chunk to io task - debug!(target: "process_upload", "sending data to io task"); - tx.send(chunk.clone()) - .await - .expect("failed to send data to io task"); - - if use_cache { - debug!(target: "process_upload", "receiving data into buffer"); - if data.len() + chunk.len() > data.capacity() { - error!(target: "process_upload", "the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."); - - // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) - data = BytesMut::new(); - use_cache = false; - } else { - data.put(chunk); - } - } - } - - // insert upload into cache if necessary - if use_cache { - let mut cache = self.cache.lock().await; - - info!(target: "process_upload", "caching upload!"); - cache.insert(name, data.freeze(), Some(self.cache_keep_alive)); - } - - // if all goes well, increment the cached upload counter - self.upl_count.fetch_add(1, Ordering::Relaxed); - } - - async fn read_cached_upload(&self, name: &String) -> Option { - let mut cache = self.cache.lock().await; - - if !cache.contains_key(&name) { - return None; - } - - let data = cache - .get(&name) - .expect("failed to read get upload data from cache") - .to_owned(); - - cache.insert(name.to_string(), data.clone(), Some(self.cache_keep_alive)); - - Some(data) - } - - pub async fn get_upload(&self, original_path: &PathBuf) -> Result { - let name = original_path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - let mut path = self.save_path.clone(); - path.push(&name); - - // check if the upload exists - if !self.upload_exists(&path).await { - return Err(StatusCode::NOT_FOUND); - } - - let cached_data = self.read_cached_upload(&name).await; - - match cached_data { - Some(data) => { - info!(target: "get_upload", "got upload from cache!!"); - - return Ok(ViewResponse::FromCache(path, data)); - } - None => { - let mut file = File::open(&path).await.unwrap(); - - let length = file - .metadata() - .await - .expect("failed to read upload file metadata") - .len() as usize; - - debug!(target: "get_upload", "read upload from disk, size = {}", length); - - if self.will_use_cache(length) { - let mut data = BytesMut::with_capacity(length); - while file.read_buf(&mut data).await.unwrap() != 0 {} - let data = data.freeze(); - - let mut cache = self.cache.lock().await; - cache.insert(name, data.clone(), Some(self.cache_keep_alive)); - - info!(target: "get_upload", "recached upload from disk!"); - - return Ok(ViewResponse::FromCache(path, data)); - } - - info!(target: "get_upload", "got upload from disk!"); - - return Ok(ViewResponse::FromDisk(file)); - } - } - } -} +use std::{ + ffi::OsStr, + path::PathBuf, + sync::atomic::{AtomicUsize, Ordering}, + time::Duration, +}; + +use archived::Archive; +use axum::extract::BodyStream; +use bytes::{BufMut, Bytes, BytesMut}; +use hyper::StatusCode; +use rand::Rng; +use tokio::{ + fs::File, + io::{AsyncReadExt, AsyncWriteExt}, + sync::{ + mpsc::{self, Receiver, Sender}, + RwLock, + }, +}; +use tokio_stream::StreamExt; +use walkdir::WalkDir; + +use crate::view::ViewResponse; + +pub struct Engine { + // state + cache: RwLock, // in-memory cache. note/ i plan to lock the cache specifically only when needed rather than locking the whole struct + pub upl_count: AtomicUsize, // cached count of uploaded files + + // config + pub base_url: String, // base url for formatting upload urls + save_path: PathBuf, // where uploads are saved to disk + + cache_max_length: usize, // if an upload is bigger than this size, it won't be cached +} + +impl Engine { + // create a new engine + pub fn new( + base_url: String, + save_path: PathBuf, + cache_max_length: usize, + cache_lifetime: Duration, + cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items + cache_mem_capacity: usize, + ) -> Self { + Self { + cache: RwLock::new(Archive::with_full_scan( + cache_full_scan_freq, + cache_lifetime, + cache_mem_capacity, + )), + upl_count: AtomicUsize::new(WalkDir::new(&save_path).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it + + base_url, + save_path, + + cache_max_length, + } + } + + fn will_use_cache(&self, length: usize) -> bool { + length <= self.cache_max_length + } + + // checks in cache or disk for an upload using a pathbuf + pub async fn upload_exists(&self, path: &PathBuf) -> bool { + let cache = self.cache.read().await; + + // check if upload is in cache + let name = path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + if cache.contains_key(&name) { + return true; + } + + // check if upload is on disk + if path.exists() { + return true; + } + + return false; + } + + // generate a new save path for an upload + #[async_recursion::async_recursion] + pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf { + // generate a 6-character alphanumeric string + let id: String = rand::thread_rng() + .sample_iter(&rand::distributions::Alphanumeric) + .take(6) + .map(char::from) + .collect(); + + // extract the extension from the original path + let original_extension = original_path + .extension() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + // path on disk + let mut path = self.save_path.clone(); + path.push(&id); + path.set_extension(original_extension); + + if !self.upload_exists(&path).await { + path + } else { + // we had a name collision! try again.. + self.gen_path(original_path).await + } + } + + // process an upload. this is called by the new route + pub async fn process_upload( + &self, + path: PathBuf, + name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path + content_length: usize, + mut stream: BodyStream, + ) { + // if the upload size is smaller than the specified maximum, we use the cache! + let mut use_cache = self.will_use_cache(content_length); + + // if we're using cache, make some space to store the upload in + let mut data = if use_cache { + BytesMut::with_capacity(content_length) + } else { + BytesMut::new() + }; + + // start a task that handles saving files to disk (we can save to cache/disk in parallel that way) + let (tx, mut rx): (Sender, Receiver) = mpsc::channel(1); + + tokio::spawn(async move { + // create file to save upload to + let mut file = File::create(path) + .await + .expect("could not open file! make sure your upload path exists"); + + // receive chunks and save them to file + while let Some(chunk) = rx.recv().await { + debug!(target: "process_upload", "writing chunk to disk (length: {})", chunk.len()); + file.write_all(&chunk) + .await + .expect("error while writing file to disk"); + } + }); + + // read and save upload + while let Some(chunk) = stream.next().await { + let chunk = chunk.unwrap(); + + // send chunk to io task + debug!(target: "process_upload", "sending data to io task"); + tx.send(chunk.clone()) + .await + .expect("failed to send data to io task"); + + if use_cache { + debug!(target: "process_upload", "receiving data into buffer"); + if data.len() + chunk.len() > data.capacity() { + error!(target: "process_upload", "the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."); + + // if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably) + data = BytesMut::new(); + use_cache = false; + } else { + data.put(chunk); + } + } + } + + // insert upload into cache if necessary + if use_cache { + let mut cache = self.cache.write().await; + + info!(target: "process_upload", "caching upload!"); + cache.insert(name, data.freeze()); + } + + // if all goes well, increment the cached upload counter + self.upl_count.fetch_add(1, Ordering::Relaxed); + } + + // read an upload from cache, if it exists + // previously, this would lock the cache as writable to renew the upload's cache lifespan + // locking the cache as readable allows multiple concurrent readers, which allows me to handle multiple views concurrently + async fn read_cached_upload(&self, name: &String) -> Option { + let cache = self.cache.read().await; + + if !cache.contains_key(&name) { + return None; + } + + // fetch upload data from cache + let data = cache + .get(&name) + .expect("failed to read get upload data from cache") + .to_owned(); + + Some(data) + } + + pub async fn get_upload(&self, original_path: &PathBuf) -> Result { + // extract upload file name + let name = original_path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + // path on disk + let mut path = self.save_path.clone(); + path.push(&name); + + // check if the upload exists, if not then 404 + if !self.upload_exists(&path).await { + return Err(StatusCode::NOT_FOUND); + } + + // attempt to read upload from cache + let cached_data = self.read_cached_upload(&name).await; + + if let Some(data) = cached_data { + info!(target: "get_upload", "got upload from cache!!"); + + return Ok(ViewResponse::FromCache(data)); + } else { + let mut file = File::open(&path).await.unwrap(); + + // read upload length from disk + let length = file + .metadata() + .await + .expect("failed to read upload file metadata") + .len() as usize; + + debug!(target: "get_upload", "read upload from disk, size = {}", length); + + // if the upload is okay to cache, recache it and send a fromcache response + if self.will_use_cache(length) { + // read file from disk + let mut data = BytesMut::with_capacity(length); + + // read file from disk and if it fails at any point, return 500 + loop { + match file.read_buf(&mut data).await { + Ok(n) => { + if n == 0 { + break; + } + } + Err(_) => { + return Err(StatusCode::INTERNAL_SERVER_ERROR); + }, + } + } + + let data = data.freeze(); + + // re-insert it into cache + let mut cache = self.cache.write().await; + cache.insert(name, data.clone()); + + info!(target: "get_upload", "recached upload from disk!"); + + return Ok(ViewResponse::FromCache(data)); + } + + info!(target: "get_upload", "got upload from disk!"); + + return Ok(ViewResponse::FromDisk(file)); + } + } +} diff --git a/src/index.rs b/src/index.rs index 48091cd..a34f335 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,10 +1,10 @@ -use std::sync::{atomic::Ordering, Arc}; - -use axum::extract::State; - -// show index status page -pub async fn index(State(engine): State>) -> String { - let count = engine.upl_count.load(Ordering::Relaxed); - - format!("minish's image host, currently hosting {} files", count) -} +use std::sync::{atomic::Ordering, Arc}; + +use axum::extract::State; + +// show index status page with amount of uploaded files +pub async fn index(State(engine): State>) -> String { + let count = engine.upl_count.load(Ordering::Relaxed); + + format!("minish's image host, currently hosting {} files", count) +} diff --git a/src/main.rs b/src/main.rs index 2a852a5..ebf0463 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,19 +1,17 @@ -use std::{sync::Arc, path::PathBuf, time::Duration, process::exit}; +use std::{env, path::PathBuf, sync::Arc, time::Duration}; extern crate axum; #[macro_use] extern crate log; -extern crate simplelog; - use engine::Engine; use axum::{ routing::{get, post}, Router, }; -use simplelog::*; +use tokio::signal; mod engine; mod index; @@ -22,39 +20,75 @@ mod view; #[tokio::main] async fn main() { - // Initialise logger - TermLogger::init( - LevelFilter::Warn, - Config::default(), - TerminalMode::Mixed, - ColorChoice::Auto, - ) - .unwrap(); + // initialise logger + env_logger::init(); - // Create engine - let engine = Engine::new( // TODO: Read config from env vars - "http://127.0.0.1:8000".to_string(), - PathBuf::from("./uploads/"), - 80_000_000, // Main instance is going to use this - Duration::from_secs(8), // CHANGE THIS!!!!!!! - Duration::from_secs(1), // THIS TOO!!!!!!!!!!!!!!! + // read env vars + let base_url = env::var("BRZ_BASE_URL").expect("missing BRZ_BASE_URL! base url for upload urls (ex: http://127.0.0.1:8000 for http://127.0.0.1:8000/p/abcdef.png, http://picture.wtf for http://picture.wtf/p/abcdef.png)"); + let save_path = env::var("BRZ_SAVE_PATH").expect("missing BRZ_SAVE_PATH! this should be a path where uploads are saved to disk (ex: /srv/uploads, C:\\brzuploads)"); + let cache_max_length = env::var("BRZ_CACHE_UPL_MAX_LENGTH").expect("missing BRZ_CACHE_UPL_MAX_LENGTH! this is the max length an upload can be in bytes before it won't be cached (ex: 80000000 for 80MB)"); + let cache_upl_lifetime = env::var("BRZ_CACHE_UPL_LIFETIME").expect("missing BRZ_CACHE_UPL_LIFETIME! this indicates how long an upload will stay in cache (ex: 1800 for 30 minutes, 60 for 1 minute)"); + let cache_scan_freq = env::var("BRZ_CACHE_SCAN_FREQ").expect("missing BRZ_CACHE_SCAN_FREQ! this is the frequency of full cache scans, which scan for and remove expired uploads (ex: 60 for 1 minute)"); + let cache_mem_capacity = env::var("BRZ_CACHE_MEM_CAPACITY").expect("missing BRZ_CACHE_MEM_CAPACITY! this is the amount of memory the cache will hold before dropping entries"); + + // parse env vars + let save_path = PathBuf::from(save_path); + let cache_max_length = usize::from_str_radix(&cache_max_length, 10).expect("failed parsing BRZ_CACHE_UPL_MAX_LENGTH! it should be a positive number without any separators"); + let cache_upl_lifetime = Duration::from_secs(u64::from_str_radix(&cache_upl_lifetime, 10).expect("failed parsing BRZ_CACHE_UPL_LIFETIME! it should be a positive number without any separators")); + let cache_scan_freq = Duration::from_secs(u64::from_str_radix(&cache_scan_freq, 10).expect("failed parsing BRZ_CACHE_SCAN_FREQ! it should be a positive number without any separators")); + let cache_mem_capacity = usize::from_str_radix(&cache_mem_capacity, 10).expect("failed parsing BRZ_CACHE_MEM_CAPACITY! it should be a positive number without any separators"); + + if !save_path.exists() || !save_path.is_dir() { + panic!("the save path does not exist or is not a directory. this is invalid"); + } + + // create engine + let engine = Engine::new( + base_url, + save_path, + cache_max_length, + cache_upl_lifetime, + cache_scan_freq, + cache_mem_capacity, ); - // Build main router + // build main router let app = Router::new() .route("/new", post(new::new)) .route("/p/:name", get(view::view)) .route("/", get(index::index)) - .route("/exit", get(exit_abc)) .with_state(Arc::new(engine)); - // Start web server - axum::Server::bind(&"127.0.0.1:8000".parse().unwrap()) // don't forget to change this! it's local for now + // start web server + axum::Server::bind(&"0.0.0.0:8000".parse().unwrap()) .serve(app.into_make_service()) + .with_graceful_shutdown(shutdown_signal()) .await .unwrap(); } -async fn exit_abc() { - exit(123); +async fn shutdown_signal() { + let ctrl_c = async { + signal::ctrl_c() + .await + .expect("failed to add ctrl-c handler"); + }; + + #[cfg(unix)] + let terminate = async { + signal::unix::signal(signal::unix::SignalKind::terminate()) + .expect("failed to add SIGTERM handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => {}, + _ = terminate => {}, + } + + info!("shutting down!"); } \ No newline at end of file diff --git a/src/new.rs b/src/new.rs index 0245535..579dc43 100644 --- a/src/new.rs +++ b/src/new.rs @@ -1,45 +1,45 @@ -use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc}; - -use axum::{ - extract::{BodyStream, Query, State}, - http::HeaderValue, -}; -use hyper::{HeaderMap, StatusCode, header}; - -#[axum::debug_handler] -pub async fn new( - State(engine): State>, - headers: HeaderMap, - Query(params): Query>, - stream: BodyStream, -) -> Result { - if !params.contains_key("name") { - return Err(StatusCode::BAD_REQUEST); - } - - let original_name = params.get("name").unwrap(); - let original_path = PathBuf::from(original_name); - - let path = engine.gen_path(&original_path).await; - let name = path - .file_name() - .and_then(OsStr::to_str) - .unwrap_or_default() - .to_string(); - - let url = format!("{}/p/{}", engine.base_url, name); - - let content_length = headers - .get(header::CONTENT_LENGTH) - .unwrap_or(&HeaderValue::from_static("")) - .to_str() - .and_then(|s| Ok(usize::from_str_radix(s, 10))) - .unwrap() - .unwrap_or(usize::MAX); - - engine - .process_upload(path, name, content_length, stream) - .await; - - Ok(url) -} +use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc}; + +use axum::{ + extract::{BodyStream, Query, State}, + http::HeaderValue, +}; +use hyper::{HeaderMap, StatusCode, header}; + +#[axum::debug_handler] +pub async fn new( + State(engine): State>, + headers: HeaderMap, + Query(params): Query>, + stream: BodyStream, +) -> Result { + if !params.contains_key("name") { + return Err(StatusCode::BAD_REQUEST); + } + + let original_name = params.get("name").unwrap(); + let original_path = PathBuf::from(original_name); + + let path = engine.gen_path(&original_path).await; + let name = path + .file_name() + .and_then(OsStr::to_str) + .unwrap_or_default() + .to_string(); + + let url = format!("{}/p/{}", engine.base_url, name); + + let content_length = headers + .get(header::CONTENT_LENGTH) + .unwrap_or(&HeaderValue::from_static("")) + .to_str() + .and_then(|s| Ok(usize::from_str_radix(s, 10))) + .unwrap() + .unwrap_or(usize::MAX); + + engine + .process_upload(path, name, content_length, stream) + .await; + + Ok(url) +} diff --git a/src/view.rs b/src/view.rs index fb7bbdf..8de516e 100644 --- a/src/view.rs +++ b/src/view.rs @@ -1,74 +1,62 @@ -use std::{ - path::{Component, PathBuf}, - sync::Arc, -}; - -use axum::{ - body::StreamBody, - extract::{Path, State}, - http::HeaderValue, - response::{IntoResponse, Response}, -}; - -use bytes::Bytes; -use hyper::StatusCode; -use mime_guess::mime; -use tokio::fs::File; -use tokio_util::io::ReaderStream; - -pub enum ViewResponse { - FromDisk(File), - FromCache(PathBuf, Bytes), -} - -impl IntoResponse for ViewResponse { - fn into_response(self) -> Response { - match self { - ViewResponse::FromDisk(file) => { - let reader = ReaderStream::new(file); - let stream = StreamBody::new(reader); - - stream.into_response() - } - ViewResponse::FromCache(original_path, data) => { - // guess the content-type using the original path - // (axum handles this w/ streamed file responses but caches are octet-stream by default) - let content_type = mime_guess::from_path(original_path) - .first() - .unwrap_or(mime::APPLICATION_OCTET_STREAM) - .to_string(); - - // extract mutable headers from the response - let mut res = data.into_response(); - let headers = res.headers_mut(); - - // clear the headers and add our content-type - headers.clear(); - headers.insert( - "content-type", - HeaderValue::from_str(content_type.as_str()).unwrap(), - ); - - res - } - } - } -} - -#[axum::debug_handler] -pub async fn view( - State(engine): State>, - Path(original_path): Path, -) -> Result { - // (hopefully) prevent path traversal, just check for any non-file components - if original_path - .components() - .into_iter() - .any(|x| !matches!(x, Component::Normal(_))) - { - error!(target: "view", "a request attempted path traversal"); - return Err(StatusCode::NOT_FOUND); - } - - engine.get_upload(&original_path).await -} +use std::{ + path::{Component, PathBuf}, + sync::Arc, +}; + +use axum::{ + body::StreamBody, + extract::{Path, State}, + response::{IntoResponse, Response}, +}; + +use bytes::Bytes; +use hyper::StatusCode; +use tokio::fs::File; +use tokio_util::io::ReaderStream; + +pub enum ViewResponse { + FromDisk(File), + FromCache(Bytes), +} + +impl IntoResponse for ViewResponse { + fn into_response(self) -> Response { + match self { + ViewResponse::FromDisk(file) => { + // create a streamed body response (we want to stream larger files) + let reader = ReaderStream::new(file); + let stream = StreamBody::new(reader); + + stream.into_response() + } + ViewResponse::FromCache(data) => { + // extract mutable headers from the response + let mut res = data.into_response(); + let headers = res.headers_mut(); + + // clear the headers, let the browser imply it + headers.clear(); + + res + } + } + } +} + +#[axum::debug_handler] +pub async fn view( + State(engine): State>, + Path(original_path): Path, +) -> Result { + // (hopefully) prevent path traversal, just check for any non-file components + if original_path + .components() + .into_iter() + .any(|x| !matches!(x, Component::Normal(_))) + { + warn!(target: "view", "a request attempted path traversal"); + return Err(StatusCode::NOT_FOUND); + } + + engine.get_upload(&original_path).await +}