Compare commits
No commits in common. "main" and "0.1.5-p2" have entirely different histories.
4
.envrc
4
.envrc
|
@ -1,4 +0,0 @@
|
|||
if ! has nix_direnv_version || ! nix_direnv_version 3.0.6; then
|
||||
source_url "https://raw.githubusercontent.com/nix-community/nix-direnv/3.0.6/direnvrc" "sha256-RYcUJaRMf8oF5LznDrlCXbkOQrywm0HDv1VjYGaJGdM="
|
||||
fi
|
||||
use flake
|
|
@ -1,5 +1,2 @@
|
|||
# binaries
|
||||
/target
|
||||
|
||||
# nix-direnv
|
||||
/.direnv
|
||||
|
|
File diff suppressed because it is too large
Load Diff
30
Cargo.toml
30
Cargo.toml
|
@ -1,27 +1,25 @@
|
|||
[package]
|
||||
name = "breeze"
|
||||
version = "0.2.6"
|
||||
version = "0.1.5"
|
||||
edition = "2021"
|
||||
|
||||
[dependencies]
|
||||
axum = { version = "0.7.5", features = ["macros", "http2"] }
|
||||
tower = "0.4.13"
|
||||
http = "1.1.0"
|
||||
axum = { version = "0.6.1", features = ["macros"] }
|
||||
hyper = { version = "0.14", features = ["full"] }
|
||||
tokio = { version = "1", features = ["full"] }
|
||||
tokio-util = { version = "0.7.4", features = ["full"] }
|
||||
tokio-stream = "0.1"
|
||||
tower = "0.4.13"
|
||||
bytes = "1"
|
||||
rand = "0.8.5"
|
||||
async-recursion = "1.0.0"
|
||||
walkdir = "2"
|
||||
futures = "0.3"
|
||||
tracing = "0.1"
|
||||
tracing-subscriber = "0.3"
|
||||
bytes = "1"
|
||||
async-recursion = "1.0.0"
|
||||
rand = "0.8.5"
|
||||
walkdir = "2"
|
||||
anyhow = "1.0"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_with = "3.4.0"
|
||||
archived = { path = "./archived" }
|
||||
xxhash-rust = { version = "0.8.7", features = ["xxh3"] }
|
||||
serde = { version = "1.0.189", features = ["derive"] }
|
||||
toml = "0.8.2"
|
||||
argh = "0.1.12"
|
||||
dashmap = { version = "5.5.3", features = ["rayon", "inline"] }
|
||||
rayon = "1.8"
|
||||
atomic-time = "0.1.4"
|
||||
img-parts = "0.3.0"
|
||||
clap = { version = "4.4.6", features = ["derive"] }
|
||||
serde_with = "3.4.0"
|
||||
|
|
|
@ -12,4 +12,8 @@ RUN apt-get update && rm -rf /var/lib/apt/lists/*
|
|||
|
||||
COPY --from=builder /usr/local/cargo/bin/breeze /usr/local/bin/breeze
|
||||
|
||||
RUN useradd -m runner
|
||||
USER runner
|
||||
|
||||
EXPOSE 8000
|
||||
CMD [ "breeze", "--config", "/etc/breeze.toml" ]
|
||||
|
|
44
README.md
44
README.md
|
@ -9,8 +9,6 @@ Compared to the old Express.js backend, breeze has
|
|||
- Streamed downloading (on larger files)
|
||||
- Upload caching
|
||||
- Generally faster speeds overall
|
||||
- Temporary uploads
|
||||
- Automatic exif data removal
|
||||
|
||||
At this time, breeze does not support encrypted uploads on disk.
|
||||
|
||||
|
@ -35,17 +33,13 @@ services:
|
|||
- /srv/uploads:/data
|
||||
- ./breeze.toml:/etc/breeze.toml
|
||||
|
||||
user: 1000:1000
|
||||
|
||||
ports:
|
||||
- 8383:8000
|
||||
- 8000:8000
|
||||
```
|
||||
For this configuration, it is expected that:
|
||||
* there is a clone of the Git repository in the `./breeze` folder
|
||||
* there is a clone of the Git repository in the `./breeze` folder.
|
||||
* there is a `breeze.toml` config file in current directory
|
||||
* there is a directory at `/srv/uploads` for storing uploads
|
||||
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
||||
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
||||
|
||||
It can also be installed directly if you have the Rust toolchain installed:
|
||||
```bash
|
||||
|
@ -56,7 +50,7 @@ cargo install --path .
|
|||
### Hosting
|
||||
Configuration is read through a toml file.
|
||||
|
||||
The config file path is specified using the `-c`/`--config` command line switch.
|
||||
By default it'll try to read `./breeze.toml`, but you can specify a different path using the `-c`/`--config` command line switch.
|
||||
|
||||
Here is an example config file:
|
||||
```toml
|
||||
|
@ -67,6 +61,10 @@ Here is an example config file:
|
|||
# upload urls of "https://picture.wtf/p/abcdef.png", etc.
|
||||
base_url = "http://127.0.0.1:8000"
|
||||
|
||||
# The location that uploads will be saved to.
|
||||
# It should be a path to a directory on disk that you can write to.
|
||||
save_path = "/data"
|
||||
|
||||
# OPTIONAL - If set, the static key specified will be required to upload new files.
|
||||
# If it is not set, no key will be required.
|
||||
upload_key = "hiiiiiiii"
|
||||
|
@ -78,28 +76,6 @@ upload_key = "hiiiiiiii"
|
|||
# %version% - current breeze version (e.g. 0.1.5)
|
||||
motd = "my image host, currently hosting %uplcount% files"
|
||||
|
||||
# The maximum lifetime a temporary upload may be given, in seconds.
|
||||
# It's okay to leave this somewhat high because large temporary uploads
|
||||
# will just be bumped out of the cache when a new upload needs to be
|
||||
# cached anyways.
|
||||
max_temp_lifetime = 43200
|
||||
|
||||
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
||||
# A word of warning about this: the error shown to ShareX users who
|
||||
# hit the limit is *not* very clear. ("connection closed" or similar)
|
||||
max_upload_len = 2_147_483_648
|
||||
|
||||
# The maximum length (in bytes) an image file may be before the server
|
||||
# will skip removing its EXIF data.
|
||||
# The performance impact of breeze's EXIF data removal is not
|
||||
# very high in everyday usage, so something like 16MiB is reasonable.
|
||||
max_strip_len = 16_777_216
|
||||
|
||||
[engine.disk]
|
||||
# The location that uploads will be saved to.
|
||||
# It should be a path to a directory on disk that you can write to.
|
||||
save_path = "/data"
|
||||
|
||||
[engine.cache]
|
||||
# The file size (in bytes) that a file must be under
|
||||
# to get cached.
|
||||
|
@ -113,7 +89,7 @@ upload_lifetime = 1800
|
|||
scan_freq = 60
|
||||
|
||||
# How much memory (in bytes) the cache is allowed to consume.
|
||||
mem_capacity = 4_294_967_296
|
||||
mem_capacity = 4_294_967_295
|
||||
|
||||
[http]
|
||||
# The address that the HTTP server will listen on. (ip:port)
|
||||
|
@ -128,12 +104,10 @@ level = "warn"
|
|||
```
|
||||
|
||||
### Uploading
|
||||
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
||||
The HTTP API is fairly simple, and it's pretty easy to make a ShareX configuration for it.
|
||||
|
||||
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
||||
|
||||
Additionally, you may specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, just leave it as default. you'll know if you need it)
|
||||
|
||||
Here's an example ShareX configuration for it (with a key):
|
||||
```json
|
||||
{
|
||||
|
|
|
@ -0,0 +1,2 @@
|
|||
.idea
|
||||
target
|
|
@ -0,0 +1,30 @@
|
|||
# This file is automatically @generated by Cargo.
|
||||
# It is not intended for manual editing.
|
||||
version = 3
|
||||
|
||||
[[package]]
|
||||
name = "archived"
|
||||
version = "0.2.0"
|
||||
dependencies = [
|
||||
"bytes",
|
||||
"once_cell",
|
||||
"rustc-hash",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "bytes"
|
||||
version = "1.3.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dfb24e866b15a1af2a1b663f10c6b6b8f397a84aadb828f12e5b289ec23a3a3c"
|
||||
|
||||
[[package]]
|
||||
name = "once_cell"
|
||||
version = "1.3.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b1c601810575c99596d4afc46f78a678c80105117c379eb3650cf99b8a21ce5b"
|
||||
|
||||
[[package]]
|
||||
name = "rustc-hash"
|
||||
version = "1.1.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2"
|
|
@ -0,0 +1,9 @@
|
|||
[package]
|
||||
name = "archived"
|
||||
version = "0.2.0"
|
||||
edition = "2018"
|
||||
license = "MIT"
|
||||
|
||||
[dependencies]
|
||||
bytes = "1.3.0"
|
||||
once_cell = "1.3.1"
|
|
@ -0,0 +1,22 @@
|
|||
MIT License
|
||||
|
||||
Copyright (c) 2020 aikidos
|
||||
Copyright (c) 2023 ot2t7, minish
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
in the Software without restriction, including without limitation the rights
|
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
copies of the Software, and to permit persons to whom the Software is
|
||||
furnished to do so, subject to the following conditions:
|
||||
|
||||
The above copyright notice and this permission notice shall be included in all
|
||||
copies or substantial portions of the Software.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
SOFTWARE.
|
|
@ -0,0 +1,26 @@
|
|||
use std::time::{Duration, SystemTime};
|
||||
|
||||
/// Represents a set of eviction and expiration details for a specific cache entry.
|
||||
pub(crate) struct CacheEntry<B> {
|
||||
/// Entry value.
|
||||
pub(crate) value: B,
|
||||
|
||||
/// Expiration time.
|
||||
///
|
||||
/// - [`None`] if the value must be kept forever.
|
||||
pub(crate) expiration_time: SystemTime,
|
||||
}
|
||||
|
||||
impl<B> CacheEntry<B> {
|
||||
pub(crate) fn new(value: B, lifetime: Duration) -> Self {
|
||||
Self {
|
||||
expiration_time: SystemTime::now() + lifetime,
|
||||
value,
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if a entry is expired.
|
||||
pub(crate) fn is_expired(&self, current_time: SystemTime) -> bool {
|
||||
current_time >= self.expiration_time
|
||||
}
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
mod entry;
|
||||
|
||||
use bytes::Bytes;
|
||||
|
||||
use crate::entry::*;
|
||||
use std::collections::hash_map::Entry;
|
||||
use std::collections::HashMap;
|
||||
use std::time::{Duration, SystemTime};
|
||||
|
||||
pub struct Archive {
|
||||
cache_table: HashMap<String, CacheEntry<Bytes>>,
|
||||
full_scan_frequency: Option<Duration>,
|
||||
created_time: SystemTime,
|
||||
last_scan_time: Option<SystemTime>,
|
||||
entry_lifetime: Duration,
|
||||
capacity: usize,
|
||||
length: usize,
|
||||
}
|
||||
|
||||
impl Archive {
|
||||
/* pub fn new(capacity: usize) -> Self {
|
||||
Self {
|
||||
cache_table: HashMap::new(),
|
||||
full_scan_frequency: None,
|
||||
created_time: SystemTime::now(),
|
||||
last_scan_time: None,
|
||||
capacity,
|
||||
length: 0,
|
||||
}
|
||||
} */
|
||||
|
||||
pub fn with_full_scan(
|
||||
full_scan_frequency: Duration,
|
||||
entry_lifetime: Duration,
|
||||
capacity: usize,
|
||||
) -> Self {
|
||||
Self {
|
||||
cache_table: HashMap::with_capacity(256),
|
||||
full_scan_frequency: Some(full_scan_frequency),
|
||||
created_time: SystemTime::now(),
|
||||
last_scan_time: None,
|
||||
entry_lifetime,
|
||||
capacity,
|
||||
length: 0,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn contains_key(&self, key: &String) -> bool {
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.cache_table
|
||||
.get(key)
|
||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||
.is_some()
|
||||
}
|
||||
|
||||
pub fn get_last_scan_time(&self) -> Option<SystemTime> {
|
||||
self.last_scan_time
|
||||
}
|
||||
|
||||
pub fn get_full_scan_frequency(&self) -> Option<Duration> {
|
||||
self.full_scan_frequency
|
||||
}
|
||||
|
||||
pub fn get(&self, key: &String) -> Option<&Bytes> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.cache_table
|
||||
.get(key)
|
||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||
.map(|cache_entry| &cache_entry.value)
|
||||
}
|
||||
|
||||
pub fn get_or_insert<F>(&mut self, key: String, factory: F) -> &Bytes
|
||||
where
|
||||
F: Fn() -> Bytes,
|
||||
{
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.try_full_scan_expired_items(now);
|
||||
|
||||
match self.cache_table.entry(key) {
|
||||
Entry::Occupied(mut occupied) => {
|
||||
if occupied.get().is_expired(now) {
|
||||
occupied.insert(CacheEntry::new(factory(), self.entry_lifetime));
|
||||
}
|
||||
|
||||
&occupied.into_mut().value
|
||||
}
|
||||
Entry::Vacant(vacant) => {
|
||||
&vacant
|
||||
.insert(CacheEntry::new(factory(), self.entry_lifetime))
|
||||
.value
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn insert(&mut self, key: String, value: Bytes) -> Option<Bytes> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.try_full_scan_expired_items(now);
|
||||
|
||||
if value.len() + self.length > self.capacity {
|
||||
return None;
|
||||
}
|
||||
|
||||
self.length += value.len();
|
||||
|
||||
self.cache_table
|
||||
.insert(key, CacheEntry::new(value, self.entry_lifetime))
|
||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||
.map(|cache_entry| cache_entry.value)
|
||||
}
|
||||
|
||||
pub fn remove(&mut self, key: &String) -> Option<Bytes> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.try_full_scan_expired_items(now);
|
||||
|
||||
let mut removed_len: usize = 0;
|
||||
let result = self
|
||||
.cache_table
|
||||
.remove(key)
|
||||
.filter(|cache_entry| !cache_entry.is_expired(now))
|
||||
.and_then(|o| {
|
||||
removed_len += o.value.len();
|
||||
return Some(o);
|
||||
})
|
||||
.map(|cache_entry| cache_entry.value);
|
||||
self.length -= removed_len;
|
||||
return result;
|
||||
}
|
||||
|
||||
pub fn renew(&mut self, key: &String) -> Option<()> {
|
||||
let now = SystemTime::now();
|
||||
|
||||
self.try_full_scan_expired_items(now);
|
||||
|
||||
let entry = self.cache_table.get_mut(key);
|
||||
|
||||
match entry {
|
||||
Some(entry) => {
|
||||
entry.expiration_time = now + self.entry_lifetime;
|
||||
|
||||
Some(())
|
||||
}
|
||||
None => None,
|
||||
}
|
||||
}
|
||||
|
||||
fn try_full_scan_expired_items(&mut self, current_time: SystemTime) {
|
||||
if let Some(full_scan_frequency) = self.full_scan_frequency {
|
||||
let since = current_time
|
||||
.duration_since(self.last_scan_time.unwrap_or(self.created_time))
|
||||
.unwrap();
|
||||
|
||||
if since >= full_scan_frequency {
|
||||
let mut removed_len = 0;
|
||||
self.cache_table.retain(|_, cache_entry| {
|
||||
if cache_entry.is_expired(current_time) {
|
||||
removed_len += cache_entry.value.len();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
});
|
||||
self.length -= removed_len;
|
||||
|
||||
self.last_scan_time = Some(current_time);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
77
flake.lock
77
flake.lock
|
@ -1,77 +0,0 @@
|
|||
{
|
||||
"nodes": {
|
||||
"crane": {
|
||||
"locked": {
|
||||
"lastModified": 1725409566,
|
||||
"narHash": "sha256-PrtLmqhM6UtJP7v7IGyzjBFhbG4eOAHT6LPYOFmYfbk=",
|
||||
"owner": "ipetkov",
|
||||
"repo": "crane",
|
||||
"rev": "7e4586bad4e3f8f97a9271def747cf58c4b68f3c",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "ipetkov",
|
||||
"repo": "crane",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"flake-utils": {
|
||||
"inputs": {
|
||||
"systems": "systems"
|
||||
},
|
||||
"locked": {
|
||||
"lastModified": 1726560853,
|
||||
"narHash": "sha256-X6rJYSESBVr3hBoH0WbKE5KvhPU5bloyZ2L4K60/fPQ=",
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"rev": "c1dfcf08411b08f6b8615f7d8971a2bfa81d5e8a",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "numtide",
|
||||
"repo": "flake-utils",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"nixpkgs": {
|
||||
"locked": {
|
||||
"lastModified": 1726871744,
|
||||
"narHash": "sha256-V5LpfdHyQkUF7RfOaDPrZDP+oqz88lTJrMT1+stXNwo=",
|
||||
"owner": "NixOS",
|
||||
"repo": "nixpkgs",
|
||||
"rev": "a1d92660c6b3b7c26fb883500a80ea9d33321be2",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "NixOS",
|
||||
"ref": "nixpkgs-unstable",
|
||||
"repo": "nixpkgs",
|
||||
"type": "github"
|
||||
}
|
||||
},
|
||||
"root": {
|
||||
"inputs": {
|
||||
"crane": "crane",
|
||||
"flake-utils": "flake-utils",
|
||||
"nixpkgs": "nixpkgs"
|
||||
}
|
||||
},
|
||||
"systems": {
|
||||
"locked": {
|
||||
"lastModified": 1681028828,
|
||||
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
|
||||
"type": "github"
|
||||
},
|
||||
"original": {
|
||||
"owner": "nix-systems",
|
||||
"repo": "default",
|
||||
"type": "github"
|
||||
}
|
||||
}
|
||||
},
|
||||
"root": "root",
|
||||
"version": 7
|
||||
}
|
225
flake.nix
225
flake.nix
|
@ -1,225 +0,0 @@
|
|||
{
|
||||
description = "breeze file server";
|
||||
|
||||
inputs = {
|
||||
nixpkgs.url = "github:NixOS/nixpkgs/nixpkgs-unstable";
|
||||
crane.url = "github:ipetkov/crane";
|
||||
flake-utils.url = "github:numtide/flake-utils";
|
||||
};
|
||||
|
||||
outputs = {
|
||||
self,
|
||||
nixpkgs,
|
||||
crane,
|
||||
flake-utils,
|
||||
...
|
||||
}:
|
||||
flake-utils.lib.eachDefaultSystem (system: let
|
||||
pkgs = nixpkgs.legacyPackages.${system};
|
||||
|
||||
craneLib = crane.mkLib pkgs;
|
||||
|
||||
# Common arguments can be set here to avoid repeating them later
|
||||
# Note: changes here will rebuild all dependency crates
|
||||
commonArgs = {
|
||||
src = craneLib.cleanCargoSource ./.;
|
||||
strictDeps = true;
|
||||
|
||||
buildInputs =
|
||||
[
|
||||
pkgs.openssl
|
||||
]
|
||||
++ pkgs.lib.optionals pkgs.stdenv.isDarwin [
|
||||
# Additional darwin specific inputs can be set here
|
||||
pkgs.libiconv
|
||||
];
|
||||
};
|
||||
|
||||
breeze = craneLib.buildPackage (commonArgs
|
||||
// {
|
||||
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
|
||||
|
||||
# Additional environment variables or build phases/hooks can be set
|
||||
# here *without* rebuilding all dependency crates
|
||||
# MY_CUSTOM_VAR = "some value";
|
||||
});
|
||||
in {
|
||||
checks = {
|
||||
inherit breeze;
|
||||
};
|
||||
|
||||
packages.default = breeze;
|
||||
|
||||
apps.default = flake-utils.lib.mkApp {
|
||||
drv = breeze;
|
||||
};
|
||||
|
||||
devShells.default = craneLib.devShell {
|
||||
# Inherit inputs from checks.
|
||||
checks = self.checks.${system};
|
||||
|
||||
# Additional dev-shell environment variables can be set directly
|
||||
# MY_CUSTOM_DEVELOPMENT_VAR = "something else";
|
||||
|
||||
# Extra inputs can be added here; cargo and rustc are provided by default.
|
||||
packages = with pkgs; [
|
||||
alejandra
|
||||
rewrk
|
||||
];
|
||||
};
|
||||
|
||||
nixosModules.breeze = {
|
||||
config,
|
||||
pkgs,
|
||||
lib,
|
||||
...
|
||||
}:
|
||||
with lib; let
|
||||
cfg = config.services.breeze;
|
||||
settingsFormat = pkgs.formats.toml {};
|
||||
in {
|
||||
options = {
|
||||
services.breeze = {
|
||||
enable = mkEnableOption "breeze file server";
|
||||
|
||||
package = mkOption {
|
||||
type = types.package;
|
||||
default = breeze;
|
||||
description = "Package for `breeze` to use";
|
||||
};
|
||||
|
||||
user = mkOption {
|
||||
type = types.str;
|
||||
default = "breeze";
|
||||
description = "User that `breeze` will run under";
|
||||
};
|
||||
|
||||
group = mkOption {
|
||||
type = types.str;
|
||||
default = "breeze";
|
||||
description = "Group that `breeze` will run under";
|
||||
};
|
||||
|
||||
extraGroups = mkOption {
|
||||
type = types.listOf types.str;
|
||||
default = [];
|
||||
description = "Any supplementary groups for `breeze` to run under";
|
||||
};
|
||||
|
||||
settings = mkOption {
|
||||
type = settingsFormat.type;
|
||||
default = {};
|
||||
description = ''
|
||||
The *.toml configuration to run `breeze` with.
|
||||
There is no formal documentation, but there is an example in the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md).
|
||||
'';
|
||||
};
|
||||
|
||||
configDir = mkOption {
|
||||
type = types.path;
|
||||
default = "/etc/breeze";
|
||||
description = ''
|
||||
The directory on disk to store the `breeze` config file in.
|
||||
This does not load pre-existing config files, it only defines where the generated config is saved.
|
||||
'';
|
||||
};
|
||||
|
||||
uploadKeyFile = mkOption {
|
||||
type = types.nullOr types.path;
|
||||
default = null;
|
||||
description = ''
|
||||
File to load the `engine.upload_key` from, if desired.
|
||||
This is useful for loading it from a secret management system.
|
||||
'';
|
||||
};
|
||||
};
|
||||
};
|
||||
|
||||
config = mkIf cfg.enable {
|
||||
users.users.${cfg.user} = {
|
||||
isSystemUser = true;
|
||||
inherit (cfg) group;
|
||||
};
|
||||
|
||||
users.groups.${cfg.group} = {};
|
||||
|
||||
systemd.tmpfiles.rules = [
|
||||
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
||||
];
|
||||
|
||||
services.breeze.settings = mkMerge [
|
||||
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
||||
];
|
||||
|
||||
systemd.services.breeze = let
|
||||
cfgFile = "${cfg.configDir}/breeze.toml";
|
||||
in {
|
||||
description = "breeze file server";
|
||||
after = ["local-fs.target" "network.target"];
|
||||
wantedBy = ["multi-user.target"];
|
||||
|
||||
preStart =
|
||||
''
|
||||
install -m 660 ${settingsFormat.generate "breeze.toml" cfg.settings} ${cfgFile}
|
||||
''
|
||||
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
||||
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
||||
'';
|
||||
|
||||
serviceConfig = rec {
|
||||
User = cfg.user;
|
||||
Group = cfg.group;
|
||||
DynamicUser = false; # we write files, so don't do that
|
||||
SupplementaryGroups = cfg.extraGroups;
|
||||
StateDirectory = "breeze";
|
||||
CacheDirectory = "breeze";
|
||||
ExecStart = escapeShellArgs [
|
||||
"${cfg.package}/bin/breeze"
|
||||
"--config"
|
||||
cfgFile
|
||||
];
|
||||
Restart = "on-failure";
|
||||
|
||||
# Security Options #
|
||||
|
||||
NoNewPrivileges = true; # implied by DynamicUser
|
||||
RemoveIPC = true; # implied by DynamicUser
|
||||
|
||||
AmbientCapabilities = "";
|
||||
CapabilityBoundingSet = "";
|
||||
|
||||
DeviceAllow = "";
|
||||
|
||||
LockPersonality = true;
|
||||
|
||||
PrivateTmp = true; # implied by DynamicUser
|
||||
PrivateDevices = true;
|
||||
PrivateUsers = true;
|
||||
|
||||
ProtectClock = true;
|
||||
ProtectControlGroups = true;
|
||||
ProtectHostname = true;
|
||||
ProtectKernelLogs = true;
|
||||
ProtectKernelModules = true;
|
||||
ProtectKernelTunables = true;
|
||||
|
||||
RestrictNamespaces = true;
|
||||
RestrictAddressFamilies = ["AF_INET" "AF_INET6" "AF_UNIX"];
|
||||
RestrictRealtime = true;
|
||||
RestrictSUIDSGID = true; # implied by DynamicUser
|
||||
|
||||
SystemCallArchitectures = "native";
|
||||
SystemCallErrorNumber = "EPERM";
|
||||
SystemCallFilter = [
|
||||
"@system-service"
|
||||
"~@keyring"
|
||||
"~@memlock"
|
||||
"~@privileged"
|
||||
"~@setuid"
|
||||
];
|
||||
};
|
||||
};
|
||||
};
|
||||
};
|
||||
});
|
||||
}
|
259
src/cache.rs
259
src/cache.rs
|
@ -1,259 +0,0 @@
|
|||
use std::{
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
time::{Duration, SystemTime},
|
||||
};
|
||||
|
||||
use atomic_time::AtomicSystemTime;
|
||||
use bytes::Bytes;
|
||||
use dashmap::{mapref::one::Ref, DashMap};
|
||||
use rayon::prelude::*;
|
||||
use tokio::time;
|
||||
|
||||
use crate::config;
|
||||
|
||||
/// An entry stored in the cache.
|
||||
///
|
||||
/// It contains basic metadata and the actual value.
|
||||
pub struct Entry {
|
||||
/// The data held
|
||||
value: Bytes,
|
||||
|
||||
/// The last time this entry was read/written
|
||||
last_used: AtomicSystemTime,
|
||||
|
||||
/// Whether or not `last_used` should be updated
|
||||
update_used: bool,
|
||||
|
||||
/// How long the entry should last
|
||||
lifetime: Duration,
|
||||
}
|
||||
|
||||
impl Entry {
|
||||
fn new(value: Bytes, lifetime: Duration, update_used: bool) -> Self {
|
||||
let now = AtomicSystemTime::now();
|
||||
|
||||
Self {
|
||||
value,
|
||||
last_used: now,
|
||||
update_used,
|
||||
lifetime,
|
||||
}
|
||||
}
|
||||
|
||||
fn last_used(&self) -> SystemTime {
|
||||
self.last_used.load(Ordering::Relaxed)
|
||||
}
|
||||
|
||||
fn is_expired(&self) -> bool {
|
||||
match self.last_used().elapsed() {
|
||||
Ok(d) => d >= self.lifetime,
|
||||
Err(_) => false, // now > last_used
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A concurrent cache with a maximum memory size (w/ LRU) and expiration.
|
||||
///
|
||||
/// It is designed to keep memory usage low.
|
||||
pub struct Cache {
|
||||
/// Where elements are stored
|
||||
map: DashMap<String, Entry>,
|
||||
|
||||
/// Total length of data stored in cache currently
|
||||
length: AtomicUsize,
|
||||
|
||||
/// How should it behave
|
||||
cfg: config::CacheConfig,
|
||||
}
|
||||
|
||||
impl Cache {
|
||||
pub fn from_config(cfg: config::CacheConfig) -> Self {
|
||||
Self {
|
||||
map: DashMap::with_capacity(256),
|
||||
length: AtomicUsize::new(0),
|
||||
|
||||
cfg,
|
||||
}
|
||||
}
|
||||
|
||||
/// Figure out who should be bumped out of cache next
|
||||
fn next_out(&self, length: usize) -> Vec<String> {
|
||||
let mut sorted: Vec<_> = self.map.iter().collect();
|
||||
|
||||
// Sort by least recently used
|
||||
sorted.par_sort_unstable_by(|e1, e2| e1.last_used().cmp(&e2.last_used()));
|
||||
|
||||
// Total bytes we would be removing
|
||||
let mut total = 0;
|
||||
|
||||
// Pull entries until we have enough free space
|
||||
sorted
|
||||
.iter()
|
||||
.take_while(|e| {
|
||||
let need_more = total < length;
|
||||
|
||||
if need_more {
|
||||
total += e.value.len();
|
||||
}
|
||||
|
||||
need_more
|
||||
})
|
||||
.map(|e| e.key().clone())
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Remove an element from the cache
|
||||
///
|
||||
/// Returns: [`Some`] if successful, [`None`] if element not found
|
||||
pub fn remove(&self, key: &str) -> Option<()> {
|
||||
// Skip expiry checks, we are removing it anyways
|
||||
// And also that could cause an infinite loop which would be pretty stupid.
|
||||
let e = self.map.get(key)?;
|
||||
|
||||
// Atomically subtract from the total cache length
|
||||
self.length.fetch_sub(e.value.len(), Ordering::Relaxed);
|
||||
|
||||
// Drop the entry lock so we can actually remove it
|
||||
drop(e);
|
||||
|
||||
// Remove from map
|
||||
self.map.remove(key);
|
||||
|
||||
Some(())
|
||||
}
|
||||
|
||||
/// Add a new element to the cache with a specified lifetime.
|
||||
///
|
||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
||||
pub fn add_with_lifetime(
|
||||
&self,
|
||||
key: &str,
|
||||
value: Bytes,
|
||||
lifetime: Duration,
|
||||
is_renewable: bool,
|
||||
) -> bool {
|
||||
let e = Entry::new(value, lifetime, is_renewable);
|
||||
|
||||
let len = e.value.len();
|
||||
let cur_total = self.length.load(Ordering::Relaxed);
|
||||
let new_total = cur_total + len;
|
||||
|
||||
if new_total > self.cfg.mem_capacity {
|
||||
// How far we went above the limit
|
||||
let needed = new_total - self.cfg.mem_capacity;
|
||||
|
||||
self.next_out(needed).par_iter().for_each(|k| {
|
||||
// Remove the element, and ignore the result
|
||||
// The only reason it should be failing is if it couldn't find it,
|
||||
// in which case it was already removed
|
||||
self.remove(k);
|
||||
})
|
||||
}
|
||||
|
||||
// Atomically add to total cached data length
|
||||
self.length.fetch_add(len, Ordering::Relaxed);
|
||||
|
||||
// Add to the map, return true if we didn't replace anything
|
||||
self.map.insert(key.to_string(), e).is_none()
|
||||
}
|
||||
|
||||
/// Add a new element to the cache with the default lifetime.
|
||||
///
|
||||
/// Returns: `true` if no value is replaced, `false` if a value was replaced
|
||||
pub fn add(&self, key: &str, value: Bytes) -> bool {
|
||||
self.add_with_lifetime(key, value, self.cfg.upload_lifetime, true)
|
||||
}
|
||||
|
||||
/// Internal function for retrieving entries.
|
||||
///
|
||||
/// Returns: same as [`DashMap::get`], for our purposes
|
||||
///
|
||||
/// It exists so we can run the expiry check before
|
||||
/// actually working with any entries, so no weird bugs happen
|
||||
fn _get(&self, key: &str) -> Option<Ref<String, Entry>> {
|
||||
let e = self.map.get(key)?;
|
||||
|
||||
// if the entry is expired get rid of it now
|
||||
if e.is_expired() {
|
||||
// drop the reference so we don't deadlock
|
||||
drop(e);
|
||||
|
||||
// remove it
|
||||
self.remove(key);
|
||||
|
||||
// and say we never had it
|
||||
return None;
|
||||
}
|
||||
|
||||
Some(e)
|
||||
}
|
||||
|
||||
/// Get an item from the cache, if it exists.
|
||||
pub fn get(&self, key: &str) -> Option<Bytes> {
|
||||
let e = self._get(key)?;
|
||||
|
||||
if e.update_used {
|
||||
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
||||
}
|
||||
|
||||
Some(e.value.clone())
|
||||
}
|
||||
|
||||
/// Check if we have an item in cache.
|
||||
///
|
||||
/// Returns: `true` if key exists, `false` if it doesn't
|
||||
///
|
||||
/// We don't use [`DashMap::contains_key`] here because it would just do
|
||||
/// the exact same thing I do here, but without running the expiry check logic
|
||||
pub fn has(&self, key: &str) -> bool {
|
||||
self._get(key).is_some()
|
||||
}
|
||||
|
||||
/// Returns if an upload is able to be cached
|
||||
/// with the current caching rules
|
||||
#[inline(always)]
|
||||
pub fn will_use(&self, length: usize) -> bool {
|
||||
length <= self.cfg.max_length
|
||||
}
|
||||
|
||||
/// The background job that scans through the cache and removes inactive elements.
|
||||
///
|
||||
/// TODO: see if this is actually less expensive than
|
||||
/// letting each entry keep track of expiry with its own task
|
||||
pub async fn scanner(&self) {
|
||||
let mut interval = time::interval(self.cfg.scan_freq);
|
||||
|
||||
loop {
|
||||
// We put this first so that it doesn't scan the instant the server starts
|
||||
interval.tick().await;
|
||||
|
||||
// Save current timestamp so we aren't retrieving it constantly
|
||||
// If we don't do this it'll be a LOT of system api calls
|
||||
let now = SystemTime::now();
|
||||
|
||||
// Collect a list of all the expired keys
|
||||
// If we fail to compare the times, it gets added to the list anyways
|
||||
let expired: Vec<_> = self
|
||||
.map
|
||||
.par_iter()
|
||||
.filter_map(|e| {
|
||||
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
||||
let is_expired = elapsed >= e.lifetime;
|
||||
|
||||
if is_expired {
|
||||
Some(e.key().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
// If we have any, lock the map and drop all of them
|
||||
if !expired.is_empty() {
|
||||
// Use a retain call, should be less locks that way
|
||||
// (instead of many remove calls)
|
||||
self.map.retain(|k, _| !expired.contains(k))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -15,7 +15,6 @@ fn default_motd() -> String {
|
|||
"breeze file server (v%version%) - currently hosting %uplcount% files".to_string()
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Deserialize)]
|
||||
pub struct EngineConfig {
|
||||
/// The url that the instance of breeze is meant to be accessed from.
|
||||
|
@ -23,28 +22,16 @@ pub struct EngineConfig {
|
|||
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
||||
pub base_url: String,
|
||||
|
||||
/// Location on disk the uploads are to be saved to
|
||||
pub save_path: PathBuf,
|
||||
|
||||
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
||||
#[serde(default)]
|
||||
pub upload_key: String,
|
||||
|
||||
/// Configuration for disk system
|
||||
pub disk: DiskConfig,
|
||||
|
||||
/// Configuration for cache system
|
||||
pub cache: CacheConfig,
|
||||
|
||||
/// Maximum size of an upload that will be accepted.
|
||||
/// Files above this size can not be uploaded.
|
||||
pub max_upload_len: Option<usize>,
|
||||
|
||||
/// Maximum lifetime of a temporary upload
|
||||
#[serde_as(as = "DurationSeconds")]
|
||||
pub max_temp_lifetime: Duration,
|
||||
|
||||
/// Maximum length (in bytes) a file can be before the server will
|
||||
/// decide not to remove its EXIF data.
|
||||
pub max_strip_len: usize,
|
||||
|
||||
/// Motd displayed when the server's index page is visited.
|
||||
///
|
||||
/// This isn't explicitly engine-related but the engine is what gets passed to routes,
|
||||
|
@ -53,14 +40,8 @@ pub struct EngineConfig {
|
|||
pub motd: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Clone)]
|
||||
pub struct DiskConfig {
|
||||
/// Location on disk the uploads are to be saved to
|
||||
pub save_path: PathBuf,
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Deserialize, Clone)]
|
||||
#[derive(Deserialize)]
|
||||
pub struct CacheConfig {
|
||||
/// The maximum length in bytes that a file can be
|
||||
/// before it skips cache (in seconds)
|
||||
|
@ -81,7 +62,6 @@ pub struct CacheConfig {
|
|||
|
||||
#[derive(Deserialize)]
|
||||
pub struct HttpConfig {
|
||||
/// The IP address the HTTP server should listen on
|
||||
pub listen_on: String,
|
||||
}
|
||||
|
||||
|
@ -95,7 +75,7 @@ pub struct LoggerConfig {
|
|||
/// Minimum level a log must be for it to be shown.
|
||||
/// This defaults to "warn" if not specified.
|
||||
#[serde_as(as = "DisplayFromStr")]
|
||||
// yes... kind of a hack but serde doesn't have anything better
|
||||
#[serde(default = "default_level_filter")]
|
||||
// yes... kind of a hack but serde doesn't have anything better
|
||||
pub level: LevelFilter,
|
||||
}
|
||||
|
|
84
src/disk.rs
84
src/disk.rs
|
@ -1,84 +0,0 @@
|
|||
use std::path::PathBuf;
|
||||
|
||||
use bytes::Bytes;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{self, AsyncWriteExt},
|
||||
sync::mpsc::{self, Receiver, Sender},
|
||||
};
|
||||
use tracing::debug;
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::config;
|
||||
|
||||
/// Provides an API to access the disk file store
|
||||
/// like we access the cache.
|
||||
pub struct Disk {
|
||||
cfg: config::DiskConfig,
|
||||
}
|
||||
|
||||
impl Disk {
|
||||
pub fn from_config(cfg: config::DiskConfig) -> Self {
|
||||
Self { cfg }
|
||||
}
|
||||
|
||||
/// Counts the number of files saved to disk we have
|
||||
pub fn count(&self) -> usize {
|
||||
WalkDir::new(&self.cfg.save_path)
|
||||
.min_depth(1)
|
||||
.into_iter()
|
||||
.count()
|
||||
}
|
||||
|
||||
/// Formats the path on disk for a `saved_name`.
|
||||
fn path_for(&self, saved_name: &str) -> PathBuf {
|
||||
let mut p = self.cfg.save_path.clone();
|
||||
p.push(saved_name);
|
||||
|
||||
p
|
||||
}
|
||||
|
||||
/// Try to open a file on disk, and if we didn't find it,
|
||||
/// then return [`None`].
|
||||
pub async fn open(&self, saved_name: &str) -> Result<Option<File>, io::Error> {
|
||||
let p = self.path_for(saved_name);
|
||||
|
||||
match File::open(p).await {
|
||||
Ok(f) => Ok(Some(f)),
|
||||
Err(e) => match e.kind() {
|
||||
io::ErrorKind::NotFound => Ok(None),
|
||||
_ => Err(e)?, // some other error, send it back
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Get the size of an upload's file
|
||||
pub async fn len(&self, f: &File) -> Result<usize, io::Error> {
|
||||
Ok(f.metadata().await?.len() as usize)
|
||||
}
|
||||
|
||||
/// Create a background I/O task
|
||||
pub async fn start_save(&self, saved_name: &str) -> Sender<Bytes> {
|
||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(256);
|
||||
|
||||
let p = self.path_for(saved_name);
|
||||
|
||||
tokio::spawn(async move {
|
||||
// create file to save upload to
|
||||
let mut file = File::create(p)
|
||||
.await
|
||||
.expect("could not open file! make sure your upload path is valid");
|
||||
|
||||
// receive chunks and save them to file
|
||||
while let Some(chunk) = rx.recv().await {
|
||||
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||
file.write_all(&chunk)
|
||||
.await
|
||||
.expect("error while writing file to disk");
|
||||
}
|
||||
});
|
||||
|
||||
tx
|
||||
}
|
||||
}
|
444
src/engine.rs
444
src/engine.rs
|
@ -1,228 +1,176 @@
|
|||
use std::{
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc,
|
||||
},
|
||||
time::Duration,
|
||||
ffi::OsStr,
|
||||
path::{Path, PathBuf},
|
||||
sync::atomic::{AtomicUsize, Ordering},
|
||||
};
|
||||
|
||||
use axum::body::BodyDataStream;
|
||||
use archived::Archive;
|
||||
use axum::extract::BodyStream;
|
||||
use bytes::{BufMut, Bytes, BytesMut};
|
||||
use img_parts::{DynImage, ImageEXIF};
|
||||
use rand::distributions::{Alphanumeric, DistString};
|
||||
use tokio::{fs::File, io::AsyncReadExt};
|
||||
use rand::Rng;
|
||||
use tokio::{
|
||||
fs::File,
|
||||
io::{AsyncReadExt, AsyncWriteExt},
|
||||
sync::{
|
||||
mpsc::{self, Receiver, Sender},
|
||||
RwLock,
|
||||
},
|
||||
};
|
||||
use tokio_stream::StreamExt;
|
||||
use tracing::{debug, info};
|
||||
use tracing::{debug, error, info};
|
||||
use walkdir::WalkDir;
|
||||
|
||||
use crate::{cache, config, disk};
|
||||
|
||||
/// Various forms of upload data that can be sent to the client
|
||||
pub enum UploadData {
|
||||
/// Send back the data from memory
|
||||
Cache(Bytes),
|
||||
|
||||
/// Stream the file from disk to the client
|
||||
Disk(File, usize),
|
||||
}
|
||||
|
||||
/// Rejection outcomes of an [`Engine::process`] call
|
||||
pub enum ProcessOutcome {
|
||||
/// The upload was successful.
|
||||
/// We give the user their file's URL
|
||||
Success(String),
|
||||
|
||||
/// Occurs when an upload exceeds the chosen maximum file size.
|
||||
UploadTooLarge,
|
||||
|
||||
/// Occurs when a temporary upload is too big to fit in the cache.
|
||||
TemporaryUploadTooLarge,
|
||||
|
||||
/// Occurs when the user-given lifetime is longer than we will allow
|
||||
TemporaryUploadLifetimeTooLong,
|
||||
}
|
||||
use crate::{
|
||||
config,
|
||||
view::{ViewError, ViewSuccess},
|
||||
};
|
||||
|
||||
/// breeze engine! this is the core of everything
|
||||
pub struct Engine {
|
||||
/// The in-memory cache that cached uploads are stored in.
|
||||
cache: RwLock<Archive>,
|
||||
|
||||
/// Cached count of uploaded files.
|
||||
pub upl_count: AtomicUsize,
|
||||
|
||||
/// Engine configuration
|
||||
pub cfg: config::EngineConfig,
|
||||
|
||||
/// The in-memory cache that cached uploads are stored in.
|
||||
cache: Arc<cache::Cache>,
|
||||
|
||||
/// An interface to the on-disk upload store
|
||||
disk: disk::Disk,
|
||||
}
|
||||
|
||||
impl Engine {
|
||||
/// Creates a new instance of the breeze engine.
|
||||
pub fn from_config(cfg: config::EngineConfig) -> Self {
|
||||
let cache = cache::Cache::from_config(cfg.cache.clone());
|
||||
let disk = disk::Disk::from_config(cfg.disk.clone());
|
||||
|
||||
let cache = Arc::new(cache);
|
||||
|
||||
let cache_scanner = cache.clone();
|
||||
tokio::spawn(async move { cache_scanner.scanner().await });
|
||||
|
||||
pub fn new(cfg: config::EngineConfig) -> Self {
|
||||
Self {
|
||||
// initialise our cached upload count. this doesn't include temp uploads!
|
||||
upl_count: AtomicUsize::new(disk.count()),
|
||||
cache: RwLock::new(Archive::with_full_scan(
|
||||
cfg.cache.scan_freq,
|
||||
cfg.cache.upload_lifetime,
|
||||
cfg.cache.mem_capacity,
|
||||
)),
|
||||
upl_count: AtomicUsize::new(
|
||||
WalkDir::new(&cfg.save_path)
|
||||
.min_depth(1)
|
||||
.into_iter()
|
||||
.count(),
|
||||
), // count the amount of files in the save path and initialise our cached count with it
|
||||
|
||||
cfg,
|
||||
|
||||
cache,
|
||||
disk,
|
||||
}
|
||||
}
|
||||
|
||||
/// Fetch an upload
|
||||
///
|
||||
/// This will first try to read from cache, and then disk after.
|
||||
/// If an upload is eligible to be cached, it will be cached and
|
||||
/// sent back as a cache response instead of a disk response.
|
||||
pub async fn get(&self, saved_name: &str) -> anyhow::Result<Option<UploadData>> {
|
||||
// check the cache first
|
||||
if let Some(u) = self.cache.get(saved_name) {
|
||||
return Ok(Some(UploadData::Cache(u)));
|
||||
}
|
||||
|
||||
// now, check if we have it on disk
|
||||
let mut f = if let Some(f) = self.disk.open(saved_name).await? {
|
||||
f
|
||||
} else {
|
||||
// file didn't exist
|
||||
return Ok(None);
|
||||
};
|
||||
|
||||
let len = self.disk.len(&f).await?;
|
||||
|
||||
// can this be recached?
|
||||
if self.cache.will_use(len) {
|
||||
// read file from disk
|
||||
let mut full = BytesMut::with_capacity(len);
|
||||
|
||||
// read file from disk and if it fails at any point, return 500
|
||||
loop {
|
||||
match f.read_buf(&mut full).await {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => Err(e)?,
|
||||
}
|
||||
}
|
||||
|
||||
let full = full.freeze();
|
||||
|
||||
// re-insert it into cache
|
||||
self.cache.add(saved_name, full.clone());
|
||||
|
||||
return Ok(Some(UploadData::Cache(full)));
|
||||
}
|
||||
|
||||
Ok(Some(UploadData::Disk(f, len)))
|
||||
/// Returns if an upload would be able to be cached
|
||||
#[inline(always)]
|
||||
fn will_use_cache(&self, length: usize) -> bool {
|
||||
length <= self.cfg.cache.max_length
|
||||
}
|
||||
|
||||
pub async fn has(&self, saved_name: &str) -> bool {
|
||||
if self.cache.has(saved_name) {
|
||||
/// Check if an upload exists in cache or on disk
|
||||
pub async fn upload_exists(&self, path: &Path) -> bool {
|
||||
let cache = self.cache.read().await;
|
||||
|
||||
// extract file name, since that's what cache uses
|
||||
let name = path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
// check in cache
|
||||
if cache.contains_key(&name) {
|
||||
return true;
|
||||
}
|
||||
|
||||
// sidestep handling the error properly
|
||||
// that way we can call this in gen_saved_name easier
|
||||
if self.disk.open(saved_name).await.is_ok_and(|f| f.is_some()) {
|
||||
// check on disk
|
||||
if path.exists() {
|
||||
return true;
|
||||
}
|
||||
|
||||
false
|
||||
return false;
|
||||
}
|
||||
|
||||
/// Generate a new saved name for an upload.
|
||||
/// Generate a new save path for an upload.
|
||||
///
|
||||
/// This will call itself recursively if it picks
|
||||
/// a name that's already used. (it is rare)
|
||||
#[async_recursion::async_recursion]
|
||||
pub async fn gen_saved_name(&self, ext: &str) -> String {
|
||||
pub async fn gen_path(&self, original_path: &PathBuf) -> PathBuf {
|
||||
// generate a 6-character alphanumeric string
|
||||
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||
let id: String = rand::thread_rng()
|
||||
.sample_iter(&rand::distributions::Alphanumeric)
|
||||
.take(6)
|
||||
.map(char::from)
|
||||
.collect();
|
||||
|
||||
// if we have an extension, add it now
|
||||
if !ext.is_empty() {
|
||||
saved_name.push('.');
|
||||
saved_name.push_str(ext);
|
||||
}
|
||||
// extract the extension from the original path
|
||||
let original_extension = original_path
|
||||
.extension()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
if !self.has(&saved_name).await {
|
||||
saved_name
|
||||
// path on disk
|
||||
let mut path = self.cfg.save_path.clone();
|
||||
path.push(&id);
|
||||
path.set_extension(original_extension);
|
||||
|
||||
if !self.upload_exists(&path).await {
|
||||
path
|
||||
} else {
|
||||
// we had a name collision! try again..
|
||||
info!("name collision! saved_name= {}", saved_name);
|
||||
self.gen_saved_name(ext).await
|
||||
self.gen_path(original_path).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Save a file to disk, and optionally cache.
|
||||
///
|
||||
/// This also handles custom file lifetimes and EXIF data removal.
|
||||
pub async fn save(
|
||||
/// Process an upload.
|
||||
/// This is called by the /new route.
|
||||
pub async fn process_upload(
|
||||
&self,
|
||||
saved_name: &str,
|
||||
provided_len: usize,
|
||||
mut use_cache: bool,
|
||||
mut stream: BodyDataStream,
|
||||
lifetime: Option<Duration>,
|
||||
keep_exif: bool,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
path: PathBuf,
|
||||
name: String, // we already extract it in the route handler, and it'd be wasteful to do it in gen_path
|
||||
content_length: usize,
|
||||
mut stream: BodyStream,
|
||||
) {
|
||||
// if the upload size is smaller than the specified maximum, we use the cache!
|
||||
let mut use_cache = self.will_use_cache(content_length);
|
||||
|
||||
// if we're using cache, make some space to store the upload in
|
||||
let mut data = if use_cache {
|
||||
BytesMut::with_capacity(provided_len)
|
||||
BytesMut::with_capacity(content_length)
|
||||
} else {
|
||||
BytesMut::new()
|
||||
};
|
||||
|
||||
// don't begin a disk save if we're using temporary lifetimes
|
||||
let tx = if lifetime.is_none() {
|
||||
Some(self.disk.start_save(saved_name).await)
|
||||
} else {
|
||||
None
|
||||
};
|
||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||
let (tx, mut rx): (Sender<Bytes>, Receiver<Bytes>) = mpsc::channel(1);
|
||||
|
||||
// whether or not we're gonna coalesce the data
|
||||
// in order to strip the exif data at the end,
|
||||
// instead of just sending it off to the i/o task
|
||||
let coalesce_and_strip = use_cache
|
||||
&& matches!(
|
||||
std::path::Path::new(saved_name)
|
||||
.extension()
|
||||
.map(|s| s.to_str()),
|
||||
Some(Some("png" | "jpg" | "jpeg" | "webp" | "tiff"))
|
||||
)
|
||||
&& !keep_exif
|
||||
&& provided_len <= self.cfg.max_strip_len;
|
||||
tokio::spawn(async move {
|
||||
// create file to save upload to
|
||||
let mut file = File::create(path)
|
||||
.await
|
||||
.expect("could not open file! make sure your upload path is valid");
|
||||
|
||||
// receive chunks and save them to file
|
||||
while let Some(chunk) = rx.recv().await {
|
||||
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||
file.write_all(&chunk)
|
||||
.await
|
||||
.expect("error while writing file to disk");
|
||||
}
|
||||
});
|
||||
|
||||
// read and save upload
|
||||
while let Some(chunk) = stream.next().await {
|
||||
// if we error on a chunk, fail out
|
||||
let chunk = chunk?;
|
||||
let chunk = chunk.unwrap();
|
||||
|
||||
// if we have an i/o task, send it off
|
||||
// also cloning this is okay because it's a Bytes
|
||||
if !coalesce_and_strip {
|
||||
if let Some(ref tx) = tx {
|
||||
debug!("sending chunk to i/o task");
|
||||
tx.send(chunk.clone()).await?;
|
||||
}
|
||||
}
|
||||
// send chunk to io task
|
||||
debug!("sending data to io task");
|
||||
tx.send(chunk.clone())
|
||||
.await
|
||||
.expect("failed to send data to io task");
|
||||
|
||||
if use_cache {
|
||||
debug!("receiving data into buffer");
|
||||
|
||||
if data.len() + chunk.len() > data.capacity() {
|
||||
info!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||
error!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||
|
||||
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
||||
data = BytesMut::new();
|
||||
|
@ -233,99 +181,109 @@ impl Engine {
|
|||
}
|
||||
}
|
||||
|
||||
let data = data.freeze();
|
||||
|
||||
// we coalesced the data instead of streaming to disk,
|
||||
// strip the exif data and send it off now
|
||||
let data = if coalesce_and_strip {
|
||||
// strip the exif if we can
|
||||
// if we can't, then oh well
|
||||
let data = if let Ok(Some(data)) = DynImage::from_bytes(data.clone()).map(|o| {
|
||||
o.map(|mut img| {
|
||||
img.set_exif(None);
|
||||
img.encoder().bytes()
|
||||
})
|
||||
}) {
|
||||
info!("stripped exif data");
|
||||
data
|
||||
} else {
|
||||
info!("failed to strip exif data");
|
||||
data
|
||||
};
|
||||
|
||||
// send what we did over to the i/o task, all in one chunk
|
||||
if let Some(ref tx) = tx {
|
||||
debug!("sending filled buffer to i/o task");
|
||||
tx.send(data.clone()).await?;
|
||||
}
|
||||
|
||||
data
|
||||
} else {
|
||||
// or, we didn't do that
|
||||
// keep the data as it is
|
||||
data
|
||||
};
|
||||
|
||||
// insert upload into cache if we're using it
|
||||
// insert upload into cache if necessary
|
||||
if use_cache {
|
||||
let mut cache = self.cache.write().await;
|
||||
|
||||
info!("caching upload!");
|
||||
match lifetime {
|
||||
Some(lt) => self.cache.add_with_lifetime(saved_name, data, lt, false),
|
||||
None => self.cache.add(saved_name, data),
|
||||
};
|
||||
cache.insert(name, data.freeze());
|
||||
}
|
||||
|
||||
info!("finished processing upload!!");
|
||||
|
||||
// if all goes well, increment the cached upload counter
|
||||
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn process(
|
||||
&self,
|
||||
ext: &str,
|
||||
provided_len: usize,
|
||||
stream: BodyDataStream,
|
||||
lifetime: Option<Duration>,
|
||||
keep_exif: bool,
|
||||
) -> Result<ProcessOutcome, anyhow::Error> {
|
||||
// if the upload size is greater than our max file size, deny it now
|
||||
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
||||
return Ok(ProcessOutcome::UploadTooLarge);
|
||||
/// Read an upload from cache, if it exists.
|
||||
///
|
||||
/// Previously, this would lock the cache as
|
||||
/// writable to renew the upload's cache lifespan.
|
||||
/// Locking the cache as readable allows multiple concurrent
|
||||
/// readers though, which allows me to handle multiple views concurrently.
|
||||
async fn read_cached_upload(&self, name: &String) -> Option<Bytes> {
|
||||
let cache = self.cache.read().await;
|
||||
|
||||
// fetch upload data from cache
|
||||
cache.get(name).map(ToOwned::to_owned)
|
||||
}
|
||||
|
||||
/// Reads an upload, from cache or on disk.
|
||||
pub async fn get_upload(&self, original_path: &Path) -> Result<ViewSuccess, ViewError> {
|
||||
// extract upload file name
|
||||
let name = original_path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
// path on disk
|
||||
let mut path = self.cfg.save_path.clone();
|
||||
path.push(&name);
|
||||
|
||||
// check if the upload exists, if not then 404
|
||||
if !self.upload_exists(&path).await {
|
||||
return Err(ViewError::NotFound);
|
||||
}
|
||||
|
||||
// if the upload size is smaller than the specified maximum, we use the cache!
|
||||
let use_cache: bool = self.cache.will_use(provided_len);
|
||||
// attempt to read upload from cache
|
||||
let cached_data = self.read_cached_upload(&name).await;
|
||||
|
||||
// if a temp file is too big for cache, reject it now
|
||||
if lifetime.is_some() && !use_cache {
|
||||
return Ok(ProcessOutcome::TemporaryUploadTooLarge);
|
||||
if let Some(data) = cached_data {
|
||||
info!("got upload from cache!");
|
||||
|
||||
Ok(ViewSuccess::FromCache(data))
|
||||
} else {
|
||||
// we already know the upload exists by now so this is okay
|
||||
let mut file = File::open(&path).await.unwrap();
|
||||
|
||||
// read upload length from disk
|
||||
let metadata = file.metadata().await;
|
||||
|
||||
if metadata.is_err() {
|
||||
error!("failed to get upload file metadata!");
|
||||
return Err(ViewError::InternalServerError);
|
||||
}
|
||||
|
||||
let metadata = metadata.unwrap();
|
||||
|
||||
let length = metadata.len() as usize;
|
||||
|
||||
debug!("read upload from disk, size = {}", length);
|
||||
|
||||
// if the upload is okay to cache, recache it and send a fromcache response
|
||||
if self.will_use_cache(length) {
|
||||
// read file from disk
|
||||
let mut data = BytesMut::with_capacity(length);
|
||||
|
||||
// read file from disk and if it fails at any point, return 500
|
||||
loop {
|
||||
match file.read_buf(&mut data).await {
|
||||
Ok(n) => {
|
||||
if n == 0 {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
return Err(ViewError::InternalServerError);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let data = data.freeze();
|
||||
|
||||
// re-insert it into cache
|
||||
let mut cache = self.cache.write().await;
|
||||
cache.insert(name, data.clone());
|
||||
|
||||
info!("recached upload from disk!");
|
||||
|
||||
return Ok(ViewSuccess::FromCache(data));
|
||||
}
|
||||
|
||||
info!("got upload from disk!");
|
||||
|
||||
Ok(ViewSuccess::FromDisk(file))
|
||||
}
|
||||
|
||||
// if a temp file's lifetime is too long, reject it now
|
||||
if lifetime.is_some_and(|lt| lt > self.cfg.max_temp_lifetime) {
|
||||
return Ok(ProcessOutcome::TemporaryUploadLifetimeTooLong);
|
||||
}
|
||||
|
||||
// generate the file name
|
||||
let saved_name = self.gen_saved_name(ext).await;
|
||||
|
||||
// save it
|
||||
self.save(
|
||||
&saved_name,
|
||||
provided_len,
|
||||
use_cache,
|
||||
stream,
|
||||
lifetime,
|
||||
keep_exif,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// format and send back the url
|
||||
let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
|
||||
|
||||
Ok(ProcessOutcome::Success(url))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,17 +8,14 @@ pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
|||
|
||||
let motd = engine.cfg.motd.clone();
|
||||
|
||||
motd.replace("%version%", env!("CARGO_PKG_VERSION"))
|
||||
motd
|
||||
.replace("%version%", env!("CARGO_PKG_VERSION"))
|
||||
.replace("%uplcount%", &count.to_string())
|
||||
}
|
||||
|
||||
pub async fn robots_txt() -> &'static str {
|
||||
/// robots.txt that tells web crawlers not to list uploads
|
||||
const ROBOTS_TXT: &str = concat!(
|
||||
"User-Agent: *\n",
|
||||
"Disallow: /p/*\n",
|
||||
"Allow: /\n"
|
||||
);
|
||||
const ROBOTS_TXT: &str = concat!("User-Agent: *\n", "Disallow: /p/*\n", "Allow: /\n");
|
||||
|
||||
ROBOTS_TXT
|
||||
}
|
||||
|
|
57
src/main.rs
57
src/main.rs
|
@ -1,56 +1,48 @@
|
|||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use argh::FromArgs;
|
||||
extern crate axum;
|
||||
|
||||
use clap::Parser;
|
||||
use engine::Engine;
|
||||
|
||||
use axum::{
|
||||
routing::{get, post},
|
||||
Router,
|
||||
};
|
||||
use tokio::{fs, net::TcpListener, signal};
|
||||
use tokio::{fs, signal};
|
||||
use tracing::{info, warn};
|
||||
|
||||
mod cache;
|
||||
mod config;
|
||||
mod disk;
|
||||
mod engine;
|
||||
mod index;
|
||||
mod new;
|
||||
mod view;
|
||||
|
||||
/// breeze file server.
|
||||
#[derive(FromArgs, Debug)]
|
||||
#[derive(Parser, Debug)]
|
||||
struct Args {
|
||||
/// the path to *.toml configuration file
|
||||
#[argh(option, short = 'c', arg_name = "file")]
|
||||
/// The path to configuration file
|
||||
#[arg(short, long, value_name = "file")]
|
||||
config: PathBuf,
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
// read & parse args
|
||||
let args: Args = argh::from_env();
|
||||
let args = Args::parse();
|
||||
|
||||
// read & parse config
|
||||
let cfg: config::Config = {
|
||||
let config_str = fs::read_to_string(args.config).await.expect(
|
||||
"failed to read config file! make sure it exists and you have read permissions",
|
||||
);
|
||||
let config_str = fs::read_to_string(args.config)
|
||||
.await
|
||||
.expect("failed to read config file! make sure it exists and you have read permissions");
|
||||
|
||||
toml::from_str(&config_str).unwrap_or_else(|e| {
|
||||
panic!("invalid config! ensure proper fields and structure. reference config is in readme.\n{e}");
|
||||
})
|
||||
};
|
||||
let cfg: config::Config = toml::from_str(&config_str).expect("invalid config! check that you have included all required options and structured it properly (no config options expecting a number getting a string, etc.)");
|
||||
|
||||
tracing_subscriber::fmt()
|
||||
.with_max_level(cfg.logger.level)
|
||||
.init();
|
||||
|
||||
{
|
||||
let save_path = cfg.engine.disk.save_path.clone();
|
||||
if !save_path.exists() || !save_path.is_dir() {
|
||||
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||
}
|
||||
if !cfg.engine.save_path.exists() || !cfg.engine.save_path.is_dir() {
|
||||
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||
}
|
||||
|
||||
if cfg.engine.upload_key.is_empty() {
|
||||
|
@ -58,7 +50,7 @@ async fn main() {
|
|||
}
|
||||
|
||||
// create engine
|
||||
let engine = Engine::from_config(cfg.engine);
|
||||
let engine = Engine::new(cfg.engine);
|
||||
|
||||
// build main router
|
||||
let app = Router::new()
|
||||
|
@ -69,15 +61,16 @@ async fn main() {
|
|||
.with_state(Arc::new(engine));
|
||||
|
||||
// start web server
|
||||
let listener = TcpListener::bind(&cfg.http.listen_on)
|
||||
.await
|
||||
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
|
||||
|
||||
info!("starting server.");
|
||||
axum::serve(listener, app)
|
||||
.with_graceful_shutdown(shutdown_signal())
|
||||
.await
|
||||
.expect("failed to start server");
|
||||
axum::Server::bind(
|
||||
&cfg.http
|
||||
.listen_on
|
||||
.parse()
|
||||
.expect("failed to parse listen_on address"),
|
||||
)
|
||||
.serve(app.into_make_service())
|
||||
.with_graceful_shutdown(shutdown_signal())
|
||||
.await
|
||||
.expect("failed to start server");
|
||||
}
|
||||
|
||||
async fn shutdown_signal() {
|
||||
|
|
85
src/new.rs
85
src/new.rs
|
@ -1,58 +1,47 @@
|
|||
use std::{ffi::OsStr, path::PathBuf, sync::Arc, time::Duration};
|
||||
use std::{collections::HashMap, ffi::OsStr, path::PathBuf, sync::Arc};
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
extract::{Query, State},
|
||||
extract::{BodyStream, Query, State},
|
||||
http::HeaderValue,
|
||||
};
|
||||
use http::{header, HeaderMap, HeaderValue, StatusCode};
|
||||
use serde::Deserialize;
|
||||
use serde_with::{serde_as, DurationSeconds};
|
||||
|
||||
use crate::engine::ProcessOutcome;
|
||||
|
||||
fn default_keep_exif() -> bool {
|
||||
false
|
||||
}
|
||||
|
||||
#[serde_as]
|
||||
#[derive(Deserialize)]
|
||||
pub struct NewRequest {
|
||||
name: String,
|
||||
key: Option<String>,
|
||||
|
||||
#[serde(rename = "lastfor")]
|
||||
#[serde_as(as = "Option<DurationSeconds>")]
|
||||
last_for: Option<Duration>,
|
||||
|
||||
#[serde(rename = "keepexif", default = "default_keep_exif")]
|
||||
keep_exif: bool,
|
||||
}
|
||||
use hyper::{header, HeaderMap, StatusCode};
|
||||
|
||||
/// The request handler for the /new path.
|
||||
/// This handles all new uploads.
|
||||
#[axum::debug_handler]
|
||||
pub async fn new(
|
||||
State(engine): State<Arc<crate::engine::Engine>>,
|
||||
Query(req): Query<NewRequest>,
|
||||
Query(params): Query<HashMap<String, String>>,
|
||||
headers: HeaderMap,
|
||||
body: Body,
|
||||
stream: BodyStream,
|
||||
) -> Result<String, StatusCode> {
|
||||
let key = params.get("key");
|
||||
|
||||
const EMPTY_STRING: &String = &String::new();
|
||||
|
||||
// check upload key, if i need to
|
||||
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
||||
if !engine.cfg.upload_key.is_empty() && key.unwrap_or(EMPTY_STRING) != &engine.cfg.upload_key {
|
||||
return Err(StatusCode::FORBIDDEN);
|
||||
}
|
||||
|
||||
let original_name = params.get("name");
|
||||
|
||||
// the original file name wasn't given, so i can't work out what the extension should be
|
||||
if req.name.is_empty() {
|
||||
if original_name.is_none() {
|
||||
return Err(StatusCode::BAD_REQUEST);
|
||||
}
|
||||
|
||||
let extension = PathBuf::from(req.name)
|
||||
.extension()
|
||||
let original_path = PathBuf::from(original_name.unwrap());
|
||||
|
||||
let path = engine.gen_path(&original_path).await;
|
||||
let name = path
|
||||
.file_name()
|
||||
.and_then(OsStr::to_str)
|
||||
.unwrap_or_default()
|
||||
.to_string();
|
||||
|
||||
let url = format!("{}/p/{}", engine.cfg.base_url, name);
|
||||
|
||||
// read and parse content-length, and if it fails just assume it's really high so it doesn't cache
|
||||
let content_length = headers
|
||||
.get(header::CONTENT_LENGTH)
|
||||
|
@ -62,34 +51,10 @@ pub async fn new(
|
|||
.unwrap()
|
||||
.unwrap_or(usize::MAX);
|
||||
|
||||
// turn body into stream
|
||||
let stream = Body::into_data_stream(body);
|
||||
|
||||
// pass it off to the engine to be processed!
|
||||
match engine
|
||||
.process(
|
||||
&extension,
|
||||
content_length,
|
||||
stream,
|
||||
req.last_for,
|
||||
req.keep_exif,
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(outcome) => match outcome {
|
||||
// 200 OK
|
||||
ProcessOutcome::Success(url) => Ok(url),
|
||||
engine
|
||||
.process_upload(path, name, content_length, stream)
|
||||
.await;
|
||||
|
||||
// 413 Payload Too Large
|
||||
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
||||
Err(StatusCode::PAYLOAD_TOO_LARGE)
|
||||
}
|
||||
|
||||
// 400 Bad Request
|
||||
ProcessOutcome::TemporaryUploadLifetimeTooLong => Err(StatusCode::BAD_REQUEST),
|
||||
},
|
||||
|
||||
// 500 Internal Server Error
|
||||
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||
}
|
||||
Ok(url)
|
||||
}
|
||||
|
|
90
src/view.rs
90
src/view.rs
|
@ -1,15 +1,40 @@
|
|||
use std::{ffi::OsStr, path::PathBuf, sync::Arc};
|
||||
use std::{
|
||||
path::{Component, PathBuf},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use axum::{
|
||||
body::Body,
|
||||
body::StreamBody,
|
||||
extract::{Path, State},
|
||||
response::{IntoResponse, Response},
|
||||
};
|
||||
|
||||
use http::{HeaderValue, StatusCode};
|
||||
use bytes::Bytes;
|
||||
use hyper::{http::HeaderValue, StatusCode};
|
||||
use tokio::{fs::File, runtime::Handle};
|
||||
use tokio_util::io::ReaderStream;
|
||||
use tracing::{error, debug, info};
|
||||
|
||||
use crate::engine::UploadData;
|
||||
/// Responses for a successful view operation
|
||||
pub enum ViewSuccess {
|
||||
/// A file read from disk, suitable for larger files.
|
||||
///
|
||||
/// The file provided will be streamed from disk and
|
||||
/// back to the viewer.
|
||||
///
|
||||
/// This is only ever used if a file exceeds the
|
||||
/// cache's maximum file size.
|
||||
FromDisk(File),
|
||||
|
||||
/// A file read from in-memory cache, best for smaller files.
|
||||
///
|
||||
/// The file is taken from the cache in its entirety
|
||||
/// and sent back to the viewer.
|
||||
///
|
||||
/// If a file can be fit into cache, this will be
|
||||
/// used even if it's read from disk.
|
||||
FromCache(Bytes),
|
||||
}
|
||||
|
||||
/// Responses for a failed view operation
|
||||
pub enum ViewError {
|
||||
|
@ -20,20 +45,42 @@ pub enum ViewError {
|
|||
InternalServerError,
|
||||
}
|
||||
|
||||
impl IntoResponse for UploadData {
|
||||
impl IntoResponse for ViewSuccess {
|
||||
fn into_response(self) -> Response {
|
||||
match self {
|
||||
UploadData::Disk(file, len) => {
|
||||
// create our content-length header
|
||||
let len_str = len.to_string();
|
||||
ViewSuccess::FromDisk(file) => {
|
||||
// get handle to current tokio runtime
|
||||
// i use this to block on futures here (not async)
|
||||
let handle = Handle::current();
|
||||
let _ = handle.enter();
|
||||
|
||||
// read the metadata of the file on disk
|
||||
// this function isn't async
|
||||
// .. so we have to use handle.block_on() to get the metadata
|
||||
let metadata = futures::executor::block_on(file.metadata());
|
||||
|
||||
// if we error then return 500
|
||||
if metadata.is_err() {
|
||||
error!("failed to read metadata from disk");
|
||||
return ViewError::InternalServerError.into_response();
|
||||
}
|
||||
|
||||
// unwrap (which we know is safe) and read the file size as a string
|
||||
let metadata = metadata.unwrap();
|
||||
let len_str = metadata.len().to_string();
|
||||
|
||||
debug!("file is {} bytes on disk", &len_str);
|
||||
|
||||
// HeaderValue::from_str will never error if only visible ASCII characters are passed (32-127)
|
||||
// .. so unwrapping this should be fine
|
||||
let content_length = HeaderValue::from_str(&len_str).unwrap();
|
||||
|
||||
// create a streamed body response (we want to stream larger files)
|
||||
let stream = ReaderStream::new(file);
|
||||
let body = Body::from_stream(stream);
|
||||
let reader = ReaderStream::new(file);
|
||||
let stream = StreamBody::new(reader);
|
||||
|
||||
// extract mutable headers from the response
|
||||
let mut res = body.into_response();
|
||||
let mut res = stream.into_response();
|
||||
let headers = res.headers_mut();
|
||||
|
||||
// clear headers, browser can imply content type
|
||||
|
@ -45,7 +92,7 @@ impl IntoResponse for UploadData {
|
|||
|
||||
res
|
||||
}
|
||||
UploadData::Cache(data) => {
|
||||
ViewSuccess::FromCache(data) => {
|
||||
// extract mutable headers from the response
|
||||
let mut res = data.into_response();
|
||||
let headers = res.headers_mut();
|
||||
|
@ -81,17 +128,16 @@ impl IntoResponse for ViewError {
|
|||
pub async fn view(
|
||||
State(engine): State<Arc<crate::engine::Engine>>,
|
||||
Path(original_path): Path<PathBuf>,
|
||||
) -> Result<UploadData, ViewError> {
|
||||
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
|
||||
n
|
||||
} else {
|
||||
) -> Result<ViewSuccess, ViewError> {
|
||||
// (hopefully) prevent path traversal, just check for any non-file components
|
||||
if original_path
|
||||
.components()
|
||||
.any(|x| !matches!(x, Component::Normal(_)))
|
||||
{
|
||||
info!("a request attempted path traversal");
|
||||
return Err(ViewError::NotFound);
|
||||
};
|
||||
}
|
||||
|
||||
// get result from the engine!
|
||||
match engine.get(saved_name).await {
|
||||
Ok(Some(u)) => Ok(u),
|
||||
Ok(None) => Err(ViewError::NotFound),
|
||||
Err(_) => Err(ViewError::InternalServerError),
|
||||
}
|
||||
engine.get_upload(&original_path).await
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue