Compare commits
No commits in common. "main" and "0.1.5-p2" have entirely different histories.
4
.envrc
4
.envrc
|
@ -1,4 +0,0 @@
|
||||||
if ! has nix_direnv_version || ! nix_direnv_version 3.0.6; then
|
|
||||||
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.6/direnvrc" "sha256-RYcUJaRMf8oF5LznDrlCXbkOQrywm0HDv1VjYGaJGdM="
|
|
||||||
fi
|
|
||||||
use flake
|
|
|
@ -1,5 +1,2 @@
|
||||||
# binaries
|
# binaries
|
||||||
/target
|
/target
|
||||||
|
|
||||||
# nix-direnv
|
|
||||||
/.direnv
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
56
Cargo.toml
56
Cargo.toml
|
@ -1,45 +1,25 @@
|
||||||
[package]
|
[package]
|
||||||
name = "breeze"
|
name = "breeze"
|
||||||
version = "0.3.1"
|
version = "0.1.5"
|
||||||
edition = "2024"
|
edition = "2021"
|
||||||
|
|
||||||
[profile.dev.package]
|
|
||||||
tikv-jemalloc-sys = { opt-level = 3 }
|
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
argh = "0.1.12"
|
axum = { version = "0.6.1", features = ["macros"] }
|
||||||
atomic-time = "0.1.4"
|
hyper = { version = "0.14", features = ["full"] }
|
||||||
axum = { version = "0.8.1", features = ["macros", "http2"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
axum-extra = { version = "0.10.0", default-features = false, features = [
|
tokio-util = { version = "0.7.4", features = ["full"] }
|
||||||
"tracing",
|
|
||||||
"typed-header",
|
|
||||||
] }
|
|
||||||
base64 = "0.21"
|
|
||||||
bytes = "1"
|
|
||||||
color-eyre = "0.6"
|
|
||||||
dashmap = { version = "6.1.0", features = ["inline"] }
|
|
||||||
headers = "0.4"
|
|
||||||
hmac = "0.12.1"
|
|
||||||
http = "1.2"
|
|
||||||
img-parts = "0.3"
|
|
||||||
rand = "0.9"
|
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
|
||||||
serde_with = "3.12"
|
|
||||||
sha2 = "0.10.9"
|
|
||||||
tokio = { version = "1", features = [
|
|
||||||
"rt-multi-thread",
|
|
||||||
"macros",
|
|
||||||
"net",
|
|
||||||
"fs",
|
|
||||||
"signal",
|
|
||||||
] }
|
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
tokio-util = { version = "0.7", features = ["io"] }
|
tower = "0.4.13"
|
||||||
toml = "0.8.2"
|
bytes = "1"
|
||||||
|
rand = "0.8.5"
|
||||||
|
async-recursion = "1.0.0"
|
||||||
|
walkdir = "2"
|
||||||
|
futures = "0.3"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
twox-hash = "2"
|
archived = { path = "./archived" }
|
||||||
walkdir = "2"
|
xxhash-rust = { version = "0.8.7", features = ["xxh3"] }
|
||||||
|
serde = { version = "1.0.189", features = ["derive"] }
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
toml = "0.8.2"
|
||||||
tikv-jemallocator = "0.6"
|
clap = { version = "4.4.6", features = ["derive"] }
|
||||||
|
serde_with = "3.4.0"
|
||||||
|
|
|
@ -12,4 +12,8 @@ RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
||||||
|
|
||||||
|
RUN useradd -m runner
|
||||||
|
USER runner
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
CMD [ "breeze", "--config", "/etc/breeze.toml" ]
|
CMD [ "breeze", "--config", "/etc/breeze.toml" ]
|
||||||
|
|
113
README.md
113
README.md
|
@ -1,34 +1,26 @@
|
||||||
# breeze
|
# breeze
|
||||||
|
|
||||||
breeze is a simple, performant file upload server.
|
breeze is a simple, performant file upload server.
|
||||||
|
|
||||||
The primary instance is https://picture.wtf.
|
The primary instance is https://picture.wtf.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
Compared to the old Express.js backend, breeze has
|
||||||
- Basic upload API tailored towards ShareX
|
|
||||||
- Streamed uploading
|
- Streamed uploading
|
||||||
- Streamed downloading (on larger files)
|
- Streamed downloading (on larger files)
|
||||||
- Pause/continue download support with `Range` header
|
- Upload caching
|
||||||
- Upload caching in memory
|
- Generally faster speeds overall
|
||||||
- Support for ShareX file deletion URLs
|
|
||||||
- Temporary uploads
|
At this time, breeze does not support encrypted uploads on disk.
|
||||||
- Automatic exif data removal
|
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
I wrote breeze with the intention of running it in a container, but it runs just fine outside of one.
|
||||||
|
|
||||||
On picture.wtf, breeze is ran with the NixOS module provided by `flake.nix`. [Take a look at the config](https://git.min.rip/min/infra/src/branch/main/nixos/hosts/silver/services/breeze.nix) if you want!
|
Either way, you need to start off by cloning the Git repository.
|
||||||
|
|
||||||
Containerised and bare-metal deployments are also supported. Instructions for those are below.
|
|
||||||
|
|
||||||
To begin, clone the Git repository:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
git clone https://git.min.rip/min/breeze.git
|
git clone https://git.min.rip/min/breeze.git
|
||||||
```
|
```
|
||||||
|
|
||||||
If you want to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
|
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below. You can start it using `docker compose up -d`.
|
||||||
|
|
||||||
```
|
```
|
||||||
version: '3.6'
|
version: '3.6'
|
||||||
|
|
||||||
|
@ -41,56 +33,26 @@ services:
|
||||||
- /srv/uploads:/data
|
- /srv/uploads:/data
|
||||||
- ./breeze.toml:/etc/breeze.toml
|
- ./breeze.toml:/etc/breeze.toml
|
||||||
|
|
||||||
user: 1000:1000
|
|
||||||
|
|
||||||
ports:
|
ports:
|
||||||
- 8383:8000
|
- 8000:8000
|
||||||
```
|
```
|
||||||
|
For this configuration, it is expected that:
|
||||||
|
* there is a clone of the Git repository in the `./breeze` folder.
|
||||||
|
* there is a `breeze.toml` config file in current directory
|
||||||
|
* there is a directory at `/srv/uploads` for storing uploads
|
||||||
|
|
||||||
With this configuration, it is expected that:
|
It can also be installed directly if you have the Rust toolchain installed:
|
||||||
|
|
||||||
- there is a clone of the Git repository in the `./breeze` folder
|
|
||||||
- there is a `breeze.toml` config file in current directory
|
|
||||||
- there is a directory at `/srv/uploads` for storing uploads
|
|
||||||
- port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
|
||||||
- you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
|
||||||
|
|
||||||
It can also be installed directly if the Rust toolchain is installed:
|
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd breeze
|
|
||||||
cargo install --path .
|
cargo install --path .
|
||||||
|
|
||||||
# then, you can run w/ a path to your `breeze.toml` config file
|
|
||||||
breeze --config /path/to/breeze.toml
|
|
||||||
```
|
|
||||||
|
|
||||||
### Exposing publicly
|
|
||||||
|
|
||||||
If you want to expose a breeze server to the internet, I highly recommend using a reverse proxy instead of just forwarding its HTTP port.
|
|
||||||
|
|
||||||
Caddy is probably the easiest to set up if you are new to reverse proxies. Here is an example `Caddyfile` for the Docker Compose file above (assuming `yourdomain.com` is a domain that points to your server's IP).
|
|
||||||
|
|
||||||
```
|
|
||||||
yourdomain.com {
|
|
||||||
# enable compression
|
|
||||||
encode
|
|
||||||
|
|
||||||
# forward request to breeze
|
|
||||||
reverse_proxy 127.0.0.1:8383
|
|
||||||
}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
### Hosting
|
### Hosting
|
||||||
|
|
||||||
Configuration is read through a toml file.
|
Configuration is read through a toml file.
|
||||||
|
|
||||||
The config file path is specified using the `-c`/`--config` command line switch.
|
By default it'll try to read `./breeze.toml`, but you can specify a different path using the `-c`/`--config` command line switch.
|
||||||
|
|
||||||
Here is an example config file:
|
Here is an example config file:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[engine]
|
[engine]
|
||||||
# The base URL that the HTTP server will be accessible on.
|
# The base URL that the HTTP server will be accessible on.
|
||||||
|
@ -99,16 +61,14 @@ Here is an example config file:
|
||||||
# upload urls of "https://picture.wtf/p/abcdef.png", etc.
|
# upload urls of "https://picture.wtf/p/abcdef.png", etc.
|
||||||
base_url = "http://127.0.0.1:8000"
|
base_url = "http://127.0.0.1:8000"
|
||||||
|
|
||||||
|
# The location that uploads will be saved to.
|
||||||
|
# It should be a path to a directory on disk that you can write to.
|
||||||
|
save_path = "/data"
|
||||||
|
|
||||||
# OPTIONAL - If set, the static key specified will be required to upload new files.
|
# OPTIONAL - If set, the static key specified will be required to upload new files.
|
||||||
# If it is not set, no key will be required.
|
# If it is not set, no key will be required.
|
||||||
upload_key = "hiiiiiiii"
|
upload_key = "hiiiiiiii"
|
||||||
|
|
||||||
# OPTIONAL - If set, the secret key used to verify ShareX deletion URLs.
|
|
||||||
# If it is not set, deletion URLs will not be created or made usable.
|
|
||||||
# WARNING: Do not share this!! If somebody else obtains it, they can
|
|
||||||
# generate deletion URLs for any upload!!
|
|
||||||
deletion_secret = "asdfhjkasdhjfashjlfhjkaskdfjkhdjkh"
|
|
||||||
|
|
||||||
# OPTIONAL - specifies what to show when the site is visited on http
|
# OPTIONAL - specifies what to show when the site is visited on http
|
||||||
# It is sent with text/plain content type.
|
# It is sent with text/plain content type.
|
||||||
# There are two variables you can use:
|
# There are two variables you can use:
|
||||||
|
@ -116,28 +76,6 @@ deletion_secret = "asdfhjkasdhjfashjlfhjkaskdfjkhdjkh"
|
||||||
# %version% - current breeze version (e.g. 0.1.5)
|
# %version% - current breeze version (e.g. 0.1.5)
|
||||||
motd = "my image host, currently hosting %uplcount% files"
|
motd = "my image host, currently hosting %uplcount% files"
|
||||||
|
|
||||||
# The maximum lifetime a temporary upload may be given, in seconds.
|
|
||||||
# It's okay to leave this somewhat high because large temporary uploads
|
|
||||||
# will just be bumped out of the cache when a new upload needs to be
|
|
||||||
# cached anyways.
|
|
||||||
max_temp_lifetime = 43200
|
|
||||||
|
|
||||||
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
|
||||||
# A word of warning about this: the error shown to ShareX users who
|
|
||||||
# hit the limit is *not* very clear. ("connection closed" or similar)
|
|
||||||
max_upload_len = 2_147_483_648
|
|
||||||
|
|
||||||
# The maximum length (in bytes) an image file may be before the server
|
|
||||||
# will skip removing its EXIF data.
|
|
||||||
# The performance impact of breeze's EXIF data removal is not
|
|
||||||
# very high in everyday usage, so something like 16MiB is reasonable.
|
|
||||||
max_strip_len = 16_777_216
|
|
||||||
|
|
||||||
[engine.disk]
|
|
||||||
# The location that uploads will be saved to.
|
|
||||||
# It should be a path to a directory on disk that you can write to.
|
|
||||||
save_path = "/data"
|
|
||||||
|
|
||||||
[engine.cache]
|
[engine.cache]
|
||||||
# The file size (in bytes) that a file must be under
|
# The file size (in bytes) that a file must be under
|
||||||
# to get cached.
|
# to get cached.
|
||||||
|
@ -151,7 +89,7 @@ upload_lifetime = 1800
|
||||||
scan_freq = 60
|
scan_freq = 60
|
||||||
|
|
||||||
# How much memory (in bytes) the cache is allowed to consume.
|
# How much memory (in bytes) the cache is allowed to consume.
|
||||||
mem_capacity = 4_294_967_296
|
mem_capacity = 4_294_967_295
|
||||||
|
|
||||||
[http]
|
[http]
|
||||||
# The address that the HTTP server will listen on. (ip:port)
|
# The address that the HTTP server will listen on. (ip:port)
|
||||||
|
@ -166,17 +104,11 @@ level = "warn"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Uploading
|
### Uploading
|
||||||
|
The HTTP API is fairly simple, and it's pretty easy to make a ShareX configuration for it.
|
||||||
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
|
||||||
|
|
||||||
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
||||||
|
|
||||||
Also you can specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, you can leave it as default. you'll know if you need it)
|
|
||||||
|
|
||||||
The endpoint's response will just be the URL of the upload in plain text, and the deletion URL will be sent in the `Breeze-Deletion-Url` header (if it's enabled).
|
|
||||||
|
|
||||||
Here's an example ShareX configuration for it (with a key):
|
Here's an example ShareX configuration for it (with a key):
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"Version": "14.1.0",
|
"Version": "14.1.0",
|
||||||
|
@ -188,7 +120,6 @@ Here's an example ShareX configuration for it (with a key):
|
||||||
"name": "{filename}",
|
"name": "{filename}",
|
||||||
"key": "hiiiiiiii"
|
"key": "hiiiiiiii"
|
||||||
},
|
},
|
||||||
"Body": "Binary",
|
"Body": "Binary"
|
||||||
"DeletionURL": "{header:Breeze-Deletion-Url}"
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
.idea
|
||||||
|
target
|
|
@ -0,0 +1,30 @@
|
||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "archived"
|
||||||
|
version = "0.2.0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"once_cell",
|
||||||
|
"rustc-hash",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bytes"
|
||||||
|
version = "1.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "once_cell"
|
||||||
|
version = "1.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rustc-hash"
|
||||||
|
version = "1.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
|
@ -0,0 +1,9 @@
|
||||||
|
[package]
|
||||||
|
name = "archived"
|
||||||
|
version = "0.2.0"
|
||||||
|
edition = "2018"
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
bytes = "1.3.0"
|
||||||
|
once_cell = "1.3.1"
|
|
@ -0,0 +1,22 @@
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2020 aikidos
|
||||||
|
Copyright (c) 2023 ot2t7, minish
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
|
@ -0,0 +1,26 @@
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
/// Represents a set of eviction and expiration details for a specific cache entry.
|
||||||
|
pub(crate) struct CacheEntry<B> {
|
||||||
|
/// Entry value.
|
||||||
|
pub(crate) value: B,
|
||||||
|
|
||||||
|
/// Expiration time.
|
||||||
|
///
|
||||||
|
/// - [`None`] if the value must be kept forever.
|
||||||
|
pub(crate) expiration_time: SystemTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> CacheEntry<B> {
|
||||||
|
pub(crate) fn new(value: B, lifetime: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
expiration_time: SystemTime::now() + lifetime,
|
||||||
|
value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a entry is expired.
|
||||||
|
pub(crate) fn is_expired(&self, current_time: SystemTime) -> bool {
|
||||||
|
current_time >= self.expiration_time
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,172 @@
|
||||||
|
mod entry;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
|
||||||
|
use crate::entry::*;
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
pub struct Archive {
|
||||||
|
cache_table: HashMap<String, CacheEntry<Bytes>>,
|
||||||
|
full_scan_frequency: Option<Duration>,
|
||||||
|
created_time: SystemTime,
|
||||||
|
last_scan_time: Option<SystemTime>,
|
||||||
|
entry_lifetime: Duration,
|
||||||
|
capacity: usize,
|
||||||
|
length: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Archive {
|
||||||
|
/* pub fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
cache_table: HashMap::new(),
|
||||||
|
full_scan_frequency: None,
|
||||||
|
created_time: SystemTime::now(),
|
||||||
|
last_scan_time: None,
|
||||||
|
capacity,
|
||||||
|
length: 0,
|
||||||
|
}
|
||||||
|
} */
|
||||||
|
|
||||||
|
pub fn with_full_scan(
|
||||||
|
full_scan_frequency: Duration,
|
||||||
|
entry_lifetime: Duration,
|
||||||
|
capacity: usize,
|
||||||
|
) -> Self {
|
||||||
|
Self {
|
||||||
|
cache_table: HashMap::with_capacity(256),
|
||||||
|
full_scan_frequency: Some(full_scan_frequency),
|
||||||
|
created_time: SystemTime::now(),
|
||||||
|
last_scan_time: None,
|
||||||
|
entry_lifetime,
|
||||||
|
capacity,
|
||||||
|
length: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains_key(&self, key: &String) -> bool {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.get(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_last_scan_time(&self) -> Option<SystemTime> {
|
||||||
|
self.last_scan_time
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_full_scan_frequency(&self) -> Option<Duration> {
|
||||||
|
self.full_scan_frequency
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, key: &String) -> Option<&Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.get(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.map(|cache_entry| &cache_entry.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_or_insert<F>(&mut self, key: String, factory: F) -> &Bytes
|
||||||
|
where
|
||||||
|
F: Fn() -> Bytes,
|
||||||
|
{
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
match self.cache_table.entry(key) {
|
||||||
|
Entry::Occupied(mut occupied) => {
|
||||||
|
if occupied.get().is_expired(now) {
|
||||||
|
occupied.insert(CacheEntry::new(factory(), self.entry_lifetime));
|
||||||
|
}
|
||||||
|
|
||||||
|
&occupied.into_mut().value
|
||||||
|
}
|
||||||
|
Entry::Vacant(vacant) => {
|
||||||
|
&vacant
|
||||||
|
.insert(CacheEntry::new(factory(), self.entry_lifetime))
|
||||||
|
.value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(&mut self, key: String, value: Bytes) -> Option<Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
if value.len() + self.length > self.capacity {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.length += value.len();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.insert(key, CacheEntry::new(value, self.entry_lifetime))
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.map(|cache_entry| cache_entry.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove(&mut self, key: &String) -> Option<Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
let mut removed_len: usize = 0;
|
||||||
|
let result = self
|
||||||
|
.cache_table
|
||||||
|
.remove(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.and_then(|o| {
|
||||||
|
removed_len += o.value.len();
|
||||||
|
return Some(o);
|
||||||
|
})
|
||||||
|
.map(|cache_entry| cache_entry.value);
|
||||||
|
self.length -= removed_len;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn renew(&mut self, key: &String) -> Option<()> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
let entry = self.cache_table.get_mut(key);
|
||||||
|
|
||||||
|
match entry {
|
||||||
|
Some(entry) => {
|
||||||
|
entry.expiration_time = now + self.entry_lifetime;
|
||||||
|
|
||||||
|
Some(())
|
||||||
|
}
|
||||||
|
None => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_full_scan_expired_items(&mut self, current_time: SystemTime) {
|
||||||
|
if let Some(full_scan_frequency) = self.full_scan_frequency {
|
||||||
|
let since = current_time
|
||||||
|
.duration_since(self.last_scan_time.unwrap_or(self.created_time))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if since >= full_scan_frequency {
|
||||||
|
let mut removed_len = 0;
|
||||||
|
self.cache_table.retain(|_, cache_entry| {
|
||||||
|
if cache_entry.is_expired(current_time) {
|
||||||
|
removed_len += cache_entry.value.len();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
self.length -= removed_len;
|
||||||
|
|
||||||
|
self.last_scan_time = Some(current_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
77
flake.lock
77
flake.lock
|
@ -1,77 +0,0 @@
|
||||||
{
|
|
||||||
"nodes": {
|
|
||||||
"crane": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1748047550,
|
|
||||||
"narHash": "sha256-t0qLLqb4C1rdtiY8IFRH5KIapTY/n3Lqt57AmxEv9mk=",
|
|
||||||
"owner": "ipetkov",
|
|
||||||
"repo": "crane",
|
|
||||||
"rev": "b718a78696060df6280196a6f992d04c87a16aef",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "ipetkov",
|
|
||||||
"repo": "crane",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"flake-utils": {
|
|
||||||
"inputs": {
|
|
||||||
"systems": "systems"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1731533236,
|
|
||||||
"narHash": "sha256-l0KFg5HjrsfsO/JpG+r7fRrqm12kzFHyUHqHCVpMMbI=",
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"rev": "11707dc2f618dd54ca8739b309ec4fc024de578b",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nixpkgs": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1747958103,
|
|
||||||
"narHash": "sha256-qmmFCrfBwSHoWw7cVK4Aj+fns+c54EBP8cGqp/yK410=",
|
|
||||||
"owner": "NixOS",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"rev": "fe51d34885f7b5e3e7b59572796e1bcb427eccb1",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "NixOS",
|
|
||||||
"ref": "nixpkgs-unstable",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": {
|
|
||||||
"inputs": {
|
|
||||||
"crane": "crane",
|
|
||||||
"flake-utils": "flake-utils",
|
|
||||||
"nixpkgs": "nixpkgs"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"systems": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1681028828,
|
|
||||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": "root",
|
|
||||||
"version": 7
|
|
||||||
}
|
|
244
flake.nix
244
flake.nix
|
@ -1,244 +0,0 @@
|
||||||
{
|
|
||||||
description = "breeze file server";
|
|
||||||
|
|
||||||
inputs = {
|
|
||||||
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
|
||||||
crane.url = "github:ipetkov/crane";
|
|
||||||
flake-utils.url = "github:numtide/flake-utils";
|
|
||||||
};
|
|
||||||
|
|
||||||
outputs = {
|
|
||||||
self,
|
|
||||||
nixpkgs,
|
|
||||||
crane,
|
|
||||||
flake-utils,
|
|
||||||
...
|
|
||||||
}:
|
|
||||||
flake-utils.lib.eachDefaultSystem (system: let
|
|
||||||
pkgs = nixpkgs.legacyPackages.${system};
|
|
||||||
|
|
||||||
craneLib = crane.mkLib pkgs;
|
|
||||||
|
|
||||||
# Common arguments can be set here to avoid repeating them later
|
|
||||||
# Note: changes here will rebuild all dependency crates
|
|
||||||
commonArgs = {
|
|
||||||
src = craneLib.cleanCargoSource ./.;
|
|
||||||
strictDeps = true;
|
|
||||||
|
|
||||||
buildInputs =
|
|
||||||
[
|
|
||||||
pkgs.openssl
|
|
||||||
]
|
|
||||||
++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
|
|
||||||
# Additional darwin specific inputs can be set here
|
|
||||||
pkgs.libiconv
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
breeze = craneLib.buildPackage (commonArgs
|
|
||||||
// {
|
|
||||||
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
|
||||||
|
|
||||||
# Additional environment variables or build phases/hooks can be set
|
|
||||||
# here *without* rebuilding all dependency crates
|
|
||||||
# MY_CUSTOM_VAR = "some value";
|
|
||||||
});
|
|
||||||
in {
|
|
||||||
checks = {
|
|
||||||
inherit breeze;
|
|
||||||
};
|
|
||||||
|
|
||||||
packages.default = breeze;
|
|
||||||
|
|
||||||
apps.default = flake-utils.lib.mkApp {
|
|
||||||
drv = breeze;
|
|
||||||
};
|
|
||||||
|
|
||||||
devShells.default = craneLib.devShell {
|
|
||||||
# Inherit inputs from checks.
|
|
||||||
checks = self.checks.${system};
|
|
||||||
|
|
||||||
# Additional dev-shell environment variables can be set directly
|
|
||||||
# MY_CUSTOM_DEVELOPMENT_VAR = "something else";
|
|
||||||
|
|
||||||
# Extra inputs can be added here; cargo and rustc are provided by default.
|
|
||||||
packages = with pkgs; [
|
|
||||||
alejandra
|
|
||||||
rewrk
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
nixosModules.breeze = {
|
|
||||||
config,
|
|
||||||
pkgs,
|
|
||||||
lib,
|
|
||||||
...
|
|
||||||
}:
|
|
||||||
with lib; let
|
|
||||||
cfg = config.services.breeze;
|
|
||||||
settingsFormat = pkgs.formats.toml {};
|
|
||||||
defaultUser = "breeze";
|
|
||||||
defaultGroup = "breeze";
|
|
||||||
in {
|
|
||||||
options = {
|
|
||||||
services.breeze = {
|
|
||||||
enable = mkEnableOption "breeze file server";
|
|
||||||
|
|
||||||
package = mkOption {
|
|
||||||
type = types.package;
|
|
||||||
default = breeze;
|
|
||||||
description = "Package for `breeze` to use";
|
|
||||||
};
|
|
||||||
|
|
||||||
user = mkOption {
|
|
||||||
type = types.str;
|
|
||||||
default = defaultUser;
|
|
||||||
description = "User that `breeze` will run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
group = mkOption {
|
|
||||||
type = types.str;
|
|
||||||
default = defaultGroup;
|
|
||||||
description = "Group that `breeze` will run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
extraGroups = mkOption {
|
|
||||||
type = types.listOf types.str;
|
|
||||||
default = [];
|
|
||||||
description = "Any supplementary groups for `breeze` to run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
settings = mkOption {
|
|
||||||
type = settingsFormat.type;
|
|
||||||
default = {};
|
|
||||||
description = ''
|
|
||||||
The *.toml configuration to run `breeze` with.
|
|
||||||
The options aren't formally documented, but the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md) provides examples.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
configDir = mkOption {
|
|
||||||
type = types.path;
|
|
||||||
default = "/etc/breeze";
|
|
||||||
description = ''
|
|
||||||
The directory on disk to store the `breeze` config file in.
|
|
||||||
This does not load pre-existing config files, it only defines where the generated config is saved.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
uploadKeyFile = mkOption {
|
|
||||||
type = types.nullOr types.path;
|
|
||||||
default = null;
|
|
||||||
description = ''
|
|
||||||
File to load the `engine.upload_key` from, if desired.
|
|
||||||
This is useful for loading it from a secret management system.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
deletionSecretFile = mkOption {
|
|
||||||
type = types.nullOr types.path;
|
|
||||||
default = null;
|
|
||||||
description = ''
|
|
||||||
File to load the `engine.deletion_secret` from, if desired.
|
|
||||||
This is useful for loading it from a secret management system.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
config = mkIf cfg.enable {
|
|
||||||
users.users = mkIf (cfg.user == defaultUser) {
|
|
||||||
${cfg.user} = {
|
|
||||||
isSystemUser = true;
|
|
||||||
inherit (cfg) group;
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
users.groups = mkIf (cfg.group == defaultGroup) {
|
|
||||||
${cfg.group} = {};
|
|
||||||
};
|
|
||||||
|
|
||||||
systemd.tmpfiles.rules = [
|
|
||||||
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
|
||||||
];
|
|
||||||
|
|
||||||
services.breeze.settings = mkMerge [
|
|
||||||
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
|
||||||
(mkIf (cfg.deletionSecretFile != null) {engine.deletion_secret = "@DELETION_SECRET@";})
|
|
||||||
];
|
|
||||||
|
|
||||||
systemd.services.breeze = let
|
|
||||||
cfgFile = "${cfg.configDir}/breeze.toml";
|
|
||||||
in {
|
|
||||||
description = "breeze file server";
|
|
||||||
after = ["local-fs.target" "network.target"];
|
|
||||||
wantedBy = ["multi-user.target"];
|
|
||||||
|
|
||||||
preStart =
|
|
||||||
''
|
|
||||||
install -m 660 ${settingsFormat.generate "breeze.toml" cfg.settings} ${cfgFile}
|
|
||||||
''
|
|
||||||
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
|
||||||
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
|
||||||
''
|
|
||||||
+ lib.optionalString (cfg.deletionSecretFile != null) ''
|
|
||||||
${pkgs.replace-secret}/bin/replace-secret '@DELETION_SECRET@' "${cfg.deletionSecretFile}" ${cfgFile}
|
|
||||||
'';
|
|
||||||
|
|
||||||
serviceConfig = rec {
|
|
||||||
User = cfg.user;
|
|
||||||
Group = cfg.group;
|
|
||||||
DynamicUser = false; # we write files, so don't do that
|
|
||||||
SupplementaryGroups = cfg.extraGroups;
|
|
||||||
StateDirectory = "breeze";
|
|
||||||
CacheDirectory = "breeze";
|
|
||||||
ExecStart = escapeShellArgs [
|
|
||||||
"${cfg.package}/bin/breeze"
|
|
||||||
"--config"
|
|
||||||
cfgFile
|
|
||||||
];
|
|
||||||
Restart = "on-failure";
|
|
||||||
|
|
||||||
# Security Options #
|
|
||||||
|
|
||||||
NoNewPrivileges = true; # implied by DynamicUser
|
|
||||||
RemoveIPC = true; # implied by DynamicUser
|
|
||||||
|
|
||||||
AmbientCapabilities = "";
|
|
||||||
CapabilityBoundingSet = "";
|
|
||||||
|
|
||||||
DeviceAllow = "";
|
|
||||||
|
|
||||||
LockPersonality = true;
|
|
||||||
|
|
||||||
PrivateTmp = true; # implied by DynamicUser
|
|
||||||
PrivateDevices = true;
|
|
||||||
PrivateUsers = true;
|
|
||||||
|
|
||||||
ProtectClock = true;
|
|
||||||
ProtectControlGroups = true;
|
|
||||||
ProtectHostname = true;
|
|
||||||
ProtectKernelLogs = true;
|
|
||||||
ProtectKernelModules = true;
|
|
||||||
ProtectKernelTunables = true;
|
|
||||||
|
|
||||||
RestrictNamespaces = true;
|
|
||||||
RestrictAddressFamilies = ["AF_INET" "AF_INET6" "AF_UNIX"];
|
|
||||||
RestrictRealtime = true;
|
|
||||||
RestrictSUIDSGID = true; # implied by DynamicUser
|
|
||||||
|
|
||||||
SystemCallArchitectures = "native";
|
|
||||||
SystemCallErrorNumber = "EPERM";
|
|
||||||
SystemCallFilter = [
|
|
||||||
"@system-service"
|
|
||||||
"~@keyring"
|
|
||||||
"~@memlock"
|
|
||||||
"~@privileged"
|
|
||||||
"~@setuid"
|
|
||||||
];
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
258
src/cache.rs
258
src/cache.rs
|
@ -1,258 +0,0 @@
|
||||||
use std::{
|
|
||||||
sync::atomic::{AtomicUsize, Ordering},
|
|
||||||
time::{Duration, SystemTime},
|
|
||||||
};
|
|
||||||
|
|
||||||
use atomic_time::AtomicSystemTime;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use dashmap::{mapref::one::Ref, DashMap};
|
|
||||||
use tokio::time;
|
|
||||||
|
|
||||||
use crate::config;
|
|
||||||
|
|
||||||
/// An entry stored in the cache.
|
|
||||||
///
|
|
||||||
/// It contains basic metadata and the actual value.
|
|
||||||
pub struct Entry {
|
|
||||||
/// The data held
|
|
||||||
value: Bytes,
|
|
||||||
|
|
||||||
/// The last time this entry was read/written
|
|
||||||
last_used: AtomicSystemTime,
|
|
||||||
|
|
||||||
/// Whether or not `last_used` should be updated
|
|
||||||
update_used: bool,
|
|
||||||
|
|
||||||
/// How long the entry should last
|
|
||||||
lifetime: Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Entry {
|
|
||||||
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
|
|
||||||
let now = AtomicSystemTime::now();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
value,
|
|
||||||
last_used: now,
|
|
||||||
update_used,
|
|
||||||
lifetime,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn last_used(&self) -> SystemTime {
|
|
||||||
self.last_used.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_expired(&self) -> bool {
|
|
||||||
match self.last_used().elapsed() {
|
|
||||||
Ok(d) => d >= self.lifetime,
|
|
||||||
Err(_) => false, // now > last_used
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
|
|
||||||
///
|
|
||||||
/// It is designed to keep memory usage low.
|
|
||||||
pub struct Cache {
|
|
||||||
/// Where elements are stored
|
|
||||||
map: DashMap<String, Entry>,
|
|
||||||
|
|
||||||
/// Total length of data stored in cache currently
|
|
||||||
length: AtomicUsize,
|
|
||||||
|
|
||||||
/// How should it behave
|
|
||||||
cfg: config::CacheConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Cache {
|
|
||||||
pub fn with_config(cfg: config::CacheConfig) -> Self {
|
|
||||||
Self {
|
|
||||||
map: DashMap::with_capacity(64),
|
|
||||||
length: AtomicUsize::new(0),
|
|
||||||
|
|
||||||
cfg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Figure out who should be bumped out of cache next
|
|
||||||
fn next_out(&self, length: usize) -> Vec<String> {
|
|
||||||
let mut sorted: Vec<_> = self.map.iter().collect();
|
|
||||||
|
|
||||||
// Sort by least recently used
|
|
||||||
sorted.sort_unstable_by_key(|e| e.last_used());
|
|
||||||
|
|
||||||
// Total bytes we would be removing
|
|
||||||
let mut total = 0;
|
|
||||||
|
|
||||||
// Pull entries until we have enough free space
|
|
||||||
sorted
|
|
||||||
.iter()
|
|
||||||
.take_while(|e| {
|
|
||||||
let need_more = total < length;
|
|
||||||
|
|
||||||
if need_more {
|
|
||||||
total += e.value.len();
|
|
||||||
}
|
|
||||||
|
|
||||||
need_more
|
|
||||||
})
|
|
||||||
.map(|e| e.key().clone())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove an element from the cache
|
|
||||||
///
|
|
||||||
/// Returns: [`Some`] if successful, [`None`] if element not found
|
|
||||||
pub fn remove(&self, key: &str) -> Option<()> {
|
|
||||||
// Skip expiry checks, we are removing it anyways
|
|
||||||
// And also that could cause an infinite loop which would be pretty stupid.
|
|
||||||
let e = self.map.get(key)?;
|
|
||||||
|
|
||||||
// Atomically subtract from the total cache length
|
|
||||||
self.length.fetch_sub(e.value.len(), Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Drop the entry lock so we can actually remove it
|
|
||||||
drop(e);
|
|
||||||
|
|
||||||
// Remove from map
|
|
||||||
self.map.remove(key);
|
|
||||||
|
|
||||||
Some(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a new element to the cache with a specified lifetime.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
|
||||||
pub fn add_with_lifetime(
|
|
||||||
&self,
|
|
||||||
key: &str,
|
|
||||||
value: Bytes,
|
|
||||||
lifetime: Duration,
|
|
||||||
is_renewable: bool,
|
|
||||||
) -> bool {
|
|
||||||
let e = Entry::new(value, lifetime, is_renewable);
|
|
||||||
|
|
||||||
let len = e.value.len();
|
|
||||||
let cur_total = self.length.load(Ordering::Relaxed);
|
|
||||||
let new_total = cur_total + len;
|
|
||||||
|
|
||||||
if new_total > self.cfg.mem_capacity {
|
|
||||||
// How far we went above the limit
|
|
||||||
let needed = new_total - self.cfg.mem_capacity;
|
|
||||||
|
|
||||||
self.next_out(needed).iter().for_each(|k| {
|
|
||||||
// Remove the element, and ignore the result
|
|
||||||
// The only reason it should be failing is if it couldn't find it,
|
|
||||||
// in which case it was already removed
|
|
||||||
self.remove(k);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomically add to total cached data length
|
|
||||||
self.length.fetch_add(len, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Add to the map, return true if we didn't replace anything
|
|
||||||
self.map.insert(key.to_string(), e).is_none()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a new element to the cache with the default lifetime.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
|
||||||
pub fn add(&self, key: &str, value: Bytes) -> bool {
|
|
||||||
self.add_with_lifetime(key, value, self.cfg.upload_lifetime, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Internal function for retrieving entries.
|
|
||||||
///
|
|
||||||
/// Returns: same as [`DashMap::get`], for our purposes
|
|
||||||
///
|
|
||||||
/// It exists so we can run the expiry check before
|
|
||||||
/// actually working with any entries, so no weird bugs happen
|
|
||||||
fn get_(&self, key: &str) -> Option<Ref<String, Entry>> {
|
|
||||||
let e = self.map.get(key)?;
|
|
||||||
|
|
||||||
// if the entry is expired get rid of it now
|
|
||||||
if e.is_expired() {
|
|
||||||
// drop the reference so we don't deadlock
|
|
||||||
drop(e);
|
|
||||||
|
|
||||||
// remove it
|
|
||||||
self.remove(key);
|
|
||||||
|
|
||||||
// and say we never had it
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(e)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get an item from the cache, if it exists.
|
|
||||||
pub fn get(&self, key: &str) -> Option<Bytes> {
|
|
||||||
let e = self.get_(key)?;
|
|
||||||
|
|
||||||
if e.update_used {
|
|
||||||
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(e.value.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if we have an item in cache.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if key exists, `false` if it doesn't
|
|
||||||
///
|
|
||||||
/// We don't use [`DashMap::contains_key`] here because it would just do
|
|
||||||
/// the exact same thing I do here, but without running the expiry check logic
|
|
||||||
pub fn has(&self, key: &str) -> bool {
|
|
||||||
self.get_(key).is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns if an upload is able to be cached
|
|
||||||
/// with the current caching rules
|
|
||||||
#[inline(always)]
|
|
||||||
pub fn will_use(&self, length: u64) -> bool {
|
|
||||||
length <= self.cfg.max_length
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The background job that scans through the cache and removes inactive elements.
|
|
||||||
///
|
|
||||||
/// TODO: see if this is actually less expensive than
|
|
||||||
/// letting each entry keep track of expiry with its own task
|
|
||||||
pub async fn scanner(&self) {
|
|
||||||
let mut interval = time::interval(self.cfg.scan_freq);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// We put this first so that it doesn't scan the instant the server starts
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
// Save current timestamp so we aren't retrieving it constantly
|
|
||||||
// If we don't do this it'll be a LOT of system api calls
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
// Collect a list of all the expired keys
|
|
||||||
// If we fail to compare the times, it gets added to the list anyways
|
|
||||||
let expired: Vec<_> = self
|
|
||||||
.map
|
|
||||||
.iter()
|
|
||||||
.filter_map(|e| {
|
|
||||||
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
|
||||||
let is_expired = elapsed >= e.lifetime;
|
|
||||||
|
|
||||||
if is_expired {
|
|
||||||
Some(e.key().clone())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// If we have any, lock the map and drop all of them
|
|
||||||
if !expired.is_empty() {
|
|
||||||
// Use a retain call, should be less locks that way
|
|
||||||
// (instead of many remove calls)
|
|
||||||
self.map.retain(|k, _| !expired.contains(k));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -15,7 +15,6 @@ fn default_motd() -> String {
|
||||||
"breeze file server (v%version%) - currently hosting %uplcount% files".to_string()
|
"breeze file server (v%version%) - currently hosting %uplcount% files".to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct EngineConfig {
|
pub struct EngineConfig {
|
||||||
/// The url that the instance of breeze is meant to be accessed from.
|
/// The url that the instance of breeze is meant to be accessed from.
|
||||||
|
@ -23,54 +22,30 @@ pub struct EngineConfig {
|
||||||
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
|
|
||||||
|
/// Location on disk the uploads are to be saved to
|
||||||
|
pub save_path: PathBuf,
|
||||||
|
|
||||||
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub upload_key: String,
|
pub upload_key: String,
|
||||||
|
|
||||||
/// Secret key to use when generating or verifying deletion tokens.
|
|
||||||
/// Leave blank to disable.
|
|
||||||
///
|
|
||||||
/// If this secret is leaked, anyone can delete any file. Be careful!!!
|
|
||||||
pub deletion_secret: Option<String>,
|
|
||||||
|
|
||||||
/// Configuration for disk system
|
|
||||||
pub disk: DiskConfig,
|
|
||||||
|
|
||||||
/// Configuration for cache system
|
/// Configuration for cache system
|
||||||
pub cache: CacheConfig,
|
pub cache: CacheConfig,
|
||||||
|
|
||||||
/// Maximum size of an upload that will be accepted.
|
|
||||||
/// Files above this size can not be uploaded.
|
|
||||||
pub max_upload_len: Option<u64>,
|
|
||||||
|
|
||||||
/// Maximum lifetime of a temporary upload
|
|
||||||
#[serde_as(as = "DurationSeconds")]
|
|
||||||
pub max_temp_lifetime: Duration,
|
|
||||||
|
|
||||||
/// Maximum length (in bytes) a file can be before the server will
|
|
||||||
/// decide not to remove its EXIF data.
|
|
||||||
pub max_strip_len: u64,
|
|
||||||
|
|
||||||
/// Motd displayed when the server's index page is visited.
|
/// Motd displayed when the server's index page is visited.
|
||||||
///
|
///
|
||||||
/// This isn't explicitly engine-related but the engine is what gets passed to routes,
|
/// This isn't explicitly engine-related but the engine is what gets passed to routes,
|
||||||
/// so it is here for now.
|
/// so it is here for now.
|
||||||
#[serde(default = "default_motd")]
|
#[serde(default = "default_motd")]
|
||||||
pub motd: String,
|
pub motd: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Clone)]
|
|
||||||
pub struct DiskConfig {
|
|
||||||
/// Location on disk the uploads are to be saved to
|
|
||||||
pub save_path: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Deserialize, Clone)]
|
#[derive(Deserialize)]
|
||||||
pub struct CacheConfig {
|
pub struct CacheConfig {
|
||||||
/// The maximum length in bytes that a file can be
|
/// The maximum length in bytes that a file can be
|
||||||
/// before it skips cache (in seconds)
|
/// before it skips cache (in seconds)
|
||||||
pub max_length: u64,
|
pub max_length: usize,
|
||||||
|
|
||||||
/// The amount of time a file can last inside the cache (in seconds)
|
/// The amount of time a file can last inside the cache (in seconds)
|
||||||
#[serde_as(as = "DurationSeconds")]
|
#[serde_as(as = "DurationSeconds")]
|
||||||
|
@ -87,7 +62,6 @@ pub struct CacheConfig {
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct HttpConfig {
|
pub struct HttpConfig {
|
||||||
/// The IP address the HTTP server should listen on
|
|
||||||
pub listen_on: String,
|
pub listen_on: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -101,7 +75,7 @@ pub struct LoggerConfig {
|
||||||
/// Minimum level a log must be for it to be shown.
|
/// Minimum level a log must be for it to be shown.
|
||||||
/// This defaults to "warn" if not specified.
|
/// This defaults to "warn" if not specified.
|
||||||
#[serde_as(as = "DisplayFromStr")]
|
#[serde_as(as = "DisplayFromStr")]
|
||||||
// yes... kind of a hack but serde doesn't have anything better
|
|
||||||
#[serde(default = "default_level_filter")]
|
#[serde(default = "default_level_filter")]
|
||||||
|
// yes... kind of a hack but serde doesn't have anything better
|
||||||
pub level: LevelFilter,
|
pub level: LevelFilter,
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,89 +0,0 @@
|
||||||
use std::sync::{Arc, atomic::Ordering};
|
|
||||||
|
|
||||||
use axum::extract::{Query, State};
|
|
||||||
use base64::{Engine, prelude::BASE64_URL_SAFE_NO_PAD};
|
|
||||||
use bytes::{Buf, BytesMut};
|
|
||||||
use hmac::Mac;
|
|
||||||
use http::StatusCode;
|
|
||||||
use serde::Deserialize;
|
|
||||||
|
|
||||||
use crate::engine::update_hmac;
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct DeleteRequest {
|
|
||||||
name: String,
|
|
||||||
hash: String,
|
|
||||||
hmac: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete(
|
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
|
||||||
Query(req): Query<DeleteRequest>,
|
|
||||||
) -> (StatusCode, &'static str) {
|
|
||||||
let Some(mut hmac) = engine.deletion_hmac.clone() else {
|
|
||||||
return (StatusCode::CONFLICT, "Deletion is not enabled");
|
|
||||||
};
|
|
||||||
|
|
||||||
// -- decode provided data
|
|
||||||
|
|
||||||
// decode user-given hmac
|
|
||||||
let Ok(provided_hmac) = BASE64_URL_SAFE_NO_PAD.decode(req.hmac) else {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Could not decode hmac");
|
|
||||||
};
|
|
||||||
|
|
||||||
// decode hash from base64
|
|
||||||
let Ok(mut provided_hash_data) = BASE64_URL_SAFE_NO_PAD
|
|
||||||
.decode(req.hash)
|
|
||||||
.map(|v| BytesMut::from(&v[..]))
|
|
||||||
else {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Could not decode partial hash");
|
|
||||||
};
|
|
||||||
// read hash
|
|
||||||
if provided_hash_data.len() != 16 {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Partial hash length is invalid");
|
|
||||||
}
|
|
||||||
let provided_hash = provided_hash_data.get_u128();
|
|
||||||
|
|
||||||
// -- verify it
|
|
||||||
|
|
||||||
// check if info is valid
|
|
||||||
let is_hmac_valid = {
|
|
||||||
// update hmad
|
|
||||||
update_hmac(&mut hmac, &req.name, provided_hash);
|
|
||||||
// verify..
|
|
||||||
hmac.verify_slice(&provided_hmac).is_ok()
|
|
||||||
};
|
|
||||||
if !is_hmac_valid {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Hmac is invalid");
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- ensure hash matches
|
|
||||||
|
|
||||||
// okay, now check if we compute the same hash as the req
|
|
||||||
// this makes sure it's (probably) the same file
|
|
||||||
let actual_hash = match engine.get_hash(&req.name).await {
|
|
||||||
Ok(Some(h)) => h,
|
|
||||||
Ok(None) => return (StatusCode::NOT_FOUND, "File not found"),
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!(%err, "failed to get hash");
|
|
||||||
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error!!");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// compare
|
|
||||||
if provided_hash != actual_hash {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Partial hash did not match");
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- delete file
|
|
||||||
|
|
||||||
// everything seems okay so try to delete
|
|
||||||
if let Err(err) = engine.remove(&req.name).await {
|
|
||||||
tracing::error!(%err, "failed to delete upload");
|
|
||||||
return (StatusCode::INTERNAL_SERVER_ERROR, "Delete failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
// decrement upload count
|
|
||||||
engine.upl_count.fetch_sub(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
(StatusCode::OK, "Deleted successfully!")
|
|
||||||
}
|
|
99
src/disk.rs
99
src/disk.rs
|
@ -1,99 +0,0 @@
|
||||||
use std::path::{Path, PathBuf};
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::{
|
|
||||||
fs::File,
|
|
||||||
io::{self, AsyncWriteExt},
|
|
||||||
sync::mpsc,
|
|
||||||
};
|
|
||||||
use tracing::debug;
|
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use crate::config;
|
|
||||||
|
|
||||||
/// Provides an API to access the disk file store
|
|
||||||
/// like we access the cache.
|
|
||||||
pub struct Disk {
|
|
||||||
cfg: config::DiskConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Disk {
|
|
||||||
pub fn with_config(cfg: config::DiskConfig) -> Self {
|
|
||||||
Self { cfg }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Counts the number of files saved to disk we have
|
|
||||||
pub fn count(&self) -> usize {
|
|
||||||
WalkDir::new(&self.cfg.save_path)
|
|
||||||
.min_depth(1)
|
|
||||||
.into_iter()
|
|
||||||
.count()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Formats the path on disk for a `saved_name`.
|
|
||||||
fn path_for(&self, saved_name: &str) -> PathBuf {
|
|
||||||
// try to prevent path traversal by ignoring everything except the file name
|
|
||||||
let name = Path::new(saved_name).file_name().unwrap_or_default();
|
|
||||||
|
|
||||||
let mut p: PathBuf = self.cfg.save_path.clone();
|
|
||||||
p.push(name);
|
|
||||||
|
|
||||||
p
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to open a file on disk, and if we didn't find it,
|
|
||||||
/// then return [`None`].
|
|
||||||
pub async fn open(&self, saved_name: &str) -> io::Result<Option<File>> {
|
|
||||||
let p = self.path_for(saved_name);
|
|
||||||
|
|
||||||
match File::open(p).await {
|
|
||||||
Ok(f) => Ok(Some(f)),
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
io::ErrorKind::NotFound => Ok(None),
|
|
||||||
_ => Err(e)?, // some other error, send it back
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the size of an upload's file
|
|
||||||
pub async fn len(&self, f: &File) -> io::Result<u64> {
|
|
||||||
Ok(f.metadata().await?.len())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove an upload from disk.
|
|
||||||
pub async fn remove(&self, saved_name: &str) -> io::Result<()> {
|
|
||||||
let p = self.path_for(saved_name);
|
|
||||||
|
|
||||||
tokio::fs::remove_file(p).await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a background I/O task
|
|
||||||
pub fn start_save(&self, saved_name: &str) -> mpsc::UnboundedSender<Bytes> {
|
|
||||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
|
||||||
let (tx, mut rx): (mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>) =
|
|
||||||
mpsc::unbounded_channel();
|
|
||||||
|
|
||||||
let p = self.path_for(saved_name);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// create file to save upload to
|
|
||||||
let file = File::create(p).await;
|
|
||||||
|
|
||||||
if let Err(err) = file {
|
|
||||||
tracing::error!(%err, "could not open file! make sure your upload path is valid");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
let mut file = file.unwrap();
|
|
||||||
|
|
||||||
// receive chunks and save them to file
|
|
||||||
while let Some(chunk) = rx.recv().await {
|
|
||||||
debug!("writing chunk to disk (length: {})", chunk.len());
|
|
||||||
if let Err(err) = file.write_all(&chunk).await {
|
|
||||||
tracing::error!(%err, "error while writing file to disk");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tx
|
|
||||||
}
|
|
||||||
}
|
|
708
src/engine.rs
708
src/engine.rs
|
@ -1,439 +1,176 @@
|
||||||
use std::{
|
use std::{
|
||||||
io::SeekFrom,
|
ffi::OsStr,
|
||||||
ops::Bound,
|
path::{Path, PathBuf},
|
||||||
sync::{
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
Arc,
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
|
||||||
},
|
|
||||||
time::Duration,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
use axum::body::BodyDataStream;
|
use archived::Archive;
|
||||||
use base64::{Engine as _, prelude::BASE64_URL_SAFE_NO_PAD};
|
use axum::extract::BodyStream;
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use color_eyre::eyre::{self, WrapErr};
|
use rand::Rng;
|
||||||
use hmac::Mac;
|
|
||||||
use img_parts::{DynImage, ImageEXIF};
|
|
||||||
use rand::distr::{Alphanumeric, SampleString};
|
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{AsyncReadExt, AsyncSeekExt},
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
sync::{
|
||||||
|
mpsc::{self, Receiver, Sender},
|
||||||
|
RwLock,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
use twox_hash::XxHash3_128;
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::{cache, config, disk};
|
use crate::{
|
||||||
|
config,
|
||||||
|
view::{ViewError, ViewSuccess},
|
||||||
|
};
|
||||||
|
|
||||||
/// Various forms of upload data that can be sent to the client
|
/// breeze engine! this is the core of everything
|
||||||
pub enum UploadData {
|
|
||||||
/// Send back the data from memory
|
|
||||||
Cache(Bytes),
|
|
||||||
/// Stream the file from disk to the client
|
|
||||||
Disk(tokio::io::Take<File>),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Upload data and metadata needed to build a view response
|
|
||||||
pub struct UploadResponse {
|
|
||||||
pub full_len: u64,
|
|
||||||
pub range: (u64, u64),
|
|
||||||
pub data: UploadData,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Non-error outcomes of an [`Engine::process`] call.
|
|
||||||
/// Some are rejections.
|
|
||||||
pub enum ProcessOutcome {
|
|
||||||
/// The upload was successful.
|
|
||||||
/// We give the user their file's URL (and deletion URL if one was created)
|
|
||||||
Success {
|
|
||||||
url: String,
|
|
||||||
deletion_url: Option<String>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Occurs when an upload exceeds the chosen maximum file size.
|
|
||||||
UploadTooLarge,
|
|
||||||
|
|
||||||
/// Occurs when a temporary upload is too big to fit in the cache.
|
|
||||||
TemporaryUploadTooLarge,
|
|
||||||
|
|
||||||
/// Occurs when the user-given lifetime is longer than we will allow
|
|
||||||
TemporaryUploadLifetimeTooLong,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Non-error outcomes of an [`Engine::get`] call.
|
|
||||||
pub enum GetOutcome {
|
|
||||||
/// Successfully read upload.
|
|
||||||
Success(UploadResponse),
|
|
||||||
|
|
||||||
/// The upload was not found anywhere
|
|
||||||
NotFound,
|
|
||||||
|
|
||||||
/// A range was requested that exceeds an upload's bounds
|
|
||||||
RangeNotSatisfiable,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Type alias to make using HMAC SHA256 easier
|
|
||||||
type HmacSha256 = hmac::Hmac<sha2::Sha256>;
|
|
||||||
|
|
||||||
/// breeze engine
|
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
/// Cached count of uploaded files
|
/// The in-memory cache that cached uploads are stored in.
|
||||||
|
cache: RwLock<Archive>,
|
||||||
|
|
||||||
|
/// Cached count of uploaded files.
|
||||||
pub upl_count: AtomicUsize,
|
pub upl_count: AtomicUsize,
|
||||||
|
|
||||||
/// Engine configuration
|
/// Engine configuration
|
||||||
pub cfg: config::EngineConfig,
|
pub cfg: config::EngineConfig,
|
||||||
|
|
||||||
/// HMAC state initialised with the deletion secret (if present)
|
|
||||||
pub deletion_hmac: Option<HmacSha256>,
|
|
||||||
|
|
||||||
/// The in-memory cache that cached uploads are stored in
|
|
||||||
cache: Arc<cache::Cache>,
|
|
||||||
|
|
||||||
/// An interface to the on-disk upload store
|
|
||||||
disk: disk::Disk,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to parse a `Range` header into an easier format to work with
|
|
||||||
fn resolve_range(range: Option<headers::Range>, full_len: u64) -> Option<(u64, u64)> {
|
|
||||||
let last_byte = full_len - 1;
|
|
||||||
|
|
||||||
let (start, end) =
|
|
||||||
if let Some((start, end)) = range.and_then(|r| r.satisfiable_ranges(full_len).next()) {
|
|
||||||
// satisfiable_ranges will never return Excluded so this is ok
|
|
||||||
let start = if let Bound::Included(start_incl) = start {
|
|
||||||
start_incl
|
|
||||||
} else {
|
|
||||||
0
|
|
||||||
};
|
|
||||||
let end = if let Bound::Included(end_incl) = end {
|
|
||||||
end_incl
|
|
||||||
} else {
|
|
||||||
last_byte
|
|
||||||
};
|
|
||||||
|
|
||||||
(start, end)
|
|
||||||
} else {
|
|
||||||
(0, last_byte)
|
|
||||||
};
|
|
||||||
|
|
||||||
// catch ranges we can't satisfy
|
|
||||||
if end > last_byte || start > end {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some((start, end))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Calculate HMAC of field values.
|
|
||||||
pub fn update_hmac(hmac: &mut HmacSha256, saved_name: &str, hash: u128) {
|
|
||||||
// mix deletion req fields into one buf
|
|
||||||
let mut field_bytes = BytesMut::new();
|
|
||||||
field_bytes.put(saved_name.as_bytes());
|
|
||||||
field_bytes.put_u128(hash);
|
|
||||||
|
|
||||||
// take the hmac
|
|
||||||
hmac.update(&field_bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// How many bytes of a file should be used for hash calculation.
|
|
||||||
const SAMPLE_WANTED_BYTES: usize = 32768;
|
|
||||||
|
|
||||||
/// Format some info about an upload and hash it
|
|
||||||
///
|
|
||||||
/// This should not change between versions!!
|
|
||||||
/// That would break deletion urls
|
|
||||||
fn calculate_hash(len: u64, data_sample: Bytes) -> u128 {
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
buf.put_u64(len);
|
|
||||||
buf.put(data_sample);
|
|
||||||
|
|
||||||
XxHash3_128::oneshot(&buf)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
/// Creates a new instance of the engine
|
/// Creates a new instance of the breeze engine.
|
||||||
pub fn with_config(cfg: config::EngineConfig) -> Self {
|
pub fn new(cfg: config::EngineConfig) -> Self {
|
||||||
let deletion_hmac = cfg
|
|
||||||
.deletion_secret
|
|
||||||
.as_ref()
|
|
||||||
.map(|s| HmacSha256::new_from_slice(s.as_bytes()).unwrap());
|
|
||||||
|
|
||||||
let cache = cache::Cache::with_config(cfg.cache.clone());
|
|
||||||
let disk = disk::Disk::with_config(cfg.disk.clone());
|
|
||||||
|
|
||||||
let cache = Arc::new(cache);
|
|
||||||
|
|
||||||
let cache_scanner = cache.clone();
|
|
||||||
tokio::spawn(async move { cache_scanner.scanner().await });
|
|
||||||
|
|
||||||
Self {
|
Self {
|
||||||
// initialise our cached upload count. this doesn't include temp uploads!
|
cache: RwLock::new(Archive::with_full_scan(
|
||||||
upl_count: AtomicUsize::new(disk.count()),
|
cfg.cache.scan_freq,
|
||||||
deletion_hmac,
|
cfg.cache.upload_lifetime,
|
||||||
|
cfg.cache.mem_capacity,
|
||||||
|
)),
|
||||||
|
upl_count: AtomicUsize::new(
|
||||||
|
WalkDir::new(&cfg.save_path)
|
||||||
|
.min_depth(1)
|
||||||
|
.into_iter()
|
||||||
|
.count(),
|
||||||
|
), // count the amount of files in the save path and initialise our cached count with it
|
||||||
|
|
||||||
cfg,
|
cfg,
|
||||||
|
|
||||||
cache,
|
|
||||||
disk,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch an upload.
|
/// Returns if an upload would be able to be cached
|
||||||
///
|
#[inline(always)]
|
||||||
/// This will first try to read from cache, and then disk after.
|
fn will_use_cache(&self, length: usize) -> bool {
|
||||||
/// If an upload is eligible to be cached, it will be cached and
|
length <= self.cfg.cache.max_length
|
||||||
/// sent back as a cache response instead of a disk response.
|
|
||||||
///
|
|
||||||
/// If there is a range, it is applied at the very end.
|
|
||||||
pub async fn get(
|
|
||||||
&self,
|
|
||||||
saved_name: &str,
|
|
||||||
range: Option<headers::Range>,
|
|
||||||
) -> eyre::Result<GetOutcome> {
|
|
||||||
let data = if let Some(u) = self.cache.get(saved_name) {
|
|
||||||
u
|
|
||||||
} else {
|
|
||||||
// now, check if we have it on disk
|
|
||||||
let Some(mut f) = self.disk.open(saved_name).await? else {
|
|
||||||
// file didn't exist
|
|
||||||
return Ok(GetOutcome::NotFound);
|
|
||||||
};
|
|
||||||
|
|
||||||
let full_len = self.disk.len(&f).await?;
|
|
||||||
|
|
||||||
// if possible, recache and send a cache response
|
|
||||||
// else, send a disk response
|
|
||||||
if self.cache.will_use(full_len) {
|
|
||||||
// read file from disk
|
|
||||||
let mut data = BytesMut::with_capacity(full_len.try_into()?);
|
|
||||||
|
|
||||||
// read file from disk and if it fails at any point, return 500
|
|
||||||
loop {
|
|
||||||
match f.read_buf(&mut data).await {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => Err(e)?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let data = data.freeze();
|
|
||||||
|
|
||||||
// re-insert it into cache
|
|
||||||
self.cache.add(saved_name, data.clone());
|
|
||||||
|
|
||||||
data
|
|
||||||
} else {
|
|
||||||
let Some((start, end)) = resolve_range(range, full_len) else {
|
|
||||||
return Ok(GetOutcome::RangeNotSatisfiable);
|
|
||||||
};
|
|
||||||
|
|
||||||
let range_len = (end - start) + 1;
|
|
||||||
|
|
||||||
f.seek(SeekFrom::Start(start)).await?;
|
|
||||||
let f = f.take(range_len);
|
|
||||||
|
|
||||||
let res = UploadResponse {
|
|
||||||
full_len,
|
|
||||||
range: (start, end),
|
|
||||||
data: UploadData::Disk(f),
|
|
||||||
};
|
|
||||||
return Ok(GetOutcome::Success(res));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let full_len = data.len() as u64;
|
|
||||||
let Some((start, end)) = resolve_range(range, full_len) else {
|
|
||||||
return Ok(GetOutcome::RangeNotSatisfiable);
|
|
||||||
};
|
|
||||||
|
|
||||||
// cut down to range
|
|
||||||
let data = data.slice((start as usize)..=(end as usize));
|
|
||||||
|
|
||||||
// build response
|
|
||||||
let res = UploadResponse {
|
|
||||||
full_len,
|
|
||||||
range: (start, end),
|
|
||||||
data: UploadData::Cache(data),
|
|
||||||
};
|
|
||||||
Ok(GetOutcome::Success(res))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if we have an upload stored anywhere.
|
/// Check if an upload exists in cache or on disk
|
||||||
///
|
pub async fn upload_exists(&self, path: &Path) -> bool {
|
||||||
/// This is only used to prevent `saved_name` collisions!!
|
let cache = self.cache.read().await;
|
||||||
/// It is not used to deliver "not found" errors.
|
|
||||||
pub async fn has(&self, saved_name: &str) -> bool {
|
// extract file name, since that's what cache uses
|
||||||
if self.cache.has(saved_name) {
|
let name = path
|
||||||
|
.file_name()
|
||||||
|
.and_then(OsStr::to_str)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// check in cache
|
||||||
|
if cache.contains_key(&name) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sidestep handling the error properly
|
// check on disk
|
||||||
// that way we can call this in gen_saved_name easier
|
if path.exists() {
|
||||||
if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
false
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to read a file and calculate a hash for it.
|
/// Generate a new save path for an upload.
|
||||||
pub async fn get_hash(&self, saved_name: &str) -> eyre::Result<Option<u128>> {
|
|
||||||
// readout sample data and full len
|
|
||||||
let (data_sample, len) = if let Some(full_data) = self.cache.get(saved_name) {
|
|
||||||
// we found it in cache! take as many bytes as we can
|
|
||||||
let taking = full_data.len().min(SAMPLE_WANTED_BYTES);
|
|
||||||
let data = full_data.slice(0..taking);
|
|
||||||
|
|
||||||
let len = full_data.len() as u64;
|
|
||||||
|
|
||||||
tracing::info!("data len is {}", data.len());
|
|
||||||
|
|
||||||
(data, len)
|
|
||||||
} else {
|
|
||||||
// not in cache, so try disk
|
|
||||||
let Some(mut f) = self.disk.open(saved_name).await? else {
|
|
||||||
// not found there either so we just dont have it
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
// find len..
|
|
||||||
let len = f.seek(SeekFrom::End(0)).await?;
|
|
||||||
f.rewind().await?;
|
|
||||||
|
|
||||||
// only take wanted # of bytes for read
|
|
||||||
let mut f = f.take(SAMPLE_WANTED_BYTES as u64);
|
|
||||||
|
|
||||||
// try to read
|
|
||||||
let mut data = Vec::with_capacity(SAMPLE_WANTED_BYTES);
|
|
||||||
f.read_to_end(&mut data).await?;
|
|
||||||
let data = Bytes::from(data);
|
|
||||||
|
|
||||||
(data, len)
|
|
||||||
};
|
|
||||||
|
|
||||||
// calculate hash
|
|
||||||
Ok(Some(calculate_hash(len, data_sample)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate a new saved name for an upload.
|
|
||||||
///
|
///
|
||||||
/// If it picks a name that already exists, it will try again.
|
/// This will call itself recursively if it picks
|
||||||
pub async fn gen_saved_name(&self, ext: Option<String>) -> String {
|
/// a name that's already used. (it is rare)
|
||||||
loop {
|
#[async_recursion::async_recursion]
|
||||||
// generate a 6-character alphanumeric string
|
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
|
||||||
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::rng(), 6);
|
// generate a 6-character alphanumeric string
|
||||||
|
let id: String = rand::thread_rng()
|
||||||
|
.sample_iter(&rand::distributions::Alphanumeric)
|
||||||
|
.take(6)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
// if we have an extension, add it now
|
// extract the extension from the original path
|
||||||
if let Some(ref ext) = ext {
|
let original_extension = original_path
|
||||||
saved_name.push('.');
|
.extension()
|
||||||
saved_name.push_str(ext);
|
.and_then(OsStr::to_str)
|
||||||
}
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
if !self.has(&saved_name).await {
|
// path on disk
|
||||||
break saved_name;
|
let mut path = self.cfg.save_path.clone();
|
||||||
} else {
|
path.push(&id);
|
||||||
// there was a name collision. loop and try again
|
path.set_extension(original_extension);
|
||||||
info!("name collision! saved_name= {}", saved_name);
|
|
||||||
}
|
if !self.upload_exists(&path).await {
|
||||||
|
path
|
||||||
|
} else {
|
||||||
|
// we had a name collision! try again..
|
||||||
|
self.gen_path(original_path).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wipe out an upload from all storage.
|
/// Process an upload.
|
||||||
///
|
/// This is called by the /new route.
|
||||||
/// (Intended for deletion URLs and failed uploads)
|
pub async fn process_upload(
|
||||||
pub async fn remove(&self, saved_name: &str) -> eyre::Result<()> {
|
|
||||||
info!(saved_name, "!! removing upload");
|
|
||||||
|
|
||||||
self.cache.remove(saved_name);
|
|
||||||
self.disk
|
|
||||||
.remove(saved_name)
|
|
||||||
.await
|
|
||||||
.wrap_err("failed to remove file from disk")?;
|
|
||||||
|
|
||||||
info!("!! successfully removed upload");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Save a file to disk, and optionally cache.
|
|
||||||
///
|
|
||||||
/// This also handles custom file lifetimes and EXIF data removal.
|
|
||||||
pub async fn save(
|
|
||||||
&self,
|
&self,
|
||||||
saved_name: &str,
|
path: PathBuf,
|
||||||
provided_len: u64,
|
name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path
|
||||||
mut use_cache: bool,
|
content_length: usize,
|
||||||
mut stream: BodyDataStream,
|
mut stream: BodyStream,
|
||||||
lifetime: Option<Duration>,
|
) {
|
||||||
keep_exif: bool,
|
// if the upload size is smaller than the specified maximum, we use the cache!
|
||||||
) -> eyre::Result<(Bytes, u64)> {
|
let mut use_cache = self.will_use_cache(content_length);
|
||||||
|
|
||||||
// if we're using cache, make some space to store the upload in
|
// if we're using cache, make some space to store the upload in
|
||||||
let mut data = if use_cache {
|
let mut data = if use_cache {
|
||||||
BytesMut::with_capacity(provided_len.try_into()?)
|
BytesMut::with_capacity(content_length)
|
||||||
} else {
|
} else {
|
||||||
BytesMut::new()
|
BytesMut::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
// don't begin a disk save if we're using temporary lifetimes
|
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||||
let tx = if lifetime.is_none() {
|
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(1);
|
||||||
Some(self.disk.start_save(saved_name))
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// whether or not we are going to coalesce the data
|
tokio::spawn(async move {
|
||||||
// in order to strip the exif data at the end,
|
// create file to save upload to
|
||||||
// instead of just sending it off to the i/o task
|
let mut file = File::create(path)
|
||||||
let coalesce_and_strip = use_cache
|
.await
|
||||||
&& matches!(
|
.expect("could not open file! make sure your upload path is valid");
|
||||||
std::path::Path::new(saved_name)
|
|
||||||
.extension()
|
|
||||||
.map(|s| s.to_str()),
|
|
||||||
Some(Some("png" | "jpg" | "jpeg" | "webp" | "tiff"))
|
|
||||||
)
|
|
||||||
&& !keep_exif
|
|
||||||
&& provided_len <= self.cfg.max_strip_len;
|
|
||||||
|
|
||||||
// buffer of sampled data for the deletion hash
|
// receive chunks and save them to file
|
||||||
let mut hash_sample = BytesMut::with_capacity(SAMPLE_WANTED_BYTES);
|
while let Some(chunk) = rx.recv().await {
|
||||||
// actual number of bytes processed
|
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||||
let mut observed_len = 0;
|
file.write_all(&chunk)
|
||||||
|
.await
|
||||||
|
.expect("error while writing file to disk");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// read and save upload
|
// read and save upload
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
// if we error on a chunk, fail out
|
let chunk = chunk.unwrap();
|
||||||
let chunk = chunk?;
|
|
||||||
|
|
||||||
// if we have an i/o task, send it off
|
// send chunk to io task
|
||||||
// also cloning this is okay because it's a Bytes
|
debug!("sending data to io task");
|
||||||
if !coalesce_and_strip {
|
tx.send(chunk.clone())
|
||||||
if let Some(ref tx) = tx {
|
.await
|
||||||
debug!("sending chunk to i/o task");
|
.expect("failed to send data to io task");
|
||||||
tx.send(chunk.clone())
|
|
||||||
.wrap_err("failed to send chunk to i/o task!")?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// add to sample if we need to
|
|
||||||
let wanted = SAMPLE_WANTED_BYTES - hash_sample.len();
|
|
||||||
if wanted != 0 {
|
|
||||||
// take as many bytes as we can ...
|
|
||||||
let taking = chunk.len().min(wanted);
|
|
||||||
hash_sample.extend_from_slice(&chunk[0..taking]);
|
|
||||||
}
|
|
||||||
// record new len
|
|
||||||
observed_len += chunk.len() as u64;
|
|
||||||
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
debug!("receiving data into buffer");
|
debug!("receiving data into buffer");
|
||||||
|
|
||||||
if data.len() + chunk.len() > data.capacity() {
|
if data.len() + chunk.len() > data.capacity() {
|
||||||
info!(
|
error!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||||
"the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."
|
|
||||||
);
|
|
||||||
|
|
||||||
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
||||||
data = BytesMut::new();
|
data = BytesMut::new();
|
||||||
|
@ -444,134 +181,109 @@ impl Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = data.freeze();
|
// insert upload into cache if necessary
|
||||||
|
|
||||||
// we coalesced the data instead of streaming to disk,
|
|
||||||
// strip the exif data and send it off now
|
|
||||||
let data = if coalesce_and_strip {
|
|
||||||
// strip the exif if we can
|
|
||||||
// if we can't, then oh well
|
|
||||||
let data = if let Ok(Some(data)) = DynImage::from_bytes(data.clone()).map(|o| {
|
|
||||||
o.map(|mut img| {
|
|
||||||
img.set_exif(None);
|
|
||||||
img.encoder().bytes()
|
|
||||||
})
|
|
||||||
}) {
|
|
||||||
info!("stripped exif data");
|
|
||||||
data
|
|
||||||
} else {
|
|
||||||
info!("failed to strip exif data");
|
|
||||||
data
|
|
||||||
};
|
|
||||||
|
|
||||||
// send what we did over to the i/o task, all in one chunk
|
|
||||||
if let Some(ref tx) = tx {
|
|
||||||
debug!("sending filled buffer to i/o task");
|
|
||||||
tx.send(data.clone())
|
|
||||||
.wrap_err("failed to send coalesced buffer to i/o task!")?;
|
|
||||||
}
|
|
||||||
|
|
||||||
data
|
|
||||||
} else {
|
|
||||||
// or, we didn't do that
|
|
||||||
// keep the data as it is
|
|
||||||
data
|
|
||||||
};
|
|
||||||
|
|
||||||
// insert upload into cache if we're using it
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
|
let mut cache = self.cache.write().await;
|
||||||
|
|
||||||
info!("caching upload!");
|
info!("caching upload!");
|
||||||
match lifetime {
|
cache.insert(name, data.freeze());
|
||||||
Some(lt) => self.cache.add_with_lifetime(saved_name, data, lt, false),
|
|
||||||
None => self.cache.add(saved_name, data),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok((hash_sample.freeze(), observed_len))
|
info!("finished processing upload!!");
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn process(
|
|
||||||
&self,
|
|
||||||
ext: Option<String>,
|
|
||||||
provided_len: u64,
|
|
||||||
stream: BodyDataStream,
|
|
||||||
lifetime: Option<Duration>,
|
|
||||||
keep_exif: bool,
|
|
||||||
) -> eyre::Result<ProcessOutcome> {
|
|
||||||
// if the upload size is greater than our max file size, deny it now
|
|
||||||
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
|
||||||
return Ok(ProcessOutcome::UploadTooLarge);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the upload size is smaller than the specified maximum, we use the cache!
|
|
||||||
let use_cache = self.cache.will_use(provided_len);
|
|
||||||
|
|
||||||
// if a temp file is too big for cache, reject it now
|
|
||||||
if lifetime.is_some() && !use_cache {
|
|
||||||
return Ok(ProcessOutcome::TemporaryUploadTooLarge);
|
|
||||||
}
|
|
||||||
|
|
||||||
// if a temp file's lifetime is too long, reject it now
|
|
||||||
if lifetime.is_some_and(|lt| lt > self.cfg.max_temp_lifetime) {
|
|
||||||
return Ok(ProcessOutcome::TemporaryUploadLifetimeTooLong);
|
|
||||||
}
|
|
||||||
|
|
||||||
// generate the file name
|
|
||||||
let saved_name = self.gen_saved_name(ext).await;
|
|
||||||
|
|
||||||
// save it
|
|
||||||
let save_result = self
|
|
||||||
.save(
|
|
||||||
&saved_name,
|
|
||||||
provided_len,
|
|
||||||
use_cache,
|
|
||||||
stream,
|
|
||||||
lifetime,
|
|
||||||
keep_exif,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
// handle result
|
|
||||||
let (hash_sample, len) = match save_result {
|
|
||||||
// Okay so just extract metadata
|
|
||||||
Ok(m) => m,
|
|
||||||
// If anything fails, delete the upload and return the error
|
|
||||||
Err(err) => {
|
|
||||||
error!("failed processing upload!");
|
|
||||||
|
|
||||||
self.remove(&saved_name).await?;
|
|
||||||
return Err(err);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// if deletion urls are enabled, create one
|
|
||||||
let deletion_url = self.deletion_hmac.clone().map(|mut hmac| {
|
|
||||||
// calculate hash of file metadata
|
|
||||||
let hash = calculate_hash(len, hash_sample);
|
|
||||||
let mut hash_bytes = BytesMut::new();
|
|
||||||
hash_bytes.put_u128(hash);
|
|
||||||
let hash_b64 = BASE64_URL_SAFE_NO_PAD.encode(&hash_bytes);
|
|
||||||
|
|
||||||
// take hmac
|
|
||||||
update_hmac(&mut hmac, &saved_name, hash);
|
|
||||||
let out = hmac.finalize().into_bytes();
|
|
||||||
let out_b64 = BASE64_URL_SAFE_NO_PAD.encode(out);
|
|
||||||
|
|
||||||
// format deletion url
|
|
||||||
format!(
|
|
||||||
"{}/del?name={saved_name}&hash={hash_b64}&hmac={out_b64}",
|
|
||||||
self.cfg.base_url
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
// format and send back the url
|
|
||||||
let url = format!("{}/p/{saved_name}", self.cfg.base_url);
|
|
||||||
|
|
||||||
// if all goes well, increment the cached upload counter
|
// if all goes well, increment the cached upload counter
|
||||||
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
info!("finished processing upload!");
|
/// Read an upload from cache, if it exists.
|
||||||
|
///
|
||||||
|
/// Previously, this would lock the cache as
|
||||||
|
/// writable to renew the upload's cache lifespan.
|
||||||
|
/// Locking the cache as readable allows multiple concurrent
|
||||||
|
/// readers though, which allows me to handle multiple views concurrently.
|
||||||
|
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
|
||||||
|
let cache = self.cache.read().await;
|
||||||
|
|
||||||
Ok(ProcessOutcome::Success { url, deletion_url })
|
// fetch upload data from cache
|
||||||
|
cache.get(name).map(ToOwned::to_owned)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads an upload, from cache or on disk.
|
||||||
|
pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> {
|
||||||
|
// extract upload file name
|
||||||
|
let name = original_path
|
||||||
|
.file_name()
|
||||||
|
.and_then(OsStr::to_str)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// path on disk
|
||||||
|
let mut path = self.cfg.save_path.clone();
|
||||||
|
path.push(&name);
|
||||||
|
|
||||||
|
// check if the upload exists, if not then 404
|
||||||
|
if !self.upload_exists(&path).await {
|
||||||
|
return Err(ViewError::NotFound);
|
||||||
|
}
|
||||||
|
|
||||||
|
// attempt to read upload from cache
|
||||||
|
let cached_data = self.read_cached_upload(&name).await;
|
||||||
|
|
||||||
|
if let Some(data) = cached_data {
|
||||||
|
info!("got upload from cache!");
|
||||||
|
|
||||||
|
Ok(ViewSuccess::FromCache(data))
|
||||||
|
} else {
|
||||||
|
// we already know the upload exists by now so this is okay
|
||||||
|
let mut file = File::open(&path).await.unwrap();
|
||||||
|
|
||||||
|
// read upload length from disk
|
||||||
|
let metadata = file.metadata().await;
|
||||||
|
|
||||||
|
if metadata.is_err() {
|
||||||
|
error!("failed to get upload file metadata!");
|
||||||
|
return Err(ViewError::InternalServerError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata = metadata.unwrap();
|
||||||
|
|
||||||
|
let length = metadata.len() as usize;
|
||||||
|
|
||||||
|
debug!("read upload from disk, size = {}", length);
|
||||||
|
|
||||||
|
// if the upload is okay to cache, recache it and send a fromcache response
|
||||||
|
if self.will_use_cache(length) {
|
||||||
|
// read file from disk
|
||||||
|
let mut data = BytesMut::with_capacity(length);
|
||||||
|
|
||||||
|
// read file from disk and if it fails at any point, return 500
|
||||||
|
loop {
|
||||||
|
match file.read_buf(&mut data).await {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ViewError::InternalServerError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let data = data.freeze();
|
||||||
|
|
||||||
|
// re-insert it into cache
|
||||||
|
let mut cache = self.cache.write().await;
|
||||||
|
cache.insert(name, data.clone());
|
||||||
|
|
||||||
|
info!("recached upload from disk!");
|
||||||
|
|
||||||
|
return Ok(ViewSuccess::FromCache(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("got upload from disk!");
|
||||||
|
|
||||||
|
Ok(ViewSuccess::FromDisk(file))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -8,17 +8,14 @@ pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
||||||
|
|
||||||
let motd = engine.cfg.motd.clone();
|
let motd = engine.cfg.motd.clone();
|
||||||
|
|
||||||
motd.replace("%version%", env!("CARGO_PKG_VERSION"))
|
motd
|
||||||
|
.replace("%version%", env!("CARGO_PKG_VERSION"))
|
||||||
.replace("%uplcount%", &count.to_string())
|
.replace("%uplcount%", &count.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn robots_txt() -> &'static str {
|
pub async fn robots_txt() -> &'static str {
|
||||||
/// robots.txt that tells web crawlers not to list uploads
|
/// robots.txt that tells web crawlers not to list uploads
|
||||||
const ROBOTS_TXT: &str = concat!(
|
const ROBOTS_TXT: &str = concat!("User-Agent: *\n", "Disallow: /p/*\n", "Allow: /\n");
|
||||||
"User-Agent: *\n",
|
|
||||||
"Disallow: /p/*\n",
|
|
||||||
"Allow: /\n"
|
|
||||||
);
|
|
||||||
|
|
||||||
ROBOTS_TXT
|
ROBOTS_TXT
|
||||||
}
|
}
|
||||||
|
|
92
src/main.rs
92
src/main.rs
|
@ -1,98 +1,76 @@
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use argh::FromArgs;
|
extern crate axum;
|
||||||
use color_eyre::eyre::{self, bail, Context};
|
|
||||||
|
use clap::Parser;
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use tokio::{fs, net::TcpListener, signal};
|
use tokio::{fs, signal};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
mod cache;
|
|
||||||
mod config;
|
mod config;
|
||||||
mod delete;
|
|
||||||
mod disk;
|
|
||||||
mod engine;
|
mod engine;
|
||||||
mod index;
|
mod index;
|
||||||
mod new;
|
mod new;
|
||||||
mod view;
|
mod view;
|
||||||
|
|
||||||
#[cfg(not(target_env = "msvc"))]
|
#[derive(Parser, Debug)]
|
||||||
use tikv_jemallocator::Jemalloc;
|
|
||||||
|
|
||||||
#[cfg(not(target_env = "msvc"))]
|
|
||||||
#[global_allocator]
|
|
||||||
static GLOBAL: Jemalloc = Jemalloc;
|
|
||||||
|
|
||||||
/// breeze file server.
|
|
||||||
#[derive(FromArgs, Debug)]
|
|
||||||
struct Args {
|
struct Args {
|
||||||
/// the path to *.toml configuration file
|
/// The path to configuration file
|
||||||
#[argh(option, short = 'c', arg_name = "file")]
|
#[arg(short, long, value_name = "file")]
|
||||||
config: PathBuf,
|
config: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> eyre::Result<()> {
|
async fn main() {
|
||||||
// Install color-eyre
|
// read & parse args
|
||||||
color_eyre::install()?;
|
let args = Args::parse();
|
||||||
|
|
||||||
// Read & parse args
|
// read & parse config
|
||||||
let args: Args = argh::from_env();
|
let config_str = fs::read_to_string(args.config)
|
||||||
|
.await
|
||||||
|
.expect("failed to read config file! make sure it exists and you have read permissions");
|
||||||
|
|
||||||
// Read & parse config
|
let cfg: config::Config = toml::from_str(&config_str).expect("invalid config! check that you have included all required options and structured it properly (no config options expecting a number getting a string, etc.)");
|
||||||
let cfg: config::Config = {
|
|
||||||
let config_str = fs::read_to_string(args.config).await.wrap_err(
|
|
||||||
"failed to read config file! make sure it exists and you have read permissions",
|
|
||||||
)?;
|
|
||||||
|
|
||||||
toml::from_str(&config_str).wrap_err(
|
|
||||||
"invalid config! ensure proper fields and structure. reference config is in readme",
|
|
||||||
)?
|
|
||||||
};
|
|
||||||
|
|
||||||
// Set up tracing
|
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_max_level(cfg.logger.level)
|
.with_max_level(cfg.logger.level)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
// Check config
|
if !cfg.engine.save_path.exists() || !cfg.engine.save_path.is_dir() {
|
||||||
{
|
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||||
let save_path = cfg.engine.disk.save_path.clone();
|
|
||||||
if !save_path.exists() || !save_path.is_dir() {
|
|
||||||
bail!("the save path does not exist or is not a directory! this is invalid");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.engine.upload_key.is_empty() {
|
if cfg.engine.upload_key.is_empty() {
|
||||||
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create engine
|
// create engine
|
||||||
let engine = Engine::with_config(cfg.engine);
|
let engine = Engine::new(cfg.engine);
|
||||||
|
|
||||||
// Build main router
|
// build main router
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
.route("/new", post(new::new))
|
.route("/new", post(new::new))
|
||||||
.route("/p/{saved_name}", get(view::view))
|
.route("/p/:name", get(view::view))
|
||||||
.route("/del", get(delete::delete))
|
|
||||||
.route("/", get(index::index))
|
.route("/", get(index::index))
|
||||||
.route("/robots.txt", get(index::robots_txt))
|
.route("/robots.txt", get(index::robots_txt))
|
||||||
.with_state(Arc::new(engine));
|
.with_state(Arc::new(engine));
|
||||||
|
|
||||||
// Start web server
|
// start web server
|
||||||
info!("starting server.");
|
axum::Server::bind(
|
||||||
let listener = TcpListener::bind(&cfg.http.listen_on)
|
&cfg.http
|
||||||
.await
|
.listen_on
|
||||||
.wrap_err("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound")?;
|
.parse()
|
||||||
axum::serve(listener, app)
|
.expect("failed to parse listen_on address"),
|
||||||
.with_graceful_shutdown(shutdown_signal())
|
)
|
||||||
.await
|
.serve(app.into_make_service())
|
||||||
.wrap_err("failed to start server")?;
|
.with_graceful_shutdown(shutdown_signal())
|
||||||
|
.await
|
||||||
Ok(())
|
.expect("failed to start server");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
|
@ -114,8 +92,8 @@ async fn shutdown_signal() {
|
||||||
let terminate = std::future::pending::<()>();
|
let terminate = std::future::pending::<()>();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
() = ctrl_c => {},
|
_ = ctrl_c => {},
|
||||||
() = terminate => {},
|
_ = terminate => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("shutting down!");
|
info!("shutting down!");
|
||||||
|
|
150
src/new.rs
150
src/new.rs
|
@ -1,134 +1,60 @@
|
||||||
use std::{
|
use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc};
|
||||||
ffi::OsStr,
|
|
||||||
path::{Path, PathBuf},
|
|
||||||
sync::Arc,
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
extract::{BodyStream, Query, State},
|
||||||
extract::{Query, State},
|
http::HeaderValue,
|
||||||
response::{IntoResponse, Response},
|
|
||||||
};
|
};
|
||||||
use axum_extra::TypedHeader;
|
use hyper::{header, HeaderMap, StatusCode};
|
||||||
use headers::ContentLength;
|
|
||||||
use http::{HeaderValue, StatusCode};
|
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_with::{DurationSeconds, serde_as};
|
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use crate::engine::ProcessOutcome;
|
|
||||||
|
|
||||||
fn default_keep_exif() -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
#[serde_as]
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct NewRequest {
|
|
||||||
name: String,
|
|
||||||
key: Option<String>,
|
|
||||||
|
|
||||||
#[serde(rename = "lastfor")]
|
|
||||||
#[serde_as(as = "Option<DurationSeconds>")]
|
|
||||||
last_for: Option<Duration>,
|
|
||||||
|
|
||||||
#[serde(rename = "keepexif", default = "default_keep_exif")]
|
|
||||||
keep_exif: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The request handler for the /new path.
|
/// The request handler for the /new path.
|
||||||
/// This handles all new uploads.
|
/// This handles all new uploads.
|
||||||
|
#[axum::debug_handler]
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Query(req): Query<NewRequest>,
|
Query(params): Query<HashMap<String, String>>,
|
||||||
TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
|
headers: HeaderMap,
|
||||||
body: Body,
|
stream: BodyStream,
|
||||||
) -> Result<Response, StatusCode> {
|
) -> Result<String, StatusCode> {
|
||||||
|
let key = params.get("key");
|
||||||
|
|
||||||
|
const EMPTY_STRING: &String = &String::new();
|
||||||
|
|
||||||
// check upload key, if i need to
|
// check upload key, if i need to
|
||||||
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
if !engine.cfg.upload_key.is_empty() && key.unwrap_or(EMPTY_STRING) != &engine.cfg.upload_key {
|
||||||
return Err(StatusCode::FORBIDDEN);
|
return Err(StatusCode::FORBIDDEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let original_name = params.get("name");
|
||||||
|
|
||||||
// the original file name wasn't given, so i can't work out what the extension should be
|
// the original file name wasn't given, so i can't work out what the extension should be
|
||||||
if req.name.is_empty() {
|
if original_name.is_none() {
|
||||||
return Err(StatusCode::BAD_REQUEST);
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
// -- try to figure out a file extension..
|
let original_path = PathBuf::from(original_name.unwrap());
|
||||||
|
|
||||||
fn extension(pb: &Path) -> Option<String> {
|
let path = engine.gen_path(&original_path).await;
|
||||||
pb.extension().and_then(OsStr::to_str).map(str::to_string)
|
let name = path
|
||||||
}
|
.file_name()
|
||||||
|
.and_then(OsStr::to_str)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
let pb = PathBuf::from(req.name);
|
let url = format!("{}/p/{}", engine.cfg.base_url, name);
|
||||||
let mut ext = extension(&pb);
|
|
||||||
|
|
||||||
// common extensions that usually have a second extension before themselves
|
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
||||||
const ADDITIVE: &[&str] = &["gz", "xz", "bz2", "lz4", "zst"];
|
let content_length = headers
|
||||||
|
.get(header::CONTENT_LENGTH)
|
||||||
|
.unwrap_or(&HeaderValue::from_static(""))
|
||||||
|
.to_str()
|
||||||
|
.map(|s| s.parse::<usize>())
|
||||||
|
.unwrap()
|
||||||
|
.unwrap_or(usize::MAX);
|
||||||
|
|
||||||
// if the extension is one of those, try to find that second extension
|
// pass it off to the engine to be processed!
|
||||||
if ext
|
engine
|
||||||
.as_ref()
|
.process_upload(path, name, content_length, stream)
|
||||||
.is_some_and(|ext| ADDITIVE.contains(&ext.as_str()))
|
.await;
|
||||||
{
|
|
||||||
// try to parse out another extension
|
|
||||||
let stem = pb.file_stem().unwrap(); // SAFETY: if extension is Some(), this will also be
|
|
||||||
|
|
||||||
if let Some(second_ext) = extension(&PathBuf::from(stem)) {
|
Ok(url)
|
||||||
// there is another extension,
|
|
||||||
// try to make sure it's one we want
|
|
||||||
// 4 is enough for most common file extensions
|
|
||||||
// and not many false positives, hopefully
|
|
||||||
if second_ext.len() <= 4 {
|
|
||||||
// seems ok so combine them
|
|
||||||
ext = ext.as_ref().map(|first_ext| second_ext + "." + first_ext);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// turn body into stream
|
|
||||||
let stream = Body::into_data_stream(body);
|
|
||||||
|
|
||||||
// pass it off to the engine to be processed
|
|
||||||
// --
|
|
||||||
// also, error responses here don't get presented properly in ShareX most of the time
|
|
||||||
// they don't expect the connection to close before they're done uploading, i think
|
|
||||||
// so it will just present the user with a "connection closed" error
|
|
||||||
match engine
|
|
||||||
.process(ext, content_length, stream, req.last_for, req.keep_exif)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(outcome) => match outcome {
|
|
||||||
// 200 OK
|
|
||||||
ProcessOutcome::Success { url, deletion_url } => {
|
|
||||||
let mut res = url.into_response();
|
|
||||||
|
|
||||||
// insert deletion url header if needed
|
|
||||||
if let Some(deletion_url) = deletion_url {
|
|
||||||
let deletion_url = HeaderValue::from_str(&deletion_url)
|
|
||||||
.expect("deletion url contains invalid chars");
|
|
||||||
|
|
||||||
let headers = res.headers_mut();
|
|
||||||
headers.insert("Breeze-Deletion-Url", deletion_url);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 413 Payload Too Large
|
|
||||||
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
|
||||||
Err(StatusCode::PAYLOAD_TOO_LARGE)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 400 Bad Request
|
|
||||||
ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST),
|
|
||||||
},
|
|
||||||
|
|
||||||
// 500 Internal Server Error
|
|
||||||
Err(err) => {
|
|
||||||
error!("failed to process upload!! {err:#}");
|
|
||||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
193
src/view.rs
193
src/view.rs
|
@ -1,18 +1,40 @@
|
||||||
use std::{ffi::OsStr, path::PathBuf, sync::Arc};
|
use std::{
|
||||||
|
path::{Component, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
body::StreamBody,
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
|
||||||
use axum_extra::TypedHeader;
|
use bytes::Bytes;
|
||||||
use headers::Range;
|
use hyper::{http::HeaderValue, StatusCode};
|
||||||
use http::{HeaderValue, StatusCode};
|
use tokio::{fs::File, runtime::Handle};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::error;
|
use tracing::{error, debug, info};
|
||||||
|
|
||||||
use crate::engine::{GetOutcome, UploadData, UploadResponse};
|
/// Responses for a successful view operation
|
||||||
|
pub enum ViewSuccess {
|
||||||
|
/// A file read from disk, suitable for larger files.
|
||||||
|
///
|
||||||
|
/// The file provided will be streamed from disk and
|
||||||
|
/// back to the viewer.
|
||||||
|
///
|
||||||
|
/// This is only ever used if a file exceeds the
|
||||||
|
/// cache's maximum file size.
|
||||||
|
FromDisk(File),
|
||||||
|
|
||||||
|
/// A file read from in-memory cache, best for smaller files.
|
||||||
|
///
|
||||||
|
/// The file is taken from the cache in its entirety
|
||||||
|
/// and sent back to the viewer.
|
||||||
|
///
|
||||||
|
/// If a file can be fit into cache, this will be
|
||||||
|
/// used even if it's read from disk.
|
||||||
|
FromCache(Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
/// Responses for a failed view operation
|
/// Responses for a failed view operation
|
||||||
pub enum ViewError {
|
pub enum ViewError {
|
||||||
|
@ -21,94 +43,101 @@ pub enum ViewError {
|
||||||
|
|
||||||
/// Will send status code 500 with a plaintext "internal server error" message.
|
/// Will send status code 500 with a plaintext "internal server error" message.
|
||||||
InternalServerError,
|
InternalServerError,
|
||||||
|
}
|
||||||
|
|
||||||
/// Sends status code 206 with a plaintext "range not satisfiable" message.
|
impl IntoResponse for ViewSuccess {
|
||||||
RangeNotSatisfiable,
|
fn into_response(self) -> Response {
|
||||||
|
match self {
|
||||||
|
ViewSuccess::FromDisk(file) => {
|
||||||
|
// get handle to current tokio runtime
|
||||||
|
// i use this to block on futures here (not async)
|
||||||
|
let handle = Handle::current();
|
||||||
|
let _ = handle.enter();
|
||||||
|
|
||||||
|
// read the metadata of the file on disk
|
||||||
|
// this function isn't async
|
||||||
|
// .. so we have to use handle.block_on() to get the metadata
|
||||||
|
let metadata = futures::executor::block_on(file.metadata());
|
||||||
|
|
||||||
|
// if we error then return 500
|
||||||
|
if metadata.is_err() {
|
||||||
|
error!("failed to read metadata from disk");
|
||||||
|
return ViewError::InternalServerError.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
// unwrap (which we know is safe) and read the file size as a string
|
||||||
|
let metadata = metadata.unwrap();
|
||||||
|
let len_str = metadata.len().to_string();
|
||||||
|
|
||||||
|
debug!("file is {} bytes on disk", &len_str);
|
||||||
|
|
||||||
|
// HeaderValue::from_str will never error if only visible ASCII characters are passed (32-127)
|
||||||
|
// .. so unwrapping this should be fine
|
||||||
|
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
||||||
|
|
||||||
|
// create a streamed body response (we want to stream larger files)
|
||||||
|
let reader = ReaderStream::new(file);
|
||||||
|
let stream = StreamBody::new(reader);
|
||||||
|
|
||||||
|
// extract mutable headers from the response
|
||||||
|
let mut res = stream.into_response();
|
||||||
|
let headers = res.headers_mut();
|
||||||
|
|
||||||
|
// clear headers, browser can imply content type
|
||||||
|
headers.clear();
|
||||||
|
|
||||||
|
// insert Content-Length header
|
||||||
|
// that way the browser shows how big a file is when it's being downloaded
|
||||||
|
headers.insert("Content-Length", content_length);
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
ViewSuccess::FromCache(data) => {
|
||||||
|
// extract mutable headers from the response
|
||||||
|
let mut res = data.into_response();
|
||||||
|
let headers = res.headers_mut();
|
||||||
|
|
||||||
|
// clear the headers, let the browser imply it
|
||||||
|
headers.clear();
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoResponse for ViewError {
|
impl IntoResponse for ViewError {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
match self {
|
match self {
|
||||||
ViewError::NotFound => (StatusCode::NOT_FOUND, "Not found!").into_response(),
|
ViewError::NotFound => (
|
||||||
|
StatusCode::NOT_FOUND,
|
||||||
ViewError::InternalServerError => {
|
"not found!"
|
||||||
(StatusCode::INTERNAL_SERVER_ERROR, "Internal server error!").into_response()
|
).into_response(),
|
||||||
}
|
|
||||||
|
ViewError::InternalServerError => (
|
||||||
ViewError::RangeNotSatisfiable => {
|
StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
(StatusCode::RANGE_NOT_SATISFIABLE, "Range not satisfiable!").into_response()
|
"internal server error!"
|
||||||
}
|
).into_response(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoResponse for UploadResponse {
|
/// The request handler for /p/* path.
|
||||||
fn into_response(self) -> Response {
|
|
||||||
let (start, end) = self.range;
|
|
||||||
let range_len = (end - start) + 1;
|
|
||||||
|
|
||||||
let mut res = match self.data {
|
|
||||||
UploadData::Cache(data) => data.into_response(),
|
|
||||||
UploadData::Disk(file) => {
|
|
||||||
let reader_stream = ReaderStream::new(file);
|
|
||||||
let body = Body::from_stream(reader_stream);
|
|
||||||
let mut res = body.into_response();
|
|
||||||
let headers = res.headers_mut();
|
|
||||||
|
|
||||||
// add Content-Length header so the browser shows how big a file is when it's being downloaded
|
|
||||||
let content_length = HeaderValue::from_str(&range_len.to_string())
|
|
||||||
.expect("construct content-length header failed");
|
|
||||||
headers.insert("Content-Length", content_length);
|
|
||||||
|
|
||||||
res
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let headers = res.headers_mut();
|
|
||||||
|
|
||||||
// remove content-type, browser can imply content type
|
|
||||||
headers.remove("Content-Type");
|
|
||||||
headers.insert("Accept-Ranges", HeaderValue::from_static("bytes"));
|
|
||||||
// ^-- indicate that byte ranges are supported. maybe unneeded, but probably good
|
|
||||||
|
|
||||||
// if it is not the full size, add relevant headers/status for range request
|
|
||||||
if range_len != self.full_len {
|
|
||||||
let content_range =
|
|
||||||
HeaderValue::from_str(&format!("bytes {}-{}/{}", start, end, self.full_len))
|
|
||||||
.expect("construct content-range header failed");
|
|
||||||
|
|
||||||
headers.insert("Content-Range", content_range);
|
|
||||||
*res.status_mut() = StatusCode::PARTIAL_CONTENT;
|
|
||||||
}
|
|
||||||
|
|
||||||
res
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// GET request handler for /p/* path.
|
|
||||||
/// All file views are handled here.
|
/// All file views are handled here.
|
||||||
|
#[axum::debug_handler]
|
||||||
pub async fn view(
|
pub async fn view(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Path(original_path): Path<PathBuf>,
|
Path(original_path): Path<PathBuf>,
|
||||||
range: Option<TypedHeader<Range>>,
|
) -> Result<ViewSuccess, ViewError> {
|
||||||
) -> Result<UploadResponse, ViewError> {
|
// (hopefully) prevent path traversal, just check for any non-file components
|
||||||
// try to extract the file name (if it's the only component)
|
if original_path
|
||||||
// this makes paths like `asdf%2fabcdef.png` invalid
|
.components()
|
||||||
let saved_name = match original_path.file_name().map(OsStr::to_str) {
|
.any(|x| !matches!(x, Component::Normal(_)))
|
||||||
Some(Some(n)) if original_path.components().count() == 1 => n,
|
{
|
||||||
_ => return Err(ViewError::NotFound),
|
info!("a request attempted path traversal");
|
||||||
};
|
return Err(ViewError::NotFound);
|
||||||
|
|
||||||
let range = range.map(|TypedHeader(range)| range);
|
|
||||||
|
|
||||||
// get result from the engine
|
|
||||||
match engine.get(saved_name, range).await {
|
|
||||||
Ok(GetOutcome::Success(res)) => Ok(res),
|
|
||||||
Ok(GetOutcome::NotFound) => Err(ViewError::NotFound),
|
|
||||||
Ok(GetOutcome::RangeNotSatisfiable) => Err(ViewError::RangeNotSatisfiable),
|
|
||||||
Err(err) => {
|
|
||||||
error!("failed to get upload!! {err:#}");
|
|
||||||
Err(ViewError::InternalServerError)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// get result from the engine!
|
||||||
|
engine.get_upload(&original_path).await
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue