unfinished stuff

This commit is contained in:
minish 2026-05-31 01:36:26 -04:00
parent b65b6ca002
commit ebd04d7bb5
Signed by: min
SSH Key Fingerprint: SHA256:mf+pUTmK92Y57BuCjlkBdd82LqztTfDCQIUp0fCKABc
6 changed files with 52 additions and 346 deletions

68
Cargo.lock generated
View File

@ -17,15 +17,6 @@ version = "2.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa" checksum = "320119579fcad9c21884f5c4861d16174d0e06250625266f50fe6898340abefa"
[[package]]
name = "aho-corasick"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddd31a130427c27518df266943a5308ed92d4b226cc639f5a8f1002816174301"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "android_system_properties" name = "android_system_properties"
version = "0.1.5" version = "0.1.5"
@ -68,11 +59,10 @@ dependencies = [
[[package]] [[package]]
name = "atomic-time" name = "atomic-time"
version = "0.1.5" version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9622f5c6fb50377516c70f65159e70b25465409760c6bd6d4e581318bf704e83" checksum = "75821c8282c0e622f3892087c1eeb8d4e3964b92467a263a44afa7d79dec7f3c"
dependencies = [ dependencies = [
"once_cell",
"portable-atomic", "portable-atomic",
] ]
@ -225,7 +215,6 @@ dependencies = [
"headers", "headers",
"hmac", "hmac",
"http", "http",
"http-body-util",
"img-parts", "img-parts",
"rand", "rand",
"serde", "serde",
@ -236,10 +225,8 @@ dependencies = [
"tokio-stream", "tokio-stream",
"tokio-util", "tokio-util",
"toml", "toml",
"tower",
"tracing", "tracing",
"tracing-subscriber", "tracing-subscriber",
"tracing-test",
"xxhash-rust", "xxhash-rust",
] ]
@ -785,15 +772,6 @@ version = "0.4.29"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897" checksum = "5e5032e24019045c762d3c0f28f5b6b8bbf38563a65908389bf7978758920897"
[[package]]
name = "matchers"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata",
]
[[package]] [[package]]
name = "matchit" name = "matchit"
version = "0.8.4" version = "0.8.4"
@ -1005,23 +983,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "regex-automata"
version = "0.4.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e1dd4122fc1595e8162618945476892eefca7b88c52820e74af6262213cae8f"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a"
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.27" version = "0.1.27"
@ -1512,39 +1473,14 @@ version = "0.3.23"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319" checksum = "cb7f578e5945fb242538965c2d0b04418d38ec25c79d160cd279bf0731c8d319"
dependencies = [ dependencies = [
"matchers",
"nu-ansi-term", "nu-ansi-term",
"once_cell",
"regex-automata",
"sharded-slab", "sharded-slab",
"smallvec", "smallvec",
"thread_local", "thread_local",
"tracing",
"tracing-core", "tracing-core",
"tracing-log", "tracing-log",
] ]
[[package]]
name = "tracing-test"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051"
dependencies = [
"tracing-core",
"tracing-subscriber",
"tracing-test-macro",
]
[[package]]
name = "tracing-test-macro"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d"
dependencies = [
"quote",
"syn",
]
[[package]] [[package]]
name = "typenum" name = "typenum"
version = "1.20.0" version = "1.20.0"

View File

@ -14,7 +14,7 @@ debug = "line-tables-only"
[dependencies] [dependencies]
argh = "0.1.12" argh = "0.1.12"
atomic-time = "0.1.4" atomic-time = "0.2"
axum = { version = "0.8.9", features = ["macros"] } axum = { version = "0.8.9", features = ["macros"] }
axum-extra = { version = "0.12.6", default-features = false, features = [ axum-extra = { version = "0.12.6", default-features = false, features = [
"tracing", "tracing",
@ -39,7 +39,6 @@ tokio = { version = "1", features = [
"fs", "fs",
"io-util", "io-util",
"signal", "signal",
"test-util",
] } ] }
tokio-stream = "0.1" tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] } tokio-util = { version = "0.7", features = ["io"] }
@ -52,10 +51,5 @@ tracing = "0.1"
tracing-subscriber = "0.3" tracing-subscriber = "0.3"
xxhash-rust = { version = "0.8", features = ["xxh3"] } xxhash-rust = { version = "0.8", features = ["xxh3"] }
[dev-dependencies]
http-body-util = "0.1"
tower = "0.5"
tracing-test = "0.2"
[target.'cfg(not(target_env = "msvc"))'.dependencies] [target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6" tikv-jemallocator = "0.6"

View File

@ -1,8 +1,12 @@
use std::{ use std::{
sync::atomic::{AtomicU64, AtomicUsize, Ordering}, sync::{
time::Duration, Arc,
atomic::{AtomicUsize, Ordering},
},
time::{Duration, Instant},
}; };
use atomic_time::AtomicInstant;
use bytes::Bytes; use bytes::Bytes;
use color_eyre::eyre::{self, bail}; use color_eyre::eyre::{self, bail};
use dashmap::{DashMap, mapref::one::Ref}; use dashmap::{DashMap, mapref::one::Ref};
@ -10,22 +14,15 @@ use tokio::time;
use crate::config; use crate::config;
#[cfg(not(test))]
use atomic_time::AtomicSystemTime;
#[cfg(not(test))]
use std::time::SystemTime;
#[cfg(test)]
use tests::{MockAtomicSystemTime as AtomicSystemTime, MockSystemTime as SystemTime};
/// An entry stored in the cache. /// An entry stored in the cache.
/// ///
/// It contains basic metadata and the actual value. /// It contains basic metadata and the actual value.
pub struct Entry { struct Entry {
/// The data held /// The data held
value: Bytes, value: Bytes,
/// The last time this entry was read/written /// The last time this entry was read/written
last_used: AtomicSystemTime, last_used: AtomicInstant,
/// Whether or not `last_used` should be updated /// Whether or not `last_used` should be updated
update_used: bool, update_used: bool,
@ -36,31 +33,25 @@ pub struct Entry {
impl Entry { impl Entry {
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self { fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
let now = AtomicSystemTime::now();
Self { Self {
value, value,
last_used: now, last_used: AtomicInstant::now(),
update_used, update_used,
lifetime, lifetime,
} }
} }
fn last_used(&self) -> SystemTime { fn last_used(&self) -> Instant {
self.last_used.load(Ordering::Relaxed) self.last_used.load(Ordering::Relaxed)
} }
fn is_expired(&self) -> bool { fn is_expired(&self) -> bool {
match self.last_used().elapsed() { let since_last_used = self.last_used().elapsed();
Ok(d) => d >= self.lifetime, since_last_used >= self.lifetime
Err(_) => false, // now > last_used
}
} }
} }
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration. /// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
///
/// It is designed to keep memory usage low.
pub struct Cache { pub struct Cache {
/// Where elements are stored /// Where elements are stored
map: DashMap<String, Entry>, map: DashMap<String, Entry>,
@ -68,29 +59,33 @@ pub struct Cache {
/// Total length of data stored in cache currently /// Total length of data stored in cache currently
length: AtomicUsize, length: AtomicUsize,
/// How many times the scanner has ran,
/// for testing purposes
scan_count: AtomicU64,
/// How should it behave /// How should it behave
cfg: config::CacheConfig, cfg: config::CacheConfig,
} }
impl Cache { impl Cache {
pub fn with_config(cfg: config::CacheConfig) -> eyre::Result<Self> { pub fn with_config(cfg: config::CacheConfig) -> eyre::Result<Arc<Self>> {
// Sanity check chosen limits // Sanity check chosen limits
if cfg.mem_capacity < cfg.max_length { if cfg.mem_capacity < cfg.max_length {
bail!("`max_length` should not exceed `mem_capacity`"); bail!("`max_length` should not exceed `mem_capacity`");
} }
// Return // Create
Ok(Self { let me = Arc::new(Self {
map: DashMap::with_capacity(64), map: DashMap::with_capacity(64),
length: AtomicUsize::new(0), length: AtomicUsize::new(0),
scan_count: AtomicU64::new(0),
cfg, cfg,
}) });
// Start scanner
tokio::spawn({
let me = me.clone();
async move { me.scanner().await }
});
// Return
Ok(me)
} }
/// Figure out who should be bumped out of cache next /// Figure out who should be bumped out of cache next
@ -159,12 +154,12 @@ impl Cache {
// How far we went above the limit // How far we went above the limit
let needed = new_total - self.cfg.mem_capacity; let needed = new_total - self.cfg.mem_capacity;
self.next_out(needed).iter().for_each(|k| { for k in self.next_out(needed) {
// Remove the element, and ignore the result // Remove the element, and ignore the result
// The only reason it should be failing is if it couldn't find it, // The only reason it should be failing is if it couldn't find it,
// in which case it was already removed // in which case it was already removed
self.remove(k); self.remove(&k);
}); }
} }
// Atomically add to total cached data length // Atomically add to total cached data length
@ -210,7 +205,7 @@ impl Cache {
let e = self.get_(key)?; let e = self.get_(key)?;
if e.update_used { if e.update_used {
e.last_used.store(SystemTime::now(), Ordering::Relaxed); e.last_used.store(Instant::now(), Ordering::Relaxed);
} }
Some(e.value.clone()) Some(e.value.clone())
@ -233,11 +228,9 @@ impl Cache {
length <= (self.cfg.max_length as u64) length <= (self.cfg.max_length as u64)
} }
/// The background job that scans through the cache and removes inactive elements. /// This background job waits for entries to reach
/// /// their expiry timestamps and removes them proactively.
/// TODO: see if this is actually less expensive than async fn scanner(&self) {
/// letting each entry keep track of expiry with its own task
pub async fn scanner(&self) {
let mut interval = time::interval(self.cfg.scan_freq); let mut interval = time::interval(self.cfg.scan_freq);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip); interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
interval.tick().await; // Skip first tick interval.tick().await; // Skip first tick
@ -245,11 +238,10 @@ impl Cache {
loop { loop {
// We put this first so that it doesn't scan the instant the server starts // We put this first so that it doesn't scan the instant the server starts
interval.tick().await; interval.tick().await;
self.scan_count.fetch_add(1, Ordering::Relaxed);
// Save current timestamp so we aren't retrieving it constantly // Save current timestamp so we aren't retrieving it constantly
// If we don't do this it'll be a LOT of system api calls // If we don't do this it'll be a LOT of system api calls
let now = SystemTime::now(); let now = Instant::now();
// Collect a list of all the expired keys // Collect a list of all the expired keys
// If we fail to compare the times, it gets added to the list anyways // If we fail to compare the times, it gets added to the list anyways
@ -257,7 +249,7 @@ impl Cache {
.map .map
.iter() .iter()
.filter_map(|e| { .filter_map(|e| {
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX); let elapsed = now.duration_since(e.last_used());
let is_expired = elapsed >= e.lifetime; let is_expired = elapsed >= e.lifetime;
if is_expired { if is_expired {
@ -277,225 +269,3 @@ impl Cache {
} }
} }
} }
#[cfg(test)]
mod tests {
use std::{
sync::{
Arc,
atomic::{AtomicU64, Ordering},
},
time::Duration,
};
use bytes::Bytes;
use crate::{cache::Cache, config::CacheConfig};
thread_local! {
static MOCK_CLOCK: AtomicU64 = AtomicU64::new(0);
}
fn get_clock() -> u64 {
MOCK_CLOCK.with(|mc| mc.load(Ordering::Relaxed))
}
fn advance_clock(ms: u64) {
MOCK_CLOCK.with(|mc| mc.fetch_add(ms, Ordering::Relaxed));
}
async fn advance_clock_async(ms: u64) {
advance_clock(ms);
tokio::time::advance(Duration::from_millis(ms)).await;
tokio::task::yield_now().await; // make sure scanner tick runs
}
pub struct MockSystemTimeError;
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
pub(super) struct MockSystemTime(u64);
impl MockSystemTime {
pub fn now() -> Self {
Self(get_clock())
}
pub fn duration_since(
&self,
earlier: MockSystemTime,
) -> Result<Duration, MockSystemTimeError> {
if self.0 >= earlier.0 {
Ok(Duration::from_millis(self.0 - earlier.0))
} else {
Err(MockSystemTimeError)
}
}
pub fn elapsed(&self) -> Result<Duration, MockSystemTimeError> {
Self::now().duration_since(*self)
}
}
pub(super) struct MockAtomicSystemTime(AtomicU64);
impl MockAtomicSystemTime {
pub fn now() -> Self {
Self(AtomicU64::new(get_clock()))
}
pub fn load(&self, order: Ordering) -> MockSystemTime {
MockSystemTime(self.0.load(order))
}
pub fn store(&self, system_time: MockSystemTime, order: Ordering) {
self.0.store(system_time.0, order);
}
}
const KEY: &str = "abcdef.png";
const VALUE: Bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7]);
fn simple() -> Cache {
return Cache::with_config(CacheConfig {
max_length: 10_000_000,
mem_capacity: 100_000_000,
scan_freq: Duration::from_secs(5),
upload_lifetime: Duration::from_secs(15),
})
.unwrap();
}
async fn scanning() -> Arc<Cache> {
let cache = Arc::new(simple());
tokio::spawn({
let cache = cache.clone();
async move { cache.scanner().await }
});
// allow 0ms scanner tick to run
tokio::task::yield_now().await;
cache
}
/// Make sure that cache use check
/// decides properly for multiple lengths
#[test]
fn will_use() {
let cache = simple();
// use something
assert!(cache.will_use(4_000_000));
// don't use something
assert!(!cache.will_use(12_000_001));
// use something edge
assert!(cache.will_use(10_000_000));
// use something mini
assert!(cache.will_use(0));
}
/// Make sure that [`Cache::add`]'s return value
/// is `false` when an entry was replaced
#[test]
fn store_replacement() {
let cache = simple();
// store
assert!(cache.add(KEY, VALUE));
// store w replace
assert!(!cache.add(KEY, VALUE));
}
/// Make sure that the scanner ticks at
/// the right times, and removes entries
/// when expected.
#[tokio::test(start_paused = true)]
async fn store_expire_on_hit_with_scanner() {
let cache = scanning().await;
// store
assert!(cache.add(KEY, VALUE));
// get again so that scanner timing
// doesn't align w expiration
advance_clock_async(4999).await;
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 0);
assert_eq!(cache.get(KEY), Some(VALUE));
// next scanner tick
advance_clock_async(1).await;
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 1);
// advance a bit more
// make sure we don't expire early
advance_clock_async(7000).await;
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 2);
assert!(cache.map.get(KEY).is_some());
// advance to next scanner tick
advance_clock_async(3000).await;
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3);
// advance to after expiry
advance_clock_async(4999).await;
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3);
// it should be there because we
// offset ourselves by 1ms
assert!(cache.map.get(KEY).is_some());
assert_eq!(cache.get(KEY), None);
}
/// Make sure that the scanner removes
/// expired entries.
#[tokio::test(start_paused = true)]
async fn store_expire_by_scanner() {
let cache = scanning().await;
// store
assert!(cache.add(KEY, VALUE));
// make sure we don't expire early
advance_clock_async(6500).await;
assert!(cache.map.get(KEY).is_some());
// advance to after expiry
advance_clock_async(8500).await;
// it should get hit by scanner
assert!(cache.map.get(KEY).is_none());
}
/// Make sure that entries expire on hit,
/// even when there is no scanner
#[test]
fn store_get_expire_on_hit() {
let cache = simple();
// store, get
let added_at = MockSystemTime::now();
assert!(cache.add(KEY, VALUE));
assert_eq!(cache.get(KEY), Some(VALUE));
// get after delay
// (upload gets used)
advance_clock(2000);
assert_eq!(cache.map.get(KEY).unwrap().last_used(), added_at);
assert_eq!(cache.get(KEY), Some(VALUE));
assert_eq!(
cache.map.get(KEY).unwrap().last_used(),
MockSystemTime::now()
);
// get after longer delay
// (upload should have been used so no expire)
advance_clock(14000);
assert_eq!(cache.get(KEY), Some(VALUE));
assert_eq!(
cache.map.get(KEY).unwrap().last_used(),
MockSystemTime::now()
);
// get after expiration
advance_clock(15000);
assert!(cache.get(KEY).is_none());
}
}

View File

@ -72,12 +72,19 @@ pub struct DiskConfig {
/// Location on disk the uploads are to be saved to /// Location on disk the uploads are to be saved to
pub save_path: PathBuf, pub save_path: PathBuf,
/// Maximum size of an upload that will be /// Maximum size (in bytes) of an upload that will be
/// saved on this disk. Anything higher will /// saved on this disk. Anything higher will
/// skip this disk. If no disks are suitable, /// skip this disk. If no disks are suitable,
/// the upload will be rejected. (status 413) /// the upload will be rejected. (status 413)
pub max_save_len: Option<u64>, pub max_save_len: Option<u64>,
/// Maximum total size (in bytes) of stored uploads
/// for this disk, so like a disk space cap.
/// It is useful to avoid "out of disk space" errors.
/// When a new upload cannot be fit on any disk,
/// the status code 507 will be returned
pub max_stored_len: Option<u64>,
/// When this "AND" condition is satisfied /// When this "AND" condition is satisfied
/// for an upload, it will be deleted. /// for an upload, it will be deleted.
#[serde(default)] #[serde(default)]

View File

@ -128,9 +128,14 @@ impl Disk {
} }
// flush to disk // flush to disk
// this should catch "no space left on device" i hope...
if let Err(err) = file.flush().await { if let Err(err) = file.flush().await {
fail_callback(err).await; fail_callback(err).await;
return;
}
// sync data+metadata to disk
if let Err(err) = file.sync_all().await {
fail_callback(err).await;
} }
}); });

View File

@ -78,15 +78,9 @@ async fn main() -> eyre::Result<()> {
} }
// Create backends // Create backends
let cache = Arc::new(Cache::with_config(cfg.cache)?); let cache = Cache::with_config(cfg.cache)?;
let disk = Disk::with_config(cfg.disk)?; let disk = Disk::with_config(cfg.disk)?;
// Start cache scanner
tokio::spawn({
let cache = cache.clone();
async move { cache.scanner().await }
});
// Create engine // Create engine
let engine = Engine::new(cfg.engine, cache, disk)?; let engine = Engine::new(cfg.engine, cache, disk)?;