Compare commits
30 Commits
Author | SHA1 | Date |
---|---|---|
minish | 2e65f3744b | |
minish | ec050b19a5 | |
minish | 495aee464d | |
minish | 1fc0ecad20 | |
minish | 85369049a5 | |
minish | 80b1727d09 | |
minish | 62e2598e72 | |
minish | 27a71ae862 | |
minish | e8e3302f82 | |
minish | 9109f822eb | |
minish | 030a38ded4 | |
minish | 51c4b72626 | |
minish | 7a3b8a66e2 | |
minish | 76701113c5 | |
minish | 2e92ab4bf0 | |
minish | 7bfbe3b1fe | |
minish | 0a98b6b8ba | |
minish | 38c4447da8 | |
minish | 698643988a | |
minish | 735cbb7428 | |
minish | 622dfe8ae0 | |
minish | 3513337ac7 | |
minish | 661f2f14dd | |
minish | 0d176bca40 | |
minish | a0ffd1ddd1 | |
minish | f5c67c64d7 | |
minish | a315baa258 | |
minish | 2aa97e05b4 | |
minish | 5f8adf023f | |
minish | d9f560677a |
|
@ -0,0 +1,4 @@
|
||||||
|
if ! has nix_direnv_version || ! nix_direnv_version 3.0.6; then
|
||||||
|
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.6/direnvrc" "sha256-RYcUJaRMf8oF5LznDrlCXbkOQrywm0HDv1VjYGaJGdM="
|
||||||
|
fi
|
||||||
|
use flake
|
|
@ -1,2 +1,5 @@
|
||||||
# binaries
|
# binaries
|
||||||
/target
|
/target
|
||||||
|
|
||||||
|
# nix-direnv
|
||||||
|
/.direnv
|
||||||
|
|
File diff suppressed because it is too large
Load Diff
30
Cargo.toml
30
Cargo.toml
|
@ -1,25 +1,27 @@
|
||||||
[package]
|
[package]
|
||||||
name = "breeze"
|
name = "breeze"
|
||||||
version = "0.1.5"
|
version = "0.2.6"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
axum = { version = "0.6.1", features = ["macros"] }
|
axum = { version = "0.7.5", features = ["macros", "http2"] }
|
||||||
hyper = { version = "0.14", features = ["full"] }
|
tower = "0.4.13"
|
||||||
|
http = "1.1.0"
|
||||||
tokio = { version = "1", features = ["full"] }
|
tokio = { version = "1", features = ["full"] }
|
||||||
tokio-util = { version = "0.7.4", features = ["full"] }
|
tokio-util = { version = "0.7.4", features = ["full"] }
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
tower = "0.4.13"
|
|
||||||
bytes = "1"
|
|
||||||
rand = "0.8.5"
|
|
||||||
async-recursion = "1.0.0"
|
|
||||||
walkdir = "2"
|
|
||||||
futures = "0.3"
|
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
archived = { path = "./archived" }
|
bytes = "1"
|
||||||
xxhash-rust = { version = "0.8.7", features = ["xxh3"] }
|
async-recursion = "1.0.0"
|
||||||
serde = { version = "1.0.189", features = ["derive"] }
|
rand = "0.8.5"
|
||||||
toml = "0.8.2"
|
walkdir = "2"
|
||||||
clap = { version = "4.4.6", features = ["derive"] }
|
anyhow = "1.0"
|
||||||
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_with = "3.4.0"
|
serde_with = "3.4.0"
|
||||||
|
toml = "0.8.2"
|
||||||
|
argh = "0.1.12"
|
||||||
|
dashmap = { version = "5.5.3", features = ["rayon", "inline"] }
|
||||||
|
rayon = "1.8"
|
||||||
|
atomic-time = "0.1.4"
|
||||||
|
img-parts = "0.3.0"
|
||||||
|
|
10
Dockerfile
10
Dockerfile
|
@ -1,19 +1,15 @@
|
||||||
# builder
|
# builder
|
||||||
FROM rust:1.73 as builder
|
FROM rust:1.74 as builder
|
||||||
|
|
||||||
WORKDIR /usr/src/breeze
|
WORKDIR /usr/src/breeze
|
||||||
COPY . .
|
COPY . .
|
||||||
RUN cargo install --path .
|
RUN cargo install --path .
|
||||||
|
|
||||||
# runner
|
# runner
|
||||||
FROM debian:bullseye-slim
|
FROM debian:bookworm-slim
|
||||||
|
|
||||||
RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
||||||
|
|
||||||
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
||||||
|
|
||||||
RUN useradd -m runner
|
CMD [ "breeze", "--config", "/etc/breeze.toml" ]
|
||||||
USER runner
|
|
||||||
|
|
||||||
EXPOSE 8000
|
|
||||||
CMD [ "breeze" ]
|
|
||||||
|
|
113
README.md
113
README.md
|
@ -1,12 +1,16 @@
|
||||||
# breeze
|
# breeze
|
||||||
breeze is a simple, performant file upload server.
|
breeze is a simple, performant file upload server.
|
||||||
|
|
||||||
|
The primary instance is https://picture.wtf.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
Compared to the old Express.js backend, breeze has
|
Compared to the old Express.js backend, breeze has
|
||||||
- Streamed uploading
|
- Streamed uploading
|
||||||
- Streamed downloading (on larger files)
|
- Streamed downloading (on larger files)
|
||||||
- Upload caching
|
- Upload caching
|
||||||
- Generally faster speeds overall
|
- Generally faster speeds overall
|
||||||
|
- Temporary uploads
|
||||||
|
- Automatic exif data removal
|
||||||
|
|
||||||
At this time, breeze does not support encrypted uploads on disk.
|
At this time, breeze does not support encrypted uploads on disk.
|
||||||
|
|
||||||
|
@ -15,10 +19,10 @@ I wrote breeze with the intention of running it in a container, but it runs just
|
||||||
|
|
||||||
Either way, you need to start off by cloning the Git repository.
|
Either way, you need to start off by cloning the Git repository.
|
||||||
```bash
|
```bash
|
||||||
git clone https://git.min.rip/minish/breeze.git
|
git clone https://git.min.rip/min/breeze.git
|
||||||
```
|
```
|
||||||
|
|
||||||
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below.
|
To run it in Docker, I recommend using Docker Compose. An example `docker-compose.yaml` configuration is below. You can start it using `docker compose up -d`.
|
||||||
```
|
```
|
||||||
version: '3.6'
|
version: '3.6'
|
||||||
|
|
||||||
|
@ -29,20 +33,19 @@ services:
|
||||||
|
|
||||||
volumes:
|
volumes:
|
||||||
- /srv/uploads:/data
|
- /srv/uploads:/data
|
||||||
|
- ./breeze.toml:/etc/breeze.toml
|
||||||
|
|
||||||
|
user: 1000:1000
|
||||||
|
|
||||||
ports:
|
ports:
|
||||||
- 8000:8000
|
- 8383:8000
|
||||||
|
|
||||||
environment:
|
|
||||||
- BRZ_BASE_URL=http://127.0.0.1:8000
|
|
||||||
- BRZ_SAVE_PATH=/data
|
|
||||||
- BRZ_UPLOAD_KEY=hiiiiiiii
|
|
||||||
- BRZ_CACHE_UPL_MAX_LENGTH=134217728 # allow files up to ~134 MiB to be cached
|
|
||||||
- BRZ_CACHE_UPL_LIFETIME=1800 # let uploads stay in cache for 30 minutes
|
|
||||||
- BRZ_CACHE_SCAN_FREQ=60 # scan the cache for expired files if more than 60 seconds have passed since the last scan
|
|
||||||
- BRZ_CACHE_MEM_CAPACITY=4294967296 # allow 4 GiB of data to be in the cache at once
|
|
||||||
```
|
```
|
||||||
For this configuration, it is expected that there is a clone of the Git repository in the `./breeze` folder. You can start it using `docker compose up -d`.
|
For this configuration, it is expected that:
|
||||||
|
* there is a clone of the Git repository in the `./breeze` folder
|
||||||
|
* there is a `breeze.toml` config file in current directory
|
||||||
|
* there is a directory at `/srv/uploads` for storing uploads
|
||||||
|
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
||||||
|
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
||||||
|
|
||||||
It can also be installed directly if you have the Rust toolchain installed:
|
It can also be installed directly if you have the Rust toolchain installed:
|
||||||
```bash
|
```bash
|
||||||
|
@ -51,22 +54,86 @@ cargo install --path .
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
### Hosting
|
### Hosting
|
||||||
Configuration is read through environment variables, because I wanted to run this using Docker Compose.
|
Configuration is read through a toml file.
|
||||||
```
|
|
||||||
BRZ_BASE_URL - base url for upload urls (ex: http://127.0.0.1:8000 for http://127.0.0.1:8000/p/abcdef.png, http://picture.wtf for http://picture.wtf/p/abcdef.png)
|
The config file path is specified using the `-c`/`--config` command line switch.
|
||||||
BRZ_SAVE_PATH - this should be a path where uploads are saved to disk (ex: /srv/uploads, C:\brzuploads)
|
|
||||||
BRZ_UPLOAD_KEY (optional) - if not empty, the key you specify will be required to upload new files.
|
Here is an example config file:
|
||||||
BRZ_CACHE_UPL_MAX_LENGTH - this is the max length an upload can be in bytes before it won't be cached (ex: 80000000 for 80MB)
|
```toml
|
||||||
BRZ_CACHE_UPL_LIFETIME - this indicates how long an upload will stay in cache (ex: 1800 for 30 minutes, 60 for 1 minute)
|
[engine]
|
||||||
BRZ_CACHE_SCAN_FREQ - this is the frequency of full cache scans, which scan for and remove expired uploads (ex: 60 for 1 minute)
|
# The base URL that the HTTP server will be accessible on.
|
||||||
BRZ_CACHE_MEM_CAPACITY - this is the amount of memory the cache will hold before dropping entries
|
# This is used for formatting upload URLs.
|
||||||
|
# Setting it to "https://picture.wtf" would result in
|
||||||
|
# upload urls of "https://picture.wtf/p/abcdef.png", etc.
|
||||||
|
base_url = "http://127.0.0.1:8000"
|
||||||
|
|
||||||
|
# OPTIONAL - If set, the static key specified will be required to upload new files.
|
||||||
|
# If it is not set, no key will be required.
|
||||||
|
upload_key = "hiiiiiiii"
|
||||||
|
|
||||||
|
# OPTIONAL - specifies what to show when the site is visited on http
|
||||||
|
# It is sent with text/plain content type.
|
||||||
|
# There are two variables you can use:
|
||||||
|
# %uplcount% - total number of uploads present on the server
|
||||||
|
# %version% - current breeze version (e.g. 0.1.5)
|
||||||
|
motd = "my image host, currently hosting %uplcount% files"
|
||||||
|
|
||||||
|
# The maximum lifetime a temporary upload may be given, in seconds.
|
||||||
|
# It's okay to leave this somewhat high because large temporary uploads
|
||||||
|
# will just be bumped out of the cache when a new upload needs to be
|
||||||
|
# cached anyways.
|
||||||
|
max_temp_lifetime = 43200
|
||||||
|
|
||||||
|
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
||||||
|
# A word of warning about this: the error shown to ShareX users who
|
||||||
|
# hit the limit is *not* very clear. ("connection closed" or similar)
|
||||||
|
max_upload_len = 2_147_483_648
|
||||||
|
|
||||||
|
# The maximum length (in bytes) an image file may be before the server
|
||||||
|
# will skip removing its EXIF data.
|
||||||
|
# The performance impact of breeze's EXIF data removal is not
|
||||||
|
# very high in everyday usage, so something like 16MiB is reasonable.
|
||||||
|
max_strip_len = 16_777_216
|
||||||
|
|
||||||
|
[engine.disk]
|
||||||
|
# The location that uploads will be saved to.
|
||||||
|
# It should be a path to a directory on disk that you can write to.
|
||||||
|
save_path = "/data"
|
||||||
|
|
||||||
|
[engine.cache]
|
||||||
|
# The file size (in bytes) that a file must be under
|
||||||
|
# to get cached.
|
||||||
|
max_length = 134_217_728
|
||||||
|
|
||||||
|
# How long a cached upload will remain cached. (in seconds)
|
||||||
|
upload_lifetime = 1800
|
||||||
|
|
||||||
|
# How often the cache will be checked for expired uploads.
|
||||||
|
# It is not a continuous scan, and only is triggered upon a cache operation.
|
||||||
|
scan_freq = 60
|
||||||
|
|
||||||
|
# How much memory (in bytes) the cache is allowed to consume.
|
||||||
|
mem_capacity = 4_294_967_296
|
||||||
|
|
||||||
|
[http]
|
||||||
|
# The address that the HTTP server will listen on. (ip:port)
|
||||||
|
# Use 0.0.0.0 as the IP to listen publicly, 127.0.0.1 only lets your
|
||||||
|
# computer access it
|
||||||
|
listen_on = "127.0.0.1:8000"
|
||||||
|
|
||||||
|
[logger]
|
||||||
|
# OPTIONAL - the current log level.
|
||||||
|
# Default level is warn.
|
||||||
|
level = "warn"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Uploading
|
### Uploading
|
||||||
The HTTP API is fairly simple, and it's pretty easy to make a ShareX configuration for it.
|
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
||||||
|
|
||||||
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
||||||
|
|
||||||
|
Additionally, you may specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, just leave it as default. you'll know if you need it)
|
||||||
|
|
||||||
Here's an example ShareX configuration for it (with a key):
|
Here's an example ShareX configuration for it (with a key):
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
|
|
|
@ -1,2 +0,0 @@
|
||||||
.idea
|
|
||||||
target
|
|
|
@ -1,23 +0,0 @@
|
||||||
# This file is automatically @generated by Cargo.
|
|
||||||
# It is not intended for manual editing.
|
|
||||||
version = 3
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "archived"
|
|
||||||
version = "0.2.0"
|
|
||||||
dependencies = [
|
|
||||||
"bytes",
|
|
||||||
"once_cell",
|
|
||||||
]
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "bytes"
|
|
||||||
version = "1.3.0"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
|
|
||||||
|
|
||||||
[[package]]
|
|
||||||
name = "once_cell"
|
|
||||||
version = "1.3.1"
|
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
|
|
|
@ -1,9 +0,0 @@
|
||||||
[package]
|
|
||||||
name = "archived"
|
|
||||||
version = "0.2.0"
|
|
||||||
edition = "2018"
|
|
||||||
license = "MIT"
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
bytes = "1.3.0"
|
|
||||||
once_cell = "1.3.1"
|
|
|
@ -1,22 +0,0 @@
|
||||||
MIT License
|
|
||||||
|
|
||||||
Copyright (c) 2020 aikidos
|
|
||||||
Copyright (c) 2023 ot2t7, minish
|
|
||||||
|
|
||||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
|
||||||
of this software and associated documentation files (the "Software"), to deal
|
|
||||||
in the Software without restriction, including without limitation the rights
|
|
||||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
|
||||||
copies of the Software, and to permit persons to whom the Software is
|
|
||||||
furnished to do so, subject to the following conditions:
|
|
||||||
|
|
||||||
The above copyright notice and this permission notice shall be included in all
|
|
||||||
copies or substantial portions of the Software.
|
|
||||||
|
|
||||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
|
||||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
|
||||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
|
||||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
|
||||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
|
||||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
|
||||||
SOFTWARE.
|
|
|
@ -1,26 +0,0 @@
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
|
|
||||||
/// Represents a set of eviction and expiration details for a specific cache entry.
|
|
||||||
pub(crate) struct CacheEntry<B> {
|
|
||||||
/// Entry value.
|
|
||||||
pub(crate) value: B,
|
|
||||||
|
|
||||||
/// Expiration time.
|
|
||||||
///
|
|
||||||
/// - [`None`] if the value must be kept forever.
|
|
||||||
pub(crate) expiration_time: SystemTime,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<B> CacheEntry<B> {
|
|
||||||
pub(crate) fn new(value: B, lifetime: Duration) -> Self {
|
|
||||||
Self {
|
|
||||||
expiration_time: SystemTime::now() + lifetime,
|
|
||||||
value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if a entry is expired.
|
|
||||||
pub(crate) fn is_expired(&self, current_time: SystemTime) -> bool {
|
|
||||||
current_time >= self.expiration_time
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,172 +0,0 @@
|
||||||
mod entry;
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
|
|
||||||
use crate::entry::*;
|
|
||||||
use std::collections::hash_map::Entry;
|
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::time::{Duration, SystemTime};
|
|
||||||
|
|
||||||
pub struct Archive {
|
|
||||||
cache_table: HashMap<String, CacheEntry<Bytes>>,
|
|
||||||
full_scan_frequency: Option<Duration>,
|
|
||||||
created_time: SystemTime,
|
|
||||||
last_scan_time: Option<SystemTime>,
|
|
||||||
entry_lifetime: Duration,
|
|
||||||
capacity: usize,
|
|
||||||
length: usize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Archive {
|
|
||||||
/* pub fn new(capacity: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
cache_table: HashMap::new(),
|
|
||||||
full_scan_frequency: None,
|
|
||||||
created_time: SystemTime::now(),
|
|
||||||
last_scan_time: None,
|
|
||||||
capacity,
|
|
||||||
length: 0,
|
|
||||||
}
|
|
||||||
} */
|
|
||||||
|
|
||||||
pub fn with_full_scan(full_scan_frequency: Duration, entry_lifetime: Duration, capacity: usize) -> Self {
|
|
||||||
Self {
|
|
||||||
cache_table: HashMap::with_capacity(256),
|
|
||||||
full_scan_frequency: Some(full_scan_frequency),
|
|
||||||
created_time: SystemTime::now(),
|
|
||||||
last_scan_time: None,
|
|
||||||
entry_lifetime,
|
|
||||||
capacity,
|
|
||||||
length: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn contains_key(&self, key: &String) -> bool {
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.cache_table
|
|
||||||
.get(key)
|
|
||||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
|
||||||
.is_some()
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_last_scan_time(&self) -> Option<SystemTime> {
|
|
||||||
self.last_scan_time
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_full_scan_frequency(&self) -> Option<Duration> {
|
|
||||||
self.full_scan_frequency
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(&self, key: &String) -> Option<&Bytes> {
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.cache_table
|
|
||||||
.get(key)
|
|
||||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
|
||||||
.map(|cache_entry| &cache_entry.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get_or_insert<F>(
|
|
||||||
&mut self,
|
|
||||||
key: String,
|
|
||||||
factory: F,
|
|
||||||
) -> &Bytes
|
|
||||||
where
|
|
||||||
F: Fn() -> Bytes,
|
|
||||||
{
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.try_full_scan_expired_items(now);
|
|
||||||
|
|
||||||
match self.cache_table.entry(key) {
|
|
||||||
Entry::Occupied(mut occupied) => {
|
|
||||||
if occupied.get().is_expired(now) {
|
|
||||||
occupied.insert(CacheEntry::new(factory(), self.entry_lifetime));
|
|
||||||
}
|
|
||||||
|
|
||||||
&occupied.into_mut().value
|
|
||||||
}
|
|
||||||
Entry::Vacant(vacant) => &vacant.insert(CacheEntry::new(factory(), self.entry_lifetime)).value,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn insert(
|
|
||||||
&mut self,
|
|
||||||
key: String,
|
|
||||||
value: Bytes,
|
|
||||||
) -> Option<Bytes> {
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.try_full_scan_expired_items(now);
|
|
||||||
|
|
||||||
if value.len() + self.length > self.capacity {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.length += value.len();
|
|
||||||
|
|
||||||
self.cache_table
|
|
||||||
.insert(key, CacheEntry::new(value, self.entry_lifetime))
|
|
||||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
|
||||||
.map(|cache_entry| cache_entry.value)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn remove(&mut self, key: &String) -> Option<Bytes> {
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.try_full_scan_expired_items(now);
|
|
||||||
|
|
||||||
let mut removed_len: usize = 0;
|
|
||||||
let result = self
|
|
||||||
.cache_table
|
|
||||||
.remove(key)
|
|
||||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
|
||||||
.and_then(|o| {
|
|
||||||
removed_len += o.value.len();
|
|
||||||
return Some(o);
|
|
||||||
})
|
|
||||||
.map(|cache_entry| cache_entry.value);
|
|
||||||
self.length -= removed_len;
|
|
||||||
return result;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn renew(&mut self, key: &String) -> Option<()> {
|
|
||||||
let now = SystemTime::now();
|
|
||||||
|
|
||||||
self.try_full_scan_expired_items(now);
|
|
||||||
|
|
||||||
let entry = self.cache_table.get_mut(key);
|
|
||||||
|
|
||||||
match entry {
|
|
||||||
Some(entry) => {
|
|
||||||
entry.expiration_time = now + self.entry_lifetime;
|
|
||||||
|
|
||||||
Some(())
|
|
||||||
}
|
|
||||||
None => None
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn try_full_scan_expired_items(&mut self, current_time: SystemTime) {
|
|
||||||
if let Some(full_scan_frequency) = self.full_scan_frequency {
|
|
||||||
let since = current_time
|
|
||||||
.duration_since(self.last_scan_time.unwrap_or(self.created_time))
|
|
||||||
.unwrap();
|
|
||||||
|
|
||||||
if since >= full_scan_frequency {
|
|
||||||
let mut removed_len = 0;
|
|
||||||
self.cache_table.retain(|_, cache_entry| {
|
|
||||||
if cache_entry.is_expired(current_time) {
|
|
||||||
removed_len += cache_entry.value.len();
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
});
|
|
||||||
self.length -= removed_len;
|
|
||||||
|
|
||||||
self.last_scan_time = Some(current_time);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,77 @@
|
||||||
|
{
|
||||||
|
"nodes": {
|
||||||
|
"crane": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1725409566,
|
||||||
|
"narHash": "sha256-PrtLmqhM6UtJP7v7IGyzjBFhbG4eOAHT6LPYOFmYfbk=",
|
||||||
|
"owner": "ipetkov",
|
||||||
|
"repo": "crane",
|
||||||
|
"rev": "7e4586bad4e3f8f97a9271def747cf58c4b68f3c",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "ipetkov",
|
||||||
|
"repo": "crane",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"flake-utils": {
|
||||||
|
"inputs": {
|
||||||
|
"systems": "systems"
|
||||||
|
},
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1726560853,
|
||||||
|
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "numtide",
|
||||||
|
"repo": "flake-utils",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"nixpkgs": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1726871744,
|
||||||
|
"narHash": "sha256-V5LpfdHyQkUF7RfOaDPrZDP+oqz88lTJrMT1+stXNwo=",
|
||||||
|
"owner": "NixOS",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"rev": "a1d92660c6b3b7c26fb883500a80ea9d33321be2",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "NixOS",
|
||||||
|
"ref": "nixpkgs-unstable",
|
||||||
|
"repo": "nixpkgs",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": {
|
||||||
|
"inputs": {
|
||||||
|
"crane": "crane",
|
||||||
|
"flake-utils": "flake-utils",
|
||||||
|
"nixpkgs": "nixpkgs"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"systems": {
|
||||||
|
"locked": {
|
||||||
|
"lastModified": 1681028828,
|
||||||
|
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||||
|
"type": "github"
|
||||||
|
},
|
||||||
|
"original": {
|
||||||
|
"owner": "nix-systems",
|
||||||
|
"repo": "default",
|
||||||
|
"type": "github"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"root": "root",
|
||||||
|
"version": 7
|
||||||
|
}
|
|
@ -0,0 +1,225 @@
|
||||||
|
{
|
||||||
|
description = "breeze file server";
|
||||||
|
|
||||||
|
inputs = {
|
||||||
|
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
||||||
|
crane.url = "github:ipetkov/crane";
|
||||||
|
flake-utils.url = "github:numtide/flake-utils";
|
||||||
|
};
|
||||||
|
|
||||||
|
outputs = {
|
||||||
|
self,
|
||||||
|
nixpkgs,
|
||||||
|
crane,
|
||||||
|
flake-utils,
|
||||||
|
...
|
||||||
|
}:
|
||||||
|
flake-utils.lib.eachDefaultSystem (system: let
|
||||||
|
pkgs = nixpkgs.legacyPackages.${system};
|
||||||
|
|
||||||
|
craneLib = crane.mkLib pkgs;
|
||||||
|
|
||||||
|
# Common arguments can be set here to avoid repeating them later
|
||||||
|
# Note: changes here will rebuild all dependency crates
|
||||||
|
commonArgs = {
|
||||||
|
src = craneLib.cleanCargoSource ./.;
|
||||||
|
strictDeps = true;
|
||||||
|
|
||||||
|
buildInputs =
|
||||||
|
[
|
||||||
|
pkgs.openssl
|
||||||
|
]
|
||||||
|
++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
|
||||||
|
# Additional darwin specific inputs can be set here
|
||||||
|
pkgs.libiconv
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
breeze = craneLib.buildPackage (commonArgs
|
||||||
|
// {
|
||||||
|
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
||||||
|
|
||||||
|
# Additional environment variables or build phases/hooks can be set
|
||||||
|
# here *without* rebuilding all dependency crates
|
||||||
|
# MY_CUSTOM_VAR = "some value";
|
||||||
|
});
|
||||||
|
in {
|
||||||
|
checks = {
|
||||||
|
inherit breeze;
|
||||||
|
};
|
||||||
|
|
||||||
|
packages.default = breeze;
|
||||||
|
|
||||||
|
apps.default = flake-utils.lib.mkApp {
|
||||||
|
drv = breeze;
|
||||||
|
};
|
||||||
|
|
||||||
|
devShells.default = craneLib.devShell {
|
||||||
|
# Inherit inputs from checks.
|
||||||
|
checks = self.checks.${system};
|
||||||
|
|
||||||
|
# Additional dev-shell environment variables can be set directly
|
||||||
|
# MY_CUSTOM_DEVELOPMENT_VAR = "something else";
|
||||||
|
|
||||||
|
# Extra inputs can be added here; cargo and rustc are provided by default.
|
||||||
|
packages = with pkgs; [
|
||||||
|
alejandra
|
||||||
|
rewrk
|
||||||
|
];
|
||||||
|
};
|
||||||
|
|
||||||
|
nixosModules.breeze = {
|
||||||
|
config,
|
||||||
|
pkgs,
|
||||||
|
lib,
|
||||||
|
...
|
||||||
|
}:
|
||||||
|
with lib; let
|
||||||
|
cfg = config.services.breeze;
|
||||||
|
settingsFormat = pkgs.formats.toml {};
|
||||||
|
in {
|
||||||
|
options = {
|
||||||
|
services.breeze = {
|
||||||
|
enable = mkEnableOption "breeze file server";
|
||||||
|
|
||||||
|
package = mkOption {
|
||||||
|
type = types.package;
|
||||||
|
default = breeze;
|
||||||
|
description = "Package for `breeze` to use";
|
||||||
|
};
|
||||||
|
|
||||||
|
user = mkOption {
|
||||||
|
type = types.str;
|
||||||
|
default = "breeze";
|
||||||
|
description = "User that `breeze` will run under";
|
||||||
|
};
|
||||||
|
|
||||||
|
group = mkOption {
|
||||||
|
type = types.str;
|
||||||
|
default = "breeze";
|
||||||
|
description = "Group that `breeze` will run under";
|
||||||
|
};
|
||||||
|
|
||||||
|
extraGroups = mkOption {
|
||||||
|
type = types.listOf types.str;
|
||||||
|
default = [];
|
||||||
|
description = "Any supplementary groups for `breeze` to run under";
|
||||||
|
};
|
||||||
|
|
||||||
|
settings = mkOption {
|
||||||
|
type = settingsFormat.type;
|
||||||
|
default = {};
|
||||||
|
description = ''
|
||||||
|
The *.toml configuration to run `breeze` with.
|
||||||
|
There is no formal documentation, but there is an example in the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md).
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
|
||||||
|
configDir = mkOption {
|
||||||
|
type = types.path;
|
||||||
|
default = "/etc/breeze";
|
||||||
|
description = ''
|
||||||
|
The directory on disk to store the `breeze` config file in.
|
||||||
|
This does not load pre-existing config files, it only defines where the generated config is saved.
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
|
||||||
|
uploadKeyFile = mkOption {
|
||||||
|
type = types.nullOr types.path;
|
||||||
|
default = null;
|
||||||
|
description = ''
|
||||||
|
File to load the `engine.upload_key` from, if desired.
|
||||||
|
This is useful for loading it from a secret management system.
|
||||||
|
'';
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
config = mkIf cfg.enable {
|
||||||
|
users.users.${cfg.user} = {
|
||||||
|
isSystemUser = true;
|
||||||
|
inherit (cfg) group;
|
||||||
|
};
|
||||||
|
|
||||||
|
users.groups.${cfg.group} = {};
|
||||||
|
|
||||||
|
systemd.tmpfiles.rules = [
|
||||||
|
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
||||||
|
];
|
||||||
|
|
||||||
|
services.breeze.settings = mkMerge [
|
||||||
|
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
||||||
|
];
|
||||||
|
|
||||||
|
systemd.services.breeze = let
|
||||||
|
cfgFile = "${cfg.configDir}/breeze.toml";
|
||||||
|
in {
|
||||||
|
description = "breeze file server";
|
||||||
|
after = ["local-fs.target" "network.target"];
|
||||||
|
wantedBy = ["multi-user.target"];
|
||||||
|
|
||||||
|
preStart =
|
||||||
|
''
|
||||||
|
install -m 660 ${settingsFormat.generate "breeze.toml" cfg.settings} ${cfgFile}
|
||||||
|
''
|
||||||
|
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
||||||
|
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
||||||
|
'';
|
||||||
|
|
||||||
|
serviceConfig = rec {
|
||||||
|
User = cfg.user;
|
||||||
|
Group = cfg.group;
|
||||||
|
DynamicUser = false; # we write files, so don't do that
|
||||||
|
SupplementaryGroups = cfg.extraGroups;
|
||||||
|
StateDirectory = "breeze";
|
||||||
|
CacheDirectory = "breeze";
|
||||||
|
ExecStart = escapeShellArgs [
|
||||||
|
"${cfg.package}/bin/breeze"
|
||||||
|
"--config"
|
||||||
|
cfgFile
|
||||||
|
];
|
||||||
|
Restart = "on-failure";
|
||||||
|
|
||||||
|
# Security Options #
|
||||||
|
|
||||||
|
NoNewPrivileges = true; # implied by DynamicUser
|
||||||
|
RemoveIPC = true; # implied by DynamicUser
|
||||||
|
|
||||||
|
AmbientCapabilities = "";
|
||||||
|
CapabilityBoundingSet = "";
|
||||||
|
|
||||||
|
DeviceAllow = "";
|
||||||
|
|
||||||
|
LockPersonality = true;
|
||||||
|
|
||||||
|
PrivateTmp = true; # implied by DynamicUser
|
||||||
|
PrivateDevices = true;
|
||||||
|
PrivateUsers = true;
|
||||||
|
|
||||||
|
ProtectClock = true;
|
||||||
|
ProtectControlGroups = true;
|
||||||
|
ProtectHostname = true;
|
||||||
|
ProtectKernelLogs = true;
|
||||||
|
ProtectKernelModules = true;
|
||||||
|
ProtectKernelTunables = true;
|
||||||
|
|
||||||
|
RestrictNamespaces = true;
|
||||||
|
RestrictAddressFamilies = ["AF_INET" "AF_INET6" "AF_UNIX"];
|
||||||
|
RestrictRealtime = true;
|
||||||
|
RestrictSUIDSGID = true; # implied by DynamicUser
|
||||||
|
|
||||||
|
SystemCallArchitectures = "native";
|
||||||
|
SystemCallErrorNumber = "EPERM";
|
||||||
|
SystemCallFilter = [
|
||||||
|
"@system-service"
|
||||||
|
"~@keyring"
|
||||||
|
"~@memlock"
|
||||||
|
"~@privileged"
|
||||||
|
"~@setuid"
|
||||||
|
];
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
};
|
||||||
|
});
|
||||||
|
}
|
|
@ -0,0 +1,259 @@
|
||||||
|
use std::{
|
||||||
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
|
time::{Duration, SystemTime},
|
||||||
|
};
|
||||||
|
|
||||||
|
use atomic_time::AtomicSystemTime;
|
||||||
|
use bytes::Bytes;
|
||||||
|
use dashmap::{mapref::one::Ref, DashMap};
|
||||||
|
use rayon::prelude::*;
|
||||||
|
use tokio::time;
|
||||||
|
|
||||||
|
use crate::config;
|
||||||
|
|
||||||
|
/// An entry stored in the cache.
|
||||||
|
///
|
||||||
|
/// It contains basic metadata and the actual value.
|
||||||
|
pub struct Entry {
|
||||||
|
/// The data held
|
||||||
|
value: Bytes,
|
||||||
|
|
||||||
|
/// The last time this entry was read/written
|
||||||
|
last_used: AtomicSystemTime,
|
||||||
|
|
||||||
|
/// Whether or not `last_used` should be updated
|
||||||
|
update_used: bool,
|
||||||
|
|
||||||
|
/// How long the entry should last
|
||||||
|
lifetime: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Entry {
|
||||||
|
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
|
||||||
|
let now = AtomicSystemTime::now();
|
||||||
|
|
||||||
|
Self {
|
||||||
|
value,
|
||||||
|
last_used: now,
|
||||||
|
update_used,
|
||||||
|
lifetime,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn last_used(&self) -> SystemTime {
|
||||||
|
self.last_used.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn is_expired(&self) -> bool {
|
||||||
|
match self.last_used().elapsed() {
|
||||||
|
Ok(d) => d >= self.lifetime,
|
||||||
|
Err(_) => false, // now > last_used
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
|
||||||
|
///
|
||||||
|
/// It is designed to keep memory usage low.
|
||||||
|
pub struct Cache {
|
||||||
|
/// Where elements are stored
|
||||||
|
map: DashMap<String, Entry>,
|
||||||
|
|
||||||
|
/// Total length of data stored in cache currently
|
||||||
|
length: AtomicUsize,
|
||||||
|
|
||||||
|
/// How should it behave
|
||||||
|
cfg: config::CacheConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Cache {
|
||||||
|
pub fn from_config(cfg: config::CacheConfig) -> Self {
|
||||||
|
Self {
|
||||||
|
map: DashMap::with_capacity(256),
|
||||||
|
length: AtomicUsize::new(0),
|
||||||
|
|
||||||
|
cfg,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Figure out who should be bumped out of cache next
|
||||||
|
fn next_out(&self, length: usize) -> Vec<String> {
|
||||||
|
let mut sorted: Vec<_> = self.map.iter().collect();
|
||||||
|
|
||||||
|
// Sort by least recently used
|
||||||
|
sorted.par_sort_unstable_by(|e1, e2| e1.last_used().cmp(&e2.last_used()));
|
||||||
|
|
||||||
|
// Total bytes we would be removing
|
||||||
|
let mut total = 0;
|
||||||
|
|
||||||
|
// Pull entries until we have enough free space
|
||||||
|
sorted
|
||||||
|
.iter()
|
||||||
|
.take_while(|e| {
|
||||||
|
let need_more = total < length;
|
||||||
|
|
||||||
|
if need_more {
|
||||||
|
total += e.value.len();
|
||||||
|
}
|
||||||
|
|
||||||
|
need_more
|
||||||
|
})
|
||||||
|
.map(|e| e.key().clone())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove an element from the cache
|
||||||
|
///
|
||||||
|
/// Returns: [`Some`] if successful, [`None`] if element not found
|
||||||
|
pub fn remove(&self, key: &str) -> Option<()> {
|
||||||
|
// Skip expiry checks, we are removing it anyways
|
||||||
|
// And also that could cause an infinite loop which would be pretty stupid.
|
||||||
|
let e = self.map.get(key)?;
|
||||||
|
|
||||||
|
// Atomically subtract from the total cache length
|
||||||
|
self.length.fetch_sub(e.value.len(), Ordering::Relaxed);
|
||||||
|
|
||||||
|
// Drop the entry lock so we can actually remove it
|
||||||
|
drop(e);
|
||||||
|
|
||||||
|
// Remove from map
|
||||||
|
self.map.remove(key);
|
||||||
|
|
||||||
|
Some(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a new element to the cache with a specified lifetime.
|
||||||
|
///
|
||||||
|
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
||||||
|
pub fn add_with_lifetime(
|
||||||
|
&self,
|
||||||
|
key: &str,
|
||||||
|
value: Bytes,
|
||||||
|
lifetime: Duration,
|
||||||
|
is_renewable: bool,
|
||||||
|
) -> bool {
|
||||||
|
let e = Entry::new(value, lifetime, is_renewable);
|
||||||
|
|
||||||
|
let len = e.value.len();
|
||||||
|
let cur_total = self.length.load(Ordering::Relaxed);
|
||||||
|
let new_total = cur_total + len;
|
||||||
|
|
||||||
|
if new_total > self.cfg.mem_capacity {
|
||||||
|
// How far we went above the limit
|
||||||
|
let needed = new_total - self.cfg.mem_capacity;
|
||||||
|
|
||||||
|
self.next_out(needed).par_iter().for_each(|k| {
|
||||||
|
// Remove the element, and ignore the result
|
||||||
|
// The only reason it should be failing is if it couldn't find it,
|
||||||
|
// in which case it was already removed
|
||||||
|
self.remove(k);
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Atomically add to total cached data length
|
||||||
|
self.length.fetch_add(len, Ordering::Relaxed);
|
||||||
|
|
||||||
|
// Add to the map, return true if we didn't replace anything
|
||||||
|
self.map.insert(key.to_string(), e).is_none()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a new element to the cache with the default lifetime.
|
||||||
|
///
|
||||||
|
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
||||||
|
pub fn add(&self, key: &str, value: Bytes) -> bool {
|
||||||
|
self.add_with_lifetime(key, value, self.cfg.upload_lifetime, true)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Internal function for retrieving entries.
|
||||||
|
///
|
||||||
|
/// Returns: same as [`DashMap::get`], for our purposes
|
||||||
|
///
|
||||||
|
/// It exists so we can run the expiry check before
|
||||||
|
/// actually working with any entries, so no weird bugs happen
|
||||||
|
fn _get(&self, key: &str) -> Option<Ref<String, Entry>> {
|
||||||
|
let e = self.map.get(key)?;
|
||||||
|
|
||||||
|
// if the entry is expired get rid of it now
|
||||||
|
if e.is_expired() {
|
||||||
|
// drop the reference so we don't deadlock
|
||||||
|
drop(e);
|
||||||
|
|
||||||
|
// remove it
|
||||||
|
self.remove(key);
|
||||||
|
|
||||||
|
// and say we never had it
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(e)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an item from the cache, if it exists.
|
||||||
|
pub fn get(&self, key: &str) -> Option<Bytes> {
|
||||||
|
let e = self._get(key)?;
|
||||||
|
|
||||||
|
if e.update_used {
|
||||||
|
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
|
||||||
|
Some(e.value.clone())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Check if we have an item in cache.
|
||||||
|
///
|
||||||
|
/// Returns: `true` if key exists, `false` if it doesn't
|
||||||
|
///
|
||||||
|
/// We don't use [`DashMap::contains_key`] here because it would just do
|
||||||
|
/// the exact same thing I do here, but without running the expiry check logic
|
||||||
|
pub fn has(&self, key: &str) -> bool {
|
||||||
|
self._get(key).is_some()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns if an upload is able to be cached
|
||||||
|
/// with the current caching rules
|
||||||
|
#[inline(always)]
|
||||||
|
pub fn will_use(&self, length: usize) -> bool {
|
||||||
|
length <= self.cfg.max_length
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The background job that scans through the cache and removes inactive elements.
|
||||||
|
///
|
||||||
|
/// TODO: see if this is actually less expensive than
|
||||||
|
/// letting each entry keep track of expiry with its own task
|
||||||
|
pub async fn scanner(&self) {
|
||||||
|
let mut interval = time::interval(self.cfg.scan_freq);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
// We put this first so that it doesn't scan the instant the server starts
|
||||||
|
interval.tick().await;
|
||||||
|
|
||||||
|
// Save current timestamp so we aren't retrieving it constantly
|
||||||
|
// If we don't do this it'll be a LOT of system api calls
|
||||||
|
let now = SystemTime::now();
|
||||||
|
|
||||||
|
// Collect a list of all the expired keys
|
||||||
|
// If we fail to compare the times, it gets added to the list anyways
|
||||||
|
let expired: Vec<_> = self
|
||||||
|
.map
|
||||||
|
.par_iter()
|
||||||
|
.filter_map(|e| {
|
||||||
|
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
||||||
|
let is_expired = elapsed >= e.lifetime;
|
||||||
|
|
||||||
|
if is_expired {
|
||||||
|
Some(e.key().clone())
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// If we have any, lock the map and drop all of them
|
||||||
|
if !expired.is_empty() {
|
||||||
|
// Use a retain call, should be less locks that way
|
||||||
|
// (instead of many remove calls)
|
||||||
|
self.map.retain(|k, _| !expired.contains(k))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -7,10 +7,15 @@ use tracing_subscriber::filter::LevelFilter;
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub engine: EngineConfig,
|
pub engine: EngineConfig,
|
||||||
pub cache: CacheConfig,
|
pub http: HttpConfig,
|
||||||
pub logger: LoggerConfig,
|
pub logger: LoggerConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn default_motd() -> String {
|
||||||
|
"breeze file server (v%version%) - currently hosting %uplcount% files".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[serde_as]
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct EngineConfig {
|
pub struct EngineConfig {
|
||||||
/// The url that the instance of breeze is meant to be accessed from.
|
/// The url that the instance of breeze is meant to be accessed from.
|
||||||
|
@ -18,15 +23,44 @@ pub struct EngineConfig {
|
||||||
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
|
|
||||||
|
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
||||||
|
#[serde(default)]
|
||||||
|
pub upload_key: String,
|
||||||
|
|
||||||
|
/// Configuration for disk system
|
||||||
|
pub disk: DiskConfig,
|
||||||
|
|
||||||
|
/// Configuration for cache system
|
||||||
|
pub cache: CacheConfig,
|
||||||
|
|
||||||
|
/// Maximum size of an upload that will be accepted.
|
||||||
|
/// Files above this size can not be uploaded.
|
||||||
|
pub max_upload_len: Option<usize>,
|
||||||
|
|
||||||
|
/// Maximum lifetime of a temporary upload
|
||||||
|
#[serde_as(as = "DurationSeconds")]
|
||||||
|
pub max_temp_lifetime: Duration,
|
||||||
|
|
||||||
|
/// Maximum length (in bytes) a file can be before the server will
|
||||||
|
/// decide not to remove its EXIF data.
|
||||||
|
pub max_strip_len: usize,
|
||||||
|
|
||||||
|
/// Motd displayed when the server's index page is visited.
|
||||||
|
///
|
||||||
|
/// This isn't explicitly engine-related but the engine is what gets passed to routes,
|
||||||
|
/// so it is here for now.
|
||||||
|
#[serde(default = "default_motd")]
|
||||||
|
pub motd: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize, Clone)]
|
||||||
|
pub struct DiskConfig {
|
||||||
/// Location on disk the uploads are to be saved to
|
/// Location on disk the uploads are to be saved to
|
||||||
pub save_path: PathBuf,
|
pub save_path: PathBuf,
|
||||||
|
|
||||||
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
|
||||||
pub upload_key: Option<String>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize, Clone)]
|
||||||
pub struct CacheConfig {
|
pub struct CacheConfig {
|
||||||
/// The maximum length in bytes that a file can be
|
/// The maximum length in bytes that a file can be
|
||||||
/// before it skips cache (in seconds)
|
/// before it skips cache (in seconds)
|
||||||
|
@ -45,11 +79,23 @@ pub struct CacheConfig {
|
||||||
pub mem_capacity: usize,
|
pub mem_capacity: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct HttpConfig {
|
||||||
|
/// The IP address the HTTP server should listen on
|
||||||
|
pub listen_on: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn default_level_filter() -> LevelFilter {
|
||||||
|
LevelFilter::WARN
|
||||||
|
}
|
||||||
|
|
||||||
#[serde_as]
|
#[serde_as]
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct LoggerConfig {
|
pub struct LoggerConfig {
|
||||||
/// Minimum level a log must be for it to be shown.
|
/// Minimum level a log must be for it to be shown.
|
||||||
/// This defaults to "warn" if not specified.
|
/// This defaults to "warn" if not specified.
|
||||||
#[serde_as(as = "Option<DisplayFromStr>")]
|
#[serde_as(as = "DisplayFromStr")]
|
||||||
pub level: Option<LevelFilter>,
|
// yes... kind of a hack but serde doesn't have anything better
|
||||||
|
#[serde(default = "default_level_filter")]
|
||||||
|
pub level: LevelFilter,
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,84 @@
|
||||||
|
use std::path::PathBuf;
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
|
use tokio::{
|
||||||
|
fs::File,
|
||||||
|
io::{self, AsyncWriteExt},
|
||||||
|
sync::mpsc::{self, Receiver, Sender},
|
||||||
|
};
|
||||||
|
use tracing::debug;
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
|
use crate::config;
|
||||||
|
|
||||||
|
/// Provides an API to access the disk file store
|
||||||
|
/// like we access the cache.
|
||||||
|
pub struct Disk {
|
||||||
|
cfg: config::DiskConfig,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Disk {
|
||||||
|
pub fn from_config(cfg: config::DiskConfig) -> Self {
|
||||||
|
Self { cfg }
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Counts the number of files saved to disk we have
|
||||||
|
pub fn count(&self) -> usize {
|
||||||
|
WalkDir::new(&self.cfg.save_path)
|
||||||
|
.min_depth(1)
|
||||||
|
.into_iter()
|
||||||
|
.count()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Formats the path on disk for a `saved_name`.
|
||||||
|
fn path_for(&self, saved_name: &str) -> PathBuf {
|
||||||
|
let mut p = self.cfg.save_path.clone();
|
||||||
|
p.push(saved_name);
|
||||||
|
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Try to open a file on disk, and if we didn't find it,
|
||||||
|
/// then return [`None`].
|
||||||
|
pub async fn open(&self, saved_name: &str) -> Result<Option<File>, io::Error> {
|
||||||
|
let p = self.path_for(saved_name);
|
||||||
|
|
||||||
|
match File::open(p).await {
|
||||||
|
Ok(f) => Ok(Some(f)),
|
||||||
|
Err(e) => match e.kind() {
|
||||||
|
io::ErrorKind::NotFound => Ok(None),
|
||||||
|
_ => Err(e)?, // some other error, send it back
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get the size of an upload's file
|
||||||
|
pub async fn len(&self, f: &File) -> Result<usize, io::Error> {
|
||||||
|
Ok(f.metadata().await?.len() as usize)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a background I/O task
|
||||||
|
pub async fn start_save(&self, saved_name: &str) -> Sender<Bytes> {
|
||||||
|
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||||
|
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(256);
|
||||||
|
|
||||||
|
let p = self.path_for(saved_name);
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
// create file to save upload to
|
||||||
|
let mut file = File::create(p)
|
||||||
|
.await
|
||||||
|
.expect("could not open file! make sure your upload path is valid");
|
||||||
|
|
||||||
|
// receive chunks and save them to file
|
||||||
|
while let Some(chunk) = rx.recv().await {
|
||||||
|
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||||
|
file.write_all(&chunk)
|
||||||
|
.await
|
||||||
|
.expect("error while writing file to disk");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
tx
|
||||||
|
}
|
||||||
|
}
|
462
src/engine.rs
462
src/engine.rs
|
@ -1,196 +1,228 @@
|
||||||
use std::{
|
use std::{
|
||||||
ffi::OsStr,
|
sync::{
|
||||||
path::{Path, PathBuf},
|
atomic::{AtomicUsize, Ordering},
|
||||||
sync::atomic::{AtomicUsize, Ordering},
|
Arc,
|
||||||
|
},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use archived::Archive;
|
use axum::body::BodyDataStream;
|
||||||
use axum::extract::BodyStream;
|
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use rand::Rng;
|
use img_parts::{DynImage, ImageEXIF};
|
||||||
use tokio::{
|
use rand::distributions::{Alphanumeric, DistString};
|
||||||
fs::File,
|
use tokio::{fs::File, io::AsyncReadExt};
|
||||||
io::{AsyncReadExt, AsyncWriteExt},
|
|
||||||
sync::{
|
|
||||||
mpsc::{self, Receiver, Sender},
|
|
||||||
RwLock,
|
|
||||||
},
|
|
||||||
};
|
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, info};
|
||||||
use walkdir::WalkDir;
|
|
||||||
|
|
||||||
use crate::view::{ViewError, ViewSuccess};
|
use crate::{cache, config, disk};
|
||||||
|
|
||||||
|
/// Various forms of upload data that can be sent to the client
|
||||||
|
pub enum UploadData {
|
||||||
|
/// Send back the data from memory
|
||||||
|
Cache(Bytes),
|
||||||
|
|
||||||
|
/// Stream the file from disk to the client
|
||||||
|
Disk(File, usize),
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Rejection outcomes of an [`Engine::process`] call
|
||||||
|
pub enum ProcessOutcome {
|
||||||
|
/// The upload was successful.
|
||||||
|
/// We give the user their file's URL
|
||||||
|
Success(String),
|
||||||
|
|
||||||
|
/// Occurs when an upload exceeds the chosen maximum file size.
|
||||||
|
UploadTooLarge,
|
||||||
|
|
||||||
|
/// Occurs when a temporary upload is too big to fit in the cache.
|
||||||
|
TemporaryUploadTooLarge,
|
||||||
|
|
||||||
|
/// Occurs when the user-given lifetime is longer than we will allow
|
||||||
|
TemporaryUploadLifetimeTooLong,
|
||||||
|
}
|
||||||
|
|
||||||
/// breeze engine! this is the core of everything
|
/// breeze engine! this is the core of everything
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
// ------ STATE ------ //
|
|
||||||
/// The in-memory cache that cached uploads are stored in.
|
|
||||||
cache: RwLock<Archive>,
|
|
||||||
|
|
||||||
/// Cached count of uploaded files.
|
/// Cached count of uploaded files.
|
||||||
pub upl_count: AtomicUsize,
|
pub upl_count: AtomicUsize,
|
||||||
|
|
||||||
// ------ CONFIG ------ //
|
/// Engine configuration
|
||||||
/// The base URL that the server will be accessed from.
|
pub cfg: config::EngineConfig,
|
||||||
/// It is only used for formatting returned upload URLs.
|
|
||||||
pub base_url: String,
|
|
||||||
|
|
||||||
/// The path on disk that uploads are saved to.
|
/// The in-memory cache that cached uploads are stored in.
|
||||||
save_path: PathBuf,
|
cache: Arc<cache::Cache>,
|
||||||
|
|
||||||
/// The authorisation key required for uploading new files.
|
/// An interface to the on-disk upload store
|
||||||
/// If it is empty, no key will be required.
|
disk: disk::Disk,
|
||||||
pub upload_key: String,
|
|
||||||
|
|
||||||
/// The maximum size for an upload to be stored in cache.
|
|
||||||
/// Anything bigger skips cache and is read/written to
|
|
||||||
/// directly from disk.
|
|
||||||
cache_max_length: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
/// Creates a new instance of the breeze engine.
|
/// Creates a new instance of the breeze engine.
|
||||||
pub fn new(
|
pub fn from_config(cfg: config::EngineConfig) -> Self {
|
||||||
base_url: String,
|
let cache = cache::Cache::from_config(cfg.cache.clone());
|
||||||
save_path: PathBuf,
|
let disk = disk::Disk::from_config(cfg.disk.clone());
|
||||||
upload_key: String,
|
|
||||||
cache_max_length: usize,
|
let cache = Arc::new(cache);
|
||||||
cache_lifetime: Duration,
|
|
||||||
cache_full_scan_freq: Duration, // how often the cache will be scanned for expired items
|
let cache_scanner = cache.clone();
|
||||||
cache_mem_capacity: usize,
|
tokio::spawn(async move { cache_scanner.scanner().await });
|
||||||
) -> Self {
|
|
||||||
Self {
|
Self {
|
||||||
cache: RwLock::new(Archive::with_full_scan(
|
// initialise our cached upload count. this doesn't include temp uploads!
|
||||||
cache_full_scan_freq,
|
upl_count: AtomicUsize::new(disk.count()),
|
||||||
cache_lifetime,
|
|
||||||
cache_mem_capacity,
|
|
||||||
)),
|
|
||||||
upl_count: AtomicUsize::new(WalkDir::new(&save_path).min_depth(1).into_iter().count()), // count the amount of files in the save path and initialise our cached count with it
|
|
||||||
|
|
||||||
base_url,
|
cfg,
|
||||||
save_path,
|
|
||||||
upload_key,
|
|
||||||
|
|
||||||
cache_max_length,
|
cache,
|
||||||
|
disk,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns if an upload would be able to be cached
|
/// Fetch an upload
|
||||||
#[inline(always)]
|
///
|
||||||
fn will_use_cache(&self, length: usize) -> bool {
|
/// This will first try to read from cache, and then disk after.
|
||||||
length <= self.cache_max_length
|
/// If an upload is eligible to be cached, it will be cached and
|
||||||
|
/// sent back as a cache response instead of a disk response.
|
||||||
|
pub async fn get(&self, saved_name: &str) -> anyhow::Result<Option<UploadData>> {
|
||||||
|
// check the cache first
|
||||||
|
if let Some(u) = self.cache.get(saved_name) {
|
||||||
|
return Ok(Some(UploadData::Cache(u)));
|
||||||
|
}
|
||||||
|
|
||||||
|
// now, check if we have it on disk
|
||||||
|
let mut f = if let Some(f) = self.disk.open(saved_name).await? {
|
||||||
|
f
|
||||||
|
} else {
|
||||||
|
// file didn't exist
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
|
||||||
|
let len = self.disk.len(&f).await?;
|
||||||
|
|
||||||
|
// can this be recached?
|
||||||
|
if self.cache.will_use(len) {
|
||||||
|
// read file from disk
|
||||||
|
let mut full = BytesMut::with_capacity(len);
|
||||||
|
|
||||||
|
// read file from disk and if it fails at any point, return 500
|
||||||
|
loop {
|
||||||
|
match f.read_buf(&mut full).await {
|
||||||
|
Ok(n) => {
|
||||||
|
if n == 0 {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => Err(e)?,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let full = full.freeze();
|
||||||
|
|
||||||
|
// re-insert it into cache
|
||||||
|
self.cache.add(saved_name, full.clone());
|
||||||
|
|
||||||
|
return Ok(Some(UploadData::Cache(full)));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Some(UploadData::Disk(f, len)))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if an upload exists in cache or on disk
|
pub async fn has(&self, saved_name: &str) -> bool {
|
||||||
pub async fn upload_exists(&self, path: &Path) -> bool {
|
if self.cache.has(saved_name) {
|
||||||
let cache = self.cache.read().await;
|
|
||||||
|
|
||||||
// extract file name, since that's what cache uses
|
|
||||||
let name = path
|
|
||||||
.file_name()
|
|
||||||
.and_then(OsStr::to_str)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// check in cache
|
|
||||||
if cache.contains_key(&name) {
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check on disk
|
// sidestep handling the error properly
|
||||||
if path.exists() {
|
// that way we can call this in gen_saved_name easier
|
||||||
|
if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
return false;
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Generate a new save path for an upload.
|
/// Generate a new saved name for an upload.
|
||||||
///
|
///
|
||||||
/// This will call itself recursively if it picks
|
/// This will call itself recursively if it picks
|
||||||
/// a name that's already used. (it is rare)
|
/// a name that's already used. (it is rare)
|
||||||
#[async_recursion::async_recursion]
|
#[async_recursion::async_recursion]
|
||||||
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
|
pub async fn gen_saved_name(&self, ext: &str) -> String {
|
||||||
// generate a 6-character alphanumeric string
|
// generate a 6-character alphanumeric string
|
||||||
let id: String = rand::thread_rng()
|
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||||
.sample_iter(&rand::distributions::Alphanumeric)
|
|
||||||
.take(6)
|
|
||||||
.map(char::from)
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
// extract the extension from the original path
|
// if we have an extension, add it now
|
||||||
let original_extension = original_path
|
if !ext.is_empty() {
|
||||||
.extension()
|
saved_name.push('.');
|
||||||
.and_then(OsStr::to_str)
|
saved_name.push_str(ext);
|
||||||
.unwrap_or_default()
|
}
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// path on disk
|
if !self.has(&saved_name).await {
|
||||||
let mut path = self.save_path.clone();
|
saved_name
|
||||||
path.push(&id);
|
|
||||||
path.set_extension(original_extension);
|
|
||||||
|
|
||||||
if !self.upload_exists(&path).await {
|
|
||||||
path
|
|
||||||
} else {
|
} else {
|
||||||
// we had a name collision! try again..
|
// we had a name collision! try again..
|
||||||
self.gen_path(original_path).await
|
info!("name collision! saved_name= {}", saved_name);
|
||||||
|
self.gen_saved_name(ext).await
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Process an upload.
|
/// Save a file to disk, and optionally cache.
|
||||||
/// This is called by the /new route.
|
///
|
||||||
pub async fn process_upload(
|
/// This also handles custom file lifetimes and EXIF data removal.
|
||||||
|
pub async fn save(
|
||||||
&self,
|
&self,
|
||||||
path: PathBuf,
|
saved_name: &str,
|
||||||
name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path
|
provided_len: usize,
|
||||||
content_length: usize,
|
mut use_cache: bool,
|
||||||
mut stream: BodyStream,
|
mut stream: BodyDataStream,
|
||||||
) {
|
lifetime: Option<Duration>,
|
||||||
// if the upload size is smaller than the specified maximum, we use the cache!
|
keep_exif: bool,
|
||||||
let mut use_cache = self.will_use_cache(content_length);
|
) -> Result<(), anyhow::Error> {
|
||||||
|
|
||||||
// if we're using cache, make some space to store the upload in
|
// if we're using cache, make some space to store the upload in
|
||||||
let mut data = if use_cache {
|
let mut data = if use_cache {
|
||||||
BytesMut::with_capacity(content_length)
|
BytesMut::with_capacity(provided_len)
|
||||||
} else {
|
} else {
|
||||||
BytesMut::new()
|
BytesMut::new()
|
||||||
};
|
};
|
||||||
|
|
||||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
// don't begin a disk save if we're using temporary lifetimes
|
||||||
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(1);
|
let tx = if lifetime.is_none() {
|
||||||
|
Some(self.disk.start_save(saved_name).await)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
tokio::spawn(async move {
|
// whether or not we're gonna coalesce the data
|
||||||
// create file to save upload to
|
// in order to strip the exif data at the end,
|
||||||
let mut file = File::create(path)
|
// instead of just sending it off to the i/o task
|
||||||
.await
|
let coalesce_and_strip = use_cache
|
||||||
.expect("could not open file! make sure your upload path is valid");
|
&& matches!(
|
||||||
|
std::path::Path::new(saved_name)
|
||||||
// receive chunks and save them to file
|
.extension()
|
||||||
while let Some(chunk) = rx.recv().await {
|
.map(|s| s.to_str()),
|
||||||
debug!("writing chunk to disk (length: {})", chunk.len());
|
Some(Some("png" | "jpg" | "jpeg" | "webp" | "tiff"))
|
||||||
file.write_all(&chunk)
|
)
|
||||||
.await
|
&& !keep_exif
|
||||||
.expect("error while writing file to disk");
|
&& provided_len <= self.cfg.max_strip_len;
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// read and save upload
|
// read and save upload
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
let chunk = chunk.unwrap();
|
// if we error on a chunk, fail out
|
||||||
|
let chunk = chunk?;
|
||||||
|
|
||||||
// send chunk to io task
|
// if we have an i/o task, send it off
|
||||||
debug!("sending data to io task");
|
// also cloning this is okay because it's a Bytes
|
||||||
tx.send(chunk.clone())
|
if !coalesce_and_strip {
|
||||||
.await
|
if let Some(ref tx) = tx {
|
||||||
.expect("failed to send data to io task");
|
debug!("sending chunk to i/o task");
|
||||||
|
tx.send(chunk.clone()).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
debug!("receiving data into buffer");
|
debug!("receiving data into buffer");
|
||||||
|
|
||||||
if data.len() + chunk.len() > data.capacity() {
|
if data.len() + chunk.len() > data.capacity() {
|
||||||
error!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
info!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||||
|
|
||||||
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
||||||
data = BytesMut::new();
|
data = BytesMut::new();
|
||||||
|
@ -201,109 +233,99 @@ impl Engine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// insert upload into cache if necessary
|
let data = data.freeze();
|
||||||
if use_cache {
|
|
||||||
let mut cache = self.cache.write().await;
|
|
||||||
|
|
||||||
|
// we coalesced the data instead of streaming to disk,
|
||||||
|
// strip the exif data and send it off now
|
||||||
|
let data = if coalesce_and_strip {
|
||||||
|
// strip the exif if we can
|
||||||
|
// if we can't, then oh well
|
||||||
|
let data = if let Ok(Some(data)) = DynImage::from_bytes(data.clone()).map(|o| {
|
||||||
|
o.map(|mut img| {
|
||||||
|
img.set_exif(None);
|
||||||
|
img.encoder().bytes()
|
||||||
|
})
|
||||||
|
}) {
|
||||||
|
info!("stripped exif data");
|
||||||
|
data
|
||||||
|
} else {
|
||||||
|
info!("failed to strip exif data");
|
||||||
|
data
|
||||||
|
};
|
||||||
|
|
||||||
|
// send what we did over to the i/o task, all in one chunk
|
||||||
|
if let Some(ref tx) = tx {
|
||||||
|
debug!("sending filled buffer to i/o task");
|
||||||
|
tx.send(data.clone()).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
data
|
||||||
|
} else {
|
||||||
|
// or, we didn't do that
|
||||||
|
// keep the data as it is
|
||||||
|
data
|
||||||
|
};
|
||||||
|
|
||||||
|
// insert upload into cache if we're using it
|
||||||
|
if use_cache {
|
||||||
info!("caching upload!");
|
info!("caching upload!");
|
||||||
cache.insert(name, data.freeze());
|
match lifetime {
|
||||||
|
Some(lt) => self.cache.add_with_lifetime(saved_name, data, lt, false),
|
||||||
|
None => self.cache.add(saved_name, data),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("finished processing upload!!");
|
info!("finished processing upload!!");
|
||||||
|
|
||||||
// if all goes well, increment the cached upload counter
|
// if all goes well, increment the cached upload counter
|
||||||
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read an upload from cache, if it exists.
|
pub async fn process(
|
||||||
///
|
&self,
|
||||||
/// Previously, this would lock the cache as
|
ext: &str,
|
||||||
/// writable to renew the upload's cache lifespan.
|
provided_len: usize,
|
||||||
/// Locking the cache as readable allows multiple concurrent
|
stream: BodyDataStream,
|
||||||
/// readers though, which allows me to handle multiple views concurrently.
|
lifetime: Option<Duration>,
|
||||||
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
|
keep_exif: bool,
|
||||||
let cache = self.cache.read().await;
|
) -> Result<ProcessOutcome, anyhow::Error> {
|
||||||
|
// if the upload size is greater than our max file size, deny it now
|
||||||
// fetch upload data from cache
|
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
||||||
cache.get(name).map(ToOwned::to_owned)
|
return Ok(ProcessOutcome::UploadTooLarge);
|
||||||
}
|
|
||||||
|
|
||||||
/// Reads an upload, from cache or on disk.
|
|
||||||
pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> {
|
|
||||||
// extract upload file name
|
|
||||||
let name = original_path
|
|
||||||
.file_name()
|
|
||||||
.and_then(OsStr::to_str)
|
|
||||||
.unwrap_or_default()
|
|
||||||
.to_string();
|
|
||||||
|
|
||||||
// path on disk
|
|
||||||
let mut path = self.save_path.clone();
|
|
||||||
path.push(&name);
|
|
||||||
|
|
||||||
// check if the upload exists, if not then 404
|
|
||||||
if !self.upload_exists(&path).await {
|
|
||||||
return Err(ViewError::NotFound);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// attempt to read upload from cache
|
// if the upload size is smaller than the specified maximum, we use the cache!
|
||||||
let cached_data = self.read_cached_upload(&name).await;
|
let use_cache: bool = self.cache.will_use(provided_len);
|
||||||
|
|
||||||
if let Some(data) = cached_data {
|
// if a temp file is too big for cache, reject it now
|
||||||
info!("got upload from cache!");
|
if lifetime.is_some() && !use_cache {
|
||||||
|
return Ok(ProcessOutcome::TemporaryUploadTooLarge);
|
||||||
Ok(ViewSuccess::FromCache(data))
|
|
||||||
} else {
|
|
||||||
// we already know the upload exists by now so this is okay
|
|
||||||
let mut file = File::open(&path).await.unwrap();
|
|
||||||
|
|
||||||
// read upload length from disk
|
|
||||||
let metadata = file.metadata().await;
|
|
||||||
|
|
||||||
if metadata.is_err() {
|
|
||||||
error!("failed to get upload file metadata!");
|
|
||||||
return Err(ViewError::InternalServerError);
|
|
||||||
}
|
|
||||||
|
|
||||||
let metadata = metadata.unwrap();
|
|
||||||
|
|
||||||
let length = metadata.len() as usize;
|
|
||||||
|
|
||||||
debug!("read upload from disk, size = {}", length);
|
|
||||||
|
|
||||||
// if the upload is okay to cache, recache it and send a fromcache response
|
|
||||||
if self.will_use_cache(length) {
|
|
||||||
// read file from disk
|
|
||||||
let mut data = BytesMut::with_capacity(length);
|
|
||||||
|
|
||||||
// read file from disk and if it fails at any point, return 500
|
|
||||||
loop {
|
|
||||||
match file.read_buf(&mut data).await {
|
|
||||||
Ok(n) => {
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
return Err(ViewError::InternalServerError);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let data = data.freeze();
|
|
||||||
|
|
||||||
// re-insert it into cache
|
|
||||||
let mut cache = self.cache.write().await;
|
|
||||||
cache.insert(name, data.clone());
|
|
||||||
|
|
||||||
info!("recached upload from disk!");
|
|
||||||
|
|
||||||
return Ok(ViewSuccess::FromCache(data));
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("got upload from disk!");
|
|
||||||
|
|
||||||
Ok(ViewSuccess::FromDisk(file))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// if a temp file's lifetime is too long, reject it now
|
||||||
|
if lifetime.is_some_and(|lt| lt > self.cfg.max_temp_lifetime) {
|
||||||
|
return Ok(ProcessOutcome::TemporaryUploadLifetimeTooLong);
|
||||||
|
}
|
||||||
|
|
||||||
|
// generate the file name
|
||||||
|
let saved_name = self.gen_saved_name(ext).await;
|
||||||
|
|
||||||
|
// save it
|
||||||
|
self.save(
|
||||||
|
&saved_name,
|
||||||
|
provided_len,
|
||||||
|
use_cache,
|
||||||
|
stream,
|
||||||
|
lifetime,
|
||||||
|
keep_exif,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// format and send back the url
|
||||||
|
let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
|
||||||
|
|
||||||
|
Ok(ProcessOutcome::Success(url))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
11
src/index.rs
11
src/index.rs
|
@ -6,12 +6,19 @@ use axum::extract::State;
|
||||||
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
||||||
let count = engine.upl_count.load(Ordering::Relaxed);
|
let count = engine.upl_count.load(Ordering::Relaxed);
|
||||||
|
|
||||||
format!("minish's image host, currently hosting {} files", count)
|
let motd = engine.cfg.motd.clone();
|
||||||
|
|
||||||
|
motd.replace("%version%", env!("CARGO_PKG_VERSION"))
|
||||||
|
.replace("%uplcount%", &count.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn robots_txt() -> &'static str {
|
pub async fn robots_txt() -> &'static str {
|
||||||
/// robots.txt that tells web crawlers not to list uploads
|
/// robots.txt that tells web crawlers not to list uploads
|
||||||
const ROBOTS_TXT: &str = concat!("User-Agent: *\n", "Disallow: /p/*\n", "Allow: /\n");
|
const ROBOTS_TXT: &str = concat!(
|
||||||
|
"User-Agent: *\n",
|
||||||
|
"Disallow: /p/*\n",
|
||||||
|
"Allow: /\n"
|
||||||
|
);
|
||||||
|
|
||||||
ROBOTS_TXT
|
ROBOTS_TXT
|
||||||
}
|
}
|
||||||
|
|
62
src/main.rs
62
src/main.rs
|
@ -1,64 +1,64 @@
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
extern crate axum;
|
use argh::FromArgs;
|
||||||
|
|
||||||
use clap::Parser;
|
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
Router,
|
Router,
|
||||||
};
|
};
|
||||||
use tokio::{fs, signal};
|
use tokio::{fs, net::TcpListener, signal};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
use tracing_subscriber::filter::LevelFilter;
|
|
||||||
|
|
||||||
|
mod cache;
|
||||||
mod config;
|
mod config;
|
||||||
|
mod disk;
|
||||||
mod engine;
|
mod engine;
|
||||||
mod index;
|
mod index;
|
||||||
mod new;
|
mod new;
|
||||||
mod view;
|
mod view;
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
/// breeze file server.
|
||||||
|
#[derive(FromArgs, Debug)]
|
||||||
struct Args {
|
struct Args {
|
||||||
/// The path to configuration file
|
/// the path to *.toml configuration file
|
||||||
config: Option<PathBuf>,
|
#[argh(option, short = 'c', arg_name = "file")]
|
||||||
|
config: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
// read & parse args
|
// read & parse args
|
||||||
let args = Args::parse();
|
let args: Args = argh::from_env();
|
||||||
|
|
||||||
// read & parse config
|
// read & parse config
|
||||||
let config_str = fs::read_to_string(args.config.unwrap_or("./breeze.toml".into()))
|
let cfg: config::Config = {
|
||||||
.await
|
let config_str = fs::read_to_string(args.config).await.expect(
|
||||||
.expect("failed to read config file! make sure it exists and you have read permissions");
|
"failed to read config file! make sure it exists and you have read permissions",
|
||||||
|
);
|
||||||
|
|
||||||
let c: config::Config = toml::from_str(&config_str).expect("invalid config! check that you have included all required options and structured it properly (no config options expecting a number getting a string, etc.)");
|
toml::from_str(&config_str).unwrap_or_else(|e| {
|
||||||
|
panic!("invalid config! ensure proper fields and structure. reference config is in readme.\n{e}");
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
tracing_subscriber::fmt()
|
tracing_subscriber::fmt()
|
||||||
.with_max_level(c.logger.level.unwrap_or(LevelFilter::WARN))
|
.with_max_level(cfg.logger.level)
|
||||||
.init();
|
.init();
|
||||||
|
|
||||||
if !c.engine.save_path.exists() || !c.engine.save_path.is_dir() {
|
{
|
||||||
panic!("the save path does not exist or is not a directory! this is invalid");
|
let save_path = cfg.engine.disk.save_path.clone();
|
||||||
|
if !save_path.exists() || !save_path.is_dir() {
|
||||||
|
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if c.engine.upload_key.is_none() {
|
if cfg.engine.upload_key.is_empty() {
|
||||||
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
||||||
}
|
}
|
||||||
|
|
||||||
// create engine
|
// create engine
|
||||||
let engine = Engine::new(
|
let engine = Engine::from_config(cfg.engine);
|
||||||
c.engine.base_url,
|
|
||||||
c.engine.save_path,
|
|
||||||
c.engine.upload_key.unwrap_or_default(),
|
|
||||||
c.cache.max_length,
|
|
||||||
c.cache.upload_lifetime,
|
|
||||||
c.cache.scan_freq,
|
|
||||||
c.cache.mem_capacity,
|
|
||||||
);
|
|
||||||
|
|
||||||
// build main router
|
// build main router
|
||||||
let app = Router::new()
|
let app = Router::new()
|
||||||
|
@ -69,11 +69,15 @@ async fn main() {
|
||||||
.with_state(Arc::new(engine));
|
.with_state(Arc::new(engine));
|
||||||
|
|
||||||
// start web server
|
// start web server
|
||||||
axum::Server::bind(&"0.0.0.0:8000".parse().unwrap())
|
let listener = TcpListener::bind(&cfg.http.listen_on)
|
||||||
.serve(app.into_make_service())
|
.await
|
||||||
|
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
|
||||||
|
|
||||||
|
info!("starting server.");
|
||||||
|
axum::serve(listener, app)
|
||||||
.with_graceful_shutdown(shutdown_signal())
|
.with_graceful_shutdown(shutdown_signal())
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.expect("failed to start server");
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
|
|
87
src/new.rs
87
src/new.rs
|
@ -1,47 +1,58 @@
|
||||||
use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc};
|
use std::{ffi::OsStr, path::PathBuf, sync::Arc, time::Duration};
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
extract::{BodyStream, Query, State},
|
body::Body,
|
||||||
http::HeaderValue,
|
extract::{Query, State},
|
||||||
};
|
};
|
||||||
use hyper::{header, HeaderMap, StatusCode};
|
use http::{header, HeaderMap, HeaderValue, StatusCode};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_with::{serde_as, DurationSeconds};
|
||||||
|
|
||||||
|
use crate::engine::ProcessOutcome;
|
||||||
|
|
||||||
|
fn default_keep_exif() -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
#[serde_as]
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
pub struct NewRequest {
|
||||||
|
name: String,
|
||||||
|
key: Option<String>,
|
||||||
|
|
||||||
|
#[serde(rename = "lastfor")]
|
||||||
|
#[serde_as(as = "Option<DurationSeconds>")]
|
||||||
|
last_for: Option<Duration>,
|
||||||
|
|
||||||
|
#[serde(rename = "keepexif", default = "default_keep_exif")]
|
||||||
|
keep_exif: bool,
|
||||||
|
}
|
||||||
|
|
||||||
/// The request handler for the /new path.
|
/// The request handler for the /new path.
|
||||||
/// This handles all new uploads.
|
/// This handles all new uploads.
|
||||||
#[axum::debug_handler]
|
#[axum::debug_handler]
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Query(params): Query<HashMap<String, String>>,
|
Query(req): Query<NewRequest>,
|
||||||
headers: HeaderMap,
|
headers: HeaderMap,
|
||||||
stream: BodyStream,
|
body: Body,
|
||||||
) -> Result<String, StatusCode> {
|
) -> Result<String, StatusCode> {
|
||||||
let key = params.get("key");
|
|
||||||
|
|
||||||
const EMPTY_STRING: &String = &String::new();
|
|
||||||
|
|
||||||
// check upload key, if i need to
|
// check upload key, if i need to
|
||||||
if !engine.upload_key.is_empty() && key.unwrap_or(EMPTY_STRING) != &engine.upload_key {
|
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
||||||
return Err(StatusCode::FORBIDDEN);
|
return Err(StatusCode::FORBIDDEN);
|
||||||
}
|
}
|
||||||
|
|
||||||
let original_name = params.get("name");
|
|
||||||
|
|
||||||
// the original file name wasn't given, so i can't work out what the extension should be
|
// the original file name wasn't given, so i can't work out what the extension should be
|
||||||
if original_name.is_none() {
|
if req.name.is_empty() {
|
||||||
return Err(StatusCode::BAD_REQUEST);
|
return Err(StatusCode::BAD_REQUEST);
|
||||||
}
|
}
|
||||||
|
|
||||||
let original_path = PathBuf::from(original_name.unwrap());
|
let extension = PathBuf::from(req.name)
|
||||||
|
.extension()
|
||||||
let path = engine.gen_path(&original_path).await;
|
|
||||||
let name = path
|
|
||||||
.file_name()
|
|
||||||
.and_then(OsStr::to_str)
|
.and_then(OsStr::to_str)
|
||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.to_string();
|
.to_string();
|
||||||
|
|
||||||
let url = format!("{}/p/{}", engine.base_url, name);
|
|
||||||
|
|
||||||
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
||||||
let content_length = headers
|
let content_length = headers
|
||||||
.get(header::CONTENT_LENGTH)
|
.get(header::CONTENT_LENGTH)
|
||||||
|
@ -51,10 +62,34 @@ pub async fn new(
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.unwrap_or(usize::MAX);
|
.unwrap_or(usize::MAX);
|
||||||
|
|
||||||
// pass it off to the engine to be processed!
|
// turn body into stream
|
||||||
engine
|
let stream = Body::into_data_stream(body);
|
||||||
.process_upload(path, name, content_length, stream)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
Ok(url)
|
// pass it off to the engine to be processed!
|
||||||
|
match engine
|
||||||
|
.process(
|
||||||
|
&extension,
|
||||||
|
content_length,
|
||||||
|
stream,
|
||||||
|
req.last_for,
|
||||||
|
req.keep_exif,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(outcome) => match outcome {
|
||||||
|
// 200 OK
|
||||||
|
ProcessOutcome::Success(url) => Ok(url),
|
||||||
|
|
||||||
|
// 413 Payload Too Large
|
||||||
|
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
||||||
|
Err(StatusCode::PAYLOAD_TOO_LARGE)
|
||||||
|
}
|
||||||
|
|
||||||
|
// 400 Bad Request
|
||||||
|
ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST),
|
||||||
|
},
|
||||||
|
|
||||||
|
// 500 Internal Server Error
|
||||||
|
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
92
src/view.rs
92
src/view.rs
|
@ -1,86 +1,39 @@
|
||||||
use std::{
|
use std::{ffi::OsStr, path::PathBuf, sync::Arc};
|
||||||
path::{Component, PathBuf},
|
|
||||||
sync::Arc,
|
|
||||||
};
|
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
body::StreamBody,
|
body::Body,
|
||||||
extract::{Path, State},
|
extract::{Path, State},
|
||||||
response::{IntoResponse, Response},
|
response::{IntoResponse, Response},
|
||||||
};
|
};
|
||||||
|
|
||||||
use bytes::Bytes;
|
use http::{HeaderValue, StatusCode};
|
||||||
use hyper::{http::HeaderValue, StatusCode};
|
|
||||||
use tokio::{fs::File, runtime::Handle};
|
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::{error, debug, warn};
|
|
||||||
|
|
||||||
/// Responses for a successful view operation
|
use crate::engine::UploadData;
|
||||||
pub enum ViewSuccess {
|
|
||||||
/// A file read from disk, suitable for larger files.
|
|
||||||
///
|
|
||||||
/// The file provided will be streamed from disk and
|
|
||||||
/// back to the viewer.
|
|
||||||
///
|
|
||||||
/// This is only ever used if a file exceeds the
|
|
||||||
/// cache's maximum file size.
|
|
||||||
FromDisk(File),
|
|
||||||
|
|
||||||
/// A file read from in-memory cache, best for smaller files.
|
|
||||||
///
|
|
||||||
/// The file is taken from the cache in its entirety
|
|
||||||
/// and sent back to the viewer.
|
|
||||||
///
|
|
||||||
/// If a file can be fit into cache, this will be
|
|
||||||
/// used even if it's read from disk.
|
|
||||||
FromCache(Bytes),
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Responses for a failed view operation
|
/// Responses for a failed view operation
|
||||||
pub enum ViewError {
|
pub enum ViewError {
|
||||||
/// Will send status code 404 witha plaintext "not found" message.
|
/// Will send status code 404 with a plaintext "not found" message.
|
||||||
NotFound,
|
NotFound,
|
||||||
|
|
||||||
/// Will send status code 500 with a plaintext "internal server error" message.
|
/// Will send status code 500 with a plaintext "internal server error" message.
|
||||||
InternalServerError,
|
InternalServerError,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl IntoResponse for ViewSuccess {
|
impl IntoResponse for UploadData {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
match self {
|
match self {
|
||||||
ViewSuccess::FromDisk(file) => {
|
UploadData::Disk(file, len) => {
|
||||||
// get handle to current tokio runtime
|
// create our content-length header
|
||||||
// i use this to block on futures here (not async)
|
let len_str = len.to_string();
|
||||||
let handle = Handle::current();
|
|
||||||
let _ = handle.enter();
|
|
||||||
|
|
||||||
// read the metadata of the file on disk
|
|
||||||
// this function isn't async
|
|
||||||
// .. so we have to use handle.block_on() to get the metadata
|
|
||||||
let metadata = futures::executor::block_on(file.metadata());
|
|
||||||
|
|
||||||
// if we error then return 500
|
|
||||||
if metadata.is_err() {
|
|
||||||
error!("failed to read metadata from disk");
|
|
||||||
return ViewError::InternalServerError.into_response();
|
|
||||||
}
|
|
||||||
|
|
||||||
// unwrap (which we know is safe) and read the file size as a string
|
|
||||||
let metadata = metadata.unwrap();
|
|
||||||
let len_str = metadata.len().to_string();
|
|
||||||
|
|
||||||
debug!("file is {} bytes on disk", &len_str);
|
|
||||||
|
|
||||||
// HeaderValue::from_str will never error if only visible ASCII characters are passed (32-127)
|
|
||||||
// .. so unwrapping this should be fine
|
|
||||||
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
||||||
|
|
||||||
// create a streamed body response (we want to stream larger files)
|
// create a streamed body response (we want to stream larger files)
|
||||||
let reader = ReaderStream::new(file);
|
let stream = ReaderStream::new(file);
|
||||||
let stream = StreamBody::new(reader);
|
let body = Body::from_stream(stream);
|
||||||
|
|
||||||
// extract mutable headers from the response
|
// extract mutable headers from the response
|
||||||
let mut res = stream.into_response();
|
let mut res = body.into_response();
|
||||||
let headers = res.headers_mut();
|
let headers = res.headers_mut();
|
||||||
|
|
||||||
// clear headers, browser can imply content type
|
// clear headers, browser can imply content type
|
||||||
|
@ -92,7 +45,7 @@ impl IntoResponse for ViewSuccess {
|
||||||
|
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
ViewSuccess::FromCache(data) => {
|
UploadData::Cache(data) => {
|
||||||
// extract mutable headers from the response
|
// extract mutable headers from the response
|
||||||
let mut res = data.into_response();
|
let mut res = data.into_response();
|
||||||
let headers = res.headers_mut();
|
let headers = res.headers_mut();
|
||||||
|
@ -128,16 +81,17 @@ impl IntoResponse for ViewError {
|
||||||
pub async fn view(
|
pub async fn view(
|
||||||
State(engine): State<Arc<crate::engine::Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Path(original_path): Path<PathBuf>,
|
Path(original_path): Path<PathBuf>,
|
||||||
) -> Result<ViewSuccess, ViewError> {
|
) -> Result<UploadData, ViewError> {
|
||||||
// (hopefully) prevent path traversal, just check for any non-file components
|
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
|
||||||
if original_path
|
n
|
||||||
.components()
|
} else {
|
||||||
.any(|x| !matches!(x, Component::Normal(_)))
|
|
||||||
{
|
|
||||||
warn!("a request attempted path traversal");
|
|
||||||
return Err(ViewError::NotFound);
|
return Err(ViewError::NotFound);
|
||||||
}
|
};
|
||||||
|
|
||||||
// get result from the engine!
|
// get result from the engine!
|
||||||
engine.get_upload(&original_path).await
|
match engine.get(saved_name).await {
|
||||||
|
Ok(Some(u)) => Ok(u),
|
||||||
|
Ok(None) => Err(ViewError::NotFound),
|
||||||
|
Err(_) => Err(ViewError::InternalServerError),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue