Compare commits

..

No commits in common. "main" and "jemalloc" have entirely different histories.

14 changed files with 836 additions and 1463 deletions

1074
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -1,55 +1,37 @@
[package]
name = "breeze"
version = "0.3.3"
edition = "2024"
version = "0.2.8"
edition = "2021"
[profile.dev.package]
tikv-jemalloc-sys = { opt-level = 3 }
backtrace = { opt-level = 3 }
[profile.release]
lto = true
codegen-units = 1
debug = "line-tables-only"
[dependencies]
argh = "0.1.12"
atomic-time = "0.2"
axum = { version = "0.8.9", features = ["macros"] }
axum-extra = { version = "0.12.6", default-features = false, features = [
axum-extra = { version = "0.10.0", default-features = false, features = [
"tracing",
"typed-header",
] }
base64 = "0.22"
bytes = "1"
color-eyre = "0.6"
dashmap = { version = "6.2.1", features = ["inline"] }
headers = "0.4"
hmac = "0.12.1"
axum = { version = "0.8.1", features = ["macros", "http2"] }
tower = "0.5"
http = "1.2"
img-parts = "0.3"
rand = "0.9"
serde = { version = "1.0", features = ["derive"] }
serde_with = "3.19"
sha2 = "0.10.9"
tokio = { version = "1", features = [
"rt-multi-thread",
"macros",
"net",
"fs",
"io-util",
"signal",
] }
headers = "0.4"
tokio = { version = "1", features = ["full"] }
tokio-util = { version = "0.7", features = ["full"] }
tokio-stream = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
toml = { version = "0.9", default-features = false, features = [
"std",
"parse",
"serde",
] }
tracing = "0.1"
tracing-subscriber = "0.3"
xxhash-rust = { version = "0.8", features = ["xxh3"] }
bytes = "1"
rand = "0.8.5"
walkdir = "2"
anyhow = "1.0"
serde = { version = "1.0", features = ["derive"] }
serde_with = "3.12"
toml = "0.8.2"
argh = "0.1.12"
dashmap = { version = "6.1.0", features = ["rayon", "inline"] }
rayon = "1.8"
atomic-time = "0.1.4"
img-parts = "0.3"
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.6"

View File

@ -1,34 +1,27 @@
# breeze
breeze is a simple, performant file upload server.
The primary instance is https://picture.wtf.
## Features
- Basic upload API tailored towards ShareX
- Streamed uploading
- Streamed downloading (on larger files)
- Pause/continue download support with `Range` header
- Upload caching in memory
- Support for ShareX file deletion URLs
- Temporary uploads
- Automatic exif data removal
## Installation
On picture.wtf, breeze's primary instance, it is ran using a NixOS module. If you would like to do that too, it is provided by the Nix flake in this repository.
On picture.wtf, breeze is ran with the NixOS module provided by `flake.nix`. [Take a look at the config](https://git.min.rip/min/infra/src/branch/main/nixos/hosts/silver/services/breeze.nix) if you want!
Containerised and bare-metal deployments are also supported. Instructions for those are below.
It is very much possible to run and deploy breeze without doing that, though. Containerised and bare-metal deployments are also supported. Instructions for those are below.
To begin, clone the Git repository:
```bash
git clone https://git.min.rip/min/breeze.git
```
If you want to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
If you would like to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
```
version: '3.6'
@ -46,51 +39,25 @@ services:
ports:
- 8383:8000
```
With this configuration, it is expected that:
- there is a clone of the Git repository in the `./breeze` folder
- there is a `breeze.toml` config file in current directory
- there is a directory at `/srv/uploads` for storing uploads
- port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
- you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
* there is a clone of the Git repository in the `./breeze` folder
* there is a `breeze.toml` config file in current directory
* there is a directory at `/srv/uploads` for storing uploads
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
It can also be installed directly if the Rust toolchain is installed:
```bash
cd breeze
cargo install --path .
# then, you can run w/ a path to your `breeze.toml` config file
breeze --config /path/to/breeze.toml
```
### Exposing publicly
If you want to expose a breeze server to the internet, I highly recommend using a reverse proxy instead of just forwarding its HTTP port.
Caddy is probably the easiest to set up if you are new to reverse proxies. Here is an example `Caddyfile` for the Docker Compose file above (assuming `yourdomain.com` is a domain that points to your server's IP).
```
yourdomain.com {
# enable compression
encode
# forward request to breeze
reverse_proxy 127.0.0.1:8383
}
```
## Usage
### Hosting
Configuration is read through a toml file.
The config file path is specified using the `-c`/`--config` command line switch.
Here is an example config file:
```toml
[engine]
# The base URL that the HTTP server will be accessible on.
@ -103,12 +70,6 @@ base_url = "http://127.0.0.1:8000"
# If it is not set, no key will be required.
upload_key = "hiiiiiiii"
# OPTIONAL - If set, the secret key used to verify ShareX deletion URLs.
# If it is not set, deletion URLs will not be created or made usable.
# WARNING: Do not share this!! If somebody else obtains it, they can
# generate deletion URLs for any upload!!
deletion_secret = "asdfhjkasdhjfashjlfhjkaskdfjkhdjkh"
# OPTIONAL - specifies what to show when the site is visited on http
# It is sent with text/plain content type.
# There are two variables you can use:
@ -125,13 +86,13 @@ max_temp_lifetime = 43200
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
# A word of warning about this: the error shown to ShareX users who
# hit the limit is *not* very clear. ("connection closed" or similar)
max_upload_len = 2_147_483_647
max_upload_len = 2_147_483_648
# The maximum length (in bytes) an image file may be before the server
# will skip removing its EXIF data.
# The performance impact of breeze's EXIF data removal is not
# very high in everyday usage, so something like 16MiB is reasonable.
max_strip_len = 16_777_215
max_strip_len = 16_777_216
[engine.disk]
# The location that uploads will be saved to.
@ -141,17 +102,17 @@ save_path = "/data"
[engine.cache]
# The file size (in bytes) that a file must be under
# to get cached.
max_length = 134_217_727
max_length = 134_217_728
# How long a cached upload will remain cached. (in seconds)
upload_lifetime = 1800
# How often the cache will be checked for expired uploads
# in the background.
# How often the cache will be checked for expired uploads.
# It is not a continuous scan, and only is triggered upon a cache operation.
scan_freq = 60
# How much memory (in bytes) the cache is allowed to consume.
mem_capacity = 4_294_967_295
mem_capacity = 4_294_967_296
[http]
# The address that the HTTP server will listen on. (ip:port)
@ -166,17 +127,13 @@ level = "warn"
```
### Uploading
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
Also you can specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, you can leave it as default. you'll know if you need it)
The endpoint's response will just be the URL of the upload in plain text, and the deletion URL will be sent in the `Breeze-Deletion-Url` header (if it's enabled).
Additionally, you may specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, just leave it as default. you'll know if you need it)
Here's an example ShareX configuration for it (with a key):
```json
{
"Version": "14.1.0",
@ -188,7 +145,6 @@ Here's an example ShareX configuration for it (with a key):
"name": "{filename}",
"key": "hiiiiiiii"
},
"Body": "Binary",
"DeletionURL": "{header:Breeze-Deletion-Url}"
"Body": "Binary"
}
```

View File

@ -2,11 +2,11 @@
"nodes": {
"crane": {
"locked": {
"lastModified": 1748047550,
"narHash": "sha256-t0qLLqb4C1rdtiY8IFRH5KIapTY/n3Lqt57AmxEv9mk=",
"lastModified": 1734808813,
"narHash": "sha256-3aH/0Y6ajIlfy7j52FGZ+s4icVX0oHhqBzRdlOeztqg=",
"owner": "ipetkov",
"repo": "crane",
"rev": "b718a78696060df6280196a6f992d04c87a16aef",
"rev": "72e2d02dbac80c8c86bf6bf3e785536acf8ee926",
"type": "github"
},
"original": {
@ -35,11 +35,11 @@
},
"nixpkgs": {
"locked": {
"lastModified": 1747958103,
"narHash": "sha256-qmmFCrfBwSHoWw7cVK4Aj+fns+c54EBP8cGqp/yK410=",
"lastModified": 1735821806,
"narHash": "sha256-cuNapx/uQeCgeuhUhdck3JKbgpsml259sjUQnWM7zW8=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "fe51d34885f7b5e3e7b59572796e1bcb427eccb1",
"rev": "d6973081434f88088e5321f83ebafe9a1167c367",
"type": "github"
},
"original": {

View File

@ -77,8 +77,6 @@
with lib; let
cfg = config.services.breeze;
settingsFormat = pkgs.formats.toml {};
defaultUser = "breeze";
defaultGroup = "breeze";
in {
options = {
services.breeze = {
@ -92,13 +90,13 @@
user = mkOption {
type = types.str;
default = defaultUser;
default = "breeze";
description = "User that `breeze` will run under";
};
group = mkOption {
type = types.str;
default = defaultGroup;
default = "breeze";
description = "Group that `breeze` will run under";
};
@ -113,7 +111,7 @@
default = {};
description = ''
The *.toml configuration to run `breeze` with.
The options aren't formally documented, but the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md) provides examples.
There is no formal documentation, but there is an example in the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md).
'';
};
@ -134,29 +132,16 @@
This is useful for loading it from a secret management system.
'';
};
deletionSecretFile = mkOption {
type = types.nullOr types.path;
default = null;
description = ''
File to load the `engine.deletion_secret` from, if desired.
This is useful for loading it from a secret management system.
'';
};
};
};
config = mkIf cfg.enable {
users.users = mkIf (cfg.user == defaultUser) {
${cfg.user} = {
isSystemUser = true;
inherit (cfg) group;
};
users.users.${cfg.user} = {
isSystemUser = true;
inherit (cfg) group;
};
users.groups = mkIf (cfg.group == defaultGroup) {
${cfg.group} = {};
};
users.groups.${cfg.group} = {};
systemd.tmpfiles.rules = [
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
@ -164,7 +149,6 @@
services.breeze.settings = mkMerge [
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
(mkIf (cfg.deletionSecretFile != null) {engine.deletion_secret = "@DELETION_SECRET@";})
];
systemd.services.breeze = let
@ -180,9 +164,6 @@
''
+ lib.optionalString (cfg.uploadKeyFile != null) ''
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
''
+ lib.optionalString (cfg.deletionSecretFile != null) ''
${pkgs.replace-secret}/bin/replace-secret '@DELETION_SECRET@' "${cfg.deletionSecretFile}" ${cfgFile}
'';
serviceConfig = rec {

View File

@ -1,15 +1,12 @@
use std::{
sync::{
Arc,
atomic::{AtomicUsize, Ordering},
},
time::{Duration, Instant},
sync::atomic::{AtomicUsize, Ordering},
time::{Duration, SystemTime},
};
use atomic_time::AtomicInstant;
use atomic_time::AtomicSystemTime;
use bytes::Bytes;
use color_eyre::eyre::{self, bail};
use dashmap::{DashMap, mapref::one::Ref};
use dashmap::{mapref::one::Ref, DashMap};
use rayon::prelude::*;
use tokio::time;
use crate::config;
@ -17,12 +14,12 @@ use crate::config;
/// An entry stored in the cache.
///
/// It contains basic metadata and the actual value.
struct Entry {
pub struct Entry {
/// The data held
value: Bytes,
/// The last time this entry was read/written
last_used: AtomicInstant,
last_used: AtomicSystemTime,
/// Whether or not `last_used` should be updated
update_used: bool,
@ -33,25 +30,31 @@ struct Entry {
impl Entry {
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
let now = AtomicSystemTime::now();
Self {
value,
last_used: AtomicInstant::now(),
last_used: now,
update_used,
lifetime,
}
}
fn last_used(&self) -> Instant {
fn last_used(&self) -> SystemTime {
self.last_used.load(Ordering::Relaxed)
}
fn is_expired(&self) -> bool {
let since_last_used = self.last_used().elapsed();
since_last_used >= self.lifetime
match self.last_used().elapsed() {
Ok(d) => d >= self.lifetime,
Err(_) => false, // now > last_used
}
}
}
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
///
/// It is designed to keep memory usage low.
pub struct Cache {
/// Where elements are stored
map: DashMap<String, Entry>,
@ -64,28 +67,13 @@ pub struct Cache {
}
impl Cache {
pub fn with_config(cfg: config::CacheConfig) -> eyre::Result<Arc<Self>> {
// Sanity check chosen limits
if cfg.mem_capacity < cfg.max_length {
bail!("`max_length` should not exceed `mem_capacity`");
}
// Create
let me = Arc::new(Self {
pub fn with_config(cfg: config::CacheConfig) -> Self {
Self {
map: DashMap::with_capacity(64),
length: AtomicUsize::new(0),
cfg,
});
// Start scanner
tokio::spawn({
let me = me.clone();
async move { me.scanner().await }
});
// Return
Ok(me)
}
}
/// Figure out who should be bumped out of cache next
@ -93,7 +81,7 @@ impl Cache {
let mut sorted: Vec<_> = self.map.iter().collect();
// Sort by least recently used
sorted.sort_unstable_by_key(|e| e.last_used());
sorted.par_sort_unstable_by(|e1, e2| e1.last_used().cmp(&e2.last_used()));
// Total bytes we would be removing
let mut total = 0;
@ -154,28 +142,19 @@ impl Cache {
// How far we went above the limit
let needed = new_total - self.cfg.mem_capacity;
for k in self.next_out(needed) {
self.next_out(needed).par_iter().for_each(|k| {
// Remove the element, and ignore the result
// The only reason it should be failing is if it couldn't find it,
// in which case it was already removed
self.remove(&k);
}
self.remove(k);
})
}
// Atomically add to total cached data length
self.length.fetch_add(len, Ordering::Relaxed);
// Add to the map, return true if we didn't replace anything
if let Some(old) = self.map.insert(key.to_string(), e) {
// We replaced something so fix the length
// I think that shouldn't happen anyway probably?
self.length.fetch_sub(old.value.len(), Ordering::Relaxed);
// An entry was replaced
false
} else {
// No entry was replaced....
true
}
self.map.insert(key.to_string(), e).is_none()
}
/// Add a new element to the cache with the default lifetime.
@ -191,7 +170,7 @@ impl Cache {
///
/// It exists so we can run the expiry check before
/// actually working with any entries, so no weird bugs happen
fn get_(&self, key: &str) -> Option<Ref<'_, String, Entry>> {
fn _get(&self, key: &str) -> Option<Ref<String, Entry>> {
let e = self.map.get(key)?;
// if the entry is expired get rid of it now
@ -211,10 +190,10 @@ impl Cache {
/// Get an item from the cache, if it exists.
pub fn get(&self, key: &str) -> Option<Bytes> {
let e = self.get_(key)?;
let e = self._get(key)?;
if e.update_used {
e.last_used.store(Instant::now(), Ordering::Relaxed);
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
}
Some(e.value.clone())
@ -227,22 +206,22 @@ impl Cache {
/// We don't use [`DashMap::contains_key`] here because it would just do
/// the exact same thing I do here, but without running the expiry check logic
pub fn has(&self, key: &str) -> bool {
self.get_(key).is_some()
self._get(key).is_some()
}
/// Returns if an upload is able to be cached
/// with the current caching rules
#[inline]
#[inline(always)]
pub fn will_use(&self, length: u64) -> bool {
length <= (self.cfg.max_length as u64)
length <= self.cfg.max_length
}
/// This background job waits for entries to reach
/// their expiry timestamps and removes them proactively.
async fn scanner(&self) {
/// The background job that scans through the cache and removes inactive elements.
///
/// TODO: see if this is actually less expensive than
/// letting each entry keep track of expiry with its own task
pub async fn scanner(&self) {
let mut interval = time::interval(self.cfg.scan_freq);
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
interval.tick().await; // Skip first tick
loop {
// We put this first so that it doesn't scan the instant the server starts
@ -250,15 +229,15 @@ impl Cache {
// Save current timestamp so we aren't retrieving it constantly
// If we don't do this it'll be a LOT of system api calls
let now = Instant::now();
let now = SystemTime::now();
// Collect a list of all the expired keys
// If we fail to compare the times, it gets added to the list anyways
let expired: Vec<_> = self
.map
.iter()
.par_iter()
.filter_map(|e| {
let elapsed = now.duration_since(e.last_used());
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
let is_expired = elapsed >= e.lifetime;
if is_expired {
@ -273,17 +252,7 @@ impl Cache {
if !expired.is_empty() {
// Use a retain call, should be less locks that way
// (instead of many remove calls)
self.map.retain(|k, e| {
let is_expired = expired.contains(k);
// Handle length update
// if expired
if is_expired {
self.length.fetch_sub(e.value.len(), Ordering::Relaxed);
}
// If it isn't expired
// it will stay
!is_expired
});
self.map.retain(|k, _| !expired.contains(k))
}
}
}

View File

@ -1,14 +1,12 @@
use std::{path::PathBuf, time::Duration};
use serde::Deserialize;
use serde_with::{DisplayFromStr, DurationSeconds, serde_as};
use serde_with::{serde_as, DisplayFromStr, DurationSeconds};
use tracing_subscriber::filter::LevelFilter;
#[derive(Deserialize)]
pub struct Config {
pub engine: EngineConfig,
pub cache: CacheConfig,
pub disk: Vec<DiskConfig>,
pub http: HttpConfig,
pub logger: LoggerConfig,
}
@ -22,18 +20,22 @@ fn default_motd() -> String {
pub struct EngineConfig {
/// The url that the instance of breeze is meant to be accessed from.
///
/// ex: `https://picture.wtf` would generate links like `https://picture.wtf/p/abcdef.png`
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
pub base_url: String,
/// Authentication key for new uploads, will be required if this is specified. (optional)
#[serde(default)]
pub upload_key: String,
/// Secret key to use when generating or verifying deletion tokens.
/// Leave blank to disable.
///
/// If this secret is leaked, anyone can delete any file. Be careful!!!
pub deletion_secret: Option<String>,
/// Configuration for disk system
pub disk: DiskConfig,
/// Configuration for cache system
pub cache: CacheConfig,
/// Maximum size of an upload that will be accepted.
/// Files above this size can not be uploaded.
pub max_upload_len: Option<u64>,
/// Maximum lifetime of a temporary upload
#[serde_as(as = "DurationSeconds")]
@ -55,20 +57,14 @@ pub struct EngineConfig {
pub struct DiskConfig {
/// Location on disk the uploads are to be saved to
pub save_path: PathBuf,
/// Maximum size (in bytes) of an upload that will be
/// saved on this disk. Anything higher will
/// skip this disk. If no disks are suitable,
/// the upload will be rejected. (status 413)
pub max_save_len: Option<u64>,
}
#[serde_as]
#[derive(Deserialize, Clone)]
pub struct CacheConfig {
/// The maximum length in bytes that a file can be
/// before it skips cache (in bytes)
pub max_length: usize,
/// before it skips cache (in seconds)
pub max_length: u64,
/// The amount of time a file can last inside the cache (in seconds)
#[serde_as(as = "DurationSeconds")]

View File

@ -1,86 +0,0 @@
use std::sync::Arc;
use axum::extract::{Query, State};
use base64::{Engine as _, prelude::BASE64_URL_SAFE_NO_PAD};
use bytes::{Buf, BytesMut};
use hmac::Mac;
use http::StatusCode;
use serde::Deserialize;
use crate::engine::{Engine, update_hmac};
#[derive(Deserialize)]
pub struct DeleteRequest {
name: String,
hash: String,
hmac: String,
}
pub async fn delete(
State(engine): State<Arc<Engine>>,
Query(req): Query<DeleteRequest>,
) -> (StatusCode, &'static str) {
let Some(mut hmac) = engine.deletion_hmac.clone() else {
return (StatusCode::CONFLICT, "Deletion is not enabled");
};
// -- decode provided data
// decode user-given hmac
let Ok(provided_hmac) = BASE64_URL_SAFE_NO_PAD.decode(req.hmac) else {
return (StatusCode::BAD_REQUEST, "Could not decode hmac");
};
// decode hash from base64
let Ok(mut provided_hash_data) = BASE64_URL_SAFE_NO_PAD
.decode(req.hash)
.map(|v| BytesMut::from(&v[..]))
else {
return (StatusCode::BAD_REQUEST, "Could not decode partial hash");
};
// read hash
if provided_hash_data.len() != 16 {
return (StatusCode::BAD_REQUEST, "Partial hash length is invalid");
}
let provided_hash = provided_hash_data.get_u128();
// -- verify it
// check if info is valid
let is_hmac_valid = {
// update hmad
update_hmac(&mut hmac, &req.name, provided_hash);
// verify..
hmac.verify_slice(&provided_hmac).is_ok()
};
if !is_hmac_valid {
return (StatusCode::BAD_REQUEST, "Hmac is invalid");
}
// -- ensure hash matches
// okay, now check if we compute the same hash as the req
// this makes sure it's (probably) the same file
let actual_hash = match engine.get_hash(&req.name).await {
Ok(Some(h)) => h,
Ok(None) => return (StatusCode::NOT_FOUND, "File not found"),
Err(err) => {
tracing::error!(%err, "failed to get hash");
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error!!");
}
};
// compare
if provided_hash != actual_hash {
return (StatusCode::BAD_REQUEST, "Partial hash did not match");
}
// -- delete file
// everything seems okay so try to delete
if let Err(err) = engine.remove(&req.name).await {
tracing::error!(%err, "failed to delete upload");
return (StatusCode::INTERNAL_SERVER_ERROR, "Delete failed");
}
(StatusCode::OK, "Deleted successfully!")
}

View File

@ -1,278 +1,46 @@
use std::{
collections::HashSet,
path::{Path, PathBuf},
sync::{Arc, Weak},
};
use std::path::PathBuf;
use bytes::Bytes;
use color_eyre::eyre::{self, bail};
use dashmap::DashMap;
use tokio::{
fs::File,
io::{self, AsyncWriteExt},
sync::{Mutex, mpsc},
sync::mpsc,
};
use tracing::debug;
use walkdir::WalkDir;
use crate::config;
/// An array of disk file stores with
/// a similar API to the cache.
pub struct DiskArray {
/// Master set of disks.
disks: Vec<Arc<Disk>>,
/// In-memory index of upload locations.
///
/// [Weak] is used to make it easier to
/// drop disks if a future update does that
locations: DashMap<String, Vec<Weak<Disk>>>,
}
impl DiskArray {
pub fn with_configs(cfgs: Vec<config::DiskConfig>) -> eyre::Result<Self> {
// create all
let mut seen_save_paths = HashSet::new();
let mut disks = Vec::new();
let locations: DashMap<_, Vec<_>> = DashMap::new();
for cfg in cfgs {
// make sure save paths are unique
// if two disks have the same save path,
// they will both try to save new uploads
// to the exact same spot which probably
// causes a lot of problems. also deletes
// will try to delete the same file etcetc
if !seen_save_paths.insert(cfg.save_path.clone()) {
bail!("disk has duplicate save path: {:?}", cfg.save_path);
}
// init disk
let disk = Arc::new(Disk::with_config(cfg)?);
// index files
for saved_name in disk.files()? {
let saved_name = saved_name?;
// add disk reference
let disk = Arc::downgrade(&disk);
let mut on_disks = locations.entry(saved_name).or_default();
on_disks.push(disk);
}
// add to disks
disks.push(disk);
}
// return
Ok(Self { disks, locations })
}
/// Returns the amount of uploads stored
/// across all disks
pub fn count(&self) -> usize {
self.locations.len()
}
/// Returns whether or not an upload
/// can be stored on any disk
pub fn will_use(&self, length: u64) -> bool {
self.disks.iter().any(|d| d.will_use(length))
}
/// Fast-path way to check if we have
/// an upload using location index
pub fn has(&self, saved_name: &str) -> bool {
self.locations.contains_key(saved_name)
}
/// Get the size of an upload's file
pub async fn len(&self, f: &File) -> io::Result<u64> {
Ok(f.metadata().await?.len())
}
/// Remove an upload from all disks
pub async fn remove(&self, saved_name: &str) -> io::Result<()> {
// find what disks the upload is stored on
// (removing from location index)
println!("get 1");
let Some((_, on_disks)) = self.locations.remove(saved_name) else {
// that's not an upload
return Err(io::Error::new(
io::ErrorKind::NotFound,
"file to remove wasn't found",
));
};
println!("get 2");
// delete from all disks its stored on
for disk in &on_disks {
let Some(disk) = disk.upgrade() else {
// dead disk so whatever
continue;
};
// try to delete file
disk.remove(saved_name).await?;
}
// return
Ok(())
}
/// Start a save I/O task that directs
/// to all disks
pub fn start_save<
Fut: Future + Send + 'static,
F: FnOnce(eyre::Error) -> Fut + Send + 'static,
>(
&self,
saved_name: &str,
length: u64,
fail_callback: F,
) -> mpsc::Sender<Bytes> {
let (tx, mut rx) = mpsc::channel::<Bytes>(1000);
// setup oneshot fail callback
let fail_callback = Arc::new(Mutex::new(Some(fail_callback)));
// add to location index
let mut on_disks = self.locations.entry(saved_name.to_string()).or_default();
// start save tasks
let mut txs = Vec::new();
for disk in &self.disks {
if !disk.will_use(length) {
// we don't want that really
continue;
}
// update location index
{
let disk = Arc::downgrade(disk);
on_disks.push(disk);
}
// start task
let fail_callback = fail_callback.clone();
let tx = disk.start_save(saved_name, async move |err| {
// run callback if we can
if let Some(fail_callback) = fail_callback.lock().await.take() {
fail_callback(err.into()).await;
}
// also so i remember- fail_callback is how late errors
// get handled. by the time it is called we don't need
// to care about channels
});
txs.push(tx);
}
// start our bg task
tokio::spawn(async move {
while let Some(chunk) = rx.recv().await {
// send to all disk tasks
for tx in &txs {
// handle error.
if let Err(err) = tx.send(chunk.clone()).await {
// try to report that
if let Some(fail_callback) = fail_callback.lock().await.take() {
fail_callback(err.into()).await;
}
// we dont want to talk
// with dead channels
return;
}
}
}
});
tx
}
/// Opens an upload on the first disk
/// that works
/// (in order of definition in config)
pub async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
// get location entry
let Some(on_disks) = self.locations.get(saved_name) else {
// that's not found.....
return Ok(None);
};
// start trying disks
for disk in on_disks.iter() {
let Some(disk) = disk.upgrade() else {
// no more that disk :(
// it would be nice to remove it from list
continue;
};
// try to open
if let Some(f) = disk.open(saved_name).await? {
return Ok(Some(f));
}
}
// none worked....
// it would be nice to delete the entry
Ok(None)
}
}
/// Provides an API to access the disk file store
/// like we access the cache.
struct Disk {
pub struct Disk {
cfg: config::DiskConfig,
}
impl Disk {
fn with_config(cfg: config::DiskConfig) -> io::Result<Self> {
// check path
if !cfg.save_path.exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"the save path does not exist",
));
}
if !cfg.save_path.is_dir() {
return Err(io::Error::new(
io::ErrorKind::NotADirectory,
"the save path is not a directory",
));
}
// return
Ok(Self { cfg })
pub fn with_config(cfg: config::DiskConfig) -> Self {
Self { cfg }
}
/// Returns an iterator of stored file names.
fn files(&self) -> io::Result<impl Iterator<Item = io::Result<String>>> {
Ok(std::fs::read_dir(&self.cfg.save_path)?.filter_map(|e| {
// todo: refactor when try blocks are out^^
(|| {
let e = e?;
Ok(e.file_type()?
.is_file()
.then_some(e.file_name().into_string().ok())
.flatten())
})()
.transpose()
}))
}
/// Returns whether or not an upload
/// is allowed to be stored with this disk
#[inline]
pub fn will_use(&self, length: u64) -> bool {
self.cfg.max_save_len.is_none_or(|l| length <= l)
/// Counts the number of files saved to disk we have
pub fn count(&self) -> usize {
WalkDir::new(&self.cfg.save_path)
.min_depth(1)
.into_iter()
.count()
}
/// Formats the path on disk for a `saved_name`.
fn path_for(&self, saved_name: &str) -> PathBuf {
// try to prevent path traversal by ignoring everything except the file name
let name = Path::new(saved_name).file_name().unwrap_or_default();
let mut p: PathBuf = self.cfg.save_path.clone();
p.push(name);
let mut p = self.cfg.save_path.clone();
p.push(saved_name);
p
}
/// Try to open a file on disk, and if we didn't find it,
/// then return [`None`].
async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
pub async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
let p = self.path_for(saved_name);
match File::open(p).await {
@ -284,56 +52,38 @@ impl Disk {
}
}
/// Get the size of an upload's file
pub async fn len(&self, f: &File) -> io::Result<u64> {
Ok(f.metadata().await?.len())
}
/// Remove an upload from disk.
async fn remove(&self, saved_name: &str) -> io::Result<()> {
pub async fn remove(&self, saved_name: &str) -> io::Result<()> {
let p = self.path_for(saved_name);
tokio::fs::remove_file(p).await
}
/// Create a background I/O task
fn start_save<Fut: Future + Send + 'static, F: FnOnce(io::Error) -> Fut + Send + 'static>(
&self,
saved_name: &str,
fail_callback: F,
) -> mpsc::Sender<Bytes> {
pub async fn start_save(&self, saved_name: &str) -> mpsc::UnboundedSender<Bytes> {
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
// a large buffer size is chosen so uploads can be received quickly,
// but with less possibility of running out of memory.
// (thats probably only possible w very high link speed tho......)
let (tx, mut rx) = mpsc::channel::<Bytes>(1000);
let (tx, mut rx): (mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>) =
mpsc::unbounded_channel();
let p = self.path_for(saved_name);
tokio::spawn(async move {
// create file to save upload to
let mut file = match File::create(p).await {
Ok(f) => f,
Err(err) => {
tracing::error!(%err, "could not open file! make sure your upload path is valid");
return;
}
};
let mut file = File::create(p)
.await
.expect("could not open file! make sure your upload path is valid");
// receive chunks and save them to file
while let Some(chunk) = rx.recv().await {
tracing::debug!(length = chunk.len(), "writing chunk to disk");
if let Err(err) = file.write_all(&chunk).await {
drop(rx);
fail_callback(err).await;
return;
}
}
// flush to disk
if let Err(err) = file.flush().await {
fail_callback(err).await;
return;
}
// sync data+metadata to disk
if let Err(err) = file.sync_all().await {
fail_callback(err).await;
debug!("writing chunk to disk (length: {})", chunk.len());
file.write_all(&chunk)
.await
.expect("error while writing file to disk");
}
});

View File

@ -1,24 +1,22 @@
use std::{
io::SeekFrom,
ops::{Bound, RangeBounds},
sync::Arc,
ops::Bound,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
use axum::body::BodyDataStream;
use base64::{Engine as _, prelude::BASE64_URL_SAFE_NO_PAD};
use bytes::{BufMut, Bytes, BytesMut};
use color_eyre::eyre::{self, WrapErr};
use hmac::Mac;
use img_parts::{DynImage, ImageEXIF};
use rand::distr::{Alphanumeric, SampleString};
use rand::distributions::{Alphanumeric, DistString};
use tokio::{
fs::File,
io::{AsyncReadExt, AsyncSeekExt},
};
use tokio_stream::StreamExt;
use tracing::{debug, error, info};
use xxhash_rust::xxh3;
use crate::{cache, config, disk};
@ -30,7 +28,6 @@ pub enum UploadData {
Disk(tokio::io::Take<File>),
}
/// Upload data and metadata needed to build a view response
pub struct UploadResponse {
pub full_len: u64,
pub range: (u64, u64),
@ -41,11 +38,8 @@ pub struct UploadResponse {
/// Some are rejections.
pub enum ProcessOutcome {
/// The upload was successful.
/// We give the user their file's URL (and deletion URL if one was created)
Success {
url: String,
deletion_url: Option<String>,
},
/// We give the user their file's URL
Success(String),
/// Occurs when an upload exceeds the chosen maximum file size.
UploadTooLarge,
@ -69,112 +63,65 @@ pub enum GetOutcome {
RangeNotSatisfiable,
}
/// Type alias to make using HMAC SHA256 easier
type HmacSha256 = hmac::Hmac<sha2::Sha256>;
/// breeze engine
pub struct Engine {
/// Cached count of uploaded files
pub upl_count: AtomicUsize,
/// Engine configuration
pub cfg: config::EngineConfig,
/// HMAC state initialised with the deletion secret (if present)
pub deletion_hmac: Option<HmacSha256>,
/// The in-memory cache that cached uploads are stored in
cache: Arc<cache::Cache>,
/// An interface to the on-disk upload store
disk: disk::DiskArray,
disk: disk::Disk,
}
/// Try to parse a `Range` header into an easier format to work with
fn resolve_range(range: Option<headers::Range>, full_len: u64) -> Option<(u64, u64)> {
// Prepare default range
let default = Some((0, full_len));
let last_byte = full_len - 1;
// Take range, otherwise return
let Some(range) = range else {
return default; // unspecified; use default
};
let (start, end) =
if let Some((start, end)) = range.and_then(|r| r.satisfiable_ranges(full_len).next()) {
// satisfiable_ranges will never return Excluded so this is ok
let start = if let Bound::Included(start_incl) = start {
start_incl
} else {
0
};
let end = if let Bound::Included(end_incl) = end {
end_incl
} else {
last_byte
};
// Get iterator of satisfiable ranges
let mut ranges = range.satisfiable_ranges(full_len);
(start, end)
} else {
(0, last_byte)
};
// Take first range
let Some(range) = ranges.next() else {
return default; // empty; use default
};
// If there are multiple ranges, we will
// not process the request
if ranges.next().is_some() {
// catch ranges we can't satisfy
if end > last_byte || start > end {
return None;
}
// Convert into a..b range
let start = match range.start_bound() {
Bound::Included(&x) => x,
Bound::Excluded(&x) => x.checked_add(1)?,
Bound::Unbounded => 0,
};
let end = match range.end_bound() {
Bound::Included(&x) => x.checked_add(1)?,
Bound::Excluded(&x) => x,
Bound::Unbounded => full_len,
};
// We can't handle bounds
// out of order
if start > end {
return None;
}
// We can't return more bytes
// than we have
if end > full_len {
return None;
}
// Return
Some((start, end))
}
/// Calculate HMAC of field values.
pub fn update_hmac(hmac: &mut HmacSha256, saved_name: &str, hash: u128) {
// mix deletion req fields into one buf
let mut field_bytes = BytesMut::new();
field_bytes.put(saved_name.as_bytes());
field_bytes.put_u128(hash);
// take the hmac
hmac.update(&field_bytes);
}
/// How many bytes of a file should be used for hash calculation.
const SAMPLE_WANTED_BYTES: usize = 32768;
/// Format some info about an upload and hash it
///
/// This should not change between versions!!
/// That would break deletion urls
fn calculate_hash(len: u64, data_sample: Bytes) -> u128 {
let mut buf = BytesMut::new();
buf.put_u64(len);
buf.put(data_sample);
xxh3::xxh3_128(&buf)
}
impl Engine {
/// Creates a new instance of the engine
pub fn new(cfg: config::EngineConfig, cache: Arc<cache::Cache>, disk: disk::DiskArray) -> Self {
let deletion_hmac = cfg
.deletion_secret
.as_ref()
.map(|s| HmacSha256::new_from_slice(s.as_bytes()).unwrap());
pub fn with_config(cfg: config::EngineConfig) -> Self {
let cache = cache::Cache::with_config(cfg.cache.clone());
let disk = disk::Disk::with_config(cfg.disk.clone());
let cache = Arc::new(cache);
let cache_scanner = cache.clone();
tokio::spawn(async move { cache_scanner.scanner().await });
Self {
deletion_hmac,
// initialise our cached upload count. this doesn't include temp uploads!
upl_count: AtomicUsize::new(disk.count()),
cfg,
@ -194,17 +141,18 @@ impl Engine {
&self,
saved_name: &str,
range: Option<headers::Range>,
) -> eyre::Result<GetOutcome> {
) -> anyhow::Result<GetOutcome> {
let data = if let Some(u) = self.cache.get(saved_name) {
u
} else {
// now, check if we have it on disk
let Some(mut f) = self.disk.open(saved_name).await? else {
let mut f = if let Some(f) = self.disk.open(saved_name).await? {
f
} else {
// file didn't exist
return Ok(GetOutcome::NotFound);
};
// read length from disk
let full_len = self.disk.len(&f).await?;
// if possible, recache and send a cache response
@ -232,15 +180,17 @@ impl Engine {
data
} else {
let Some((start, end)) = resolve_range(range, full_len) else {
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
range
} else {
return Ok(GetOutcome::RangeNotSatisfiable);
};
// Set up file handle
f.seek(SeekFrom::Start(start)).await?;
let f = f.take(end - start);
let range_len = (end - start) + 1;
f.seek(std::io::SeekFrom::Start(start)).await?;
let f = f.take(range_len);
// Return
let res = UploadResponse {
full_len,
range: (start, end),
@ -250,27 +200,17 @@ impl Engine {
}
};
// Resolve a..b range
let full_len = data.len() as u64;
let Some((start, end)) = resolve_range(range, full_len) else {
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
range
} else {
return Ok(GetOutcome::RangeNotSatisfiable);
};
// Cut down to range
let data = {
// Convert types.
// These should never be greater than usize::MAX
// if I recall, because max cache length is a usize.
let (start, end): (usize, usize) = (
start.try_into().expect("start bound"),
end.try_into().expect("end bound"),
);
// cut down to range
let data = data.slice((start as usize)..=(end as usize));
// Slice bytes
data.slice(start..end)
};
// Build response
// build response
let res = UploadResponse {
full_len,
range: (start, end),
@ -279,101 +219,55 @@ impl Engine {
Ok(GetOutcome::Success(res))
}
/// Returns the amount of uploads we have stored.
pub fn count(&self) -> usize {
self.disk.count()
}
/// Check if we have an upload stored anywhere.
///
/// This is only used to prevent `saved_name` collisions!!
/// It is not used to deliver "not found" errors.
pub fn has(&self, saved_name: &str) -> bool {
// look in cache
pub async fn has(&self, saved_name: &str) -> bool {
if self.cache.has(saved_name) {
return true;
}
// look in disk
if self.disk.has(saved_name) {
// sidestep handling the error properly
// that way we can call this in gen_saved_name easier
if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) {
return true;
}
false
}
/// Try to read a file and calculate a hash for it.
pub async fn get_hash(&self, saved_name: &str) -> eyre::Result<Option<u128>> {
// readout sample data and full len
let (data_sample, len) = if let Some(full_data) = self.cache.get(saved_name) {
// we found it in cache! take as many bytes as we can
let taking = full_data.len().min(SAMPLE_WANTED_BYTES);
let data = full_data.slice(0..taking);
// get len
let len = full_data.len() as u64;
// return
(data, len)
} else {
// not in cache, so try disk
let Some(mut f) = self.disk.open(saved_name).await? else {
// not found there either so we just dont have it
return Ok(None);
};
// find len..
let len = f.seek(SeekFrom::End(0)).await?;
f.rewind().await?;
// only take wanted # of bytes for read
let mut f = f.take(SAMPLE_WANTED_BYTES as u64);
// try to read
let mut data = Vec::with_capacity(SAMPLE_WANTED_BYTES);
f.read_to_end(&mut data).await?;
let data = Bytes::from(data);
(data, len)
};
// calculate hash
Ok(Some(calculate_hash(len, data_sample)))
}
/// Generate a new saved name for an upload.
///
/// If it picks a name that already exists, it will try again.
pub fn gen_saved_name(&self, ext: Option<&str>) -> String {
pub async fn gen_saved_name(&self, ext: Option<String>) -> String {
loop {
// generate a 6-character alphanumeric string
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::rng(), 6);
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
// if we have an extension, add it now
if let Some(ext) = ext {
if let Some(ref ext) = ext {
saved_name.push('.');
saved_name.push_str(ext);
}
if !self.has(&saved_name) {
if !self.has(&saved_name).await {
break saved_name;
} else {
// there was a name collision. loop and try again
info!("name collision! saved_name= {}", saved_name);
}
// there was a name collision. loop and try again
info!("name collision! saved_name= {}", saved_name);
}
}
/// Wipe out an upload from all storage.
/// * Intended for deletion URLs and failed uploads
pub async fn remove(&self, saved_name: &str) -> eyre::Result<()> {
info!(saved_name, "!! removing upload");
///
/// This is for deleting failed uploads only!!
pub async fn remove(&self, saved_name: &str) -> anyhow::Result<()> {
info!("!! removing upload: {saved_name}");
// removals
self.cache.remove(saved_name);
self.disk
.remove(saved_name)
.await
.wrap_err("failed to remove file from disk")?;
self.disk.remove(saved_name).await?;
info!("!! successfully removed upload");
@ -384,14 +278,14 @@ impl Engine {
///
/// This also handles custom file lifetimes and EXIF data removal.
pub async fn save(
self: &Arc<Self>,
&self,
saved_name: &str,
provided_len: u64,
mut use_cache: bool,
mut stream: BodyDataStream,
lifetime: Option<Duration>,
keep_exif: bool,
) -> eyre::Result<(Bytes, u64)> {
) -> anyhow::Result<()> {
// if we're using cache, make some space to store the upload in
let mut data = if use_cache {
BytesMut::with_capacity(provided_len.try_into()?)
@ -401,24 +295,12 @@ impl Engine {
// don't begin a disk save if we're using temporary lifetimes
let tx = if lifetime.is_none() {
// todo: can you lie about len??
Some(self.disk.start_save(saved_name, provided_len, {
let me = self.clone();
let saved_name = saved_name.to_string();
async move |err| {
// try to delete the failed upload
error!(%saved_name, %err, "error while saving file to disk");
if let Err(err) = me.remove(&saved_name).await {
error!(%saved_name, %err, "IO error callback failed to remove upload");
}
}
}))
Some(self.disk.start_save(saved_name).await)
} else {
None
};
// whether or not we are going to coalesce the data
// whether or not we're gonna coalesce the data
// in order to strip the exif data at the end,
// instead of just sending it off to the i/o task
let coalesce_and_strip = use_cache
@ -431,11 +313,6 @@ impl Engine {
&& !keep_exif
&& provided_len <= self.cfg.max_strip_len;
// buffer of sampled data for the deletion hash
let mut hash_sample = BytesMut::with_capacity(SAMPLE_WANTED_BYTES);
// actual number of bytes processed
let mut observed_len = 0;
// read and save upload
while let Some(chunk) = stream.next().await {
// if we error on a chunk, fail out
@ -443,30 +320,18 @@ impl Engine {
// if we have an i/o task, send it off
// also cloning this is okay because it's a Bytes
if !coalesce_and_strip && let Some(ref tx) = tx {
debug!("sending chunk to i/o task");
tx.send(chunk.clone())
.await
.wrap_err("failed to send chunk to i/o task!")?;
if !coalesce_and_strip {
if let Some(ref tx) = tx {
debug!("sending chunk to i/o task");
tx.send(chunk.clone())?;
}
}
// add to sample if we need to
let wanted = SAMPLE_WANTED_BYTES - hash_sample.len();
if wanted != 0 {
// take as many bytes as we can ...
let taking = chunk.len().min(wanted);
hash_sample.extend_from_slice(&chunk[0..taking]);
}
// record new len
observed_len += chunk.len() as u64;
if use_cache {
debug!("receiving data into buffer");
if data.len() + chunk.len() > data.capacity() {
info!(
"the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."
);
info!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
data = BytesMut::new();
@ -500,9 +365,7 @@ impl Engine {
// send what we did over to the i/o task, all in one chunk
if let Some(ref tx) = tx {
debug!("sending filled buffer to i/o task");
tx.send(data.clone())
.await
.wrap_err("failed to send coalesced buffer to i/o task!")?;
tx.send(data.clone())?;
}
data
@ -521,21 +384,19 @@ impl Engine {
};
}
// return w/ info for hash calculation
Ok((hash_sample.freeze(), observed_len))
Ok(())
}
pub async fn process(
self: &Arc<Self>,
&self,
ext: Option<String>,
provided_len: u64,
stream: BodyDataStream,
lifetime: Option<Duration>,
keep_exif: bool,
) -> eyre::Result<ProcessOutcome> {
) -> anyhow::Result<ProcessOutcome> {
// if the upload size is greater than our max file size, deny it now
// temp uploads get to skip it because they don't use the disk
if lifetime.is_none() && !self.disk.will_use(provided_len) {
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
return Ok(ProcessOutcome::UploadTooLarge);
}
@ -553,7 +414,7 @@ impl Engine {
}
// generate the file name
let saved_name = self.gen_saved_name(ext.as_deref());
let saved_name = self.gen_saved_name(ext).await;
// save it
let save_result = self
@ -567,49 +428,22 @@ impl Engine {
)
.await;
// handle result
let (hash_sample, len) = match save_result {
// Okay so just extract metadata
Ok(m) => m,
// If anything fails, delete the upload and return the error
Err(err) => {
error!(?err, "failed processing upload!");
// If anything fails, delete the upload and return the error
if save_result.is_err() {
error!("failed processing upload!");
// if the disk is messed up or something
// lets not propagate the remove error
// before the save error
if let Err(err) = self.remove(&saved_name).await {
error!(?err, "failed to cleanup failed upload!");
}
return Err(err);
}
};
// if deletion urls are enabled, create one
let deletion_url = self.deletion_hmac.clone().map(|mut hmac| {
// calculate hash of file metadata
let hash = calculate_hash(len, hash_sample);
let mut hash_bytes = BytesMut::new();
hash_bytes.put_u128(hash);
let hash_b64 = BASE64_URL_SAFE_NO_PAD.encode(&hash_bytes);
// take hmac
update_hmac(&mut hmac, &saved_name, hash);
let out = hmac.finalize().into_bytes();
let out_b64 = BASE64_URL_SAFE_NO_PAD.encode(out);
// format deletion url
format!(
"{}/del?name={saved_name}&hash={hash_b64}&hmac={out_b64}",
self.cfg.base_url
)
});
self.remove(&saved_name).await?;
save_result?;
}
// format and send back the url
let url = format!("{}/p/{saved_name}", self.cfg.base_url);
let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
// if all goes well, increment the cached upload counter
self.upl_count.fetch_add(1, Ordering::Relaxed);
info!("finished processing upload!");
Ok(ProcessOutcome::Success { url, deletion_url })
Ok(ProcessOutcome::Success(url))
}
}

View File

@ -1,12 +1,10 @@
use std::sync::Arc;
use std::sync::{atomic::Ordering, Arc};
use axum::extract::State;
use crate::engine::Engine;
/// Show index status page with amount of uploaded files
pub async fn index(State(engine): State<Arc<Engine>>) -> String {
let count = engine.count();
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
let count = engine.upl_count.load(Ordering::Relaxed);
let motd = engine.cfg.motd.clone();
@ -14,7 +12,6 @@ pub async fn index(State(engine): State<Arc<Engine>>) -> String {
.replace("%uplcount%", &count.to_string())
}
#[rustfmt::skip]
pub async fn robots_txt() -> &'static str {
/// robots.txt that tells web crawlers not to list uploads
const ROBOTS_TXT: &str = concat!(

View File

@ -1,18 +1,17 @@
use std::{path::PathBuf, sync::Arc};
use argh::FromArgs;
use color_eyre::eyre::{self, Context};
use engine::Engine;
use axum::{
Router,
routing::{get, post},
Router,
};
use tokio::{fs, net::TcpListener, signal};
use tracing::{info, warn};
mod cache;
mod config;
mod delete;
mod disk;
mod engine;
mod index;
@ -22,8 +21,6 @@ mod view;
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
use crate::{cache::Cache, disk::DiskArray, engine::Engine};
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
@ -36,34 +33,20 @@ struct Args {
config: PathBuf,
}
/// Instantiates router.
fn router(engine: Engine) -> Router {
Router::new()
.route("/new", post(new::new))
.route("/p/{saved_name}", get(view::view))
.route("/del", get(delete::delete))
.route("/", get(index::index))
.route("/robots.txt", get(index::robots_txt))
.with_state(Arc::new(engine))
}
#[tokio::main]
async fn main() -> eyre::Result<()> {
// Install color-eyre
color_eyre::install()?;
async fn main() {
// Read & parse args
let args: Args = argh::from_env();
// Read & parse config
let cfg: config::Config = {
let config_str = fs::read_to_string(args.config).await.wrap_err(
let config_str = fs::read_to_string(args.config).await.expect(
"failed to read config file! make sure it exists and you have read permissions",
)?;
);
toml::from_str(&config_str).wrap_err(
"invalid config! ensure proper fields and structure. reference config is in readme",
)?
toml::from_str(&config_str).unwrap_or_else(|e| {
panic!("invalid config! ensure proper fields and structure. reference config is in readme.\n{e}");
})
};
// Set up tracing
@ -72,31 +55,36 @@ async fn main() -> eyre::Result<()> {
.init();
// Check config
{
let save_path = cfg.engine.disk.save_path.clone();
if !save_path.exists() || !save_path.is_dir() {
panic!("the save path does not exist or is not a directory! this is invalid");
}
}
if cfg.engine.upload_key.is_empty() {
warn!("engine upload_key is empty! no key will be required for uploading new files");
}
// Create backends
let cache = Cache::with_config(cfg.cache)?;
let disk_array = DiskArray::with_configs(cfg.disk)?;
// Create engine
let engine = Engine::new(cfg.engine, cache, disk_array);
let engine = Engine::with_config(cfg.engine);
// Build main router
let app = router(engine);
let app = Router::new()
.route("/new", post(new::new))
.route("/p/{saved_name}", get(view::view))
.route("/", get(index::index))
.route("/robots.txt", get(index::robots_txt))
.with_state(Arc::new(engine));
// Start web server
info!("starting server.");
let listener = TcpListener::bind(&cfg.http.listen_on)
.await
.wrap_err("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound")?;
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.wrap_err("failed to start server")?;
Ok(())
.expect("failed to start server");
}
async fn shutdown_signal() {
@ -118,8 +106,8 @@ async fn shutdown_signal() {
let terminate = std::future::pending::<()>();
tokio::select! {
() = ctrl_c => {},
() = terminate => {},
_ = ctrl_c => {},
_ = terminate => {},
}
info!("shutting down!");

View File

@ -8,16 +8,14 @@ use std::{
use axum::{
body::Body,
extract::{Query, State},
response::{IntoResponse as _, Response},
};
use axum_extra::TypedHeader;
use headers::ContentLength;
use http::{HeaderValue, StatusCode};
use http::StatusCode;
use serde::Deserialize;
use serde_with::{DurationSeconds, serde_as};
use tracing::error;
use serde_with::{serde_as, DurationSeconds};
use crate::engine::{Engine, ProcessOutcome};
use crate::engine::ProcessOutcome;
fn default_keep_exif() -> bool {
false
@ -40,11 +38,11 @@ pub struct NewRequest {
/// The request handler for the /new path.
/// This handles all new uploads.
pub async fn new(
State(engine): State<Arc<Engine>>,
State(engine): State<Arc<crate::engine::Engine>>,
Query(req): Query<NewRequest>,
TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
body: Body,
) -> Result<Response, StatusCode> {
) -> Result<String, StatusCode> {
// check upload key, if i need to
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
return Err(StatusCode::FORBIDDEN);
@ -92,7 +90,7 @@ pub async fn new(
// pass it off to the engine to be processed
// --
// also, error responses here don't get presented properly in ShareX most of the time
// also, error responses here don't get represented properly in ShareX most of the time
// they don't expect the connection to close before they're done uploading, i think
// so it will just present the user with a "connection closed" error
match engine
@ -101,20 +99,7 @@ pub async fn new(
{
Ok(outcome) => match outcome {
// 200 OK
ProcessOutcome::Success { url, deletion_url } => {
let mut res = url.into_response();
// insert deletion url header if needed
if let Some(deletion_url) = deletion_url {
let deletion_url = HeaderValue::from_str(&deletion_url)
.expect("deletion url contains invalid chars");
let headers = res.headers_mut();
headers.insert("Breeze-Deletion-Url", deletion_url);
}
Ok(res)
}
ProcessOutcome::Success(url) => Ok(url),
// 413 Payload Too Large
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
@ -126,9 +111,6 @@ pub async fn new(
},
// 500 Internal Server Error
Err(err) => {
error!("failed to process upload!! {err:#}");
Err(StatusCode::INTERNAL_SERVER_ERROR)
}
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
}
}

View File

@ -10,9 +10,8 @@ use axum_extra::TypedHeader;
use headers::Range;
use http::{HeaderValue, StatusCode};
use tokio_util::io::ReaderStream;
use tracing::error;
use crate::engine::{Engine, GetOutcome, UploadData, UploadResponse};
use crate::engine::{GetOutcome, UploadData, UploadResponse};
/// Responses for a failed view operation
pub enum ViewError {
@ -22,7 +21,7 @@ pub enum ViewError {
/// Will send status code 500 with a plaintext "internal server error" message.
InternalServerError,
/// Sends status code 416 with a plaintext "range not satisfiable" message.
/// Sends status code 206 with a plaintext "range not satisfiable" message.
RangeNotSatisfiable,
}
@ -45,7 +44,7 @@ impl IntoResponse for ViewError {
impl IntoResponse for UploadResponse {
fn into_response(self) -> Response {
let (start, end) = self.range;
let range_len = end - start;
let range_len = (end - start) + 1;
let mut res = match self.data {
UploadData::Cache(data) => data.into_response(),
@ -73,11 +72,8 @@ impl IntoResponse for UploadResponse {
// if it is not the full size, add relevant headers/status for range request
if range_len != self.full_len {
// the spec says its meant to be an inclusive range
// so we do that.........
let end_incl = end - 1;
let content_range =
HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end_incl, self.full_len))
HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end, self.full_len))
.expect("construct content-range header failed");
headers.insert("Content-Range", content_range);
@ -91,27 +87,23 @@ impl IntoResponse for UploadResponse {
/// GET request handler for /p/* path.
/// All file views are handled here.
pub async fn view(
State(engine): State<Arc<Engine>>,
State(engine): State<Arc<crate::engine::Engine>>,
Path(original_path): Path<PathBuf>,
range: Option<TypedHeader<Range>>,
) -> Result<UploadResponse, ViewError> {
// try to extract the file name (if it's the only component)
// this makes paths like `asdf%2fabcdef.png` invalid
let saved_name = match original_path.file_name().map(OsStr::to_str) {
Some(Some(n)) if original_path.components().count() == 1 => n,
_ => return Err(ViewError::NotFound),
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
n
} else {
return Err(ViewError::NotFound);
};
let range = range.map(|TypedHeader(range)| range);
let range = range.map(|th| th.0);
// get result from the engine
match engine.get(saved_name, range).await {
Ok(GetOutcome::Success(res)) => Ok(res),
Ok(GetOutcome::NotFound) => Err(ViewError::NotFound),
Ok(GetOutcome::RangeNotSatisfiable) => Err(ViewError::RangeNotSatisfiable),
Err(err) => {
error!("failed to get upload!! {err:#}");
Err(ViewError::InternalServerError)
}
Err(_) => Err(ViewError::InternalServerError),
}
}