Compare commits
No commits in common. "main" and "0.1.5-p1" have entirely different histories.
4
.envrc
4
.envrc
|
@ -1,4 +0,0 @@
|
||||||
if ! has nix_direnv_version || ! nix_direnv_version 3.0.6; then
|
|
||||||
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.6/direnvrc" "sha256-RYcUJaRMf8oF5LznDrlCXbkOQrywm0HDv1VjYGaJGdM="
|
|
||||||
fi
|
|
||||||
use flake
|
|
|
@ -1,5 +1,2 @@
|
||||||
# binaries
|
# binaries
|
||||||
/target
|
/target
|
||||||
|
|
||||||
# nix-direnv
|
|
||||||
/.direnv
|
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
30
Cargo.toml
30
Cargo.toml
|
@ -1,27 +1,25 @@
|
||||||
[package]
|
[package]
|
||||||
name = "breeze"
|
name = "breeze"
|
||||||
version = "0.2.6"
|
version = "0.1.5"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
axum = { version = "0.7.5", features = ["macros", "http2"] }
|
axum = { version = "0.6.1", features = ["macros"] }
|
||||||
tower = "0.4.13"
|
hyper = { version = "0.14", features = ["full"] }
|
||||||
http = "1.1.0"
|
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-util = { version = "0.7.4", features = ["full"] }
|
tokio-util = { version = "0.7.4", features = ["full"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
|
tower = "0.4.13"
|
||||||
|
bytes = "1"
|
||||||
|
rand = "0.8.5"
|
||||||
|
async-recursion = "1.0.0"
|
||||||
|
walkdir = "2"
|
||||||
|
futures = "0.3"
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
bytes = "1"
|
archived = { path = "./archived" }
|
||||||
async-recursion = "1.0.0"
|
xxhash-rust = { version = "0.8.7", features = ["xxh3"] }
|
||||||
rand = "0.8.5"
|
serde = { version = "1.0.189", features = ["derive"] }
|
||||||
walkdir = "2"
|
|
||||||
anyhow = "1.0"
|
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
|
||||||
serde_with = "3.4.0"
|
|
||||||
toml = "0.8.2"
|
toml = "0.8.2"
|
||||||
argh = "0.1.12"
|
clap = { version = "4.4.6", features = ["derive"] }
|
||||||
dashmap = { version = "5.5.3", features = ["rayon", "inline"] }
|
serde_with = "3.4.0"
|
||||||
rayon = "1.8"
|
|
||||||
atomic-time = "0.1.4"
|
|
||||||
img-parts = "0.3.0"
|
|
||||||
|
|
10
Dockerfile
10
Dockerfile
|
@ -1,15 +1,19 @@
|
||||||
# builder
|
# builder
|
||||||
FROM rust:1.74 as builder
|
FROM rust:1.73 as builder
|
||||||
|
|
||||||
WORKDIR /usr/src/breeze
|
WORKDIR /usr/src/breeze
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN cargo install --path .
|
RUN cargo install --path .
|
||||||
|
|
||||||
# runner
|
# runner
|
||||||
FROM debian:bookworm-slim
|
FROM debian:bullseye-slim
|
||||||
|
|
||||||
RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
||||||
|
|
||||||
CMD [ "breeze", "--config", "/etc/breeze.toml" ]
|
RUN useradd -m runner
|
||||||
|
USER runner
|
||||||
|
|
||||||
|
EXPOSE 8000
|
||||||
|
CMD [ "breeze" ]
|
||||||
|
|
113
README.md
113
README.md
|
@ -1,16 +1,12 @@
|
||||||
# breeze
|
# breeze
|
||||||
breeze is a simple, performant file upload server.
|
breeze is a simple, performant file upload server.
|
||||||
|
|
||||||
The primary instance is https://picture.wtf.
|
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
Compared to the old Express.js backend, breeze has
|
Compared to the old Express.js backend, breeze has
|
||||||
- Streamed uploading
|
- Streamed uploading
|
||||||
- Streamed downloading (on larger files)
|
- Streamed downloading (on larger files)
|
||||||
- Upload caching
|
- Upload caching
|
||||||
- Generally faster speeds overall
|
- Generally faster speeds overall
|
||||||
- Temporary uploads
|
|
||||||
- Automatic exif data removal
|
|
||||||
|
|
||||||
At this time, breeze does not support encrypted uploads on disk.
|
At this time, breeze does not support encrypted uploads on disk.
|
||||||
|
|
||||||
|
@ -19,10 +15,10 @@ I wrote breeze with the intention of running it in a container, but it runs just
|
||||||
|
|
||||||
Either way, you need to start off by cloning the Git repository.
|
Either way, you need to start off by cloning the Git repository.
|
||||||
```bash
|
```bash
|
||||||
git clone https://git.min.rip/min/breeze.git
|
git clone https://git.min.rip/minish/breeze.git
|
||||||
```
|
```
|
||||||
|
|
||||||
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below. You can start it using `docker compose up -d`.
|
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below.
|
||||||
```
|
```
|
||||||
version: '3.6'
|
version: '3.6'
|
||||||
|
|
||||||
|
@ -33,19 +29,20 @@ services:
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- /srv/uploads:/data
|
- /srv/uploads:/data
|
||||||
- ./breeze.toml:/etc/breeze.toml
|
|
||||||
|
|
||||||
user: 1000:1000
|
|
||||||
|
|
||||||
ports:
|
ports:
|
||||||
- 8383:8000
|
- 8000:8000
|
||||||
|
|
||||||
|
environment:
|
||||||
|
- BRZ_BASE_URL=http://127.0.0.1:8000
|
||||||
|
- BRZ_SAVE_PATH=/data
|
||||||
|
- BRZ_UPLOAD_KEY=hiiiiiiii
|
||||||
|
- BRZ_CACHE_UPL_MAX_LENGTH=134217728 # allow files up to ~134 MiB to be cached
|
||||||
|
- BRZ_CACHE_UPL_LIFETIME=1800 # let uploads stay in cache for 30 minutes
|
||||||
|
- BRZ_CACHE_SCAN_FREQ=60 # scan the cache for expired files if more than 60 seconds have passed since the last scan
|
||||||
|
- BRZ_CACHE_MEM_CAPACITY=4294967296 # allow 4 GiB of data to be in the cache at once
|
||||||
```
|
```
|
||||||
For this configuration, it is expected that:
|
For this configuration, it is expected that there is a clone of the Git repository in the `./breeze` folder. You can start it using `docker compose up -d`.
|
||||||
* there is a clone of the Git repository in the `./breeze` folder
|
|
||||||
* there is a `breeze.toml` config file in current directory
|
|
||||||
* there is a directory at `/srv/uploads` for storing uploads
|
|
||||||
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
|
||||||
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
|
||||||
|
|
||||||
It can also be installed directly if you have the Rust toolchain installed:
|
It can also be installed directly if you have the Rust toolchain installed:
|
||||||
```bash
|
```bash
|
||||||
|
@ -54,86 +51,22 @@ cargo install --path .
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
### Hosting
|
### Hosting
|
||||||
Configuration is read through a toml file.
|
Configuration is read through environment variables, because I wanted to run this using Docker Compose.
|
||||||
|
```
|
||||||
The config file path is specified using the `-c`/`--config` command line switch.
|
BRZ_BASE_URL - base url for upload urls (ex: http://127.0.0.1:8000 for http://127.0.0.1:8000/p/abcdef.png, http://picture.wtf for http://picture.wtf/p/abcdef.png)
|
||||||
|
BRZ_SAVE_PATH - this should be a path where uploads are saved to disk (ex: /srv/uploads, C:\brzuploads)
|
||||||
Here is an example config file:
|
BRZ_UPLOAD_KEY (optional) - if not empty, the key you specify will be required to upload new files.
|
||||||
```toml
|
BRZ_CACHE_UPL_MAX_LENGTH - this is the max length an upload can be in bytes before it won't be cached (ex: 80000000 for 80MB)
|
||||||
[engine]
|
BRZ_CACHE_UPL_LIFETIME - this indicates how long an upload will stay in cache (ex: 1800 for 30 minutes, 60 for 1 minute)
|
||||||
# The base URL that the HTTP server will be accessible on.
|
BRZ_CACHE_SCAN_FREQ - this is the frequency of full cache scans, which scan for and remove expired uploads (ex: 60 for 1 minute)
|
||||||
# This is used for formatting upload URLs.
|
BRZ_CACHE_MEM_CAPACITY - this is the amount of memory the cache will hold before dropping entries
|
||||||
# Setting it to "https://picture.wtf" would result in
|
|
||||||
# upload urls of "https://picture.wtf/p/abcdef.png", etc.
|
|
||||||
base_url = "http://127.0.0.1:8000"
|
|
||||||
|
|
||||||
# OPTIONAL - If set, the static key specified will be required to upload new files.
|
|
||||||
# If it is not set, no key will be required.
|
|
||||||
upload_key = "hiiiiiiii"
|
|
||||||
|
|
||||||
# OPTIONAL - specifies what to show when the site is visited on http
|
|
||||||
# It is sent with text/plain content type.
|
|
||||||
# There are two variables you can use:
|
|
||||||
# %uplcount% - total number of uploads present on the server
|
|
||||||
# %version% - current breeze version (e.g. 0.1.5)
|
|
||||||
motd = "my image host, currently hosting %uplcount% files"
|
|
||||||
|
|
||||||
# The maximum lifetime a temporary upload may be given, in seconds.
|
|
||||||
# It's okay to leave this somewhat high because large temporary uploads
|
|
||||||
# will just be bumped out of the cache when a new upload needs to be
|
|
||||||
# cached anyways.
|
|
||||||
max_temp_lifetime = 43200
|
|
||||||
|
|
||||||
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
|
||||||
# A word of warning about this: the error shown to ShareX users who
|
|
||||||
# hit the limit is *not* very clear. ("connection closed" or similar)
|
|
||||||
max_upload_len = 2_147_483_648
|
|
||||||
|
|
||||||
# The maximum length (in bytes) an image file may be before the server
|
|
||||||
# will skip removing its EXIF data.
|
|
||||||
# The performance impact of breeze's EXIF data removal is not
|
|
||||||
# very high in everyday usage, so something like 16MiB is reasonable.
|
|
||||||
max_strip_len = 16_777_216
|
|
||||||
|
|
||||||
[engine.disk]
|
|
||||||
# The location that uploads will be saved to.
|
|
||||||
# It should be a path to a directory on disk that you can write to.
|
|
||||||
save_path = "/data"
|
|
||||||
|
|
||||||
[engine.cache]
|
|
||||||
# The file size (in bytes) that a file must be under
|
|
||||||
# to get cached.
|
|
||||||
max_length = 134_217_728
|
|
||||||
|
|
||||||
# How long a cached upload will remain cached. (in seconds)
|
|
||||||
upload_lifetime = 1800
|
|
||||||
|
|
||||||
# How often the cache will be checked for expired uploads.
|
|
||||||
# It is not a continuous scan, and only is triggered upon a cache operation.
|
|
||||||
scan_freq = 60
|
|
||||||
|
|
||||||
# How much memory (in bytes) the cache is allowed to consume.
|
|
||||||
mem_capacity = 4_294_967_296
|
|
||||||
|
|
||||||
[http]
|
|
||||||
# The address that the HTTP server will listen on. (ip:port)
|
|
||||||
# Use 0.0.0.0 as the IP to listen publicly, 127.0.0.1 only lets your
|
|
||||||
# computer access it
|
|
||||||
listen_on = "127.0.0.1:8000"
|
|
||||||
|
|
||||||
[logger]
|
|
||||||
# OPTIONAL - the current log level.
|
|
||||||
# Default level is warn.
|
|
||||||
level = "warn"
|
|
||||||
```
|
```
|
||||||
|
|
||||||
### Uploading
|
### Uploading
|
||||||
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
The HTTP API is fairly simple, and it's pretty easy to make a ShareX configuration for it.
|
||||||
|
|
||||||
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
||||||
|
|
||||||
Additionally, you may specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, just leave it as default. you'll know if you need it)
|
|
||||||
|
|
||||||
Here's an example ShareX configuration for it (with a key):
|
Here's an example ShareX configuration for it (with a key):
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
|
|
|
@ -0,0 +1,2 @@
|
||||||
|
.idea
|
||||||
|
target
|
|
@ -0,0 +1,23 @@
|
||||||
|
# This file is automatically @generated by Cargo.
|
||||||
|
# It is not intended for manual editing.
|
||||||
|
version = 3
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "archived"
|
||||||
|
version = "0.2.0"
|
||||||
|
dependencies = [
|
||||||
|
"bytes",
|
||||||
|
"once_cell",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "bytes"
|
||||||
|
version = "1.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "once_cell"
|
||||||
|
version = "1.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
|
|
@ -0,0 +1,9 @@
|
||||||
|
[package]
|
||||||
|
name = "archived"
|
||||||
|
version = "0.2.0"
|
||||||
|
edition = "2018"
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
bytes = "1.3.0"
|
||||||
|
once_cell = "1.3.1"
|
|
@ -0,0 +1,22 @@
|
||||||
|
MIT License
|
||||||
|
|
||||||
|
Copyright (c) 2020 aikidos
|
||||||
|
Copyright (c) 2023 ot2t7, minish
|
||||||
|
|
||||||
|
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||||
|
of this software and associated documentation files (the "Software"), to deal
|
||||||
|
in the Software without restriction, including without limitation the rights
|
||||||
|
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||||
|
copies of the Software, and to permit persons to whom the Software is
|
||||||
|
furnished to do so, subject to the following conditions:
|
||||||
|
|
||||||
|
The above copyright notice and this permission notice shall be included in all
|
||||||
|
copies or substantial portions of the Software.
|
||||||
|
|
||||||
|
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||||
|
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||||
|
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||||
|
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||||
|
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||||
|
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||||
|
SOFTWARE.
|
|
@ -0,0 +1,26 @@
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
/// Represents a set of eviction and expiration details for a specific cache entry.
|
||||||
|
pub(crate) struct CacheEntry<B> {
|
||||||
|
/// Entry value.
|
||||||
|
pub(crate) value: B,
|
||||||
|
|
||||||
|
/// Expiration time.
|
||||||
|
///
|
||||||
|
/// - [`None`] if the value must be kept forever.
|
||||||
|
pub(crate) expiration_time: SystemTime,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> CacheEntry<B> {
|
||||||
|
pub(crate) fn new(value: B, lifetime: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
expiration_time: SystemTime::now() + lifetime,
|
||||||
|
value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if a entry is expired.
|
||||||
|
pub(crate) fn is_expired(&self, current_time: SystemTime) -> bool {
|
||||||
|
current_time >= self.expiration_time
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,172 @@
|
||||||
|
mod entry;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
|
||||||
|
use crate::entry::*;
|
||||||
|
use std::collections::hash_map::Entry;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
|
pub struct Archive {
|
||||||
|
cache_table: HashMap<String, CacheEntry<Bytes>>,
|
||||||
|
full_scan_frequency: Option<Duration>,
|
||||||
|
created_time: SystemTime,
|
||||||
|
last_scan_time: Option<SystemTime>,
|
||||||
|
entry_lifetime: Duration,
|
||||||
|
capacity: usize,
|
||||||
|
length: usize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Archive {
|
||||||
|
/* pub fn new(capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
cache_table: HashMap::new(),
|
||||||
|
full_scan_frequency: None,
|
||||||
|
created_time: SystemTime::now(),
|
||||||
|
last_scan_time: None,
|
||||||
|
capacity,
|
||||||
|
length: 0,
|
||||||
|
}
|
||||||
|
} */
|
||||||
|
|
||||||
|
pub fn with_full_scan(full_scan_frequency: Duration, entry_lifetime: Duration, capacity: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
cache_table: HashMap::with_capacity(256),
|
||||||
|
full_scan_frequency: Some(full_scan_frequency),
|
||||||
|
created_time: SystemTime::now(),
|
||||||
|
last_scan_time: None,
|
||||||
|
entry_lifetime,
|
||||||
|
capacity,
|
||||||
|
length: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn contains_key(&self, key: &String) -> bool {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.get(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_last_scan_time(&self) -> Option<SystemTime> {
|
||||||
|
self.last_scan_time
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_full_scan_frequency(&self) -> Option<Duration> {
|
||||||
|
self.full_scan_frequency
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, key: &String) -> Option<&Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.get(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.map(|cache_entry| &cache_entry.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_or_insert<F>(
|
||||||
|
&mut self,
|
||||||
|
key: String,
|
||||||
|
factory: F,
|
||||||
|
) -> &Bytes
|
||||||
|
where
|
||||||
|
F: Fn() -> Bytes,
|
||||||
|
{
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
match self.cache_table.entry(key) {
|
||||||
|
Entry::Occupied(mut occupied) => {
|
||||||
|
if occupied.get().is_expired(now) {
|
||||||
|
occupied.insert(CacheEntry::new(factory(), self.entry_lifetime));
|
||||||
|
}
|
||||||
|
|
||||||
|
&occupied.into_mut().value
|
||||||
|
}
|
||||||
|
Entry::Vacant(vacant) => &vacant.insert(CacheEntry::new(factory(), self.entry_lifetime)).value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn insert(
|
||||||
|
&mut self,
|
||||||
|
key: String,
|
||||||
|
value: Bytes,
|
||||||
|
) -> Option<Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
if value.len() + self.length > self.capacity {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.length += value.len();
|
||||||
|
|
||||||
|
self.cache_table
|
||||||
|
.insert(key, CacheEntry::new(value, self.entry_lifetime))
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.map(|cache_entry| cache_entry.value)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn remove(&mut self, key: &String) -> Option<Bytes> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
let mut removed_len: usize = 0;
|
||||||
|
let result = self
|
||||||
|
.cache_table
|
||||||
|
.remove(key)
|
||||||
|
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||||
|
.and_then(|o| {
|
||||||
|
removed_len += o.value.len();
|
||||||
|
return Some(o);
|
||||||
|
})
|
||||||
|
.map(|cache_entry| cache_entry.value);
|
||||||
|
self.length -= removed_len;
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn renew(&mut self, key: &String) -> Option<()> {
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
self.try_full_scan_expired_items(now);
|
||||||
|
|
||||||
|
let entry = self.cache_table.get_mut(key);
|
||||||
|
|
||||||
|
match entry {
|
||||||
|
Some(entry) => {
|
||||||
|
entry.expiration_time = now + self.entry_lifetime;
|
||||||
|
|
||||||
|
Some(())
|
||||||
|
}
|
||||||
|
None => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_full_scan_expired_items(&mut self, current_time: SystemTime) {
|
||||||
|
if let Some(full_scan_frequency) = self.full_scan_frequency {
|
||||||
|
let since = current_time
|
||||||
|
.duration_since(self.last_scan_time.unwrap_or(self.created_time))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
if since >= full_scan_frequency {
|
||||||
|
let mut removed_len = 0;
|
||||||
|
self.cache_table.retain(|_, cache_entry| {
|
||||||
|
if cache_entry.is_expired(current_time) {
|
||||||
|
removed_len += cache_entry.value.len();
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
self.length -= removed_len;
|
||||||
|
|
||||||
|
self.last_scan_time = Some(current_time);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
77
flake.lock
77
flake.lock
|
@ -1,77 +0,0 @@
|
||||||
{
|
|
||||||
"nodes": {
|
|
||||||
"crane": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1725409566,
|
|
||||||
"narHash": "sha256-PrtLmqhM6UtJP7v7IGyzjBFhbG4eOAHT6LPYOFmYfbk=",
|
|
||||||
"owner": "ipetkov",
|
|
||||||
"repo": "crane",
|
|
||||||
"rev": "7e4586bad4e3f8f97a9271def747cf58c4b68f3c",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "ipetkov",
|
|
||||||
"repo": "crane",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"flake-utils": {
|
|
||||||
"inputs": {
|
|
||||||
"systems": "systems"
|
|
||||||
},
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1726560853,
|
|
||||||
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "numtide",
|
|
||||||
"repo": "flake-utils",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"nixpkgs": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1726871744,
|
|
||||||
"narHash": "sha256-V5LpfdHyQkUF7RfOaDPrZDP+oqz88lTJrMT1+stXNwo=",
|
|
||||||
"owner": "NixOS",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"rev": "a1d92660c6b3b7c26fb883500a80ea9d33321be2",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "NixOS",
|
|
||||||
"ref": "nixpkgs-unstable",
|
|
||||||
"repo": "nixpkgs",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": {
|
|
||||||
"inputs": {
|
|
||||||
"crane": "crane",
|
|
||||||
"flake-utils": "flake-utils",
|
|
||||||
"nixpkgs": "nixpkgs"
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"systems": {
|
|
||||||
"locked": {
|
|
||||||
"lastModified": 1681028828,
|
|
||||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
|
||||||
"type": "github"
|
|
||||||
},
|
|
||||||
"original": {
|
|
||||||
"owner": "nix-systems",
|
|
||||||
"repo": "default",
|
|
||||||
"type": "github"
|
|
||||||
}
|
|
||||||
}
|
|
||||||
},
|
|
||||||
"root": "root",
|
|
||||||
"version": 7
|
|
||||||
}
|
|
225
flake.nix
225
flake.nix
|
@ -1,225 +0,0 @@
|
||||||
{
|
|
||||||
description = "breeze file server";
|
|
||||||
|
|
||||||
inputs = {
|
|
||||||
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
|
||||||
crane.url = "github:ipetkov/crane";
|
|
||||||
flake-utils.url = "github:numtide/flake-utils";
|
|
||||||
};
|
|
||||||
|
|
||||||
outputs = {
|
|
||||||
self,
|
|
||||||
nixpkgs,
|
|
||||||
crane,
|
|
||||||
flake-utils,
|
|
||||||
...
|
|
||||||
}:
|
|
||||||
flake-utils.lib.eachDefaultSystem (system: let
|
|
||||||
pkgs = nixpkgs.legacyPackages.${system};
|
|
||||||
|
|
||||||
craneLib = crane.mkLib pkgs;
|
|
||||||
|
|
||||||
# Common arguments can be set here to avoid repeating them later
|
|
||||||
# Note: changes here will rebuild all dependency crates
|
|
||||||
commonArgs = {
|
|
||||||
src = craneLib.cleanCargoSource ./.;
|
|
||||||
strictDeps = true;
|
|
||||||
|
|
||||||
buildInputs =
|
|
||||||
[
|
|
||||||
pkgs.openssl
|
|
||||||
]
|
|
||||||
++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
|
|
||||||
# Additional darwin specific inputs can be set here
|
|
||||||
pkgs.libiconv
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
breeze = craneLib.buildPackage (commonArgs
|
|
||||||
// {
|
|
||||||
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
|
||||||
|
|
||||||
# Additional environment variables or build phases/hooks can be set
|
|
||||||
# here *without* rebuilding all dependency crates
|
|
||||||
# MY_CUSTOM_VAR = "some value";
|
|
||||||
});
|
|
||||||
in {
|
|
||||||
checks = {
|
|
||||||
inherit breeze;
|
|
||||||
};
|
|
||||||
|
|
||||||
packages.default = breeze;
|
|
||||||
|
|
||||||
apps.default = flake-utils.lib.mkApp {
|
|
||||||
drv = breeze;
|
|
||||||
};
|
|
||||||
|
|
||||||
devShells.default = craneLib.devShell {
|
|
||||||
# Inherit inputs from checks.
|
|
||||||
checks = self.checks.${system};
|
|
||||||
|
|
||||||
# Additional dev-shell environment variables can be set directly
|
|
||||||
# MY_CUSTOM_DEVELOPMENT_VAR = "something else";
|
|
||||||
|
|
||||||
# Extra inputs can be added here; cargo and rustc are provided by default.
|
|
||||||
packages = with pkgs; [
|
|
||||||
alejandra
|
|
||||||
rewrk
|
|
||||||
];
|
|
||||||
};
|
|
||||||
|
|
||||||
nixosModules.breeze = {
|
|
||||||
config,
|
|
||||||
pkgs,
|
|
||||||
lib,
|
|
||||||
...
|
|
||||||
}:
|
|
||||||
with lib; let
|
|
||||||
cfg = config.services.breeze;
|
|
||||||
settingsFormat = pkgs.formats.toml {};
|
|
||||||
in {
|
|
||||||
options = {
|
|
||||||
services.breeze = {
|
|
||||||
enable = mkEnableOption "breeze file server";
|
|
||||||
|
|
||||||
package = mkOption {
|
|
||||||
type = types.package;
|
|
||||||
default = breeze;
|
|
||||||
description = "Package for `breeze` to use";
|
|
||||||
};
|
|
||||||
|
|
||||||
user = mkOption {
|
|
||||||
type = types.str;
|
|
||||||
default = "breeze";
|
|
||||||
description = "User that `breeze` will run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
group = mkOption {
|
|
||||||
type = types.str;
|
|
||||||
default = "breeze";
|
|
||||||
description = "Group that `breeze` will run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
extraGroups = mkOption {
|
|
||||||
type = types.listOf types.str;
|
|
||||||
default = [];
|
|
||||||
description = "Any supplementary groups for `breeze` to run under";
|
|
||||||
};
|
|
||||||
|
|
||||||
settings = mkOption {
|
|
||||||
type = settingsFormat.type;
|
|
||||||
default = {};
|
|
||||||
description = ''
|
|
||||||
The *.toml configuration to run `breeze` with.
|
|
||||||
There is no formal documentation, but there is an example in the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md).
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
configDir = mkOption {
|
|
||||||
type = types.path;
|
|
||||||
default = "/etc/breeze";
|
|
||||||
description = ''
|
|
||||||
The directory on disk to store the `breeze` config file in.
|
|
||||||
This does not load pre-existing config files, it only defines where the generated config is saved.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
|
|
||||||
uploadKeyFile = mkOption {
|
|
||||||
type = types.nullOr types.path;
|
|
||||||
default = null;
|
|
||||||
description = ''
|
|
||||||
File to load the `engine.upload_key` from, if desired.
|
|
||||||
This is useful for loading it from a secret management system.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
config = mkIf cfg.enable {
|
|
||||||
users.users.${cfg.user} = {
|
|
||||||
isSystemUser = true;
|
|
||||||
inherit (cfg) group;
|
|
||||||
};
|
|
||||||
|
|
||||||
users.groups.${cfg.group} = {};
|
|
||||||
|
|
||||||
systemd.tmpfiles.rules = [
|
|
||||||
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
|
||||||
];
|
|
||||||
|
|
||||||
services.breeze.settings = mkMerge [
|
|
||||||
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
|
||||||
];
|
|
||||||
|
|
||||||
systemd.services.breeze = let
|
|
||||||
cfgFile = "${cfg.configDir}/breeze.toml";
|
|
||||||
in {
|
|
||||||
description = "breeze file server";
|
|
||||||
after = ["local-fs.target" "network.target"];
|
|
||||||
wantedBy = ["multi-user.target"];
|
|
||||||
|
|
||||||
preStart =
|
|
||||||
''
|
|
||||||
install -m 660 ${settingsFormat.generate "breeze.toml" cfg.settings} ${cfgFile}
|
|
||||||
''
|
|
||||||
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
|
||||||
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
|
||||||
'';
|
|
||||||
|
|
||||||
serviceConfig = rec {
|
|
||||||
User = cfg.user;
|
|
||||||
Group = cfg.group;
|
|
||||||
DynamicUser = false; # we write files, so don't do that
|
|
||||||
SupplementaryGroups = cfg.extraGroups;
|
|
||||||
StateDirectory = "breeze";
|
|
||||||
CacheDirectory = "breeze";
|
|
||||||
ExecStart = escapeShellArgs [
|
|
||||||
"${cfg.package}/bin/breeze"
|
|
||||||
"--config"
|
|
||||||
cfgFile
|
|
||||||
];
|
|
||||||
Restart = "on-failure";
|
|
||||||
|
|
||||||
# Security Options #
|
|
||||||
|
|
||||||
NoNewPrivileges = true; # implied by DynamicUser
|
|
||||||
RemoveIPC = true; # implied by DynamicUser
|
|
||||||
|
|
||||||
AmbientCapabilities = "";
|
|
||||||
CapabilityBoundingSet = "";
|
|
||||||
|
|
||||||
DeviceAllow = "";
|
|
||||||
|
|
||||||
LockPersonality = true;
|
|
||||||
|
|
||||||
PrivateTmp = true; # implied by DynamicUser
|
|
||||||
PrivateDevices = true;
|
|
||||||
PrivateUsers = true;
|
|
||||||
|
|
||||||
ProtectClock = true;
|
|
||||||
ProtectControlGroups = true;
|
|
||||||
ProtectHostname = true;
|
|
||||||
ProtectKernelLogs = true;
|
|
||||||
ProtectKernelModules = true;
|
|
||||||
ProtectKernelTunables = true;
|
|
||||||
|
|
||||||
RestrictNamespaces = true;
|
|
||||||
RestrictAddressFamilies = ["AF_INET" "AF_INET6" "AF_UNIX"];
|
|
||||||
RestrictRealtime = true;
|
|
||||||
RestrictSUIDSGID = true; # implied by DynamicUser
|
|
||||||
|
|
||||||
SystemCallArchitectures = "native";
|
|
||||||
SystemCallErrorNumber = "EPERM";
|
|
||||||
SystemCallFilter = [
|
|
||||||
"@system-service"
|
|
||||||
"~@keyring"
|
|
||||||
"~@memlock"
|
|
||||||
"~@privileged"
|
|
||||||
"~@setuid"
|
|
||||||
];
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
});
|
|
||||||
}
|
|
259
src/cache.rs
259
src/cache.rs
|
@ -1,259 +0,0 @@
|
||||||
use std::{
|
|
||||||
sync::atomic::{AtomicUsize, Ordering},
|
|
||||||
time::{Duration, SystemTime},
|
|
||||||
};
|
|
||||||
|
|
||||||
use atomic_time::AtomicSystemTime;
|
|
||||||
use bytes::Bytes;
|
|
||||||
use dashmap::{mapref::one::Ref, DashMap};
|
|
||||||
use rayon::prelude::*;
|
|
||||||
use tokio::time;
|
|
||||||
|
|
||||||
use crate::config;
|
|
||||||
|
|
||||||
/// An entry stored in the cache.
|
|
||||||
///
|
|
||||||
/// It contains basic metadata and the actual value.
|
|
||||||
pub struct Entry {
|
|
||||||
/// The data held
|
|
||||||
value: Bytes,
|
|
||||||
|
|
||||||
/// The last time this entry was read/written
|
|
||||||
last_used: AtomicSystemTime,
|
|
||||||
|
|
||||||
/// Whether or not `last_used` should be updated
|
|
||||||
update_used: bool,
|
|
||||||
|
|
||||||
/// How long the entry should last
|
|
||||||
lifetime: Duration,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Entry {
|
|
||||||
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
|
|
||||||
let now = AtomicSystemTime::now();
|
|
||||||
|
|
||||||
Self {
|
|
||||||
value,
|
|
||||||
last_used: now,
|
|
||||||
update_used,
|
|
||||||
lifetime,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn last_used(&self) -> SystemTime {
|
|
||||||
self.last_used.load(Ordering::Relaxed)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn is_expired(&self) -> bool {
|
|
||||||
match self.last_used().elapsed() {
|
|
||||||
Ok(d) => d >= self.lifetime,
|
|
||||||
Err(_) => false, // now > last_used
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
|
|
||||||
///
|
|
||||||
/// It is designed to keep memory usage low.
|
|
||||||
pub struct Cache {
|
|
||||||
/// Where elements are stored
|
|
||||||
map: DashMap<String, Entry>,
|
|
||||||
|
|
||||||
/// Total length of data stored in cache currently
|
|
||||||
length: AtomicUsize,
|
|
||||||
|
|
||||||
/// How should it behave
|
|
||||||
cfg: config::CacheConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Cache {
|
|
||||||
pub fn from_config(cfg: config::CacheConfig) -> Self {
|
|
||||||
Self {
|
|
||||||
map: DashMap::with_capacity(256),
|
|
||||||
length: AtomicUsize::new(0),
|
|
||||||
|
|
||||||
cfg,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Figure out who should be bumped out of cache next
|
|
||||||
fn next_out(&self, length: usize) -> Vec<String> {
|
|
||||||
let mut sorted: Vec<_> = self.map.iter().collect();
|
|
||||||
|
|
||||||
// Sort by least recently used
|
|
||||||
sorted.par_sort_unstable_by(|e1, e2| e1.last_used().cmp(&e2.last_used()));
|
|
||||||
|
|
||||||
// Total bytes we would be removing
|
|
||||||
let mut total = 0;
|
|
||||||
|
|
||||||
// Pull entries until we have enough free space
|
|
||||||
sorted
|
|
||||||
.iter()
|
|
||||||
.take_while(|e| {
|
|
||||||
let need_more = total < length;
|
|
||||||
|
|
||||||
if need_more {
|
|
||||||
total += e.value.len();
|
|
||||||
}
|
|
||||||
|
|
||||||
need_more
|
|
||||||
})
|
|
||||||
.map(|e| e.key().clone())
|
|
||||||
.collect()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Remove an element from the cache
|
|
||||||
///
|
|
||||||
/// Returns: [`Some`] if successful, [`None`] if element not found
|
|
||||||
pub fn remove(&self, key: &str) -> Option<()> {
|
|
||||||
// Skip expiry checks, we are removing it anyways
|
|
||||||
// And also that could cause an infinite loop which would be pretty stupid.
|
|
||||||
let e = self.map.get(key)?;
|
|
||||||
|
|
||||||
// Atomically subtract from the total cache length
|
|
||||||
self.length.fetch_sub(e.value.len(), Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Drop the entry lock so we can actually remove it
|
|
||||||
drop(e);
|
|
||||||
|
|
||||||
// Remove from map
|
|
||||||
self.map.remove(key);
|
|
||||||
|
|
||||||
Some(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a new element to the cache with a specified lifetime.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
|
||||||
pub fn add_with_lifetime(
|
|
||||||
&self,
|
|
||||||
key: &str,
|
|
||||||
value: Bytes,
|
|
||||||
lifetime: Duration,
|
|
||||||
is_renewable: bool,
|
|
||||||
) -> bool {
|
|
||||||
let e = Entry::new(value, lifetime, is_renewable);
|
|
||||||
|
|
||||||
let len = e.value.len();
|
|
||||||
let cur_total = self.length.load(Ordering::Relaxed);
|
|
||||||
let new_total = cur_total + len;
|
|
||||||
|
|
||||||
if new_total > self.cfg.mem_capacity {
|
|
||||||
// How far we went above the limit
|
|
||||||
let needed = new_total - self.cfg.mem_capacity;
|
|
||||||
|
|
||||||
self.next_out(needed).par_iter().for_each(|k| {
|
|
||||||
// Remove the element, and ignore the result
|
|
||||||
// The only reason it should be failing is if it couldn't find it,
|
|
||||||
// in which case it was already removed
|
|
||||||
self.remove(k);
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Atomically add to total cached data length
|
|
||||||
self.length.fetch_add(len, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Add to the map, return true if we didn't replace anything
|
|
||||||
self.map.insert(key.to_string(), e).is_none()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Add a new element to the cache with the default lifetime.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
|
||||||
pub fn add(&self, key: &str, value: Bytes) -> bool {
|
|
||||||
self.add_with_lifetime(key, value, self.cfg.upload_lifetime, true)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Internal function for retrieving entries.
|
|
||||||
///
|
|
||||||
/// Returns: same as [`DashMap::get`], for our purposes
|
|
||||||
///
|
|
||||||
/// It exists so we can run the expiry check before
|
|
||||||
/// actually working with any entries, so no weird bugs happen
|
|
||||||
fn _get(&self, key: &str) -> Option<Ref<String, Entry>> {
|
|
||||||
let e = self.map.get(key)?;
|
|
||||||
|
|
||||||
// if the entry is expired get rid of it now
|
|
||||||
if e.is_expired() {
|
|
||||||
// drop the reference so we don't deadlock
|
|
||||||
drop(e);
|
|
||||||
|
|
||||||
// remove it
|
|
||||||
self.remove(key);
|
|
||||||
|
|
||||||
// and say we never had it
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(e)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get an item from the cache, if it exists.
|
|
||||||
pub fn get(&self, key: &str) -> Option<Bytes> {
|
|
||||||
let e = self._get(key)?;
|
|
||||||
|
|
||||||
if e.update_used {
|
|
||||||
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
|
||||||
}
|
|
||||||
|
|
||||||
Some(e.value.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if we have an item in cache.
|
|
||||||
///
|
|
||||||
/// Returns: `true` if key exists, `false` if it doesn't
|
|
||||||
///
|
|
||||||
/// We don't use [`DashMap::contains_key`] here because it would just do
|
|
||||||
/// the exact same thing I do here, but without running the expiry check logic
|
|
||||||
pub fn has(&self, key: &str) -> bool {
|
|
||||||
self._get(key).is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Returns if an upload is able to be cached
|
|
||||||
/// with the current caching rules
|
|
||||||
#[inline(always)]
|
|
||||||
pub fn will_use(&self, length: usize) -> bool {
|
|
||||||
length <= self.cfg.max_length
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The background job that scans through the cache and removes inactive elements.
|
|
||||||
///
|
|
||||||
/// TODO: see if this is actually less expensive than
|
|
||||||
/// letting each entry keep track of expiry with its own task
|
|
||||||
pub async fn scanner(&self) {
|
|
||||||
let mut interval = time::interval(self.cfg.scan_freq);
|
|
||||||
|
|
||||||
loop {
|
|
||||||
// We put this first so that it doesn't scan the instant the server starts
|
|
||||||
interval.tick().await;
|
|
||||||
|
|
||||||
// Save current timestamp so we aren't retrieving it constantly
|
|
||||||
// If we don't do this it'll be a LOT of system api calls
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
// Collect a list of all the expired keys
|
|
||||||
// If we fail to compare the times, it gets added to the list anyways
|
|
||||||
let expired: Vec<_> = self
|
|
||||||
.map
|
|
||||||
.par_iter()
|
|
||||||
.filter_map(|e| {
|
|
||||||
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
|
||||||
let is_expired = elapsed >= e.lifetime;
|
|
||||||
|
|
||||||
if is_expired {
|
|
||||||
Some(e.key().clone())
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// If we have any, lock the map and drop all of them
|
|
||||||
if !expired.is_empty() {
|
|
||||||
// Use a retain call, should be less locks that way
|
|
||||||
// (instead of many remove calls)
|
|
||||||
self.map.retain(|k, _| !expired.contains(k))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -7,15 +7,10 @@ use tracing_subscriber::filter::LevelFilter;
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub engine: EngineConfig,
|
pub engine: EngineConfig,
|
||||||
pub http: HttpConfig,
|
pub cache: CacheConfig,
|
||||||
pub logger: LoggerConfig,
|
pub logger: LoggerConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_motd() -> String {
|
|
||||||
"breeze file server (v%version%) - currently hosting %uplcount% files".to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[serde_as]
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct EngineConfig {
|
pub struct EngineConfig {
|
||||||
/// The url that the instance of breeze is meant to be accessed from.
|
/// The url that the instance of breeze is meant to be accessed from.
|
||||||
|
@ -23,44 +18,15 @@ pub struct EngineConfig {
|
||||||
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
|
|
||||||
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
|
||||||
#[serde(default)]
|
|
||||||
pub upload_key: String,
|
|
||||||
|
|
||||||
/// Configuration for disk system
|
|
||||||
pub disk: DiskConfig,
|
|
||||||
|
|
||||||
/// Configuration for cache system
|
|
||||||
pub cache: CacheConfig,
|
|
||||||
|
|
||||||
/// Maximum size of an upload that will be accepted.
|
|
||||||
/// Files above this size can not be uploaded.
|
|
||||||
pub max_upload_len: Option<usize>,
|
|
||||||
|
|
||||||
/// Maximum lifetime of a temporary upload
|
|
||||||
#[serde_as(as = "DurationSeconds")]
|
|
||||||
pub max_temp_lifetime: Duration,
|
|
||||||
|
|
||||||
/// Maximum length (in bytes) a file can be before the server will
|
|
||||||
/// decide not to remove its EXIF data.
|
|
||||||
pub max_strip_len: usize,
|
|
||||||
|
|
||||||
/// Motd displayed when the server's index page is visited.
|
|
||||||
///
|
|
||||||
/// This isn't explicitly engine-related but the engine is what gets passed to routes,
|
|
||||||
/// so it is here for now.
|
|
||||||
#[serde(default = "default_motd")]
|
|
||||||
pub motd: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Deserialize, Clone)]
|
|
||||||
pub struct DiskConfig {
|
|
||||||
/// Location on disk the uploads are to be saved to
|
/// Location on disk the uploads are to be saved to
|
||||||
pub save_path: PathBuf,
|
pub save_path: PathBuf,
|
||||||
|
|
||||||
|
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
||||||
|
pub upload_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Deserialize, Clone)]
|
#[derive(Deserialize)]
|
||||||
pub struct CacheConfig {
|
pub struct CacheConfig {
|
||||||
/// The maximum length in bytes that a file can be
|
/// The maximum length in bytes that a file can be
|
||||||
/// before it skips cache (in seconds)
|
/// before it skips cache (in seconds)
|
||||||
|
@ -79,23 +45,11 @@ pub struct CacheConfig {
|
||||||
pub mem_capacity: usize,
|
pub mem_capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct HttpConfig {
|
|
||||||
/// The IP address the HTTP server should listen on
|
|
||||||
pub listen_on: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn default_level_filter() -> LevelFilter {
|
|
||||||
LevelFilter::WARN
|
|
||||||
}
|
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct LoggerConfig {
|
pub struct LoggerConfig {
|
||||||
/// Minimum level a log must be for it to be shown.
|
/// Minimum level a log must be for it to be shown.
|
||||||
/// This defaults to "warn" if not specified.
|
/// This defaults to "warn" if not specified.
|
||||||
#[serde_as(as = "DisplayFromStr")]
|
#[serde_as(as = "Option<DisplayFromStr>")]
|
||||||
// yes... kind of a hack but serde doesn't have anything better
|
pub level: Option<LevelFilter>,
|
||||||
#[serde(default = "default_level_filter")]
|
|
||||||
pub level: LevelFilter,
|
|
||||||
}
|
}
|
||||||
|
|
84
src/disk.rs
84
src/disk.rs
|
@ -1,84 +0,0 @@
|
||||||
use std::path::PathBuf;
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
use tokio::{
|
|
||||||
fs::File,
|
|
||||||
io::{self, AsyncWriteExt},
|
|
||||||
sync::mpsc::{self, Receiver, Sender},
|
|
||||||
};
|
|
||||||
use tracing::debug;
|
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use crate::config;
|
|
||||||
|
|
||||||
/// Provides an API to access the disk file store
|
|
||||||
/// like we access the cache.
|
|
||||||
pub struct Disk {
|
|
||||||
cfg: config::DiskConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Disk {
|
|
||||||
pub fn from_config(cfg: config::DiskConfig) -> Self {
|
|
||||||
Self { cfg }
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Counts the number of files saved to disk we have
|
|
||||||
pub fn count(&self) -> usize {
|
|
||||||
WalkDir::new(&self.cfg.save_path)
|
|
||||||
.min_depth(1)
|
|
||||||
.into_iter()
|
|
||||||
.count()
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Formats the path on disk for a `saved_name`.
|
|
||||||
fn path_for(&self, saved_name: &str) -> PathBuf {
|
|
||||||
let mut p = self.cfg.save_path.clone();
|
|
||||||
p.push(saved_name);
|
|
||||||
|
|
||||||
p
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to open a file on disk, and if we didn't find it,
|
|
||||||
/// then return [`None`].
|
|
||||||
pub async fn open(&self, saved_name: &str) -> Result<Option<File>, io::Error> {
|
|
||||||
let p = self.path_for(saved_name);
|
|
||||||
|
|
||||||
match File::open(p).await {
|
|
||||||
Ok(f) => Ok(Some(f)),
|
|
||||||
Err(e) => match e.kind() {
|
|
||||||
io::ErrorKind::NotFound => Ok(None),
|
|
||||||
_ => Err(e)?, // some other error, send it back
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the size of an upload's file
|
|
||||||
pub async fn len(&self, f: &File) -> Result<usize, io::Error> {
|
|
||||||
Ok(f.metadata().await?.len() as usize)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Create a background I/O task
|
|
||||||
pub async fn start_save(&self, saved_name: &str) -> Sender<Bytes> {
|
|
||||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
|
||||||
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(256);
|
|
||||||
|
|
||||||
let p = self.path_for(saved_name);
|
|
||||||
|
|
||||||
tokio::spawn(async move {
|
|
||||||
// create file to save upload to
|
|
||||||
let mut file = File::create(p)
|
|
||||||
.await
|
|
||||||
.expect("could not open file! make sure your upload path is valid");
|
|
||||||
|
|
||||||
// receive chunks and save them to file
|
|
||||||
while let Some(chunk) = rx.recv().await {
|
|
||||||
debug!("writing chunk to disk (length: {})", chunk.len());
|
|
||||||
file.write_all(&chunk)
|
|
||||||
.await
|
|
||||||
.expect("error while writing file to disk");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
tx
|
|
||||||
}
|
|
||||||
}
|
|
462
src/engine.rs
462
src/engine.rs
|
@ -1,228 +1,196 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::{
|
ffi::OsStr,
|
||||||
atomic::{AtomicUsize, Ordering},
|
path::{Path, PathBuf},
|
||||||
Arc,
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
},
|
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use axum::body::BodyDataStream;
|
use archived::Archive;
|
||||||
|
use axum::extract::BodyStream;
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use img_parts::{DynImage, ImageEXIF};
|
use rand::Rng;
|
||||||
use rand::distributions::{Alphanumeric, DistString};
|
use tokio::{
|
||||||
use tokio::{fs::File, io::AsyncReadExt};
|
fs::File,
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
sync::{
|
||||||
|
mpsc::{self, Receiver, Sender},
|
||||||
|
RwLock,
|
||||||
|
},
|
||||||
|
};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{debug, info};
|
use tracing::{debug, error, info};
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::{cache, config, disk};
|
use crate::view::{ViewError, ViewSuccess};
|
||||||
|
|
||||||
/// Various forms of upload data that can be sent to the client
|
|
||||||
pub enum UploadData {
|
|
||||||
/// Send back the data from memory
|
|
||||||
Cache(Bytes),
|
|
||||||
|
|
||||||
/// Stream the file from disk to the client
|
|
||||||
Disk(File, usize),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Rejection outcomes of an [`Engine::process`] call
|
|
||||||
pub enum ProcessOutcome {
|
|
||||||
/// The upload was successful.
|
|
||||||
/// We give the user their file's URL
|
|
||||||
Success(String),
|
|
||||||
|
|
||||||
/// Occurs when an upload exceeds the chosen maximum file size.
|
|
||||||
UploadTooLarge,
|
|
||||||
|
|
||||||
/// Occurs when a temporary upload is too big to fit in the cache.
|
|
||||||
TemporaryUploadTooLarge,
|
|
||||||
|
|
||||||
/// Occurs when the user-given lifetime is longer than we will allow
|
|
||||||
TemporaryUploadLifetimeTooLong,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// breeze engine! this is the core of everything
|
/// breeze engine! this is the core of everything
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
|
// ------ STATE ------ //
|
||||||
|
/// The in-memory cache that cached uploads are stored in.
|
||||||
|
cache: RwLock<Archive>,
|
||||||
|
|
||||||
/// Cached count of uploaded files.
|
/// Cached count of uploaded files.
|
||||||
pub upl_count: AtomicUsize,
|
pub upl_count: AtomicUsize,
|
||||||
|
|
||||||
/// Engine configuration
|
// ------ CONFIG ------ //
|
||||||
pub cfg: config::EngineConfig,
|
/// The base URL that the server will be accessed from.
|
||||||
|
/// It is only used for formatting returned upload URLs.
|
||||||
|
pub base_url: String,
|
||||||
|
|
||||||
/// The in-memory cache that cached uploads are stored in.
|
/// The path on disk that uploads are saved to.
|
||||||
cache: Arc<cache::Cache>,
|
save_path: PathBuf,
|
||||||
|
|
||||||
/// An interface to the on-disk upload store
|
/// The authorisation key required for uploading new files.
|
||||||
disk: disk::Disk,
|
/// If it is empty, no key will be required.
|
||||||
|
pub upload_key: String,
|
||||||
|
|
||||||
|
/// The maximum size for an upload to be stored in cache.
|
||||||
|
/// Anything bigger skips cache and is read/written to
|
||||||
|
/// directly from disk.
|
||||||
|
cache_max_length: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
/// Creates a new instance of the breeze engine.
|
/// Creates a new instance of the breeze engine.
|
||||||
pub fn from_config(cfg: config::EngineConfig) -> Self {
|
pub fn new(
|
||||||
let cache = cache::Cache::from_config(cfg.cache.clone());
|
base_url: String,
|
||||||
let disk = disk::Disk::from_config(cfg.disk.clone());
|
save_path: PathBuf,
|
||||||
|
upload_key: String,
|
||||||
let cache = Arc::new(cache);
|
cache_max_length: usize,
|
||||||
|
cache_lifetime: Duration,
|
||||||
let cache_scanner = cache.clone();
|
cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items
|
||||||
tokio::spawn(async move { cache_scanner.scanner().await });
|
cache_mem_capacity: usize,
|
||||||
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
// initialise our cached upload count. this doesn't include temp uploads!
|
cache: RwLock::new(Archive::with_full_scan(
|
||||||
upl_count: AtomicUsize::new(disk.count()),
|
cache_full_scan_freq,
|
||||||
|
cache_lifetime,
|
||||||
|
cache_mem_capacity,
|
||||||
|
)),
|
||||||
|
upl_count: AtomicUsize::new(WalkDir::new(&save_path).min_depth(1).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it
|
||||||
|
|
||||||
cfg,
|
base_url,
|
||||||
|
save_path,
|
||||||
|
upload_key,
|
||||||
|
|
||||||
cache,
|
cache_max_length,
|
||||||
disk,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch an upload
|
/// Returns if an upload would be able to be cached
|
||||||
///
|
#[inline(always)]
|
||||||
/// This will first try to read from cache, and then disk after.
|
fn will_use_cache(&self, length: usize) -> bool {
|
||||||
/// If an upload is eligible to be cached, it will be cached and
|
length <= self.cache_max_length
|
||||||
/// sent back as a cache response instead of a disk response.
|
|
||||||
pub async fn get(&self, saved_name: &str) -> anyhow::Result<Option<UploadData>> {
|
|
||||||
// check the cache first
|
|
||||||
if let Some(u) = self.cache.get(saved_name) {
|
|
||||||
return Ok(Some(UploadData::Cache(u)));
|
|
||||||
}
|
|
||||||
|
|
||||||
// now, check if we have it on disk
|
|
||||||
let mut f = if let Some(f) = self.disk.open(saved_name).await? {
|
|
||||||
f
|
|
||||||
} else {
|
|
||||||
// file didn't exist
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
let len = self.disk.len(&f).await?;
|
|
||||||
|
|
||||||
// can this be recached?
|
|
||||||
if self.cache.will_use(len) {
|
|
||||||
// read file from disk
|
|
||||||
let mut full = BytesMut::with_capacity(len);
|
|
||||||
|
|
||||||
// read file from disk and if it fails at any point, return 500
|
|
||||||
loop {
|
|
||||||
match f.read_buf(&mut full).await {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => Err(e)?,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let full = full.freeze();
|
|
||||||
|
|
||||||
// re-insert it into cache
|
|
||||||
self.cache.add(saved_name, full.clone());
|
|
||||||
|
|
||||||
return Ok(Some(UploadData::Cache(full)));
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Some(UploadData::Disk(f, len)))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn has(&self, saved_name: &str) -> bool {
|
/// Check if an upload exists in cache or on disk
|
||||||
if self.cache.has(saved_name) {
|
pub async fn upload_exists(&self, path: &Path) -> bool {
|
||||||
|
let cache = self.cache.read().await;
|
||||||
|
|
||||||
|
// extract file name, since that's what cache uses
|
||||||
|
let name = path
|
||||||
|
.file_name()
|
||||||
|
.and_then(OsStr::to_str)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// check in cache
|
||||||
|
if cache.contains_key(&name) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// sidestep handling the error properly
|
// check on disk
|
||||||
// that way we can call this in gen_saved_name easier
|
if path.exists() {
|
||||||
if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
false
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate a new saved name for an upload.
|
/// Generate a new save path for an upload.
|
||||||
///
|
///
|
||||||
/// This will call itself recursively if it picks
|
/// This will call itself recursively if it picks
|
||||||
/// a name that's already used. (it is rare)
|
/// a name that's already used. (it is rare)
|
||||||
#[async_recursion::async_recursion]
|
#[async_recursion::async_recursion]
|
||||||
pub async fn gen_saved_name(&self, ext: &str) -> String {
|
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
|
||||||
// generate a 6-character alphanumeric string
|
// generate a 6-character alphanumeric string
|
||||||
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
let id: String = rand::thread_rng()
|
||||||
|
.sample_iter(&rand::distributions::Alphanumeric)
|
||||||
|
.take(6)
|
||||||
|
.map(char::from)
|
||||||
|
.collect();
|
||||||
|
|
||||||
// if we have an extension, add it now
|
// extract the extension from the original path
|
||||||
if !ext.is_empty() {
|
let original_extension = original_path
|
||||||
saved_name.push('.');
|
.extension()
|
||||||
saved_name.push_str(ext);
|
.and_then(OsStr::to_str)
|
||||||
}
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
if !self.has(&saved_name).await {
|
// path on disk
|
||||||
saved_name
|
let mut path = self.save_path.clone();
|
||||||
|
path.push(&id);
|
||||||
|
path.set_extension(original_extension);
|
||||||
|
|
||||||
|
if !self.upload_exists(&path).await {
|
||||||
|
path
|
||||||
} else {
|
} else {
|
||||||
// we had a name collision! try again..
|
// we had a name collision! try again..
|
||||||
info!("name collision! saved_name= {}", saved_name);
|
self.gen_path(original_path).await
|
||||||
self.gen_saved_name(ext).await
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save a file to disk, and optionally cache.
|
/// Process an upload.
|
||||||
///
|
/// This is called by the /new route.
|
||||||
/// This also handles custom file lifetimes and EXIF data removal.
|
pub async fn process_upload(
|
||||||
pub async fn save(
|
|
||||||
&self,
|
&self,
|
||||||
saved_name: &str,
|
path: PathBuf,
|
||||||
provided_len: usize,
|
name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path
|
||||||
mut use_cache: bool,
|
content_length: usize,
|
||||||
mut stream: BodyDataStream,
|
mut stream: BodyStream,
|
||||||
lifetime: Option<Duration>,
|
) {
|
||||||
keep_exif: bool,
|
// if the upload size is smaller than the specified maximum, we use the cache!
|
||||||
) -> Result<(), anyhow::Error> {
|
let mut use_cache = self.will_use_cache(content_length);
|
||||||
|
|
||||||
// if we're using cache, make some space to store the upload in
|
// if we're using cache, make some space to store the upload in
|
||||||
let mut data = if use_cache {
|
let mut data = if use_cache {
|
||||||
BytesMut::with_capacity(provided_len)
|
BytesMut::with_capacity(content_length)
|
||||||
} else {
|
} else {
|
||||||
BytesMut::new()
|
BytesMut::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
// don't begin a disk save if we're using temporary lifetimes
|
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||||
let tx = if lifetime.is_none() {
|
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(1);
|
||||||
Some(self.disk.start_save(saved_name).await)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
// whether or not we're gonna coalesce the data
|
tokio::spawn(async move {
|
||||||
// in order to strip the exif data at the end,
|
// create file to save upload to
|
||||||
// instead of just sending it off to the i/o task
|
let mut file = File::create(path)
|
||||||
let coalesce_and_strip = use_cache
|
.await
|
||||||
&& matches!(
|
.expect("could not open file! make sure your upload path is valid");
|
||||||
std::path::Path::new(saved_name)
|
|
||||||
.extension()
|
// receive chunks and save them to file
|
||||||
.map(|s| s.to_str()),
|
while let Some(chunk) = rx.recv().await {
|
||||||
Some(Some("png" | "jpg" | "jpeg" | "webp" | "tiff"))
|
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||||
)
|
file.write_all(&chunk)
|
||||||
&& !keep_exif
|
.await
|
||||||
&& provided_len <= self.cfg.max_strip_len;
|
.expect("error while writing file to disk");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
// read and save upload
|
// read and save upload
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
// if we error on a chunk, fail out
|
let chunk = chunk.unwrap();
|
||||||
let chunk = chunk?;
|
|
||||||
|
|
||||||
// if we have an i/o task, send it off
|
// send chunk to io task
|
||||||
// also cloning this is okay because it's a Bytes
|
debug!("sending data to io task");
|
||||||
if !coalesce_and_strip {
|
tx.send(chunk.clone())
|
||||||
if let Some(ref tx) = tx {
|
.await
|
||||||
debug!("sending chunk to i/o task");
|
.expect("failed to send data to io task");
|
||||||
tx.send(chunk.clone()).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
debug!("receiving data into buffer");
|
debug!("receiving data into buffer");
|
||||||
|
|
||||||
if data.len() + chunk.len() > data.capacity() {
|
if data.len() + chunk.len() > data.capacity() {
|
||||||
info!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
error!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||||
|
|
||||||
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
||||||
data = BytesMut::new();
|
data = BytesMut::new();
|
||||||
|
@ -233,99 +201,109 @@ impl Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let data = data.freeze();
|
// insert upload into cache if necessary
|
||||||
|
|
||||||
// we coalesced the data instead of streaming to disk,
|
|
||||||
// strip the exif data and send it off now
|
|
||||||
let data = if coalesce_and_strip {
|
|
||||||
// strip the exif if we can
|
|
||||||
// if we can't, then oh well
|
|
||||||
let data = if let Ok(Some(data)) = DynImage::from_bytes(data.clone()).map(|o| {
|
|
||||||
o.map(|mut img| {
|
|
||||||
img.set_exif(None);
|
|
||||||
img.encoder().bytes()
|
|
||||||
})
|
|
||||||
}) {
|
|
||||||
info!("stripped exif data");
|
|
||||||
data
|
|
||||||
} else {
|
|
||||||
info!("failed to strip exif data");
|
|
||||||
data
|
|
||||||
};
|
|
||||||
|
|
||||||
// send what we did over to the i/o task, all in one chunk
|
|
||||||
if let Some(ref tx) = tx {
|
|
||||||
debug!("sending filled buffer to i/o task");
|
|
||||||
tx.send(data.clone()).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
data
|
|
||||||
} else {
|
|
||||||
// or, we didn't do that
|
|
||||||
// keep the data as it is
|
|
||||||
data
|
|
||||||
};
|
|
||||||
|
|
||||||
// insert upload into cache if we're using it
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
|
let mut cache = self.cache.write().await;
|
||||||
|
|
||||||
info!("caching upload!");
|
info!("caching upload!");
|
||||||
match lifetime {
|
cache.insert(name, data.freeze());
|
||||||
Some(lt) => self.cache.add_with_lifetime(saved_name, data, lt, false),
|
|
||||||
None => self.cache.add(saved_name, data),
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("finished processing upload!!");
|
info!("finished processing upload!!");
|
||||||
|
|
||||||
// if all goes well, increment the cached upload counter
|
// if all goes well, increment the cached upload counter
|
||||||
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process(
|
/// Read an upload from cache, if it exists.
|
||||||
&self,
|
///
|
||||||
ext: &str,
|
/// Previously, this would lock the cache as
|
||||||
provided_len: usize,
|
/// writable to renew the upload's cache lifespan.
|
||||||
stream: BodyDataStream,
|
/// Locking the cache as readable allows multiple concurrent
|
||||||
lifetime: Option<Duration>,
|
/// readers though, which allows me to handle multiple views concurrently.
|
||||||
keep_exif: bool,
|
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
|
||||||
) -> Result<ProcessOutcome, anyhow::Error> {
|
let cache = self.cache.read().await;
|
||||||
// if the upload size is greater than our max file size, deny it now
|
|
||||||
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
// fetch upload data from cache
|
||||||
return Ok(ProcessOutcome::UploadTooLarge);
|
cache.get(name).map(ToOwned::to_owned)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reads an upload, from cache or on disk.
|
||||||
|
pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> {
|
||||||
|
// extract upload file name
|
||||||
|
let name = original_path
|
||||||
|
.file_name()
|
||||||
|
.and_then(OsStr::to_str)
|
||||||
|
.unwrap_or_default()
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
// path on disk
|
||||||
|
let mut path = self.save_path.clone();
|
||||||
|
path.push(&name);
|
||||||
|
|
||||||
|
// check if the upload exists, if not then 404
|
||||||
|
if !self.upload_exists(&path).await {
|
||||||
|
return Err(ViewError::NotFound);
|
||||||
}
|
}
|
||||||
|
|
||||||
// if the upload size is smaller than the specified maximum, we use the cache!
|
// attempt to read upload from cache
|
||||||
let use_cache: bool = self.cache.will_use(provided_len);
|
let cached_data = self.read_cached_upload(&name).await;
|
||||||
|
|
||||||
// if a temp file is too big for cache, reject it now
|
if let Some(data) = cached_data {
|
||||||
if lifetime.is_some() && !use_cache {
|
info!("got upload from cache!");
|
||||||
return Ok(ProcessOutcome::TemporaryUploadTooLarge);
|
|
||||||
|
Ok(ViewSuccess::FromCache(data))
|
||||||
|
} else {
|
||||||
|
// we already know the upload exists by now so this is okay
|
||||||
|
let mut file = File::open(&path).await.unwrap();
|
||||||
|
|
||||||
|
// read upload length from disk
|
||||||
|
let metadata = file.metadata().await;
|
||||||
|
|
||||||
|
if metadata.is_err() {
|
||||||
|
error!("failed to get upload file metadata!");
|
||||||
|
return Err(ViewError::InternalServerError);
|
||||||
|
}
|
||||||
|
|
||||||
|
let metadata = metadata.unwrap();
|
||||||
|
|
||||||
|
let length = metadata.len() as usize;
|
||||||
|
|
||||||
|
debug!("read upload from disk, size = {}", length);
|
||||||
|
|
||||||
|
// if the upload is okay to cache, recache it and send a fromcache response
|
||||||
|
if self.will_use_cache(length) {
|
||||||
|
// read file from disk
|
||||||
|
let mut data = BytesMut::with_capacity(length);
|
||||||
|
|
||||||
|
// read file from disk and if it fails at any point, return 500
|
||||||
|
loop {
|
||||||
|
match file.read_buf(&mut data).await {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
return Err(ViewError::InternalServerError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let data = data.freeze();
|
||||||
|
|
||||||
|
// re-insert it into cache
|
||||||
|
let mut cache = self.cache.write().await;
|
||||||
|
cache.insert(name, data.clone());
|
||||||
|
|
||||||
|
info!("recached upload from disk!");
|
||||||
|
|
||||||
|
return Ok(ViewSuccess::FromCache(data));
|
||||||
|
}
|
||||||
|
|
||||||
|
info!("got upload from disk!");
|
||||||
|
|
||||||
|
Ok(ViewSuccess::FromDisk(file))
|
||||||
}
|
}
|
||||||
|
|
||||||
// if a temp file's lifetime is too long, reject it now
|
|
||||||
if lifetime.is_some_and(|lt| lt > self.cfg.max_temp_lifetime) {
|
|
||||||
return Ok(ProcessOutcome::TemporaryUploadLifetimeTooLong);
|
|
||||||
}
|
|
||||||
|
|
||||||
// generate the file name
|
|
||||||
let saved_name = self.gen_saved_name(ext).await;
|
|
||||||
|
|
||||||
// save it
|
|
||||||
self.save(
|
|
||||||
&saved_name,
|
|
||||||
provided_len,
|
|
||||||
use_cache,
|
|
||||||
stream,
|
|
||||||
lifetime,
|
|
||||||
keep_exif,
|
|
||||||
)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
// format and send back the url
|
|
||||||
let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
|
|
||||||
|
|
||||||
Ok(ProcessOutcome::Success(url))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
11
src/index.rs
11
src/index.rs
|
@ -6,19 +6,12 @@ use axum::extract::State;
|
||||||
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
||||||
let count = engine.upl_count.load(Ordering::Relaxed);
|
let count = engine.upl_count.load(Ordering::Relaxed);
|
||||||
|
|
||||||
let motd = engine.cfg.motd.clone();
|
format!("minish's image host, currently hosting {} files", count)
|
||||||
|
|
||||||
motd.replace("%version%", env!("CARGO_PKG_VERSION"))
|
|
||||||
.replace("%uplcount%", &count.to_string())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn robots_txt() -> &'static str {
|
pub async fn robots_txt() -> &'static str {
|
||||||
/// robots.txt that tells web crawlers not to list uploads
|
/// robots.txt that tells web crawlers not to list uploads
|
||||||
const ROBOTS_TXT: &str = concat!(
|
const ROBOTS_TXT: &str = concat!("User-Agent: *\n", "Disallow: /p/*\n", "Allow: /\n");
|
||||||
"User-Agent: *\n",
|
|
||||||
"Disallow: /p/*\n",
|
|
||||||
"Allow: /\n"
|
|
||||||
);
|
|
||||||
|
|
||||||
ROBOTS_TXT
|
ROBOTS_TXT
|
||||||
}
|
}
|
||||||
|
|
62
src/main.rs
62
src/main.rs
|
@ -1,64 +1,64 @@
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use argh::FromArgs;
|
extern crate axum;
|
||||||
|
|
||||||
|
use clap::Parser;
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use tokio::{fs, net::TcpListener, signal};
|
use tokio::{fs, signal};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
use tracing_subscriber::filter::LevelFilter;
|
||||||
|
|
||||||
mod cache;
|
|
||||||
mod config;
|
mod config;
|
||||||
mod disk;
|
|
||||||
mod engine;
|
mod engine;
|
||||||
mod index;
|
mod index;
|
||||||
mod new;
|
mod new;
|
||||||
mod view;
|
mod view;
|
||||||
|
|
||||||
/// breeze file server.
|
#[derive(Parser, Debug)]
|
||||||
#[derive(FromArgs, Debug)]
|
|
||||||
struct Args {
|
struct Args {
|
||||||
/// the path to *.toml configuration file
|
/// The path to configuration file
|
||||||
#[argh(option, short = 'c', arg_name = "file")]
|
config: Option<PathBuf>,
|
||||||
config: PathBuf,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
// read & parse args
|
// read & parse args
|
||||||
let args: Args = argh::from_env();
|
let args = Args::parse();
|
||||||
|
|
||||||
// read & parse config
|
// read & parse config
|
||||||
let cfg: config::Config = {
|
let config_str = fs::read_to_string(args.config.unwrap_or("./breeze.toml".into()))
|
||||||
let config_str = fs::read_to_string(args.config).await.expect(
|
.await
|
||||||
"failed to read config file! make sure it exists and you have read permissions",
|
.expect("failed to read config file! make sure it exists and you have read permissions");
|
||||||
);
|
|
||||||
|
|
||||||
toml::from_str(&config_str).unwrap_or_else(|e| {
|
let c: config::Config = toml::from_str(&config_str).expect("invalid config! check that you have included all required options and structured it properly (no config options expecting a number getting a string, etc.)");
|
||||||
panic!("invalid config! ensure proper fields and structure. reference config is in readme.\n{e}");
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_max_level(cfg.logger.level)
|
.with_max_level(c.logger.level.unwrap_or(LevelFilter::WARN))
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
{
|
if !c.engine.save_path.exists() || !c.engine.save_path.is_dir() {
|
||||||
let save_path = cfg.engine.disk.save_path.clone();
|
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||||
if !save_path.exists() || !save_path.is_dir() {
|
|
||||||
panic!("the save path does not exist or is not a directory! this is invalid");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.engine.upload_key.is_empty() {
|
if c.engine.upload_key.is_none() {
|
||||||
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
||||||
}
|
}
|
||||||
|
|
||||||
// create engine
|
// create engine
|
||||||
let engine = Engine::from_config(cfg.engine);
|
let engine = Engine::new(
|
||||||
|
c.engine.base_url,
|
||||||
|
c.engine.save_path,
|
||||||
|
c.engine.upload_key.unwrap_or_default(),
|
||||||
|
c.cache.max_length,
|
||||||
|
c.cache.upload_lifetime,
|
||||||
|
c.cache.scan_freq,
|
||||||
|
c.cache.mem_capacity,
|
||||||
|
);
|
||||||
|
|
||||||
// build main router
|
// build main router
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
|
@ -69,15 +69,11 @@ async fn main() {
|
||||||
.with_state(Arc::new(engine));
|
.with_state(Arc::new(engine));
|
||||||
|
|
||||||
// start web server
|
// start web server
|
||||||
let listener = TcpListener::bind(&cfg.http.listen_on)
|
axum::Server::bind(&"0.0.0.0:8000".parse().unwrap())
|
||||||
.await
|
.serve(app.into_make_service())
|
||||||
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
|
|
||||||
|
|
||||||
info!("starting server.");
|
|
||||||
axum::serve(listener, app)
|
|
||||||
.with_graceful_shutdown(shutdown_signal())
|
.with_graceful_shutdown(shutdown_signal())
|
||||||
.await
|
.await
|
||||||
.expect("failed to start server");
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
|
|
85
src/new.rs
85
src/new.rs
|
@ -1,58 +1,47 @@
|
||||||
use std::{ffi::OsStr, path::PathBuf, sync::Arc, time::Duration};
|
use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
extract::{BodyStream, Query, State},
|
||||||
extract::{Query, State},
|
http::HeaderValue,
|
||||||
};
|
};
|
||||||
use http::{header, HeaderMap, HeaderValue, StatusCode};
|
use hyper::{header, HeaderMap, StatusCode};
|
||||||
use serde::Deserialize;
|
|
||||||
use serde_with::{serde_as, DurationSeconds};
|
|
||||||
|
|
||||||
use crate::engine::ProcessOutcome;
|
|
||||||
|
|
||||||
fn default_keep_exif() -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
#[serde_as]
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct NewRequest {
|
|
||||||
name: String,
|
|
||||||
key: Option<String>,
|
|
||||||
|
|
||||||
#[serde(rename = "lastfor")]
|
|
||||||
#[serde_as(as = "Option<DurationSeconds>")]
|
|
||||||
last_for: Option<Duration>,
|
|
||||||
|
|
||||||
#[serde(rename = "keepexif", default = "default_keep_exif")]
|
|
||||||
keep_exif: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The request handler for the /new path.
|
/// The request handler for the /new path.
|
||||||
/// This handles all new uploads.
|
/// This handles all new uploads.
|
||||||
#[axum::debug_handler]
|
#[axum::debug_handler]
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Query(req): Query<NewRequest>,
|
Query(params): Query<HashMap<String, String>>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
body: Body,
|
stream: BodyStream,
|
||||||
) -> Result<String, StatusCode> {
|
) -> Result<String, StatusCode> {
|
||||||
|
let key = params.get("key");
|
||||||
|
|
||||||
|
const EMPTY_STRING: &String = &String::new();
|
||||||
|
|
||||||
// check upload key, if i need to
|
// check upload key, if i need to
|
||||||
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
if !engine.upload_key.is_empty() && key.unwrap_or(EMPTY_STRING) != &engine.upload_key {
|
||||||
return Err(StatusCode::FORBIDDEN);
|
return Err(StatusCode::FORBIDDEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let original_name = params.get("name");
|
||||||
|
|
||||||
// the original file name wasn't given, so i can't work out what the extension should be
|
// the original file name wasn't given, so i can't work out what the extension should be
|
||||||
if req.name.is_empty() {
|
if original_name.is_none() {
|
||||||
return Err(StatusCode::BAD_REQUEST);
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
let extension = PathBuf::from(req.name)
|
let original_path = PathBuf::from(original_name.unwrap());
|
||||||
.extension()
|
|
||||||
|
let path = engine.gen_path(&original_path).await;
|
||||||
|
let name = path
|
||||||
|
.file_name()
|
||||||
.and_then(OsStr::to_str)
|
.and_then(OsStr::to_str)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
|
let url = format!("{}/p/{}", engine.base_url, name);
|
||||||
|
|
||||||
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
||||||
let content_length = headers
|
let content_length = headers
|
||||||
.get(header::CONTENT_LENGTH)
|
.get(header::CONTENT_LENGTH)
|
||||||
|
@ -62,34 +51,10 @@ pub async fn new(
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap_or(usize::MAX);
|
.unwrap_or(usize::MAX);
|
||||||
|
|
||||||
// turn body into stream
|
|
||||||
let stream = Body::into_data_stream(body);
|
|
||||||
|
|
||||||
// pass it off to the engine to be processed!
|
// pass it off to the engine to be processed!
|
||||||
match engine
|
engine
|
||||||
.process(
|
.process_upload(path, name, content_length, stream)
|
||||||
&extension,
|
.await;
|
||||||
content_length,
|
|
||||||
stream,
|
|
||||||
req.last_for,
|
|
||||||
req.keep_exif,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(outcome) => match outcome {
|
|
||||||
// 200 OK
|
|
||||||
ProcessOutcome::Success(url) => Ok(url),
|
|
||||||
|
|
||||||
// 413 Payload Too Large
|
Ok(url)
|
||||||
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
|
||||||
Err(StatusCode::PAYLOAD_TOO_LARGE)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 400 Bad Request
|
|
||||||
ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST),
|
|
||||||
},
|
|
||||||
|
|
||||||
// 500 Internal Server Error
|
|
||||||
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
92
src/view.rs
92
src/view.rs
|
@ -1,39 +1,86 @@
|
||||||
use std::{ffi::OsStr, path::PathBuf, sync::Arc};
|
use std::{
|
||||||
|
path::{Component, PathBuf},
|
||||||
|
sync::Arc,
|
||||||
|
};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
body::StreamBody,
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
|
||||||
use http::{HeaderValue, StatusCode};
|
use bytes::Bytes;
|
||||||
|
use hyper::{http::HeaderValue, StatusCode};
|
||||||
|
use tokio::{fs::File, runtime::Handle};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
|
use tracing::{error, debug, warn};
|
||||||
|
|
||||||
use crate::engine::UploadData;
|
/// Responses for a successful view operation
|
||||||
|
pub enum ViewSuccess {
|
||||||
|
/// A file read from disk, suitable for larger files.
|
||||||
|
///
|
||||||
|
/// The file provided will be streamed from disk and
|
||||||
|
/// back to the viewer.
|
||||||
|
///
|
||||||
|
/// This is only ever used if a file exceeds the
|
||||||
|
/// cache's maximum file size.
|
||||||
|
FromDisk(File),
|
||||||
|
|
||||||
|
/// A file read from in-memory cache, best for smaller files.
|
||||||
|
///
|
||||||
|
/// The file is taken from the cache in its entirety
|
||||||
|
/// and sent back to the viewer.
|
||||||
|
///
|
||||||
|
/// If a file can be fit into cache, this will be
|
||||||
|
/// used even if it's read from disk.
|
||||||
|
FromCache(Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
/// Responses for a failed view operation
|
/// Responses for a failed view operation
|
||||||
pub enum ViewError {
|
pub enum ViewError {
|
||||||
/// Will send status code 404 with a plaintext "not found" message.
|
/// Will send status code 404 witha plaintext "not found" message.
|
||||||
NotFound,
|
NotFound,
|
||||||
|
|
||||||
/// Will send status code 500 with a plaintext "internal server error" message.
|
/// Will send status code 500 with a plaintext "internal server error" message.
|
||||||
InternalServerError,
|
InternalServerError,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoResponse for UploadData {
|
impl IntoResponse for ViewSuccess {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
match self {
|
match self {
|
||||||
UploadData::Disk(file, len) => {
|
ViewSuccess::FromDisk(file) => {
|
||||||
// create our content-length header
|
// get handle to current tokio runtime
|
||||||
let len_str = len.to_string();
|
// i use this to block on futures here (not async)
|
||||||
|
let handle = Handle::current();
|
||||||
|
let _ = handle.enter();
|
||||||
|
|
||||||
|
// read the metadata of the file on disk
|
||||||
|
// this function isn't async
|
||||||
|
// .. so we have to use handle.block_on() to get the metadata
|
||||||
|
let metadata = futures::executor::block_on(file.metadata());
|
||||||
|
|
||||||
|
// if we error then return 500
|
||||||
|
if metadata.is_err() {
|
||||||
|
error!("failed to read metadata from disk");
|
||||||
|
return ViewError::InternalServerError.into_response();
|
||||||
|
}
|
||||||
|
|
||||||
|
// unwrap (which we know is safe) and read the file size as a string
|
||||||
|
let metadata = metadata.unwrap();
|
||||||
|
let len_str = metadata.len().to_string();
|
||||||
|
|
||||||
|
debug!("file is {} bytes on disk", &len_str);
|
||||||
|
|
||||||
|
// HeaderValue::from_str will never error if only visible ASCII characters are passed (32-127)
|
||||||
|
// .. so unwrapping this should be fine
|
||||||
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
||||||
|
|
||||||
// create a streamed body response (we want to stream larger files)
|
// create a streamed body response (we want to stream larger files)
|
||||||
let stream = ReaderStream::new(file);
|
let reader = ReaderStream::new(file);
|
||||||
let body = Body::from_stream(stream);
|
let stream = StreamBody::new(reader);
|
||||||
|
|
||||||
// extract mutable headers from the response
|
// extract mutable headers from the response
|
||||||
let mut res = body.into_response();
|
let mut res = stream.into_response();
|
||||||
let headers = res.headers_mut();
|
let headers = res.headers_mut();
|
||||||
|
|
||||||
// clear headers, browser can imply content type
|
// clear headers, browser can imply content type
|
||||||
|
@ -45,7 +92,7 @@ impl IntoResponse for UploadData {
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
UploadData::Cache(data) => {
|
ViewSuccess::FromCache(data) => {
|
||||||
// extract mutable headers from the response
|
// extract mutable headers from the response
|
||||||
let mut res = data.into_response();
|
let mut res = data.into_response();
|
||||||
let headers = res.headers_mut();
|
let headers = res.headers_mut();
|
||||||
|
@ -81,17 +128,16 @@ impl IntoResponse for ViewError {
|
||||||
pub async fn view(
|
pub async fn view(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Path(original_path): Path<PathBuf>,
|
Path(original_path): Path<PathBuf>,
|
||||||
) -> Result<UploadData, ViewError> {
|
) -> Result<ViewSuccess, ViewError> {
|
||||||
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
|
// (hopefully) prevent path traversal, just check for any non-file components
|
||||||
n
|
if original_path
|
||||||
} else {
|
.components()
|
||||||
|
.any(|x| !matches!(x, Component::Normal(_)))
|
||||||
|
{
|
||||||
|
warn!("a request attempted path traversal");
|
||||||
return Err(ViewError::NotFound);
|
return Err(ViewError::NotFound);
|
||||||
};
|
}
|
||||||
|
|
||||||
// get result from the engine!
|
// get result from the engine!
|
||||||
match engine.get(saved_name).await {
|
engine.get_upload(&original_path).await
|
||||||
Ok(Some(u)) => Ok(u),
|
|
||||||
Ok(None) => Err(ViewError::NotFound),
|
|
||||||
Err(_) => Err(ViewError::InternalServerError),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue