breeze/src/disk.rs

343 lines
10 KiB
Rust

use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use bytes::Bytes;
use color_eyre::eyre::{self, bail};
use dashmap::DashMap;
use tokio::{
fs::File,
io::{self, AsyncWriteExt},
sync::{Mutex, mpsc},
};
use crate::config;
/// An array of disk file stores with
/// a similar API to the cache.
pub struct DiskArray {
/// Master set of disks.
disks: Vec<Arc<Disk>>,
/// In-memory index of upload locations.
///
/// [Weak] is used to make it easier to
/// drop disks if a future update does that
locations: DashMap<String, Vec<Weak<Disk>>>,
}
impl DiskArray {
pub fn with_configs(cfgs: Vec<config::DiskConfig>) -> eyre::Result<Self> {
// create all
let mut seen_save_paths = HashSet::new();
let mut disks = Vec::new();
let locations: DashMap<_, Vec<_>> = DashMap::new();
for cfg in cfgs {
// make sure save paths are unique
// if two disks have the same save path,
// they will both try to save new uploads
// to the exact same spot which probably
// causes a lot of problems. also deletes
// will try to delete the same file etcetc
if !seen_save_paths.insert(cfg.save_path.clone()) {
bail!("disk has duplicate save path: {:?}", cfg.save_path);
}
// init disk
let disk = Arc::new(Disk::with_config(cfg)?);
// index files
for saved_name in disk.files()? {
let saved_name = saved_name?;
// add disk reference
let disk = Arc::downgrade(&disk);
let mut on_disks = locations.entry(saved_name).or_default();
on_disks.push(disk);
}
// add to disks
disks.push(disk);
}
// return
Ok(Self { disks, locations })
}
/// Returns the amount of uploads stored
/// across all disks
pub fn count(&self) -> usize {
self.locations.len()
}
/// Returns whether or not an upload
/// can be stored on any disk
pub fn will_use(&self, length: u64) -> bool {
self.disks.iter().any(|d| d.will_use(length))
}
/// Fast-path way to check if we have
/// an upload using location index
pub fn has(&self, saved_name: &str) -> bool {
self.locations.contains_key(saved_name)
}
/// Get the size of an upload's file
pub async fn len(&self, f: &File) -> io::Result<u64> {
Ok(f.metadata().await?.len())
}
/// Remove an upload from all disks
pub async fn remove(&self, saved_name: &str) -> io::Result<()> {
// find what disks the upload is stored on
// (removing from location index)
println!("get 1");
let Some((_, on_disks)) = self.locations.remove(saved_name) else {
// that's not an upload
return Err(io::Error::new(
io::ErrorKind::NotFound,
"file to remove wasn't found",
));
};
println!("get 2");
// delete from all disks its stored on
for disk in &on_disks {
let Some(disk) = disk.upgrade() else {
// dead disk so whatever
continue;
};
// try to delete file
disk.remove(saved_name).await?;
}
// return
Ok(())
}
/// Start a save I/O task that directs
/// to all disks
pub fn start_save<
Fut: Future + Send + 'static,
F: FnOnce(eyre::Error) -> Fut + Send + 'static,
>(
&self,
saved_name: &str,
length: u64,
fail_callback: F,
) -> mpsc::Sender<Bytes> {
let (tx, mut rx) = mpsc::channel::<Bytes>(1000);
// setup oneshot fail callback
let fail_callback = Arc::new(Mutex::new(Some(fail_callback)));
// add to location index
let mut on_disks = self.locations.entry(saved_name.to_string()).or_default();
// start save tasks
let mut txs = Vec::new();
for disk in &self.disks {
if !disk.will_use(length) {
// we don't want that really
continue;
}
// update location index
{
let disk = Arc::downgrade(disk);
on_disks.push(disk);
}
// start task
let fail_callback = fail_callback.clone();
let tx = disk.start_save(saved_name, async move |err| {
// run callback if we can
if let Some(fail_callback) = fail_callback.lock().await.take() {
fail_callback(err.into()).await;
}
// also so i remember- fail_callback is how late errors
// get handled. by the time it is called we don't need
// to care about channels
});
txs.push(tx);
}
// start our bg task
tokio::spawn(async move {
while let Some(chunk) = rx.recv().await {
// send to all disk tasks
for tx in &txs {
// handle error.
if let Err(err) = tx.send(chunk.clone()).await {
// try to report that
if let Some(fail_callback) = fail_callback.lock().await.take() {
fail_callback(err.into()).await;
}
// we dont want to talk
// with dead channels
return;
}
}
}
});
tx
}
/// Opens an upload on the first disk
/// that works
/// (in order of definition in config)
pub async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
// get location entry
let Some(on_disks) = self.locations.get(saved_name) else {
// that's not found.....
return Ok(None);
};
// start trying disks
for disk in on_disks.iter() {
let Some(disk) = disk.upgrade() else {
// no more that disk :(
// it would be nice to remove it from list
continue;
};
// try to open
if let Some(f) = disk.open(saved_name).await? {
return Ok(Some(f));
}
}
// none worked....
// it would be nice to delete the entry
Ok(None)
}
}
/// Provides an API to access the disk file store
/// like we access the cache.
struct Disk {
cfg: config::DiskConfig,
}
impl Disk {
fn with_config(cfg: config::DiskConfig) -> io::Result<Self> {
// check path
if !cfg.save_path.exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"the save path does not exist",
));
}
if !cfg.save_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotADirectory,
"the save path is not a directory",
));
}
// return
Ok(Self { cfg })
}
/// Returns an iterator of stored file names.
fn files(&self) -> io::Result<impl Iterator<Item = io::Result<String>>> {
Ok(std::fs::read_dir(&self.cfg.save_path)?.filter_map(|e| {
// todo: refactor when try blocks are out^^
(|| {
let e = e?;
Ok(e.file_type()?
.is_file()
.then_some(e.file_name().into_string().ok())
.flatten())
})()
.transpose()
}))
}
/// Returns whether or not an upload
/// is allowed to be stored with this disk
#[inline]
pub fn will_use(&self, length: u64) -> bool {
self.cfg.max_save_len.is_none_or(|l| length <= l)
}
/// Formats the path on disk for a `saved_name`.
fn path_for(&self, saved_name: &str) -> PathBuf {
// try to prevent path traversal by ignoring everything except the file name
let name = Path::new(saved_name).file_name().unwrap_or_default();
let mut p: PathBuf = self.cfg.save_path.clone();
p.push(name);
p
}
/// Try to open a file on disk, and if we didn't find it,
/// then return [`None`].
async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
let p = self.path_for(saved_name);
match File::open(p).await {
Ok(f) => Ok(Some(f)),
Err(e) => match e.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(e)?, // some other error, send it back
},
}
}
/// Remove an upload from disk.
async fn remove(&self, saved_name: &str) -> io::Result<()> {
let p = self.path_for(saved_name);
tokio::fs::remove_file(p).await
}
/// Create a background I/O task
fn start_save<Fut: Future + Send + 'static, F: FnOnce(io::Error) -> Fut + Send + 'static>(
&self,
saved_name: &str,
fail_callback: F,
) -> mpsc::Sender<Bytes> {
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
// a large buffer size is chosen so uploads can be received quickly,
// but with less possibility of running out of memory.
// (thats probably only possible w very high link speed tho......)
let (tx, mut rx) = mpsc::channel::<Bytes>(1000);
let p = self.path_for(saved_name);
tokio::spawn(async move {
// create file to save upload to
let mut file = match File::create(p).await {
Ok(f) => f,
Err(err) => {
tracing::error!(%err, "could not open file! make sure your upload path is valid");
return;
}
};
// receive chunks and save them to file
while let Some(chunk) = rx.recv().await {
tracing::debug!(length = chunk.len(), "writing chunk to disk");
if let Err(err) = file.write_all(&chunk).await {
drop(rx);
fail_callback(err).await;
return;
}
}
// flush to disk
if let Err(err) = file.flush().await {
fail_callback(err).await;
return;
}
// sync data+metadata to disk
if let Err(err) = file.sync_all().await {
fail_callback(err).await;
}
});
tx
}
}