breeze/src/engine.rs

259 lines
8.6 KiB
Rust

use std::{
ffi::OsStr,
path::PathBuf,
sync::atomic::{AtomicUsize, Ordering},
time::Duration,
};
use axum::extract::BodyStream;
use bytes::{BufMut, Bytes, BytesMut};
use hyper::StatusCode;
use memory_cache::MemoryCache;
use rand::Rng;
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncWriteExt},
sync::{
mpsc::{self, Receiver, Sender},
Mutex,
},
};
use tokio_stream::StreamExt;
use walkdir::WalkDir;
use crate::view::ViewResponse;
pub struct Engine {
// state
cache: Mutex<MemoryCache<String, Bytes>>, // in-memory cache. note/ i plan to lock the cache specifically only when needed rather than locking the whole struct
pub upl_count: AtomicUsize, // cached count of uploaded files
// config
pub base_url: String, // base url for formatting upload urls
save_path: PathBuf, // where uploads are saved to disk
cache_max_length: usize, // if an upload is bigger than this size, it won't be cached
cache_keep_alive: Duration, // amount of time a file should last in cache
}
impl Engine {
// create a new engine
pub fn new(
base_url: String,
save_path: PathBuf,
cache_max_length: usize,
cache_keep_alive: Duration,
cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items
) -> Self {
Self {
cache: Mutex::new(MemoryCache::with_full_scan(cache_full_scan_freq)),
upl_count: AtomicUsize::new(WalkDir::new(&save_path).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it
base_url,
save_path,
cache_max_length,
cache_keep_alive,
}
}
fn will_use_cache(&self, length: usize) -> bool {
length <= self.cache_max_length
}
// checks in cache or disk for an upload using a pathbuf
pub async fn upload_exists(&self, path: &PathBuf) -> bool {
let cache = self.cache.lock().await;
// Check if upload is in cache
let name = path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.to_string();
if cache.contains_key(&name) {
return true;
}
// Check if upload is on disk
if path.exists() {
return true;
}
return false;
}
// generate a new save path for an upload
#[async_recursion::async_recursion]
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
// generate a 6-character alphanumeric string
let id: String = rand::thread_rng()
.sample_iter(&rand::distributions::Alphanumeric)
.take(6)
.map(char::from)
.collect();
// extract the extension from the original path
let original_extension = original_path
.extension()
.and_then(OsStr::to_str)
.unwrap_or_default()
.to_string();
let mut path = self.save_path.clone();
path.push(&id);
path.set_extension(original_extension);
if !self.upload_exists(&path).await {
path
} else {
// we had a name collision! try again..
self.gen_path(original_path).await
}
}
// process an upload. this is called by the new route
pub async fn process_upload(
&self,
path: PathBuf,
name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path
content_length: usize,
mut stream: BodyStream,
) {
// if the upload size is smaller than the specified maximum, we use the cache!
let mut use_cache = self.will_use_cache(content_length);
// create file to save upload to
let mut file = File::create(path)
.await
.expect("could not open file! make sure your upload path exists");
// if we're using cache, make some space to store the upload in
let mut data = if use_cache {
BytesMut::with_capacity(content_length)
} else {
BytesMut::new()
};
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(1);
tokio::spawn(async move {
// receive chunks and save them to file
while let Some(chunk) = rx.recv().await {
debug!(target: "process_upload", "writing chunk to disk (length: {})", chunk.len());
file.write_all(&chunk)
.await
.expect("error while writing file to disk");
}
});
// read and save upload
while let Some(chunk) = stream.next().await {
let chunk = chunk.unwrap();
// send chunk to io task
debug!(target: "process_upload", "sending data to io task");
tx.send(chunk.clone())
.await
.expect("failed to send data to io task");
if use_cache {
debug!(target: "process_upload", "receiving data into buffer");
if data.len() + chunk.len() > data.capacity() {
error!(target: "process_upload", "the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
data = BytesMut::new();
use_cache = false;
} else {
data.put(chunk);
}
}
}
// insert upload into cache if necessary
if use_cache {
let mut cache = self.cache.lock().await;
info!(target: "process_upload", "caching upload!");
cache.insert(name, data.freeze(), Some(self.cache_keep_alive));
}
// if all goes well, increment the cached upload counter
self.upl_count.fetch_add(1, Ordering::Relaxed);
}
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
let mut cache = self.cache.lock().await;
if !cache.contains_key(&name) {
return None;
}
let data = cache
.get(&name)
.expect("failed to read get upload data from cache")
.to_owned();
cache.insert(name.to_string(), data.clone(), Some(self.cache_keep_alive));
Some(data)
}
pub async fn get_upload(&self, original_path: &PathBuf) -> Result<ViewResponse, StatusCode> {
let name = original_path
.file_name()
.and_then(OsStr::to_str)
.unwrap_or_default()
.to_string();
let mut path = self.save_path.clone();
path.push(&name);
// check if the upload exists
if !self.upload_exists(&path).await {
return Err(StatusCode::NOT_FOUND);
}
let cached_data = self.read_cached_upload(&name).await;
match cached_data {
Some(data) => {
info!(target: "get_upload", "got upload from cache!!");
return Ok(ViewResponse::FromCache(path, data));
}
None => {
let mut file = File::open(&path).await.unwrap();
let length = file
.metadata()
.await
.expect("failed to read upload file metadata")
.len() as usize;
debug!(target: "get_upload", "read upload from disk, size = {}", length);
if self.will_use_cache(length) {
let mut data = BytesMut::with_capacity(length);
while file.read_buf(&mut data).await.unwrap() != 0 {}
let data = data.freeze();
let mut cache = self.cache.lock().await;
cache.insert(name, data.clone(), Some(self.cache_keep_alive));
info!(target: "get_upload", "recached upload from disk!");
return Ok(ViewResponse::FromCache(path, data));
}
info!(target: "get_upload", "got upload from disk!");
return Ok(ViewResponse::FromDisk(file));
}
}
}
}