config rework ep1

+ misc refactoring and tweaks
This commit is contained in:
minish 2023-11-09 21:22:02 -05:00
parent 6deddc3014
commit 3fa4caad92
Signed by: min
GPG Key ID: FEECFF24EF0CE9E9
8 changed files with 824 additions and 226 deletions

754
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,6 +1,6 @@
[package] [package]
name = "breeze" name = "breeze"
version = "0.1.4" version = "0.1.5"
edition = "2021" edition = "2021"
[dependencies] [dependencies]
@ -15,6 +15,11 @@ rand = "0.8.5"
async-recursion = "1.0.0" async-recursion = "1.0.0"
walkdir = "2" walkdir = "2"
futures = "0.3" futures = "0.3"
log = "0.4" tracing = "0.1"
pretty_env_logger = "0.5.0" tracing-subscriber = "0.3"
archived = { path = "./archived" } archived = { path = "./archived" }
xxhash-rust = { version = "0.8.7", features = ["xxh3"] }
serde = { version = "1.0.189", features = ["derive"] }
toml = "0.8.2"
clap = { version = "4.4.6", features = ["derive"] }
serde_with = "3.4.0"

55
src/config.rs Normal file
View File

@ -0,0 +1,55 @@
use std::{path::PathBuf, time::Duration};
use serde::Deserialize;
use serde_with::{serde_as, DisplayFromStr, DurationSeconds};
use tracing_subscriber::filter::LevelFilter;
#[derive(Deserialize)]
pub struct Config {
pub engine: EngineConfig,
pub cache: CacheConfig,
pub logger: LoggerConfig,
}
#[derive(Deserialize)]
pub struct EngineConfig {
/// The url that the instance of breeze is meant to be accessed from.
///
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
pub base_url: String,
/// Location on disk the uploads are to be saved to
pub save_path: PathBuf,
/// Authentication key for new uploads, will be required if this is specified. (optional)
pub upload_key: Option<String>,
}
#[serde_as]
#[derive(Deserialize)]
pub struct CacheConfig {
/// The maximum length in bytes that a file can be
/// before it skips cache (in seconds)
pub max_length: usize,
/// The amount of time a file can last inside the cache (in seconds)
#[serde_as(as = "DurationSeconds")]
pub upload_lifetime: Duration,
/// How often the cache is to be scanned for
/// expired entries (in seconds)
#[serde_as(as = "DurationSeconds")]
pub scan_freq: Duration,
/// How much memory the cache is allowed to use (in bytes)
pub mem_capacity: usize,
}
#[serde_as]
#[derive(Deserialize)]
pub struct LoggerConfig {
/// Minimum level a log must be for it to be shown.
/// This defaults to "warn" if not specified.
#[serde_as(as = "Option<DisplayFromStr>")]
pub level: Option<LevelFilter>,
}

View File

