From ebd04d7bb51fe19be2d2723a90e3afb9b4d9ca7b Mon Sep 17 00:00:00 2001 From: min Date: Sun, 31 May 2026 01:36:26 -0400 Subject: [PATCH] unfinished stuff --- Cargo.lock | 68 +----------- Cargo.toml | 8 +- src/cache.rs | 298 ++++++-------------------------------------------- src/config.rs | 9 +- src/disk.rs | 7 +- src/main.rs | 8 +- 6 files changed, 52 insertions(+), 346 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86e254e..ca15833 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,15 +17,6 @@ version = "2.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" -[[package]] -name = "aho-corasick" -version = "1.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301" -dependencies = [ - "memchr", -] - [[package]] name = "android_system_properties" version = "0.1.5" @@ -68,11 +59,10 @@ dependencies = [ [[package]] name = "atomic-time" -version = "0.1.5" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9622f5c6fb50377516c70f65159e70b25465409760c6bd6d4e581318bf704e83" +checksum = "75821c8282c0e622f3892087c1eeb8d4e3964b92467a263a44afa7d79dec7f3c" dependencies = [ - "once_cell", "portable-atomic", ] @@ -225,7 +215,6 @@ dependencies = [ "headers", "hmac", "http", - "http-body-util", "img-parts", "rand", "serde", @@ -236,10 +225,8 @@ dependencies = [ "tokio-stream", "tokio-util", "toml", - "tower", "tracing", "tracing-subscriber", - "tracing-test", "xxhash-rust", ] @@ -785,15 +772,6 @@ version = "0.4.29" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" -[[package]] -name = "matchers" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" -dependencies = [ - "regex-automata", -] - [[package]] name = "matchit" version = "0.8.4" @@ -1005,23 +983,6 @@ dependencies = [ "syn", ] -[[package]] -name = "regex-automata" -version = "0.4.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f" -dependencies = [ - "aho-corasick", - "memchr", - "regex-syntax", -] - -[[package]] -name = "regex-syntax" -version = "0.8.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" - [[package]] name = "rustc-demangle" version = "0.1.27" @@ -1512,39 +1473,14 @@ version = "0.3.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" dependencies = [ - "matchers", "nu-ansi-term", - "once_cell", - "regex-automata", "sharded-slab", "smallvec", "thread_local", - "tracing", "tracing-core", "tracing-log", ] -[[package]] -name = "tracing-test" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" -dependencies = [ - "tracing-core", - "tracing-subscriber", - "tracing-test-macro", -] - -[[package]] -name = "tracing-test-macro" -version = "0.2.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" -dependencies = [ - "quote", - "syn", -] - [[package]] name = "typenum" version = "1.20.0" diff --git a/Cargo.toml b/Cargo.toml index dd88990..c161d1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,7 +14,7 @@ debug = "line-tables-only" [dependencies] argh = "0.1.12" -atomic-time = "0.1.4" +atomic-time = "0.2" axum = { version = "0.8.9", features = ["macros"] } axum-extra = { version = "0.12.6", default-features = false, features = [ "tracing", @@ -39,7 +39,6 @@ tokio = { version = "1", features = [ "fs", "io-util", "signal", - "test-util", ] } tokio-stream = "0.1" tokio-util = { version = "0.7", features = ["io"] } @@ -52,10 +51,5 @@ tracing = "0.1" tracing-subscriber = "0.3" xxhash-rust = { version = "0.8", features = ["xxh3"] } -[dev-dependencies] -http-body-util = "0.1" -tower = "0.5" -tracing-test = "0.2" - [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.6" diff --git a/src/cache.rs b/src/cache.rs index 45aaf60..7fc6a8f 100644 --- a/src/cache.rs +++ b/src/cache.rs @@ -1,8 +1,12 @@ use std::{ - sync::atomic::{AtomicU64, AtomicUsize, Ordering}, - time::Duration, + sync::{ + Arc, + atomic::{AtomicUsize, Ordering}, + }, + time::{Duration, Instant}, }; +use atomic_time::AtomicInstant; use bytes::Bytes; use color_eyre::eyre::{self, bail}; use dashmap::{DashMap, mapref::one::Ref}; @@ -10,22 +14,15 @@ use tokio::time; use crate::config; -#[cfg(not(test))] -use atomic_time::AtomicSystemTime; -#[cfg(not(test))] -use std::time::SystemTime; -#[cfg(test)] -use tests::{MockAtomicSystemTime as AtomicSystemTime, MockSystemTime as SystemTime}; - /// An entry stored in the cache. /// /// It contains basic metadata and the actual value. -pub struct Entry { +struct Entry { /// The data held value: Bytes, /// The last time this entry was read/written - last_used: AtomicSystemTime, + last_used: AtomicInstant, /// Whether or not `last_used` should be updated update_used: bool, @@ -36,31 +33,25 @@ pub struct Entry { impl Entry { fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self { - let now = AtomicSystemTime::now(); - Self { value, - last_used: now, + last_used: AtomicInstant::now(), update_used, lifetime, } } - fn last_used(&self) -> SystemTime { + fn last_used(&self) -> Instant { self.last_used.load(Ordering::Relaxed) } fn is_expired(&self) -> bool { - match self.last_used().elapsed() { - Ok(d) => d >= self.lifetime, - Err(_) => false, // now > last_used - } + let since_last_used = self.last_used().elapsed(); + since_last_used >= self.lifetime } } /// A concurrent cache with a maximum memory size (w/ LRU) and expiration. -/// -/// It is designed to keep memory usage low. pub struct Cache { /// Where elements are stored map: DashMap, @@ -68,29 +59,33 @@ pub struct Cache { /// Total length of data stored in cache currently length: AtomicUsize, - /// How many times the scanner has ran, - /// for testing purposes - scan_count: AtomicU64, - /// How should it behave cfg: config::CacheConfig, } impl Cache { - pub fn with_config(cfg: config::CacheConfig) -> eyre::Result { + pub fn with_config(cfg: config::CacheConfig) -> eyre::Result> { // Sanity check chosen limits if cfg.mem_capacity < cfg.max_length { bail!("`max_length` should not exceed `mem_capacity`"); } - // Return - Ok(Self { + // Create + let me = Arc::new(Self { map: DashMap::with_capacity(64), length: AtomicUsize::new(0), - scan_count: AtomicU64::new(0), cfg, - }) + }); + + // Start scanner + tokio::spawn({ + let me = me.clone(); + async move { me.scanner().await } + }); + + // Return + Ok(me) } /// Figure out who should be bumped out of cache next @@ -159,12 +154,12 @@ impl Cache { // How far we went above the limit let needed = new_total - self.cfg.mem_capacity; - self.next_out(needed).iter().for_each(|k| { + for k in self.next_out(needed) { // Remove the element, and ignore the result // The only reason it should be failing is if it couldn't find it, // in which case it was already removed - self.remove(k); - }); + self.remove(&k); + } } // Atomically add to total cached data length @@ -210,7 +205,7 @@ impl Cache { let e = self.get_(key)?; if e.update_used { - e.last_used.store(SystemTime::now(), Ordering::Relaxed); + e.last_used.store(Instant::now(), Ordering::Relaxed); } Some(e.value.clone()) @@ -233,11 +228,9 @@ impl Cache { length <= (self.cfg.max_length as u64) } - /// The background job that scans through the cache and removes inactive elements. - /// - /// TODO: see if this is actually less expensive than - /// letting each entry keep track of expiry with its own task - pub async fn scanner(&self) { + /// This background job waits for entries to reach + /// their expiry timestamps and removes them proactively. + async fn scanner(&self) { let mut interval = time::interval(self.cfg.scan_freq); interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); interval.tick().await; // Skip first tick @@ -245,11 +238,10 @@ impl Cache { loop { // We put this first so that it doesn't scan the instant the server starts interval.tick().await; - self.scan_count.fetch_add(1, Ordering::Relaxed); // Save current timestamp so we aren't retrieving it constantly // If we don't do this it'll be a LOT of system api calls - let now = SystemTime::now(); + let now = Instant::now(); // Collect a list of all the expired keys // If we fail to compare the times, it gets added to the list anyways @@ -257,7 +249,7 @@ impl Cache { .map .iter() .filter_map(|e| { - let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX); + let elapsed = now.duration_since(e.last_used()); let is_expired = elapsed >= e.lifetime; if is_expired { @@ -277,225 +269,3 @@ impl Cache { } } } - -#[cfg(test)] -mod tests { - use std::{ - sync::{ - Arc, - atomic::{AtomicU64, Ordering}, - }, - time::Duration, - }; - - use bytes::Bytes; - - use crate::{cache::Cache, config::CacheConfig}; - - thread_local! { - static MOCK_CLOCK: AtomicU64 = AtomicU64::new(0); - } - fn get_clock() -> u64 { - MOCK_CLOCK.with(|mc| mc.load(Ordering::Relaxed)) - } - fn advance_clock(ms: u64) { - MOCK_CLOCK.with(|mc| mc.fetch_add(ms, Ordering::Relaxed)); - } - async fn advance_clock_async(ms: u64) { - advance_clock(ms); - tokio::time::advance(Duration::from_millis(ms)).await; - tokio::task::yield_now().await; // make sure scanner tick runs - } - - pub struct MockSystemTimeError; - - #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)] - pub(super) struct MockSystemTime(u64); - impl MockSystemTime { - pub fn now() -> Self { - Self(get_clock()) - } - - pub fn duration_since( - &self, - earlier: MockSystemTime, - ) -> Result { - if self.0 >= earlier.0 { - Ok(Duration::from_millis(self.0 - earlier.0)) - } else { - Err(MockSystemTimeError) - } - } - - pub fn elapsed(&self) -> Result { - Self::now().duration_since(*self) - } - } - - pub(super) struct MockAtomicSystemTime(AtomicU64); - impl MockAtomicSystemTime { - pub fn now() -> Self { - Self(AtomicU64::new(get_clock())) - } - - pub fn load(&self, order: Ordering) -> MockSystemTime { - MockSystemTime(self.0.load(order)) - } - pub fn store(&self, system_time: MockSystemTime, order: Ordering) { - self.0.store(system_time.0, order); - } - } - - const KEY: &str = "abcdef.png"; - const VALUE: Bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7]); - - fn simple() -> Cache { - return Cache::with_config(CacheConfig { - max_length: 10_000_000, - mem_capacity: 100_000_000, - scan_freq: Duration::from_secs(5), - upload_lifetime: Duration::from_secs(15), - }) - .unwrap(); - } - - async fn scanning() -> Arc { - let cache = Arc::new(simple()); - - tokio::spawn({ - let cache = cache.clone(); - async move { cache.scanner().await } - }); - // allow 0ms scanner tick to run - tokio::task::yield_now().await; - - cache - } - - /// Make sure that cache use check - /// decides properly for multiple lengths - #[test] - fn will_use() { - let cache = simple(); - - // use something - assert!(cache.will_use(4_000_000)); - - // don't use something - assert!(!cache.will_use(12_000_001)); - - // use something edge - assert!(cache.will_use(10_000_000)); - - // use something mini - assert!(cache.will_use(0)); - } - - /// Make sure that [`Cache::add`]'s return value - /// is `false` when an entry was replaced - #[test] - fn store_replacement() { - let cache = simple(); - - // store - assert!(cache.add(KEY, VALUE)); - - // store w replace - assert!(!cache.add(KEY, VALUE)); - } - - /// Make sure that the scanner ticks at - /// the right times, and removes entries - /// when expected. - #[tokio::test(start_paused = true)] - async fn store_expire_on_hit_with_scanner() { - let cache = scanning().await; - - // store - assert!(cache.add(KEY, VALUE)); - - // get again so that scanner timing - // doesn't align w expiration - advance_clock_async(4999).await; - assert_eq!(cache.scan_count.load(Ordering::Relaxed), 0); - assert_eq!(cache.get(KEY), Some(VALUE)); - - // next scanner tick - advance_clock_async(1).await; - assert_eq!(cache.scan_count.load(Ordering::Relaxed), 1); - - // advance a bit more - // make sure we don't expire early - advance_clock_async(7000).await; - assert_eq!(cache.scan_count.load(Ordering::Relaxed), 2); - assert!(cache.map.get(KEY).is_some()); - - // advance to next scanner tick - advance_clock_async(3000).await; - assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3); - - // advance to after expiry - advance_clock_async(4999).await; - assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3); - - // it should be there because we - // offset ourselves by 1ms - assert!(cache.map.get(KEY).is_some()); - assert_eq!(cache.get(KEY), None); - } - - /// Make sure that the scanner removes - /// expired entries. - #[tokio::test(start_paused = true)] - async fn store_expire_by_scanner() { - let cache = scanning().await; - - // store - assert!(cache.add(KEY, VALUE)); - - // make sure we don't expire early - advance_clock_async(6500).await; - assert!(cache.map.get(KEY).is_some()); - - // advance to after expiry - advance_clock_async(8500).await; - - // it should get hit by scanner - assert!(cache.map.get(KEY).is_none()); - } - - /// Make sure that entries expire on hit, - /// even when there is no scanner - #[test] - fn store_get_expire_on_hit() { - let cache = simple(); - - // store, get - let added_at = MockSystemTime::now(); - assert!(cache.add(KEY, VALUE)); - assert_eq!(cache.get(KEY), Some(VALUE)); - - // get after delay - // (upload gets used) - advance_clock(2000); - assert_eq!(cache.map.get(KEY).unwrap().last_used(), added_at); - assert_eq!(cache.get(KEY), Some(VALUE)); - assert_eq!( - cache.map.get(KEY).unwrap().last_used(), - MockSystemTime::now() - ); - - // get after longer delay - // (upload should have been used so no expire) - advance_clock(14000); - assert_eq!(cache.get(KEY), Some(VALUE)); - assert_eq!( - cache.map.get(KEY).unwrap().last_used(), - MockSystemTime::now() - ); - - // get after expiration - advance_clock(15000); - assert!(cache.get(KEY).is_none()); - } -} diff --git a/src/config.rs b/src/config.rs index c627cca..91825bf 100644 --- a/src/config.rs +++ b/src/config.rs @@ -72,12 +72,19 @@ pub struct DiskConfig { /// Location on disk the uploads are to be saved to pub save_path: PathBuf, - /// Maximum size of an upload that will be + /// Maximum size (in bytes) of an upload that will be /// saved on this disk. Anything higher will /// skip this disk. If no disks are suitable, /// the upload will be rejected. (status 413) pub max_save_len: Option, + /// Maximum total size (in bytes) of stored uploads + /// for this disk, so like a disk space cap. + /// It is useful to avoid "out of disk space" errors. + /// When a new upload cannot be fit on any disk, + /// the status code 507 will be returned + pub max_stored_len: Option, + /// When this "AND" condition is satisfied /// for an upload, it will be deleted. #[serde(default)] diff --git a/src/disk.rs b/src/disk.rs index 07250bc..824cfa7 100644 --- a/src/disk.rs +++ b/src/disk.rs @@ -128,9 +128,14 @@ impl Disk { } // flush to disk - // this should catch "no space left on device" i hope... if let Err(err) = file.flush().await { fail_callback(err).await; + return; + } + + // sync data+metadata to disk + if let Err(err) = file.sync_all().await { + fail_callback(err).await; } }); diff --git a/src/main.rs b/src/main.rs index 3a4a285..b97892d 100644 --- a/src/main.rs +++ b/src/main.rs @@ -78,15 +78,9 @@ async fn main() -> eyre::Result<()> { } // Create backends - let cache = Arc::new(Cache::with_config(cfg.cache)?); + let cache = Cache::with_config(cfg.cache)?; let disk = Disk::with_config(cfg.disk)?; - // Start cache scanner - tokio::spawn({ - let cache = cache.clone(); - async move { cache.scanner().await } - }); - // Create engine let engine = Engine::new(cfg.engine, cache, disk)?;