diff --git a/Cargo.lock b/Cargo.lock index ca15833..2292c50 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -373,9 +373,9 @@ dependencies = [ [[package]] name = "dashmap" -version = "6.1.0" +version = "6.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +checksum = "e6361d5c062261c78a176addb82d4c821ae42bed6089de0e12603cd25de2059c" dependencies = [ "cfg-if", "crossbeam-utils", diff --git a/Cargo.toml b/Cargo.toml index c161d1f..f3b7fc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ axum-extra = { version = "0.12.6", default-features = false, features = [ base64 = "0.22" bytes = "1" color-eyre = "0.6" -dashmap = { version = "6.1.0", features = ["inline"] } +dashmap = { version = "6.2.1", features = ["inline"] } headers = "0.4" hmac = "0.12.1" http = "1.2" diff --git a/src/cache.rs b/src/cache.rs index 7fc6a8f..7cc56e1 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -166,7 +166,16 @@ impl Cache { self.length.fetch_add(len, Ordering::Relaxed); // Add to the map, return true if we didn't replace anything - self.map.insert(key.to_string(), e).is_none() + if let Some(old) = self.map.insert(key.to_string(), e) { + // We replaced something so fix the length + // I think that shouldn't happen anyway probably? + self.length.fetch_sub(old.value.len(), Ordering::Relaxed); + // An entry was replaced + false + } else { + // No entry was replaced.... + true + } } /// Add a new element to the cache with the default lifetime. @@ -264,7 +273,17 @@ impl Cache { if !expired.is_empty() { // Use a retain call, should be less locks that way // (instead of many remove calls) - self.map.retain(|k, _| !expired.contains(k)); + self.map.retain(|k, e| { + let is_expired = expired.contains(k); + // Handle length update + // if expired + if is_expired { + self.length.fetch_sub(e.value.len(), Ordering::Relaxed); + } + // If it isn't expired + // it will stay + !is_expired + }); } } } diff --git a/src/config.rs b/src/config.rs index 91825bf..fb1db82 100644 --- a/src/config.rs +++ b/src/config.rs @@ -8,7 +8,7 @@ use tracing_subscriber::filter::LevelFilter; pub struct Config { pub engine: EngineConfig, pub cache: CacheConfig, - pub disk: DiskConfig, + pub disk: Vec, pub http: HttpConfig, pub logger: LoggerConfig, } @@ -51,22 +51,6 @@ pub struct EngineConfig { pub motd: String, } -#[serde_as] -#[derive(Deserialize, Default, Clone)] -pub struct DeleteWhenConfig { - /// Condition that is satisfied when - /// an upload reaches the specified age. - /// (in seconds) - #[serde_as(as = "DurationSeconds")] - pub older_than: Duration, - - /// Condition that is satisfied when - /// an upload has not been accessed - /// for the specified duration. (in seconds) - #[serde_as(as = "DurationSeconds")] - pub not_accessed_for: Duration, -} - #[derive(Deserialize, Clone)] pub struct DiskConfig { /// Location on disk the uploads are to be saved to @@ -77,18 +61,6 @@ pub struct DiskConfig { /// skip this disk. If no disks are suitable, /// the upload will be rejected. (status 413) pub max_save_len: Option, - - /// Maximum total size (in bytes) of stored uploads - /// for this disk, so like a disk space cap. - /// It is useful to avoid "out of disk space" errors. - /// When a new upload cannot be fit on any disk, - /// the status code 507 will be returned - pub max_stored_len: Option, - - /// When this "AND" condition is satisfied - /// for an upload, it will be deleted. - #[serde(default)] - pub delete_when: DeleteWhenConfig, } #[serde_as] diff --git a/src/disk.rs b/src/disk.rs index 824cfa7..b790d1e 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -1,22 +1,224 @@ -use std::path::{Path, PathBuf}; +use std::{ + collections::HashSet, + path::{Path, PathBuf}, + sync::{Arc, Weak}, +}; use bytes::Bytes; +use color_eyre::eyre::{self, bail}; +use dashmap::DashMap; use tokio::{ fs::File, io::{self, AsyncWriteExt}, - sync::mpsc, + sync::{Mutex, mpsc}, }; use crate::config; +/// An array of disk file stores with +/// a similar API to the cache. +pub struct DiskArray { + /// Master set of disks. + disks: Vec>, + + /// In-memory index of upload locations. + /// + /// [Weak] is used to make it easier to + /// drop disks if a future update does that + locations: DashMap>>, +} + +impl DiskArray { + pub fn with_configs(cfgs: Vec) -> eyre::Result { + // create all + let mut seen_save_paths = HashSet::new(); + let mut disks = Vec::new(); + let locations: DashMap<_, Vec<_>> = DashMap::new(); + + for cfg in cfgs { + // make sure save paths are unique + // if two disks have the same save path, + // they will both try to save new uploads + // to the exact same spot which probably + // causes a lot of problems. also deletes + // will try to delete the same file etcetc + if !seen_save_paths.insert(cfg.save_path.clone()) { + bail!("disk has duplicate save path: {:?}", cfg.save_path); + } + + // init disk + let disk = Arc::new(Disk::with_config(cfg)?); + + // index files + for saved_name in disk.files()? { + let saved_name = saved_name?; + // add disk reference + let disk = Arc::downgrade(&disk); + let mut on_disks = locations.entry(saved_name).or_default(); + on_disks.push(disk); + } + + // add to disks + disks.push(disk); + } + + // return + Ok(Self { disks, locations }) + } + + /// Returns the amount of uploads stored + /// across all disks + pub fn count(&self) -> usize { + self.locations.len() + } + + /// Returns whether or not an upload + /// can be stored on any disk + pub fn will_use(&self, length: u64) -> bool { + self.disks.iter().any(|d| d.will_use(length)) + } + + /// Fast-path way to check if we have + /// an upload using location index + pub fn has(&self, saved_name: &str) -> bool { + self.locations.contains_key(saved_name) + } + + /// Get the size of an upload's file + pub async fn len(&self, f: &File) -> io::Result { + Ok(f.metadata().await?.len()) + } + + /// Remove an upload from all disks + pub async fn remove(&self, saved_name: &str) -> io::Result<()> { + // find what disks the upload is stored on + // (removing from location index) + println!("get 1"); + let Some((_, on_disks)) = self.locations.remove(saved_name) else { + // that's not an upload + return Err(io::Error::new( + io::ErrorKind::NotFound, + "file to remove wasn't found", + )); + }; + println!("get 2"); + // delete from all disks its stored on + for disk in &on_disks { + let Some(disk) = disk.upgrade() else { + // dead disk so whatever + continue; + }; + // try to delete file + disk.remove(saved_name).await?; + } + // return + Ok(()) + } + + /// Start a save I/O task that directs + /// to all disks + pub fn start_save< + Fut: Future + Send + 'static, + F: FnOnce(eyre::Error) -> Fut + Send + 'static, + >( + &self, + saved_name: &str, + length: u64, + fail_callback: F, + ) -> mpsc::Sender { + let (tx, mut rx) = mpsc::channel::(1000); + + // setup oneshot fail callback + let fail_callback = Arc::new(Mutex::new(Some(fail_callback))); + + // add to location index + let mut on_disks = self.locations.entry(saved_name.to_string()).or_default(); + + // start save tasks + let mut txs = Vec::new(); + for disk in &self.disks { + if !disk.will_use(length) { + // we don't want that really + continue; + } + // update location index + { + let disk = Arc::downgrade(disk); + on_disks.push(disk); + } + // start task + let fail_callback = fail_callback.clone(); + let tx = disk.start_save(saved_name, async move |err| { + // run callback if we can + if let Some(fail_callback) = fail_callback.lock().await.take() { + fail_callback(err.into()).await; + } + // also so i remember- fail_callback is how late errors + // get handled. by the time it is called we don't need + // to care about channels + }); + txs.push(tx); + } + + // start our bg task + tokio::spawn(async move { + while let Some(chunk) = rx.recv().await { + // send to all disk tasks + for tx in &txs { + // handle error. + if let Err(err) = tx.send(chunk.clone()).await { + // try to report that + if let Some(fail_callback) = fail_callback.lock().await.take() { + fail_callback(err.into()).await; + } + // we dont want to talk + // with dead channels + return; + } + } + } + }); + + tx + } + + /// Opens an upload on the first disk + /// that works + /// (in order of definition in config) + pub async fn open(&self, saved_name: &str) -> io::Result> { + // get location entry + let Some(on_disks) = self.locations.get(saved_name) else { + // that's not found..... + return Ok(None); + }; + + // start trying disks + for disk in on_disks.iter() { + let Some(disk) = disk.upgrade() else { + // no more that disk :( + // it would be nice to remove it from list + continue; + }; + // try to open + if let Some(f) = disk.open(saved_name).await? { + return Ok(Some(f)); + } + } + + // none worked.... + // it would be nice to delete the entry + Ok(None) + } +} + /// Provides an API to access the disk file store /// like we access the cache. -pub struct Disk { +struct Disk { cfg: config::DiskConfig, } impl Disk { - pub fn with_config(cfg: config::DiskConfig) -> io::Result { + fn with_config(cfg: config::DiskConfig) -> io::Result { // check path if !cfg.save_path.exists() { return Err(io::Error::new( @@ -35,15 +237,19 @@ impl Disk { Ok(Self { cfg }) } - /// Counts the number of files saved to disk we have - pub fn count(&self) -> io::Result { - std::fs::read_dir(&self.cfg.save_path)?.try_fold(0, |acc, x| { - Ok(if x?.file_type()?.is_file() { - acc + 1 - } else { - acc - }) - }) + /// Returns an iterator of stored file names. + fn files(&self) -> io::Result>> { + Ok(std::fs::read_dir(&self.cfg.save_path)?.filter_map(|e| { + // todo: refactor when try blocks are out^^ + (|| { + let e = e?; + Ok(e.file_type()? + .is_file() + .then_some(e.file_name().into_string().ok()) + .flatten()) + })() + .transpose() + })) } /// Returns whether or not an upload @@ -66,7 +272,7 @@ impl Disk { /// Try to open a file on disk, and if we didn't find it, /// then return [`None`]. - pub async fn open(&self, saved_name: &str) -> io::Result> { + async fn open(&self, saved_name: &str) -> io::Result> { let p = self.path_for(saved_name); match File::open(p).await { @@ -78,23 +284,15 @@ impl Disk { } } - /// Get the size of an upload's file - pub async fn len(&self, f: &File) -> io::Result { - Ok(f.metadata().await?.len()) - } - /// Remove an upload from disk. - pub async fn remove(&self, saved_name: &str) -> io::Result<()> { + async fn remove(&self, saved_name: &str) -> io::Result<()> { let p = self.path_for(saved_name); tokio::fs::remove_file(p).await } /// Create a background I/O task - pub fn start_save< - Fut: Future + Send + 'static, - F: FnOnce(io::Error) -> Fut + Send + 'static, - >( + fn start_save Fut + Send + 'static>( &self, saved_name: &str, fail_callback: F, @@ -103,7 +301,7 @@ impl Disk { // a large buffer size is chosen so uploads can be received quickly, // but with less possibility of running out of memory. // (thats probably only possible w very high link speed tho......) - let (tx, mut rx): (mpsc::Sender, mpsc::Receiver) = mpsc::channel(30000); + let (tx, mut rx) = mpsc::channel::(1000); let p = self.path_for(saved_name); diff --git a/src/engine.rs b/src/engine.rs index 9b1d29e..93a429e 100644 --- a/src/engine.rs +++ b/src/engine.rs @@ -1,10 +1,7 @@ use std::{ io::SeekFrom, ops::{Bound, RangeBounds}, - sync::{ - Arc, - atomic::{AtomicUsize, Ordering}, - }, + sync::Arc, time::Duration, }; @@ -77,9 +74,6 @@ type HmacSha256 = hmac::Hmac; /// breeze engine pub struct Engine { - /// Cached count of uploaded files - pub upl_count: AtomicUsize, - /// Engine configuration pub cfg: config::EngineConfig, @@ -90,7 +84,7 @@ pub struct Engine { cache: Arc, /// An interface to the on-disk upload store - disk: disk::Disk, + disk: disk::DiskArray, } /// Try to parse a `Range` header into an easier format to work with @@ -173,26 +167,20 @@ fn calculate_hash(len: u64, data_sample: Bytes) -> u128 { impl Engine { /// Creates a new instance of the engine - pub fn new( - cfg: config::EngineConfig, - cache: Arc, - disk: disk::Disk, - ) -> std::io::Result { + pub fn new(cfg: config::EngineConfig, cache: Arc, disk: disk::DiskArray) -> Self { let deletion_hmac = cfg .deletion_secret .as_ref() .map(|s| HmacSha256::new_from_slice(s.as_bytes()).unwrap()); - Ok(Self { - // initialise our cached upload count. this doesn't include temp uploads! - upl_count: AtomicUsize::new(disk.count()?), + Self { deletion_hmac, cfg, cache, disk, - }) + } } /// Fetch an upload. @@ -291,18 +279,23 @@ impl Engine { Ok(GetOutcome::Success(res)) } + /// Returns the amount of uploads we have stored. + pub fn count(&self) -> usize { + self.disk.count() + } + /// Check if we have an upload stored anywhere. /// /// This is only used to prevent `saved_name` collisions!! /// It is not used to deliver "not found" errors. - pub async fn has(&self, saved_name: &str) -> bool { + pub fn has(&self, saved_name: &str) -> bool { + // look in cache if self.cache.has(saved_name) { return true; } - // sidestep handling the error properly - // that way we can call this in gen_saved_name easier - if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) { + // look in disk + if self.disk.has(saved_name) { return true; } @@ -350,18 +343,18 @@ impl Engine { /// Generate a new saved name for an upload. /// /// If it picks a name that already exists, it will try again. - pub async fn gen_saved_name(&self, ext: Option) -> String { + pub fn gen_saved_name(&self, ext: Option<&str>) -> String { loop { // generate a 6-character alphanumeric string let mut saved_name: String = Alphanumeric.sample_string(&mut rand::rng(), 6); // if we have an extension, add it now - if let Some(ref ext) = ext { + if let Some(ext) = ext { saved_name.push('.'); saved_name.push_str(ext); } - if !self.has(&saved_name).await { + if !self.has(&saved_name) { break saved_name; } @@ -384,9 +377,6 @@ impl Engine { info!("!! successfully removed upload"); - // decrement upload count - self.upl_count.fetch_sub(1, Ordering::Relaxed); - Ok(()) } @@ -411,7 +401,8 @@ impl Engine { // don't begin a disk save if we're using temporary lifetimes let tx = if lifetime.is_none() { - Some(self.disk.start_save(saved_name, { + // todo: can you lie about len?? + Some(self.disk.start_save(saved_name, provided_len, { let me = self.clone(); let saved_name = saved_name.to_string(); @@ -543,7 +534,8 @@ impl Engine { keep_exif: bool, ) -> eyre::Result { // if the upload size is greater than our max file size, deny it now - if !self.disk.will_use(provided_len) { + // temp uploads get to skip it because they don't use the disk + if lifetime.is_none() && !self.disk.will_use(provided_len) { return Ok(ProcessOutcome::UploadTooLarge); } @@ -561,7 +553,7 @@ impl Engine { } // generate the file name - let saved_name = self.gen_saved_name(ext).await; + let saved_name = self.gen_saved_name(ext.as_deref()); // save it let save_result = self @@ -583,7 +575,12 @@ impl Engine { Err(err) => { error!(?err, "failed processing upload!"); - self.remove(&saved_name).await?; + // if the disk is messed up or something + // lets not propagate the remove error + // before the save error + if let Err(err) = self.remove(&saved_name).await { + error!(?err, "failed to cleanup failed upload!"); + } return Err(err); } }; @@ -611,9 +608,6 @@ impl Engine { // format and send back the url let url = format!("{}/p/{saved_name}", self.cfg.base_url); - // if all goes well, increment the cached upload counter - self.upl_count.fetch_add(1, Ordering::Relaxed); - info!("finished processing upload!"); Ok(ProcessOutcome::Success { url, deletion_url }) diff --git a/src/index.rs b/src/index.rs index 852f6ff..db9984f 100644 --- a/src/index.rs +++ b/src/index.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, atomic::Ordering}; +use std::sync::Arc; use axum::extract::State; @@ -6,7 +6,7 @@ use crate::engine::Engine; /// Show index status page with amount of uploaded files pub async fn index(State(engine): State>) -> String { - let count = engine.upl_count.load(Ordering::Relaxed); + let count = engine.count(); let motd = engine.cfg.motd.clone(); diff --git a/src/main.rs b/src/main.rs index b97892d..016aeee 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,7 +2,6 @@ use std::{path::PathBuf, sync::Arc}; use argh::FromArgs; use color_eyre::eyre::{self, Context}; -use engine::Engine; use axum::{ Router, @@ -23,7 +22,7 @@ mod view; #[cfg(not(target_env = "msvc"))] use tikv_jemallocator::Jemalloc; -use crate::{cache::Cache, disk::Disk}; +use crate::{cache::Cache, disk::DiskArray, engine::Engine}; #[cfg(not(target_env = "msvc"))] #[global_allocator] @@ -79,10 +78,10 @@ async fn main() -> eyre::Result<()> { // Create backends let cache = Cache::with_config(cfg.cache)?; - let disk = Disk::with_config(cfg.disk)?; + let disk_array = DiskArray::with_configs(cfg.disk)?; // Create engine - let engine = Engine::new(cfg.engine, cache, disk)?; + let engine = Engine::new(cfg.engine, cache, disk_array); // Build main router let app = router(engine); diff --git a/src/view.rs b/src/view.rs index cb46015..b1015d5 100644 --- a/src/view.rs +++ b/src/view.rs @@ -73,8 +73,11 @@ impl IntoResponse for UploadResponse { // if it is not the full size, add relevant headers/status for range request if range_len != self.full_len { + // the spec says its meant to be an inclusive range + // so we do that......... + let end_incl = end - 1; let content_range = - HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end, self.full_len)) + HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end_incl, self.full_len)) .expect("construct content-range header failed"); headers.insert("Content-Range", content_range);