@ -18,25 +18,40 @@ use tokio::{
}, },
}; };
use tokio_stream::StreamExt; use tokio_stream::StreamExt;
use tracing::{debug, error, info};
use walkdir::WalkDir; use walkdir::WalkDir;
use crate::view::{ViewError, ViewSuccess}; use crate::view::{ViewError, ViewSuccess};
/// breeze engine! this is the core of everything
pub struct Engine { pub struct Engine {
// state // ------ STATE ------ //
cache: RwLock<Archive>, // in-memory cache /// The in-memory cache that cached uploads are stored in.
pub upl_count: AtomicUsize, // cached count of uploaded files cache: RwLock<Archive>,
// config /// Cached count of uploaded files.
pub base_url: String, // base url for formatting upload urls pub upl_count: AtomicUsize,
save_path: PathBuf, // where uploads are saved to disk
pub upload_key: String, // authorisation key for uploading new files
cache_max_length: usize, // if an upload is bigger than this size, it won't be cached // ------ CONFIG ------ //
/// The base URL that the server will be accessed from.
/// It is only used for formatting returned upload URLs.
pub base_url: String,
/// The path on disk that uploads are saved to.
save_path: PathBuf,
/// The authorisation key required for uploading new files.
/// If it is empty, no key will be required.
pub upload_key: String,
/// The maximum size for an upload to be stored in cache.
/// Anything bigger skips cache and is read/written to
/// directly from disk.
cache_max_length: usize,
} }
impl Engine { impl Engine {
// create a new engine /// Creates a new instance of the breeze engine.
pub fn new( pub fn new(
base_url: String, base_url: String,
save_path: PathBuf, save_path: PathBuf,
@ -62,26 +77,29 @@ impl Engine {
} }
} }
/// Returns if an upload would be able to be cached
#[inline(always)]
fn will_use_cache(&self, length: usize) -> bool { fn will_use_cache(&self, length: usize) -> bool {
length <= self.cache_max_length length <= self.cache_max_length
} }
// checks in cache or disk for an upload using a pathbuf /// Check if an upload exists in cache or on disk
pub async fn upload_exists(&self, path: &Path) -> bool { pub async fn upload_exists(&self, path: &Path) -> bool {
let cache = self.cache.read().await; let cache = self.cache.read().await;
// check if upload is in cache // extract file name, since that's what cache uses
let name = path let name = path
.file_name() .file_name()
.and_then(OsStr::to_str) .and_then(OsStr::to_str)
.unwrap_or_default() .unwrap_or_default()
.to_string(); .to_string();
// check in cache
if cache.contains_key(&name) { if cache.contains_key(&name) {
return true; return true;
} }
// check if upload is on disk // check on disk
if path.exists() { if path.exists() {
return true; return true;
} }
@ -89,7 +107,10 @@ impl Engine {
return false; return false;
} }
// generate a new save path for an upload /// Generate a new save path for an upload.
///
/// This will call itself recursively if it picks
/// a name that's already used. (it is rare)
#[async_recursion::async_recursion] #[async_recursion::async_recursion]
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf { pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
// generate a 6-character alphanumeric string // generate a 6-character alphanumeric string
@ -119,7 +140,8 @@ impl Engine {
} }
} }
// process an upload. this is called by the new route /// Process an upload.
/// This is called by the /new route.
pub async fn process_upload( pub async fn process_upload(
&self, &self,
path: PathBuf, path: PathBuf,
@ -193,25 +215,20 @@ impl Engine {
self.upl_count.fetch_add(1, Ordering::Relaxed); self.upl_count.fetch_add(1, Ordering::Relaxed);
} }
// read an upload from cache, if it exists /// Read an upload from cache, if it exists.
// previously, this would lock the cache as writable to renew the upload's cache lifespan ///
// locking the cache as readable allows multiple concurrent readers, which allows me to handle multiple views concurrently /// Previously, this would lock the cache as
/// writable to renew the upload's cache lifespan.
/// Locking the cache as readable allows multiple concurrent
/// readers though, which allows me to handle multiple views concurrently.
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> { async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
let cache = self.cache.read().await; let cache = self.cache.read().await;
if !cache.contains_key(name) {
return None;
}
// fetch upload data from cache // fetch upload data from cache
let data = cache cache.get(name).map(ToOwned::to_owned)
.get(name)
.expect("failed to read get upload data from cache")
.to_owned();
Some(data)
} }
/// Reads an upload, from cache or on disk.
pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> { pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> {
// extract upload file name // extract upload file name
let name = original_path let name = original_path
@ -233,18 +250,24 @@ impl Engine {
let cached_data = self.read_cached_upload(&name).await; let cached_data = self.read_cached_upload(&name).await;
if let Some(data) = cached_data { if let Some(data) = cached_data {
info!("got upload from cache!!"); info!("got upload from cache!");
Ok(ViewSuccess::FromCache(data)) Ok(ViewSuccess::FromCache(data))
} else { } else {
// we already know the upload exists by now so this is okay
let mut file = File::open(&path).await.unwrap(); let mut file = File::open(&path).await.unwrap();
// read upload length from disk // read upload length from disk
let length = file let metadata = file.metadata().await;
.metadata()
.await if metadata.is_err() {
.expect("failed to read upload file metadata") error!("failed to get upload file metadata!");
.len() as usize; return Err(ViewError::InternalServerError);
}
let metadata = metadata.unwrap();
let length = metadata.len() as usize;
debug!("read upload from disk, size = {}", length); debug!("read upload from disk, size = {}", length);

View File

@ -2,20 +2,16 @@ use std::sync::{atomic::Ordering, Arc};
use axum::extract::State; use axum::extract::State;
// show index status page with amount of uploaded files /// Show index status page with amount of uploaded files
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String { pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
let count = engine.upl_count.load(Ordering::Relaxed); let count = engine.upl_count.load(Ordering::Relaxed);
format!("minish's image host, currently hosting {} files", count) format!("minish's image host, currently hosting {} files", count)
} }
// robots.txt that tells web crawlers not to list uploads
const ROBOTS_TXT: &str = concat!(
"User-Agent: *\n",
"Disallow: /p/*\n",
"Allow: /\n"
);
pub async fn robots_txt() -> &'static str { pub async fn robots_txt() -> &'static str {
/// robots.txt that tells web crawlers not to list uploads
const ROBOTS_TXT: &str = concat!("User-Agent: *\n", "Disallow: /p/*\n", "Allow: /\n");
ROBOTS_TXT ROBOTS_TXT
} }

View File

@ -1,62 +1,63 @@
use std::{env, path::PathBuf, sync::Arc, time::Duration}; use std::{path::PathBuf, sync::Arc};
extern crate axum; extern crate axum;
#[macro_use] use clap::Parser;
extern crate log;
use engine::Engine; use engine::Engine;
use axum::{ use axum::{
routing::{get, post}, routing::{get, post},
Router, Router,
}; };
use tokio::signal; use tokio::{fs, signal};
use tracing::{info, warn};
use tracing_subscriber::filter::LevelFilter;
mod config;
mod engine; mod engine;
mod index; mod index;
mod new; mod new;
mod view; mod view;
#[derive(Parser, Debug)]
struct Args {
/// The path to configuration file
config: Option<PathBuf>,
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// initialise logger // read & parse args
pretty_env_logger::init(); let args = Args::parse();
// read env vars // read & parse config
let base_url = env::var("BRZ_BASE_URL").expect("missing BRZ_BASE_URL! base url for upload urls (ex: http://127.0.0.1:8000 for http://127.0.0.1:8000/p/abcdef.png, http://picture.wtf for http://picture.wtf/p/abcdef.png)"); let config_str = fs::read_to_string(args.config.unwrap_or("./breeze.toml".into()))
let save_path = env::var("BRZ_SAVE_PATH").expect("missing BRZ_SAVE_PATH! this should be a path where uploads are saved to disk (ex: /srv/uploads, C:\\brzuploads)"); .await
let upload_key = env::var("BRZ_UPLOAD_KEY").unwrap_or_default(); .expect("failed to read config file! make sure it exists and you have read permissions");
let cache_max_length = env::var("BRZ_CACHE_UPL_MAX_LENGTH").expect("missing BRZ_CACHE_UPL_MAX_LENGTH! this is the max length an upload can be in bytes before it won't be cached (ex: 80000000 for 80MB)");
let cache_upl_lifetime = env::var("BRZ_CACHE_UPL_LIFETIME").expect("missing BRZ_CACHE_UPL_LIFETIME! this indicates how long an upload will stay in cache (ex: 1800 for 30 minutes, 60 for 1 minute)");
let cache_scan_freq = env::var("BRZ_CACHE_SCAN_FREQ").expect("missing BRZ_CACHE_SCAN_FREQ! this is the frequency of full cache scans, which scan for and remove expired uploads (ex: 60 for 1 minute)");
let cache_mem_capacity = env::var("BRZ_CACHE_MEM_CAPACITY").expect("missing BRZ_CACHE_MEM_CAPACITY! this is the amount of memory the cache will hold before dropping entries");
// parse env vars let c: config::Config = toml::from_str(&config_str).expect("invalid config! check that you have included all required options and structured it properly (no config options expecting a number getting a string, etc.)");
let save_path = PathBuf::from(save_path);
let cache_max_length = cache_max_length.parse::<usize>().expect("failed parsing BRZ_CACHE_UPL_MAX_LENGTH! it should be a positive number without any separators");
let cache_upl_lifetime = Duration::from_secs(cache_upl_lifetime.parse::<u64>().expect("failed parsing BRZ_CACHE_UPL_LIFETIME! it should be a positive number without any separators"));
let cache_scan_freq = Duration::from_secs(cache_scan_freq.parse::<u64>().expect("failed parsing BRZ_CACHE_SCAN_FREQ! it should be a positive number without any separators"));
let cache_mem_capacity = cache_mem_capacity.parse::<usize>().expect("failed parsing BRZ_CACHE_MEM_CAPACITY! it should be a positive number without any separators");
if !save_path.exists() || !save_path.is_dir() { tracing_subscriber::fmt()
.with_max_level(c.logger.level.unwrap_or(LevelFilter::WARN))
.init();
if !c.engine.save_path.exists() || !c.engine.save_path.is_dir() {
panic!("the save path does not exist or is not a directory! this is invalid"); panic!("the save path does not exist or is not a directory! this is invalid");
} }
if upload_key.is_empty() { if c.engine.upload_key.is_none() {
// i would prefer this to be a warning but the default log level hides those warn!("engine upload_key is empty! no key will be required for uploading new files");
error!("upload key (BRZ_UPLOAD_KEY) is empty! no key will be required for uploading new files");
} }
// create engine // create engine
let engine = Engine::new( let engine = Engine::new(
base_url, c.engine.base_url,
save_path, c.engine.save_path,
upload_key, c.engine.upload_key.unwrap_or_default(),
cache_max_length, c.cache.max_length,
cache_upl_lifetime, c.cache.upload_lifetime,
cache_scan_freq, c.cache.scan_freq,
cache_mem_capacity, c.cache.mem_capacity,
); );
// build main router // build main router
@ -99,4 +100,4 @@ async fn shutdown_signal() {
} }
info!("shutting down!"); info!("shutting down!");
} }

View File

@ -6,17 +6,21 @@ use axum::{
}; };
use hyper::{header, HeaderMap, StatusCode}; use hyper::{header, HeaderMap, StatusCode};
/// The request handler for the /new path.
/// This handles all new uploads.
#[axum::debug_handler] #[axum::debug_handler]
pub async fn new( pub async fn new(
State(engine): State<Arc<crate::engine::Engine>>, State(engine): State<Arc<crate::engine::Engine>>,
headers: HeaderMap,
Query(params): Query<HashMap<String, String>>, Query(params): Query<HashMap<String, String>>,
headers: HeaderMap,
stream: BodyStream, stream: BodyStream,
) -> Result<String, StatusCode> { ) -> Result<String, StatusCode> {
let key = params.get("key"); let key = params.get("key");
const EMPTY_STRING: &String = &String::new();
// check upload key, if i need to // check upload key, if i need to
if !engine.upload_key.is_empty() && key.unwrap_or(&String::new()) != &engine.upload_key { if !engine.upload_key.is_empty() && key.unwrap_or(EMPTY_STRING) != &engine.upload_key {
return Err(StatusCode::FORBIDDEN); return Err(StatusCode::FORBIDDEN);
} }

View File

@ -13,22 +13,43 @@ use bytes::Bytes;
use hyper::{http::HeaderValue, StatusCode}; use hyper::{http::HeaderValue, StatusCode};
use tokio::{fs::File, runtime::Handle}; use tokio::{fs::File, runtime::Handle};
use tokio_util::io::ReaderStream; use tokio_util::io::ReaderStream;
use tracing::{error, debug, warn};
/// Responses for a successful view operation
pub enum ViewSuccess { pub enum ViewSuccess {
/// A file read from disk, suitable for larger files.
///
/// The file provided will be streamed from disk and
/// back to the viewer.
///
/// This is only ever used if a file exceeds the
/// cache's maximum file size.
FromDisk(File), FromDisk(File),
/// A file read from in-memory cache, best for smaller files.
///
/// The file is taken from the cache in its entirety
/// and sent back to the viewer.
///
/// If a file can be fit into cache, this will be
/// used even if it's read from disk.
FromCache(Bytes), FromCache(Bytes),
} }
/// Responses for a failed view operation
pub enum ViewError { pub enum ViewError {
NotFound, // 404 /// Will send status code 404 witha plaintext "not found" message.
InternalServerError, // 500 NotFound,
/// Will send status code 500 with a plaintext "internal server error" message.
InternalServerError,
} }
impl IntoResponse for ViewSuccess { impl IntoResponse for ViewSuccess {
fn into_response(self) -> Response { fn into_response(self) -> Response {
match self { match self {
ViewSuccess::FromDisk(file) => { ViewSuccess::FromDisk(file) => {
// get handle to current runtime // get handle to current tokio runtime
// i use this to block on futures here (not async) // i use this to block on futures here (not async)
let handle = Handle::current(); let handle = Handle::current();
let _ = handle.enter(); let _ = handle.enter();
@ -88,24 +109,21 @@ impl IntoResponse for ViewSuccess {
impl IntoResponse for ViewError { impl IntoResponse for ViewError {
fn into_response(self) -> Response { fn into_response(self) -> Response {
match self { match self {
ViewError::NotFound => { ViewError::NotFound => (
// convert string into response, change status code StatusCode::NOT_FOUND,
let mut res = "not found!".into_response(); "not found!"
*res.status_mut() = StatusCode::NOT_FOUND; ).into_response(),
res ViewError::InternalServerError => (
} StatusCode::INTERNAL_SERVER_ERROR,
ViewError::InternalServerError => { "internal server error!"
// convert string into response, change status code ).into_response(),
let mut res = "internal server error!".into_response();
*res.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
res
}
} }
} }
} }
/// The request handler for /p/* path.
/// All file views are handled here.
#[axum::debug_handler] #[axum::debug_handler]
pub async fn view( pub async fn view(
State(engine): State<Arc<crate::engine::Engine>>, State(engine): State<Arc<crate::engine::Engine>>,