Compare commits
No commits in common. "main" and "jemalloc" have entirely different histories.
File diff suppressed because it is too large
Load Diff
58
Cargo.toml
58
Cargo.toml
|
|
@ -1,55 +1,37 @@
|
||||||
[package]
|
[package]
|
||||||
name = "breeze"
|
name = "breeze"
|
||||||
version = "0.3.3"
|
version = "0.2.8"
|
||||||
edition = "2024"
|
edition = "2021"
|
||||||
|
|
||||||
[profile.dev.package]
|
[profile.dev.package]
|
||||||
tikv-jemalloc-sys = { opt-level = 3 }
|
tikv-jemalloc-sys = { opt-level = 3 }
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
argh = "0.1.12"
|
axum-extra = { version = "0.10.0", default-features = false, features = [
|
||||||
atomic-time = "0.1.4"
|
|
||||||
axum = { version = "0.8.9", features = ["macros"] }
|
|
||||||
axum-extra = { version = "0.12.6", default-features = false, features = [
|
|
||||||
"tracing",
|
"tracing",
|
||||||
"typed-header",
|
"typed-header",
|
||||||
] }
|
] }
|
||||||
base64 = "0.22"
|
axum = { version = "0.8.1", features = ["macros", "http2"] }
|
||||||
bytes = "1"
|
tower = "0.5"
|
||||||
color-eyre = "0.6"
|
|
||||||
dashmap = { version = "6.1.0", features = ["inline"] }
|
|
||||||
headers = "0.4"
|
|
||||||
hmac = "0.12.1"
|
|
||||||
http = "1.2"
|
http = "1.2"
|
||||||
img-parts = "0.3"
|
headers = "0.4"
|
||||||
rand = "0.9"
|
tokio = { version = "1", features = ["full"] }
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
tokio-util = { version = "0.7", features = ["full"] }
|
||||||
serde_with = "3.19"
|
|
||||||
sha2 = "0.10.9"
|
|
||||||
tokio = { version = "1", features = [
|
|
||||||
"rt-multi-thread",
|
|
||||||
"macros",
|
|
||||||
"net",
|
|
||||||
"fs",
|
|
||||||
"io-util",
|
|
||||||
"signal",
|
|
||||||
"test-util",
|
|
||||||
] }
|
|
||||||
tokio-stream = "0.1"
|
tokio-stream = "0.1"
|
||||||
tokio-util = { version = "0.7", features = ["io"] }
|
|
||||||
toml = { version = "0.9", default-features = false, features = [
|
|
||||||
"std",
|
|
||||||
"parse",
|
|
||||||
"serde",
|
|
||||||
] }
|
|
||||||
tracing = "0.1"
|
tracing = "0.1"
|
||||||
tracing-subscriber = "0.3"
|
tracing-subscriber = "0.3"
|
||||||
twox-hash = "2"
|
bytes = "1"
|
||||||
|
rand = "0.8.5"
|
||||||
[dev-dependencies]
|
walkdir = "2"
|
||||||
http-body-util = "0.1"
|
anyhow = "1.0"
|
||||||
tower = "0.5"
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
tracing-test = "0.2"
|
serde_with = "3.12"
|
||||||
|
toml = "0.8.2"
|
||||||
|
argh = "0.1.12"
|
||||||
|
dashmap = { version = "6.1.0", features = ["rayon", "inline"] }
|
||||||
|
rayon = "1.8"
|
||||||
|
atomic-time = "0.1.4"
|
||||||
|
img-parts = "0.3"
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
tikv-jemallocator = "0.6"
|
tikv-jemallocator = "0.6"
|
||||||
|
|
|
||||||
76
README.md
76
README.md
|
|
@ -1,34 +1,27 @@
|
||||||
# breeze
|
# breeze
|
||||||
|
|
||||||
breeze is a simple, performant file upload server.
|
breeze is a simple, performant file upload server.
|
||||||
|
|
||||||
The primary instance is https://picture.wtf.
|
The primary instance is https://picture.wtf.
|
||||||
|
|
||||||
## Features
|
## Features
|
||||||
|
|
||||||
- Basic upload API tailored towards ShareX
|
- Basic upload API tailored towards ShareX
|
||||||
- Streamed uploading
|
- Streamed uploading
|
||||||
- Streamed downloading (on larger files)
|
- Streamed downloading (on larger files)
|
||||||
- Pause/continue download support with `Range` header
|
|
||||||
- Upload caching in memory
|
- Upload caching in memory
|
||||||
- Support for ShareX file deletion URLs
|
|
||||||
- Temporary uploads
|
- Temporary uploads
|
||||||
- Automatic exif data removal
|
- Automatic exif data removal
|
||||||
|
|
||||||
## Installation
|
## Installation
|
||||||
|
On picture.wtf, breeze's primary instance, it is ran using a NixOS module. If you would like to do that too, it is provided by the Nix flake in this repository.
|
||||||
|
|
||||||
On picture.wtf, breeze is ran with the NixOS module provided by `flake.nix`. [Take a look at the config](https://git.min.rip/min/infra/src/branch/main/nixos/hosts/silver/services/breeze.nix) if you want!
|
It is very much possible to run and deploy breeze without doing that, though. Containerised and bare-metal deployments are also supported. Instructions for those are below.
|
||||||
|
|
||||||
Containerised and bare-metal deployments are also supported. Instructions for those are below.
|
|
||||||
|
|
||||||
To begin, clone the Git repository:
|
To begin, clone the Git repository:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
git clone https://git.min.rip/min/breeze.git
|
git clone https://git.min.rip/min/breeze.git
|
||||||
```
|
```
|
||||||
|
|
||||||
If you want to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
|
If you would like to run it as a Docker container, here is an example `docker-compose.yaml` that may be useful for reference.
|
||||||
|
|
||||||
```
|
```
|
||||||
version: '3.6'
|
version: '3.6'
|
||||||
|
|
||||||
|
|
@ -46,51 +39,25 @@ services:
|
||||||
ports:
|
ports:
|
||||||
- 8383:8000
|
- 8383:8000
|
||||||
```
|
```
|
||||||
|
|
||||||
With this configuration, it is expected that:
|
With this configuration, it is expected that:
|
||||||
|
* there is a clone of the Git repository in the `./breeze` folder
|
||||||
- there is a clone of the Git repository in the `./breeze` folder
|
* there is a `breeze.toml` config file in current directory
|
||||||
- there is a `breeze.toml` config file in current directory
|
* there is a directory at `/srv/uploads` for storing uploads
|
||||||
- there is a directory at `/srv/uploads` for storing uploads
|
* port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
||||||
- port 8383 will be made accessible to the Internet somehow (either forwarding the port through your firewall directly, or passing it through a reverse proxy)
|
* you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
||||||
- you want the uploads to be owned by the user on your system with id 1000. (this is usually your user)
|
|
||||||
|
|
||||||
It can also be installed directly if the Rust toolchain is installed:
|
It can also be installed directly if the Rust toolchain is installed:
|
||||||
|
|
||||||
```bash
|
```bash
|
||||||
cd breeze
|
|
||||||
cargo install --path .
|
cargo install --path .
|
||||||
|
|
||||||
# then, you can run w/ a path to your `breeze.toml` config file
|
|
||||||
breeze --config /path/to/breeze.toml
|
|
||||||
```
|
|
||||||
|
|
||||||
### Exposing publicly
|
|
||||||
|
|
||||||
If you want to expose a breeze server to the internet, I highly recommend using a reverse proxy instead of just forwarding its HTTP port.
|
|
||||||
|
|
||||||
Caddy is probably the easiest to set up if you are new to reverse proxies. Here is an example `Caddyfile` for the Docker Compose file above (assuming `yourdomain.com` is a domain that points to your server's IP).
|
|
||||||
|
|
||||||
```
|
|
||||||
yourdomain.com {
|
|
||||||
# enable compression
|
|
||||||
encode
|
|
||||||
|
|
||||||
# forward request to breeze
|
|
||||||
reverse_proxy 127.0.0.1:8383
|
|
||||||
}
|
|
||||||
```
|
```
|
||||||
|
|
||||||
## Usage
|
## Usage
|
||||||
|
|
||||||
### Hosting
|
### Hosting
|
||||||
|
|
||||||
Configuration is read through a toml file.
|
Configuration is read through a toml file.
|
||||||
|
|
||||||
The config file path is specified using the `-c`/`--config` command line switch.
|
The config file path is specified using the `-c`/`--config` command line switch.
|
||||||
|
|
||||||
Here is an example config file:
|
Here is an example config file:
|
||||||
|
|
||||||
```toml
|
```toml
|
||||||
[engine]
|
[engine]
|
||||||
# The base URL that the HTTP server will be accessible on.
|
# The base URL that the HTTP server will be accessible on.
|
||||||
|
|
@ -103,12 +70,6 @@ base_url = "http://127.0.0.1:8000"
|
||||||
# If it is not set, no key will be required.
|
# If it is not set, no key will be required.
|
||||||
upload_key = "hiiiiiiii"
|
upload_key = "hiiiiiiii"
|
||||||
|
|
||||||
# OPTIONAL - If set, the secret key used to verify ShareX deletion URLs.
|
|
||||||
# If it is not set, deletion URLs will not be created or made usable.
|
|
||||||
# WARNING: Do not share this!! If somebody else obtains it, they can
|
|
||||||
# generate deletion URLs for any upload!!
|
|
||||||
deletion_secret = "asdfhjkasdhjfashjlfhjkaskdfjkhdjkh"
|
|
||||||
|
|
||||||
# OPTIONAL - specifies what to show when the site is visited on http
|
# OPTIONAL - specifies what to show when the site is visited on http
|
||||||
# It is sent with text/plain content type.
|
# It is sent with text/plain content type.
|
||||||
# There are two variables you can use:
|
# There are two variables you can use:
|
||||||
|
|
@ -125,13 +86,13 @@ max_temp_lifetime = 43200
|
||||||
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
# OPTIONAL - the maximum length (in bytes) a file being uploaded may be.
|
||||||
# A word of warning about this: the error shown to ShareX users who
|
# A word of warning about this: the error shown to ShareX users who
|
||||||
# hit the limit is *not* very clear. ("connection closed" or similar)
|
# hit the limit is *not* very clear. ("connection closed" or similar)
|
||||||
max_upload_len = 2_147_483_647
|
max_upload_len = 2_147_483_648
|
||||||
|
|
||||||
# The maximum length (in bytes) an image file may be before the server
|
# The maximum length (in bytes) an image file may be before the server
|
||||||
# will skip removing its EXIF data.
|
# will skip removing its EXIF data.
|
||||||
# The performance impact of breeze's EXIF data removal is not
|
# The performance impact of breeze's EXIF data removal is not
|
||||||
# very high in everyday usage, so something like 16MiB is reasonable.
|
# very high in everyday usage, so something like 16MiB is reasonable.
|
||||||
max_strip_len = 16_777_215
|
max_strip_len = 16_777_216
|
||||||
|
|
||||||
[engine.disk]
|
[engine.disk]
|
||||||
# The location that uploads will be saved to.
|
# The location that uploads will be saved to.
|
||||||
|
|
@ -141,17 +102,17 @@ save_path = "/data"
|
||||||
[engine.cache]
|
[engine.cache]
|
||||||
# The file size (in bytes) that a file must be under
|
# The file size (in bytes) that a file must be under
|
||||||
# to get cached.
|
# to get cached.
|
||||||
max_length = 134_217_727
|
max_length = 134_217_728
|
||||||
|
|
||||||
# How long a cached upload will remain cached. (in seconds)
|
# How long a cached upload will remain cached. (in seconds)
|
||||||
upload_lifetime = 1800
|
upload_lifetime = 1800
|
||||||
|
|
||||||
# How often the cache will be checked for expired uploads
|
# How often the cache will be checked for expired uploads.
|
||||||
# in the background.
|
# It is not a continuous scan, and only is triggered upon a cache operation.
|
||||||
scan_freq = 60
|
scan_freq = 60
|
||||||
|
|
||||||
# How much memory (in bytes) the cache is allowed to consume.
|
# How much memory (in bytes) the cache is allowed to consume.
|
||||||
mem_capacity = 4_294_967_295
|
mem_capacity = 4_294_967_296
|
||||||
|
|
||||||
[http]
|
[http]
|
||||||
# The address that the HTTP server will listen on. (ip:port)
|
# The address that the HTTP server will listen on. (ip:port)
|
||||||
|
|
@ -166,17 +127,13 @@ level = "warn"
|
||||||
```
|
```
|
||||||
|
|
||||||
### Uploading
|
### Uploading
|
||||||
|
|
||||||
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
The HTTP API is pretty simple, and it's easy to make a ShareX configuration for it.
|
||||||
|
|
||||||
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
Uploads should be sent to `/new?name={original filename}` as a POST request. If the server uses upload keys, it should be sent to `/new?name={original filename}&key={upload key}`. The uploaded file's content should be sent as raw binary in the request body.
|
||||||
|
|
||||||
Also you can specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, you can leave it as default. you'll know if you need it)
|
Additionally, you may specify `&lastfor={time in seconds}` to make your upload temporary, or `&keepexif=true` to tell the server not to clear EXIF data on image uploads. (if you don't know what EXIF data is, just leave it as default. you'll know if you need it)
|
||||||
|
|
||||||
The endpoint's response will just be the URL of the upload in plain text, and the deletion URL will be sent in the `Breeze-Deletion-Url` header (if it's enabled).
|
|
||||||
|
|
||||||
Here's an example ShareX configuration for it (with a key):
|
Here's an example ShareX configuration for it (with a key):
|
||||||
|
|
||||||
```json
|
```json
|
||||||
{
|
{
|
||||||
"Version": "14.1.0",
|
"Version": "14.1.0",
|
||||||
|
|
@ -188,7 +145,6 @@ Here's an example ShareX configuration for it (with a key):
|
||||||
"name": "{filename}",
|
"name": "{filename}",
|
||||||
"key": "hiiiiiiii"
|
"key": "hiiiiiiii"
|
||||||
},
|
},
|
||||||
"Body": "Binary",
|
"Body": "Binary"
|
||||||
"DeletionURL": "{header:Breeze-Deletion-Url}"
|
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
|
||||||
12
flake.lock
12
flake.lock
|
|
@ -2,11 +2,11 @@
|
||||||
"nodes": {
|
"nodes": {
|
||||||
"crane": {
|
"crane": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1748047550,
|
"lastModified": 1734808813,
|
||||||
"narHash": "sha256-t0qLLqb4C1rdtiY8IFRH5KIapTY/n3Lqt57AmxEv9mk=",
|
"narHash": "sha256-3aH/0Y6ajIlfy7j52FGZ+s4icVX0oHhqBzRdlOeztqg=",
|
||||||
"owner": "ipetkov",
|
"owner": "ipetkov",
|
||||||
"repo": "crane",
|
"repo": "crane",
|
||||||
"rev": "b718a78696060df6280196a6f992d04c87a16aef",
|
"rev": "72e2d02dbac80c8c86bf6bf3e785536acf8ee926",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|
@ -35,11 +35,11 @@
|
||||||
},
|
},
|
||||||
"nixpkgs": {
|
"nixpkgs": {
|
||||||
"locked": {
|
"locked": {
|
||||||
"lastModified": 1747958103,
|
"lastModified": 1735821806,
|
||||||
"narHash": "sha256-qmmFCrfBwSHoWw7cVK4Aj+fns+c54EBP8cGqp/yK410=",
|
"narHash": "sha256-cuNapx/uQeCgeuhUhdck3JKbgpsml259sjUQnWM7zW8=",
|
||||||
"owner": "NixOS",
|
"owner": "NixOS",
|
||||||
"repo": "nixpkgs",
|
"repo": "nixpkgs",
|
||||||
"rev": "fe51d34885f7b5e3e7b59572796e1bcb427eccb1",
|
"rev": "d6973081434f88088e5321f83ebafe9a1167c367",
|
||||||
"type": "github"
|
"type": "github"
|
||||||
},
|
},
|
||||||
"original": {
|
"original": {
|
||||||
|
|
|
||||||
33
flake.nix
33
flake.nix
|
|
@ -77,8 +77,6 @@
|
||||||
with lib; let
|
with lib; let
|
||||||
cfg = config.services.breeze;
|
cfg = config.services.breeze;
|
||||||
settingsFormat = pkgs.formats.toml {};
|
settingsFormat = pkgs.formats.toml {};
|
||||||
defaultUser = "breeze";
|
|
||||||
defaultGroup = "breeze";
|
|
||||||
in {
|
in {
|
||||||
options = {
|
options = {
|
||||||
services.breeze = {
|
services.breeze = {
|
||||||
|
|
@ -92,13 +90,13 @@
|
||||||
|
|
||||||
user = mkOption {
|
user = mkOption {
|
||||||
type = types.str;
|
type = types.str;
|
||||||
default = defaultUser;
|
default = "breeze";
|
||||||
description = "User that `breeze` will run under";
|
description = "User that `breeze` will run under";
|
||||||
};
|
};
|
||||||
|
|
||||||
group = mkOption {
|
group = mkOption {
|
||||||
type = types.str;
|
type = types.str;
|
||||||
default = defaultGroup;
|
default = "breeze";
|
||||||
description = "Group that `breeze` will run under";
|
description = "Group that `breeze` will run under";
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -113,7 +111,7 @@
|
||||||
default = {};
|
default = {};
|
||||||
description = ''
|
description = ''
|
||||||
The *.toml configuration to run `breeze` with.
|
The *.toml configuration to run `breeze` with.
|
||||||
The options aren't formally documented, but the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md) provides examples.
|
There is no formal documentation, but there is an example in the [readme](https://git.min.rip/min/breeze/src/branch/main/README.md).
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -134,29 +132,16 @@
|
||||||
This is useful for loading it from a secret management system.
|
This is useful for loading it from a secret management system.
|
||||||
'';
|
'';
|
||||||
};
|
};
|
||||||
|
|
||||||
deletionSecretFile = mkOption {
|
|
||||||
type = types.nullOr types.path;
|
|
||||||
default = null;
|
|
||||||
description = ''
|
|
||||||
File to load the `engine.deletion_secret` from, if desired.
|
|
||||||
This is useful for loading it from a secret management system.
|
|
||||||
'';
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
};
|
};
|
||||||
|
|
||||||
config = mkIf cfg.enable {
|
config = mkIf cfg.enable {
|
||||||
users.users = mkIf (cfg.user == defaultUser) {
|
users.users.${cfg.user} = {
|
||||||
${cfg.user} = {
|
isSystemUser = true;
|
||||||
isSystemUser = true;
|
inherit (cfg) group;
|
||||||
inherit (cfg) group;
|
|
||||||
};
|
|
||||||
};
|
};
|
||||||
|
|
||||||
users.groups = mkIf (cfg.group == defaultGroup) {
|
users.groups.${cfg.group} = {};
|
||||||
${cfg.group} = {};
|
|
||||||
};
|
|
||||||
|
|
||||||
systemd.tmpfiles.rules = [
|
systemd.tmpfiles.rules = [
|
||||||
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
"d '${cfg.configDir}' 0750 ${cfg.user} ${cfg.group} - -"
|
||||||
|
|
@ -164,7 +149,6 @@
|
||||||
|
|
||||||
services.breeze.settings = mkMerge [
|
services.breeze.settings = mkMerge [
|
||||||
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
(mkIf (cfg.uploadKeyFile != null) {engine.upload_key = "@UPLOAD_KEY@";})
|
||||||
(mkIf (cfg.deletionSecretFile != null) {engine.deletion_secret = "@DELETION_SECRET@";})
|
|
||||||
];
|
];
|
||||||
|
|
||||||
systemd.services.breeze = let
|
systemd.services.breeze = let
|
||||||
|
|
@ -180,9 +164,6 @@
|
||||||
''
|
''
|
||||||
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
+ lib.optionalString (cfg.uploadKeyFile != null) ''
|
||||||
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
${pkgs.replace-secret}/bin/replace-secret '@UPLOAD_KEY@' "${cfg.uploadKeyFile}" ${cfgFile}
|
||||||
''
|
|
||||||
+ lib.optionalString (cfg.deletionSecretFile != null) ''
|
|
||||||
${pkgs.replace-secret}/bin/replace-secret '@DELETION_SECRET@' "${cfg.deletionSecretFile}" ${cfgFile}
|
|
||||||
'';
|
'';
|
||||||
|
|
||||||
serviceConfig = rec {
|
serviceConfig = rec {
|
||||||
|
|
|
||||||
278
src/cache.rs
278
src/cache.rs
|
|
@ -1,22 +1,16 @@
|
||||||
use std::{
|
use std::{
|
||||||
sync::atomic::{AtomicU64, AtomicUsize, Ordering},
|
sync::atomic::{AtomicUsize, Ordering},
|
||||||
time::Duration,
|
time::{Duration, SystemTime},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use atomic_time::AtomicSystemTime;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use color_eyre::eyre::{self, bail};
|
use dashmap::{mapref::one::Ref, DashMap};
|
||||||
use dashmap::{DashMap, mapref::one::Ref};
|
use rayon::prelude::*;
|
||||||
use tokio::time;
|
use tokio::time;
|
||||||
|
|
||||||
use crate::config;
|
use crate::config;
|
||||||
|
|
||||||
#[cfg(not(test))]
|
|
||||||
use atomic_time::AtomicSystemTime;
|
|
||||||
#[cfg(not(test))]
|
|
||||||
use std::time::SystemTime;
|
|
||||||
#[cfg(test)]
|
|
||||||
use tests::{MockAtomicSystemTime as AtomicSystemTime, MockSystemTime as SystemTime};
|
|
||||||
|
|
||||||
/// An entry stored in the cache.
|
/// An entry stored in the cache.
|
||||||
///
|
///
|
||||||
/// It contains basic metadata and the actual value.
|
/// It contains basic metadata and the actual value.
|
||||||
|
|
@ -68,29 +62,18 @@ pub struct Cache {
|
||||||
/// Total length of data stored in cache currently
|
/// Total length of data stored in cache currently
|
||||||
length: AtomicUsize,
|
length: AtomicUsize,
|
||||||
|
|
||||||
/// How many times the scanner has ran,
|
|
||||||
/// for testing purposes
|
|
||||||
scan_count: AtomicU64,
|
|
||||||
|
|
||||||
/// How should it behave
|
/// How should it behave
|
||||||
cfg: config::CacheConfig,
|
cfg: config::CacheConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Cache {
|
impl Cache {
|
||||||
pub fn with_config(cfg: config::CacheConfig) -> eyre::Result<Self> {
|
pub fn with_config(cfg: config::CacheConfig) -> Self {
|
||||||
// Sanity check chosen limits
|
Self {
|
||||||
if cfg.mem_capacity < cfg.max_length {
|
|
||||||
bail!("`max_length` should not exceed `mem_capacity`");
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return
|
|
||||||
Ok(Self {
|
|
||||||
map: DashMap::with_capacity(64),
|
map: DashMap::with_capacity(64),
|
||||||
length: AtomicUsize::new(0),
|
length: AtomicUsize::new(0),
|
||||||
scan_count: AtomicU64::new(0),
|
|
||||||
|
|
||||||
cfg,
|
cfg,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Figure out who should be bumped out of cache next
|
/// Figure out who should be bumped out of cache next
|
||||||
|
|
@ -98,7 +81,7 @@ impl Cache {
|
||||||
let mut sorted: Vec<_> = self.map.iter().collect();
|
let mut sorted: Vec<_> = self.map.iter().collect();
|
||||||
|
|
||||||
// Sort by least recently used
|
// Sort by least recently used
|
||||||
sorted.sort_unstable_by_key(|e| e.last_used());
|
sorted.par_sort_unstable_by(|e1, e2| e1.last_used().cmp(&e2.last_used()));
|
||||||
|
|
||||||
// Total bytes we would be removing
|
// Total bytes we would be removing
|
||||||
let mut total = 0;
|
let mut total = 0;
|
||||||
|
|
@ -159,12 +142,12 @@ impl Cache {
|
||||||
// How far we went above the limit
|
// How far we went above the limit
|
||||||
let needed = new_total - self.cfg.mem_capacity;
|
let needed = new_total - self.cfg.mem_capacity;
|
||||||
|
|
||||||
self.next_out(needed).iter().for_each(|k| {
|
self.next_out(needed).par_iter().for_each(|k| {
|
||||||
// Remove the element, and ignore the result
|
// Remove the element, and ignore the result
|
||||||
// The only reason it should be failing is if it couldn't find it,
|
// The only reason it should be failing is if it couldn't find it,
|
||||||
// in which case it was already removed
|
// in which case it was already removed
|
||||||
self.remove(k);
|
self.remove(k);
|
||||||
});
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Atomically add to total cached data length
|
// Atomically add to total cached data length
|
||||||
|
|
@ -187,7 +170,7 @@ impl Cache {
|
||||||
///
|
///
|
||||||
/// It exists so we can run the expiry check before
|
/// It exists so we can run the expiry check before
|
||||||
/// actually working with any entries, so no weird bugs happen
|
/// actually working with any entries, so no weird bugs happen
|
||||||
fn get_(&self, key: &str) -> Option<Ref<'_, String, Entry>> {
|
fn _get(&self, key: &str) -> Option<Ref<String, Entry>> {
|
||||||
let e = self.map.get(key)?;
|
let e = self.map.get(key)?;
|
||||||
|
|
||||||
// if the entry is expired get rid of it now
|
// if the entry is expired get rid of it now
|
||||||
|
|
@ -207,7 +190,7 @@ impl Cache {
|
||||||
|
|
||||||
/// Get an item from the cache, if it exists.
|
/// Get an item from the cache, if it exists.
|
||||||
pub fn get(&self, key: &str) -> Option<Bytes> {
|
pub fn get(&self, key: &str) -> Option<Bytes> {
|
||||||
let e = self.get_(key)?;
|
let e = self._get(key)?;
|
||||||
|
|
||||||
if e.update_used {
|
if e.update_used {
|
||||||
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
e.last_used.store(SystemTime::now(), Ordering::Relaxed);
|
||||||
|
|
@ -223,14 +206,14 @@ impl Cache {
|
||||||
/// We don't use [`DashMap::contains_key`] here because it would just do
|
/// We don't use [`DashMap::contains_key`] here because it would just do
|
||||||
/// the exact same thing I do here, but without running the expiry check logic
|
/// the exact same thing I do here, but without running the expiry check logic
|
||||||
pub fn has(&self, key: &str) -> bool {
|
pub fn has(&self, key: &str) -> bool {
|
||||||
self.get_(key).is_some()
|
self._get(key).is_some()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Returns if an upload is able to be cached
|
/// Returns if an upload is able to be cached
|
||||||
/// with the current caching rules
|
/// with the current caching rules
|
||||||
#[inline]
|
#[inline(always)]
|
||||||
pub fn will_use(&self, length: u64) -> bool {
|
pub fn will_use(&self, length: u64) -> bool {
|
||||||
length <= (self.cfg.max_length as u64)
|
length <= self.cfg.max_length
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The background job that scans through the cache and removes inactive elements.
|
/// The background job that scans through the cache and removes inactive elements.
|
||||||
|
|
@ -239,13 +222,10 @@ impl Cache {
|
||||||
/// letting each entry keep track of expiry with its own task
|
/// letting each entry keep track of expiry with its own task
|
||||||
pub async fn scanner(&self) {
|
pub async fn scanner(&self) {
|
||||||
let mut interval = time::interval(self.cfg.scan_freq);
|
let mut interval = time::interval(self.cfg.scan_freq);
|
||||||
interval.set_missed_tick_behavior(time::MissedTickBehavior::Skip);
|
|
||||||
interval.tick().await; // Skip first tick
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// We put this first so that it doesn't scan the instant the server starts
|
// We put this first so that it doesn't scan the instant the server starts
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
self.scan_count.fetch_add(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
// Save current timestamp so we aren't retrieving it constantly
|
// Save current timestamp so we aren't retrieving it constantly
|
||||||
// If we don't do this it'll be a LOT of system api calls
|
// If we don't do this it'll be a LOT of system api calls
|
||||||
|
|
@ -255,7 +235,7 @@ impl Cache {
|
||||||
// If we fail to compare the times, it gets added to the list anyways
|
// If we fail to compare the times, it gets added to the list anyways
|
||||||
let expired: Vec<_> = self
|
let expired: Vec<_> = self
|
||||||
.map
|
.map
|
||||||
.iter()
|
.par_iter()
|
||||||
.filter_map(|e| {
|
.filter_map(|e| {
|
||||||
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
let elapsed = now.duration_since(e.last_used()).unwrap_or(Duration::MAX);
|
||||||
let is_expired = elapsed >= e.lifetime;
|
let is_expired = elapsed >= e.lifetime;
|
||||||
|
|
@ -272,230 +252,8 @@ impl Cache {
|
||||||
if !expired.is_empty() {
|
if !expired.is_empty() {
|
||||||
// Use a retain call, should be less locks that way
|
// Use a retain call, should be less locks that way
|
||||||
// (instead of many remove calls)
|
// (instead of many remove calls)
|
||||||
self.map.retain(|k, _| !expired.contains(k));
|
self.map.retain(|k, _| !expired.contains(k))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use std::{
|
|
||||||
sync::{
|
|
||||||
Arc,
|
|
||||||
atomic::{AtomicU64, Ordering},
|
|
||||||
},
|
|
||||||
time::Duration,
|
|
||||||
};
|
|
||||||
|
|
||||||
use bytes::Bytes;
|
|
||||||
|
|
||||||
use crate::{cache::Cache, config::CacheConfig};
|
|
||||||
|
|
||||||
thread_local! {
|
|
||||||
static MOCK_CLOCK: AtomicU64 = AtomicU64::new(0);
|
|
||||||
}
|
|
||||||
fn get_clock() -> u64 {
|
|
||||||
MOCK_CLOCK.with(|mc| mc.load(Ordering::Relaxed))
|
|
||||||
}
|
|
||||||
fn advance_clock(ms: u64) {
|
|
||||||
MOCK_CLOCK.with(|mc| mc.fetch_add(ms, Ordering::Relaxed));
|
|
||||||
}
|
|
||||||
async fn advance_clock_async(ms: u64) {
|
|
||||||
advance_clock(ms);
|
|
||||||
tokio::time::advance(Duration::from_millis(ms)).await;
|
|
||||||
tokio::task::yield_now().await; // make sure scanner tick runs
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct MockSystemTimeError;
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug)]
|
|
||||||
pub(super) struct MockSystemTime(u64);
|
|
||||||
impl MockSystemTime {
|
|
||||||
pub fn now() -> Self {
|
|
||||||
Self(get_clock())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn duration_since(
|
|
||||||
&self,
|
|
||||||
earlier: MockSystemTime,
|
|
||||||
) -> Result<Duration, MockSystemTimeError> {
|
|
||||||
if self.0 >= earlier.0 {
|
|
||||||
Ok(Duration::from_millis(self.0 - earlier.0))
|
|
||||||
} else {
|
|
||||||
Err(MockSystemTimeError)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn elapsed(&self) -> Result<Duration, MockSystemTimeError> {
|
|
||||||
Self::now().duration_since(*self)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(super) struct MockAtomicSystemTime(AtomicU64);
|
|
||||||
impl MockAtomicSystemTime {
|
|
||||||
pub fn now() -> Self {
|
|
||||||
Self(AtomicU64::new(get_clock()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn load(&self, order: Ordering) -> MockSystemTime {
|
|
||||||
MockSystemTime(self.0.load(order))
|
|
||||||
}
|
|
||||||
pub fn store(&self, system_time: MockSystemTime, order: Ordering) {
|
|
||||||
self.0.store(system_time.0, order);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const KEY: &str = "abcdef.png";
|
|
||||||
const VALUE: Bytes = Bytes::from_static(&[0, 1, 2, 3, 4, 5, 6, 7]);
|
|
||||||
|
|
||||||
fn simple() -> Cache {
|
|
||||||
return Cache::with_config(CacheConfig {
|
|
||||||
max_length: 10_000_000,
|
|
||||||
mem_capacity: 100_000_000,
|
|
||||||
scan_freq: Duration::from_secs(5),
|
|
||||||
upload_lifetime: Duration::from_secs(15),
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn scanning() -> Arc<Cache> {
|
|
||||||
let cache = Arc::new(simple());
|
|
||||||
|
|
||||||
tokio::spawn({
|
|
||||||
let cache = cache.clone();
|
|
||||||
async move { cache.scanner().await }
|
|
||||||
});
|
|
||||||
// allow 0ms scanner tick to run
|
|
||||||
tokio::task::yield_now().await;
|
|
||||||
|
|
||||||
cache
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make sure that cache use check
|
|
||||||
/// decides properly for multiple lengths
|
|
||||||
#[test]
|
|
||||||
fn will_use() {
|
|
||||||
let cache = simple();
|
|
||||||
|
|
||||||
// use something
|
|
||||||
assert!(cache.will_use(4_000_000));
|
|
||||||
|
|
||||||
// don't use something
|
|
||||||
assert!(!cache.will_use(12_000_001));
|
|
||||||
|
|
||||||
// use something edge
|
|
||||||
assert!(cache.will_use(10_000_000));
|
|
||||||
|
|
||||||
// use something mini
|
|
||||||
assert!(cache.will_use(0));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make sure that [`Cache::add`]'s return value
|
|
||||||
/// is `false` when an entry was replaced
|
|
||||||
#[test]
|
|
||||||
fn store_replacement() {
|
|
||||||
let cache = simple();
|
|
||||||
|
|
||||||
// store
|
|
||||||
assert!(cache.add(KEY, VALUE));
|
|
||||||
|
|
||||||
// store w replace
|
|
||||||
assert!(!cache.add(KEY, VALUE));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make sure that the scanner ticks at
|
|
||||||
/// the right times, and removes entries
|
|
||||||
/// when expected.
|
|
||||||
#[tokio::test(start_paused = true)]
|
|
||||||
async fn store_expire_on_hit_with_scanner() {
|
|
||||||
let cache = scanning().await;
|
|
||||||
|
|
||||||
// store
|
|
||||||
assert!(cache.add(KEY, VALUE));
|
|
||||||
|
|
||||||
// get again so that scanner timing
|
|
||||||
// doesn't align w expiration
|
|
||||||
advance_clock_async(4999).await;
|
|
||||||
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 0);
|
|
||||||
assert_eq!(cache.get(KEY), Some(VALUE));
|
|
||||||
|
|
||||||
// next scanner tick
|
|
||||||
advance_clock_async(1).await;
|
|
||||||
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 1);
|
|
||||||
|
|
||||||
// advance a bit more
|
|
||||||
// make sure we don't expire early
|
|
||||||
advance_clock_async(7000).await;
|
|
||||||
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 2);
|
|
||||||
assert!(cache.map.get(KEY).is_some());
|
|
||||||
|
|
||||||
// advance to next scanner tick
|
|
||||||
advance_clock_async(3000).await;
|
|
||||||
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3);
|
|
||||||
|
|
||||||
// advance to after expiry
|
|
||||||
advance_clock_async(4999).await;
|
|
||||||
assert_eq!(cache.scan_count.load(Ordering::Relaxed), 3);
|
|
||||||
|
|
||||||
// it should be there because we
|
|
||||||
// offset ourselves by 1ms
|
|
||||||
assert!(cache.map.get(KEY).is_some());
|
|
||||||
assert_eq!(cache.get(KEY), None);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make sure that the scanner removes
|
|
||||||
/// expired entries.
|
|
||||||
#[tokio::test(start_paused = true)]
|
|
||||||
async fn store_expire_by_scanner() {
|
|
||||||
let cache = scanning().await;
|
|
||||||
|
|
||||||
// store
|
|
||||||
assert!(cache.add(KEY, VALUE));
|
|
||||||
|
|
||||||
// make sure we don't expire early
|
|
||||||
advance_clock_async(6500).await;
|
|
||||||
assert!(cache.map.get(KEY).is_some());
|
|
||||||
|
|
||||||
// advance to after expiry
|
|
||||||
advance_clock_async(8500).await;
|
|
||||||
|
|
||||||
// it should get hit by scanner
|
|
||||||
assert!(cache.map.get(KEY).is_none());
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Make sure that entries expire on hit,
|
|
||||||
/// even when there is no scanner
|
|
||||||
#[test]
|
|
||||||
fn store_get_expire_on_hit() {
|
|
||||||
let cache = simple();
|
|
||||||
|
|
||||||
// store, get
|
|
||||||
let added_at = MockSystemTime::now();
|
|
||||||
assert!(cache.add(KEY, VALUE));
|
|
||||||
assert_eq!(cache.get(KEY), Some(VALUE));
|
|
||||||
|
|
||||||
// get after delay
|
|
||||||
// (upload gets used)
|
|
||||||
advance_clock(2000);
|
|
||||||
assert_eq!(cache.map.get(KEY).unwrap().last_used(), added_at);
|
|
||||||
assert_eq!(cache.get(KEY), Some(VALUE));
|
|
||||||
assert_eq!(
|
|
||||||
cache.map.get(KEY).unwrap().last_used(),
|
|
||||||
MockSystemTime::now()
|
|
||||||
);
|
|
||||||
|
|
||||||
// get after longer delay
|
|
||||||
// (upload should have been used so no expire)
|
|
||||||
advance_clock(14000);
|
|
||||||
assert_eq!(cache.get(KEY), Some(VALUE));
|
|
||||||
assert_eq!(
|
|
||||||
cache.map.get(KEY).unwrap().last_used(),
|
|
||||||
MockSystemTime::now()
|
|
||||||
);
|
|
||||||
|
|
||||||
// get after expiration
|
|
||||||
advance_clock(15000);
|
|
||||||
assert!(cache.get(KEY).is_none());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,12 @@
|
||||||
use std::{path::PathBuf, time::Duration};
|
use std::{path::PathBuf, time::Duration};
|
||||||
|
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_with::{DisplayFromStr, DurationSeconds, serde_as};
|
use serde_with::{serde_as, DisplayFromStr, DurationSeconds};
|
||||||
use tracing_subscriber::filter::LevelFilter;
|
use tracing_subscriber::filter::LevelFilter;
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
#[derive(Deserialize)]
|
||||||
pub struct Config {
|
pub struct Config {
|
||||||
pub engine: EngineConfig,
|
pub engine: EngineConfig,
|
||||||
pub cache: CacheConfig,
|
|
||||||
pub disk: DiskConfig,
|
|
||||||
pub http: HttpConfig,
|
pub http: HttpConfig,
|
||||||
pub logger: LoggerConfig,
|
pub logger: LoggerConfig,
|
||||||
}
|
}
|
||||||
|
|
@ -22,18 +20,18 @@ fn default_motd() -> String {
|
||||||
pub struct EngineConfig {
|
pub struct EngineConfig {
|
||||||
/// The url that the instance of breeze is meant to be accessed from.
|
/// The url that the instance of breeze is meant to be accessed from.
|
||||||
///
|
///
|
||||||
/// ex: `https://picture.wtf` would generate links like `https://picture.wtf/p/abcdef.png`
|
/// ex: https://picture.wtf would generate links like https://picture.wtf/p/abcdef.png
|
||||||
pub base_url: String,
|
pub base_url: String,
|
||||||
|
|
||||||
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
/// Authentication key for new uploads, will be required if this is specified. (optional)
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub upload_key: String,
|
pub upload_key: String,
|
||||||
|
|
||||||
/// Secret key to use when generating or verifying deletion tokens.
|
/// Configuration for disk system
|
||||||
/// Leave blank to disable.
|
pub disk: DiskConfig,
|
||||||
///
|
|
||||||
/// If this secret is leaked, anyone can delete any file. Be careful!!!
|
/// Configuration for cache system
|
||||||
pub deletion_secret: Option<String>,
|
pub cache: CacheConfig,
|
||||||
|
|
||||||
/// Maximum size of an upload that will be accepted.
|
/// Maximum size of an upload that will be accepted.
|
||||||
/// Files above this size can not be uploaded.
|
/// Files above this size can not be uploaded.
|
||||||
|
|
@ -65,8 +63,8 @@ pub struct DiskConfig {
|
||||||
#[derive(Deserialize, Clone)]
|
#[derive(Deserialize, Clone)]
|
||||||
pub struct CacheConfig {
|
pub struct CacheConfig {
|
||||||
/// The maximum length in bytes that a file can be
|
/// The maximum length in bytes that a file can be
|
||||||
/// before it skips cache (in bytes)
|
/// before it skips cache (in seconds)
|
||||||
pub max_length: usize,
|
pub max_length: u64,
|
||||||
|
|
||||||
/// The amount of time a file can last inside the cache (in seconds)
|
/// The amount of time a file can last inside the cache (in seconds)
|
||||||
#[serde_as(as = "DurationSeconds")]
|
#[serde_as(as = "DurationSeconds")]
|
||||||
|
|
|
||||||
|
|
@ -1,89 +0,0 @@
|
||||||
use std::sync::{Arc, atomic::Ordering};
|
|
||||||
|
|
||||||
use axum::extract::{Query, State};
|
|
||||||
use base64::{Engine as _, prelude::BASE64_URL_SAFE_NO_PAD};
|
|
||||||
use bytes::{Buf, BytesMut};
|
|
||||||
use hmac::Mac;
|
|
||||||
use http::StatusCode;
|
|
||||||
use serde::Deserialize;
|
|
||||||
|
|
||||||
use crate::engine::{Engine, update_hmac};
|
|
||||||
|
|
||||||
#[derive(Deserialize)]
|
|
||||||
pub struct DeleteRequest {
|
|
||||||
name: String,
|
|
||||||
hash: String,
|
|
||||||
hmac: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete(
|
|
||||||
State(engine): State<Arc<Engine>>,
|
|
||||||
Query(req): Query<DeleteRequest>,
|
|
||||||
) -> (StatusCode, &'static str) {
|
|
||||||
let Some(mut hmac) = engine.deletion_hmac.clone() else {
|
|
||||||
return (StatusCode::CONFLICT, "Deletion is not enabled");
|
|
||||||
};
|
|
||||||
|
|
||||||
// -- decode provided data
|
|
||||||
|
|
||||||
// decode user-given hmac
|
|
||||||
let Ok(provided_hmac) = BASE64_URL_SAFE_NO_PAD.decode(req.hmac) else {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Could not decode hmac");
|
|
||||||
};
|
|
||||||
|
|
||||||
// decode hash from base64
|
|
||||||
let Ok(mut provided_hash_data) = BASE64_URL_SAFE_NO_PAD
|
|
||||||
.decode(req.hash)
|
|
||||||
.map(|v| BytesMut::from(&v[..]))
|
|
||||||
else {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Could not decode partial hash");
|
|
||||||
};
|
|
||||||
// read hash
|
|
||||||
if provided_hash_data.len() != 16 {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Partial hash length is invalid");
|
|
||||||
}
|
|
||||||
let provided_hash = provided_hash_data.get_u128();
|
|
||||||
|
|
||||||
// -- verify it
|
|
||||||
|
|
||||||
// check if info is valid
|
|
||||||
let is_hmac_valid = {
|
|
||||||
// update hmad
|
|
||||||
update_hmac(&mut hmac, &req.name, provided_hash);
|
|
||||||
// verify..
|
|
||||||
hmac.verify_slice(&provided_hmac).is_ok()
|
|
||||||
};
|
|
||||||
if !is_hmac_valid {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Hmac is invalid");
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- ensure hash matches
|
|
||||||
|
|
||||||
// okay, now check if we compute the same hash as the req
|
|
||||||
// this makes sure it's (probably) the same file
|
|
||||||
let actual_hash = match engine.get_hash(&req.name).await {
|
|
||||||
Ok(Some(h)) => h,
|
|
||||||
Ok(None) => return (StatusCode::NOT_FOUND, "File not found"),
|
|
||||||
Err(err) => {
|
|
||||||
tracing::error!(%err, "failed to get hash");
|
|
||||||
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal server error!!");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
// compare
|
|
||||||
if provided_hash != actual_hash {
|
|
||||||
return (StatusCode::BAD_REQUEST, "Partial hash did not match");
|
|
||||||
}
|
|
||||||
|
|
||||||
// -- delete file
|
|
||||||
|
|
||||||
// everything seems okay so try to delete
|
|
||||||
if let Err(err) = engine.remove(&req.name).await {
|
|
||||||
tracing::error!(%err, "failed to delete upload");
|
|
||||||
return (StatusCode::INTERNAL_SERVER_ERROR, "Delete failed");
|
|
||||||
}
|
|
||||||
|
|
||||||
// decrement upload count
|
|
||||||
engine.upl_count.fetch_sub(1, Ordering::Relaxed);
|
|
||||||
|
|
||||||
(StatusCode::OK, "Deleted successfully!")
|
|
||||||
}
|
|
||||||
65
src/disk.rs
65
src/disk.rs
|
|
@ -1,4 +1,4 @@
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::PathBuf;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
|
|
@ -6,6 +6,8 @@ use tokio::{
|
||||||
io::{self, AsyncWriteExt},
|
io::{self, AsyncWriteExt},
|
||||||
sync::mpsc,
|
sync::mpsc,
|
||||||
};
|
};
|
||||||
|
use tracing::debug;
|
||||||
|
use walkdir::WalkDir;
|
||||||
|
|
||||||
use crate::config;
|
use crate::config;
|
||||||
|
|
||||||
|
|
@ -21,23 +23,17 @@ impl Disk {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Counts the number of files saved to disk we have
|
/// Counts the number of files saved to disk we have
|
||||||
pub fn count(&self) -> io::Result<usize> {
|
pub fn count(&self) -> usize {
|
||||||
std::fs::read_dir(&self.cfg.save_path)?.try_fold(0, |acc, x| {
|
WalkDir::new(&self.cfg.save_path)
|
||||||
Ok(if x?.file_type()?.is_file() {
|
.min_depth(1)
|
||||||
acc + 1
|
.into_iter()
|
||||||
} else {
|
.count()
|
||||||
acc
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Formats the path on disk for a `saved_name`.
|
/// Formats the path on disk for a `saved_name`.
|
||||||
fn path_for(&self, saved_name: &str) -> PathBuf {
|
fn path_for(&self, saved_name: &str) -> PathBuf {
|
||||||
// try to prevent path traversal by ignoring everything except the file name
|
let mut p = self.cfg.save_path.clone();
|
||||||
let name = Path::new(saved_name).file_name().unwrap_or_default();
|
p.push(saved_name);
|
||||||
|
|
||||||
let mut p: PathBuf = self.cfg.save_path.clone();
|
|
||||||
p.push(name);
|
|
||||||
|
|
||||||
p
|
p
|
||||||
}
|
}
|
||||||
|
|
@ -69,46 +65,25 @@ impl Disk {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Create a background I/O task
|
/// Create a background I/O task
|
||||||
pub fn start_save<
|
pub async fn start_save(&self, saved_name: &str) -> mpsc::UnboundedSender<Bytes> {
|
||||||
Fut: Future + Send + 'static,
|
|
||||||
F: FnOnce(io::Error) -> Fut + Send + 'static,
|
|
||||||
>(
|
|
||||||
&self,
|
|
||||||
saved_name: &str,
|
|
||||||
fail_callback: F,
|
|
||||||
) -> mpsc::Sender<Bytes> {
|
|
||||||
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
// start a task that handles saving files to disk (we can save to cache/disk in parallel that way)
|
||||||
// a large buffer size is chosen so uploads can be received quickly,
|
let (tx, mut rx): (mpsc::UnboundedSender<Bytes>, mpsc::UnboundedReceiver<Bytes>) =
|
||||||
// but with less possibility of running out of memory.
|
mpsc::unbounded_channel();
|
||||||
// (thats probably only possible w very high link speed tho......)
|
|
||||||
let (tx, mut rx): (mpsc::Sender<Bytes>, mpsc::Receiver<Bytes>) = mpsc::channel(30000);
|
|
||||||
|
|
||||||
let p = self.path_for(saved_name);
|
let p = self.path_for(saved_name);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
// create file to save upload to
|
// create file to save upload to
|
||||||
let mut file = match File::create(p).await {
|
let mut file = File::create(p)
|
||||||
Ok(f) => f,
|
.await
|
||||||
Err(err) => {
|
.expect("could not open file! make sure your upload path is valid");
|
||||||
tracing::error!(%err, "could not open file! make sure your upload path is valid");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// receive chunks and save them to file
|
// receive chunks and save them to file
|
||||||
while let Some(chunk) = rx.recv().await {
|
while let Some(chunk) = rx.recv().await {
|
||||||
tracing::debug!(length = chunk.len(), "writing chunk to disk");
|
debug!("writing chunk to disk (length: {})", chunk.len());
|
||||||
if let Err(err) = file.write_all(&chunk).await {
|
file.write_all(&chunk)
|
||||||
drop(rx);
|
.await
|
||||||
fail_callback(err).await;
|
.expect("error while writing file to disk");
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// flush to disk
|
|
||||||
// this should catch "no space left on device" i hope...
|
|
||||||
if let Err(err) = file.flush().await {
|
|
||||||
fail_callback(err).await;
|
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
354
src/engine.rs
354
src/engine.rs
|
|
@ -1,27 +1,22 @@
|
||||||
use std::{
|
use std::{
|
||||||
io::SeekFrom,
|
ops::Bound,
|
||||||
ops::{Bound, RangeBounds},
|
|
||||||
sync::{
|
sync::{
|
||||||
Arc,
|
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc,
|
||||||
},
|
},
|
||||||
time::Duration,
|
time::Duration,
|
||||||
};
|
};
|
||||||
|
|
||||||
use axum::body::BodyDataStream;
|
use axum::body::BodyDataStream;
|
||||||
use base64::{Engine as _, prelude::BASE64_URL_SAFE_NO_PAD};
|
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use color_eyre::eyre::{self, WrapErr};
|
|
||||||
use hmac::Mac;
|
|
||||||
use img_parts::{DynImage, ImageEXIF};
|
use img_parts::{DynImage, ImageEXIF};
|
||||||
use rand::distr::{Alphanumeric, SampleString};
|
use rand::distributions::{Alphanumeric, DistString};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
fs::File,
|
fs::File,
|
||||||
io::{AsyncReadExt, AsyncSeekExt},
|
io::{AsyncReadExt, AsyncSeekExt},
|
||||||
};
|
};
|
||||||
use tokio_stream::StreamExt;
|
use tokio_stream::StreamExt;
|
||||||
use tracing::{debug, error, info};
|
use tracing::{debug, error, info};
|
||||||
use twox_hash::XxHash3_128;
|
|
||||||
|
|
||||||
use crate::{cache, config, disk};
|
use crate::{cache, config, disk};
|
||||||
|
|
||||||
|
|
@ -33,7 +28,6 @@ pub enum UploadData {
|
||||||
Disk(tokio::io::Take<File>),
|
Disk(tokio::io::Take<File>),
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Upload data and metadata needed to build a view response
|
|
||||||
pub struct UploadResponse {
|
pub struct UploadResponse {
|
||||||
pub full_len: u64,
|
pub full_len: u64,
|
||||||
pub range: (u64, u64),
|
pub range: (u64, u64),
|
||||||
|
|
@ -44,11 +38,8 @@ pub struct UploadResponse {
|
||||||
/// Some are rejections.
|
/// Some are rejections.
|
||||||
pub enum ProcessOutcome {
|
pub enum ProcessOutcome {
|
||||||
/// The upload was successful.
|
/// The upload was successful.
|
||||||
/// We give the user their file's URL (and deletion URL if one was created)
|
/// We give the user their file's URL
|
||||||
Success {
|
Success(String),
|
||||||
url: String,
|
|
||||||
deletion_url: Option<String>,
|
|
||||||
},
|
|
||||||
|
|
||||||
/// Occurs when an upload exceeds the chosen maximum file size.
|
/// Occurs when an upload exceeds the chosen maximum file size.
|
||||||
UploadTooLarge,
|
UploadTooLarge,
|
||||||
|
|
@ -72,9 +63,6 @@ pub enum GetOutcome {
|
||||||
RangeNotSatisfiable,
|
RangeNotSatisfiable,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Type alias to make using HMAC SHA256 easier
|
|
||||||
type HmacSha256 = hmac::Hmac<sha2::Sha256>;
|
|
||||||
|
|
||||||
/// breeze engine
|
/// breeze engine
|
||||||
pub struct Engine {
|
pub struct Engine {
|
||||||
/// Cached count of uploaded files
|
/// Cached count of uploaded files
|
||||||
|
|
@ -83,132 +71,63 @@ pub struct Engine {
|
||||||
/// Engine configuration
|
/// Engine configuration
|
||||||
pub cfg: config::EngineConfig,
|
pub cfg: config::EngineConfig,
|
||||||
|
|
||||||
/// HMAC state initialised with the deletion secret (if present)
|
|
||||||
pub deletion_hmac: Option<HmacSha256>,
|
|
||||||
|
|
||||||
/// The in-memory cache that cached uploads are stored in
|
/// The in-memory cache that cached uploads are stored in
|
||||||
cache: Arc<cache::Cache>,
|
cache: Arc<cache::Cache>,
|
||||||
|
|
||||||
/// An interface to the on-disk upload store
|
/// An interface to the on-disk upload store
|
||||||
disk: Arc<disk::Disk>,
|
disk: disk::Disk,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wipe out an upload from all storage.
|
|
||||||
/// * Intended for deletion URLs and failed uploads
|
|
||||||
/// * Separated from [`Engine`] for use in [`disk::Disk`]
|
|
||||||
async fn remove(cache: &cache::Cache, disk: &disk::Disk, saved_name: &str) -> eyre::Result<()> {
|
|
||||||
info!(saved_name, "!! removing upload");
|
|
||||||
|
|
||||||
cache.remove(saved_name);
|
|
||||||
disk.remove(saved_name)
|
|
||||||
.await
|
|
||||||
.wrap_err("failed to remove file from disk")?;
|
|
||||||
|
|
||||||
info!("!! successfully removed upload");
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Try to parse a `Range` header into an easier format to work with
|
|
||||||
fn resolve_range(range: Option<headers::Range>, full_len: u64) -> Option<(u64, u64)> {
|
fn resolve_range(range: Option<headers::Range>, full_len: u64) -> Option<(u64, u64)> {
|
||||||
// Prepare default range
|
let last_byte = full_len - 1;
|
||||||
let default = Some((0, full_len));
|
|
||||||
|
|
||||||
// Take range, otherwise return
|
let (start, end) =
|
||||||
let Some(range) = range else {
|
if let Some((start, end)) = range.and_then(|r| r.satisfiable_ranges(full_len).next()) {
|
||||||
return default; // unspecified; use default
|
// satisfiable_ranges will never return Excluded so this is ok
|
||||||
};
|
let start = if let Bound::Included(start_incl) = start {
|
||||||
|
start_incl
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
};
|
||||||
|
let end = if let Bound::Included(end_incl) = end {
|
||||||
|
end_incl
|
||||||
|
} else {
|
||||||
|
last_byte
|
||||||
|
};
|
||||||
|
|
||||||
// Get iterator of satisfiable ranges
|
(start, end)
|
||||||
let mut ranges = range.satisfiable_ranges(full_len);
|
} else {
|
||||||
|
(0, last_byte)
|
||||||
|
};
|
||||||
|
|
||||||
// Take first range
|
// catch ranges we can't satisfy
|
||||||
let Some(range) = ranges.next() else {
|
if end > last_byte || start > end {
|
||||||
return default; // empty; use default
|
|
||||||
};
|
|
||||||
|
|
||||||
// If there are multiple ranges, we will
|
|
||||||
// not process the request
|
|
||||||
if ranges.next().is_some() {
|
|
||||||
return None;
|
return None;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Convert into a..b range
|
|
||||||
let start = match range.start_bound() {
|
|
||||||
Bound::Included(&x) => x,
|
|
||||||
Bound::Excluded(&x) => x.checked_add(1)?,
|
|
||||||
Bound::Unbounded => 0,
|
|
||||||
};
|
|
||||||
let end = match range.end_bound() {
|
|
||||||
Bound::Included(&x) => x.checked_add(1)?,
|
|
||||||
Bound::Excluded(&x) => x,
|
|
||||||
Bound::Unbounded => full_len,
|
|
||||||
};
|
|
||||||
|
|
||||||
// We can't handle bounds
|
|
||||||
// out of order
|
|
||||||
if start > end {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
// We can't return more bytes
|
|
||||||
// than we have
|
|
||||||
if end > full_len {
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Return
|
|
||||||
Some((start, end))
|
Some((start, end))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Calculate HMAC of field values.
|
|
||||||
pub fn update_hmac(hmac: &mut HmacSha256, saved_name: &str, hash: u128) {
|
|
||||||
// mix deletion req fields into one buf
|
|
||||||
let mut field_bytes = BytesMut::new();
|
|
||||||
field_bytes.put(saved_name.as_bytes());
|
|
||||||
field_bytes.put_u128(hash);
|
|
||||||
|
|
||||||
// take the hmac
|
|
||||||
hmac.update(&field_bytes);
|
|
||||||
}
|
|
||||||
|
|
||||||
/// How many bytes of a file should be used for hash calculation.
|
|
||||||
const SAMPLE_WANTED_BYTES: usize = 32768;
|
|
||||||
|
|
||||||
/// Format some info about an upload and hash it
|
|
||||||
///
|
|
||||||
/// This should not change between versions!!
|
|
||||||
/// That would break deletion urls
|
|
||||||
fn calculate_hash(len: u64, data_sample: Bytes) -> u128 {
|
|
||||||
let mut buf = BytesMut::new();
|
|
||||||
buf.put_u64(len);
|
|
||||||
buf.put(data_sample);
|
|
||||||
|
|
||||||
XxHash3_128::oneshot(&buf)
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Engine {
|
impl Engine {
|
||||||
/// Creates a new instance of the engine
|
/// Creates a new instance of the engine
|
||||||
pub fn new(
|
pub fn with_config(cfg: config::EngineConfig) -> Self {
|
||||||
cfg: config::EngineConfig,
|
let cache = cache::Cache::with_config(cfg.cache.clone());
|
||||||
cache: Arc<cache::Cache>,
|
let disk = disk::Disk::with_config(cfg.disk.clone());
|
||||||
disk: disk::Disk,
|
|
||||||
) -> std::io::Result<Self> {
|
|
||||||
let deletion_hmac = cfg
|
|
||||||
.deletion_secret
|
|
||||||
.as_ref()
|
|
||||||
.map(|s| HmacSha256::new_from_slice(s.as_bytes()).unwrap());
|
|
||||||
|
|
||||||
Ok(Self {
|
let cache = Arc::new(cache);
|
||||||
|
|
||||||
|
let cache_scanner = cache.clone();
|
||||||
|
tokio::spawn(async move { cache_scanner.scanner().await });
|
||||||
|
|
||||||
|
Self {
|
||||||
// initialise our cached upload count. this doesn't include temp uploads!
|
// initialise our cached upload count. this doesn't include temp uploads!
|
||||||
upl_count: AtomicUsize::new(disk.count()?),
|
upl_count: AtomicUsize::new(disk.count()),
|
||||||
deletion_hmac,
|
|
||||||
|
|
||||||
cfg,
|
cfg,
|
||||||
|
|
||||||
cache,
|
cache,
|
||||||
disk: Arc::new(disk),
|
disk,
|
||||||
})
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Fetch an upload.
|
/// Fetch an upload.
|
||||||
|
|
@ -222,17 +141,18 @@ impl Engine {
|
||||||
&self,
|
&self,
|
||||||
saved_name: &str,
|
saved_name: &str,
|
||||||
range: Option<headers::Range>,
|
range: Option<headers::Range>,
|
||||||
) -> eyre::Result<GetOutcome> {
|
) -> anyhow::Result<GetOutcome> {
|
||||||
let data = if let Some(u) = self.cache.get(saved_name) {
|
let data = if let Some(u) = self.cache.get(saved_name) {
|
||||||
u
|
u
|
||||||
} else {
|
} else {
|
||||||
// now, check if we have it on disk
|
// now, check if we have it on disk
|
||||||
let Some(mut f) = self.disk.open(saved_name).await? else {
|
let mut f = if let Some(f) = self.disk.open(saved_name).await? {
|
||||||
|
f
|
||||||
|
} else {
|
||||||
// file didn't exist
|
// file didn't exist
|
||||||
return Ok(GetOutcome::NotFound);
|
return Ok(GetOutcome::NotFound);
|
||||||
};
|
};
|
||||||
|
|
||||||
// read length from disk
|
|
||||||
let full_len = self.disk.len(&f).await?;
|
let full_len = self.disk.len(&f).await?;
|
||||||
|
|
||||||
// if possible, recache and send a cache response
|
// if possible, recache and send a cache response
|
||||||
|
|
@ -260,15 +180,17 @@ impl Engine {
|
||||||
|
|
||||||
data
|
data
|
||||||
} else {
|
} else {
|
||||||
let Some((start, end)) = resolve_range(range, full_len) else {
|
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
|
||||||
|
range
|
||||||
|
} else {
|
||||||
return Ok(GetOutcome::RangeNotSatisfiable);
|
return Ok(GetOutcome::RangeNotSatisfiable);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Set up file handle
|
let range_len = (end - start) + 1;
|
||||||
f.seek(SeekFrom::Start(start)).await?;
|
|
||||||
let f = f.take(end - start);
|
f.seek(std::io::SeekFrom::Start(start)).await?;
|
||||||
|
let f = f.take(range_len);
|
||||||
|
|
||||||
// Return
|
|
||||||
let res = UploadResponse {
|
let res = UploadResponse {
|
||||||
full_len,
|
full_len,
|
||||||
range: (start, end),
|
range: (start, end),
|
||||||
|
|
@ -278,27 +200,17 @@ impl Engine {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Resolve a..b range
|
|
||||||
let full_len = data.len() as u64;
|
let full_len = data.len() as u64;
|
||||||
let Some((start, end)) = resolve_range(range, full_len) else {
|
let (start, end) = if let Some(range) = resolve_range(range, full_len) {
|
||||||
|
range
|
||||||
|
} else {
|
||||||
return Ok(GetOutcome::RangeNotSatisfiable);
|
return Ok(GetOutcome::RangeNotSatisfiable);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Cut down to range
|
// cut down to range
|
||||||
let data = {
|
let data = data.slice((start as usize)..=(end as usize));
|
||||||
// Convert types.
|
|
||||||
// These should never be greater than usize::MAX
|
|
||||||
// if I recall, because max cache length is a usize.
|
|
||||||
let (start, end): (usize, usize) = (
|
|
||||||
start.try_into().expect("start bound"),
|
|
||||||
end.try_into().expect("end bound"),
|
|
||||||
);
|
|
||||||
|
|
||||||
// Slice bytes
|
// build response
|
||||||
data.slice(start..end)
|
|
||||||
};
|
|
||||||
|
|
||||||
// Build response
|
|
||||||
let res = UploadResponse {
|
let res = UploadResponse {
|
||||||
full_len,
|
full_len,
|
||||||
range: (start, end),
|
range: (start, end),
|
||||||
|
|
@ -325,51 +237,13 @@ impl Engine {
|
||||||
false
|
false
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Try to read a file and calculate a hash for it.
|
|
||||||
pub async fn get_hash(&self, saved_name: &str) -> eyre::Result<Option<u128>> {
|
|
||||||
// readout sample data and full len
|
|
||||||
let (data_sample, len) = if let Some(full_data) = self.cache.get(saved_name) {
|
|
||||||
// we found it in cache! take as many bytes as we can
|
|
||||||
let taking = full_data.len().min(SAMPLE_WANTED_BYTES);
|
|
||||||
let data = full_data.slice(0..taking);
|
|
||||||
// get len
|
|
||||||
let len = full_data.len() as u64;
|
|
||||||
|
|
||||||
// return
|
|
||||||
(data, len)
|
|
||||||
} else {
|
|
||||||
// not in cache, so try disk
|
|
||||||
let Some(mut f) = self.disk.open(saved_name).await? else {
|
|
||||||
// not found there either so we just dont have it
|
|
||||||
return Ok(None);
|
|
||||||
};
|
|
||||||
|
|
||||||
// find len..
|
|
||||||
let len = f.seek(SeekFrom::End(0)).await?;
|
|
||||||
f.rewind().await?;
|
|
||||||
|
|
||||||
// only take wanted # of bytes for read
|
|
||||||
let mut f = f.take(SAMPLE_WANTED_BYTES as u64);
|
|
||||||
|
|
||||||
// try to read
|
|
||||||
let mut data = Vec::with_capacity(SAMPLE_WANTED_BYTES);
|
|
||||||
f.read_to_end(&mut data).await?;
|
|
||||||
let data = Bytes::from(data);
|
|
||||||
|
|
||||||
(data, len)
|
|
||||||
};
|
|
||||||
|
|
||||||
// calculate hash
|
|
||||||
Ok(Some(calculate_hash(len, data_sample)))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Generate a new saved name for an upload.
|
/// Generate a new saved name for an upload.
|
||||||
///
|
///
|
||||||
/// If it picks a name that already exists, it will try again.
|
/// If it picks a name that already exists, it will try again.
|
||||||
pub async fn gen_saved_name(&self, ext: Option<String>) -> String {
|
pub async fn gen_saved_name(&self, ext: Option<String>) -> String {
|
||||||
loop {
|
loop {
|
||||||
// generate a 6-character alphanumeric string
|
// generate a 6-character alphanumeric string
|
||||||
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::rng(), 6);
|
let mut saved_name: String = Alphanumeric.sample_string(&mut rand::thread_rng(), 6);
|
||||||
|
|
||||||
// if we have an extension, add it now
|
// if we have an extension, add it now
|
||||||
if let Some(ref ext) = ext {
|
if let Some(ref ext) = ext {
|
||||||
|
|
@ -379,18 +253,25 @@ impl Engine {
|
||||||
|
|
||||||
if !self.has(&saved_name).await {
|
if !self.has(&saved_name).await {
|
||||||
break saved_name;
|
break saved_name;
|
||||||
|
} else {
|
||||||
|
// there was a name collision. loop and try again
|
||||||
|
info!("name collision! saved_name= {}", saved_name);
|
||||||
}
|
}
|
||||||
|
|
||||||
// there was a name collision. loop and try again
|
|
||||||
info!("name collision! saved_name= {}", saved_name);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Wipe out an upload from all storage.
|
/// Wipe out an upload from all storage.
|
||||||
///
|
///
|
||||||
/// (Intended for deletion URLs and failed uploads)
|
/// This is for deleting failed uploads only!!
|
||||||
pub async fn remove(&self, saved_name: &str) -> eyre::Result<()> {
|
pub async fn remove(&self, saved_name: &str) -> anyhow::Result<()> {
|
||||||
remove(&self.cache, &self.disk, saved_name).await
|
info!("!! removing upload: {saved_name}");
|
||||||
|
|
||||||
|
self.cache.remove(saved_name);
|
||||||
|
self.disk.remove(saved_name).await?;
|
||||||
|
|
||||||
|
info!("!! successfully removed upload");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Save a file to disk, and optionally cache.
|
/// Save a file to disk, and optionally cache.
|
||||||
|
|
@ -404,7 +285,7 @@ impl Engine {
|
||||||
mut stream: BodyDataStream,
|
mut stream: BodyDataStream,
|
||||||
lifetime: Option<Duration>,
|
lifetime: Option<Duration>,
|
||||||
keep_exif: bool,
|
keep_exif: bool,
|
||||||
) -> eyre::Result<(Bytes, u64)> {
|
) -> anyhow::Result<()> {
|
||||||
// if we're using cache, make some space to store the upload in
|
// if we're using cache, make some space to store the upload in
|
||||||
let mut data = if use_cache {
|
let mut data = if use_cache {
|
||||||
BytesMut::with_capacity(provided_len.try_into()?)
|
BytesMut::with_capacity(provided_len.try_into()?)
|
||||||
|
|
@ -414,24 +295,12 @@ impl Engine {
|
||||||
|
|
||||||
// don't begin a disk save if we're using temporary lifetimes
|
// don't begin a disk save if we're using temporary lifetimes
|
||||||
let tx = if lifetime.is_none() {
|
let tx = if lifetime.is_none() {
|
||||||
Some(self.disk.start_save(saved_name, {
|
Some(self.disk.start_save(saved_name).await)
|
||||||
let cache = self.cache.clone();
|
|
||||||
let disk = self.disk.clone();
|
|
||||||
let saved_name = saved_name.to_string();
|
|
||||||
|
|
||||||
async move |err| {
|
|
||||||
// try to delete the failed upload
|
|
||||||
error!(%saved_name, %err, "error while saving file to disk");
|
|
||||||
if let Err(err) = remove(&cache, &disk, &saved_name).await {
|
|
||||||
error!(%saved_name, %err, "IO error callback failed to remove upload");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}))
|
|
||||||
} else {
|
} else {
|
||||||
None
|
None
|
||||||
};
|
};
|
||||||
|
|
||||||
// whether or not we are going to coalesce the data
|
// whether or not we're gonna coalesce the data
|
||||||
// in order to strip the exif data at the end,
|
// in order to strip the exif data at the end,
|
||||||
// instead of just sending it off to the i/o task
|
// instead of just sending it off to the i/o task
|
||||||
let coalesce_and_strip = use_cache
|
let coalesce_and_strip = use_cache
|
||||||
|
|
@ -444,11 +313,6 @@ impl Engine {
|
||||||
&& !keep_exif
|
&& !keep_exif
|
||||||
&& provided_len <= self.cfg.max_strip_len;
|
&& provided_len <= self.cfg.max_strip_len;
|
||||||
|
|
||||||
// buffer of sampled data for the deletion hash
|
|
||||||
let mut hash_sample = BytesMut::with_capacity(SAMPLE_WANTED_BYTES);
|
|
||||||
// actual number of bytes processed
|
|
||||||
let mut observed_len = 0;
|
|
||||||
|
|
||||||
// read and save upload
|
// read and save upload
|
||||||
while let Some(chunk) = stream.next().await {
|
while let Some(chunk) = stream.next().await {
|
||||||
// if we error on a chunk, fail out
|
// if we error on a chunk, fail out
|
||||||
|
|
@ -456,30 +320,18 @@ impl Engine {
|
||||||
|
|
||||||
// if we have an i/o task, send it off
|
// if we have an i/o task, send it off
|
||||||
// also cloning this is okay because it's a Bytes
|
// also cloning this is okay because it's a Bytes
|
||||||
if !coalesce_and_strip && let Some(ref tx) = tx {
|
if !coalesce_and_strip {
|
||||||
debug!("sending chunk to i/o task");
|
if let Some(ref tx) = tx {
|
||||||
tx.send(chunk.clone())
|
debug!("sending chunk to i/o task");
|
||||||
.await
|
tx.send(chunk.clone())?;
|
||||||
.wrap_err("failed to send chunk to i/o task!")?;
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to sample if we need to
|
|
||||||
let wanted = SAMPLE_WANTED_BYTES - hash_sample.len();
|
|
||||||
if wanted != 0 {
|
|
||||||
// take as many bytes as we can ...
|
|
||||||
let taking = chunk.len().min(wanted);
|
|
||||||
hash_sample.extend_from_slice(&chunk[0..taking]);
|
|
||||||
}
|
|
||||||
// record new len
|
|
||||||
observed_len += chunk.len() as u64;
|
|
||||||
|
|
||||||
if use_cache {
|
if use_cache {
|
||||||
debug!("receiving data into buffer");
|
debug!("receiving data into buffer");
|
||||||
|
|
||||||
if data.len() + chunk.len() > data.capacity() {
|
if data.len() + chunk.len() > data.capacity() {
|
||||||
info!(
|
info!("the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload.");
|
||||||
"the amount of data sent exceeds the content-length provided by the client! caching will be cancelled for this upload."
|
|
||||||
);
|
|
||||||
|
|
||||||
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
// if we receive too much data, drop the buffer and stop using cache (it is still okay to use disk, probably)
|
||||||
data = BytesMut::new();
|
data = BytesMut::new();
|
||||||
|
|
@ -513,9 +365,7 @@ impl Engine {
|
||||||
// send what we did over to the i/o task, all in one chunk
|
// send what we did over to the i/o task, all in one chunk
|
||||||
if let Some(ref tx) = tx {
|
if let Some(ref tx) = tx {
|
||||||
debug!("sending filled buffer to i/o task");
|
debug!("sending filled buffer to i/o task");
|
||||||
tx.send(data.clone())
|
tx.send(data.clone())?;
|
||||||
.await
|
|
||||||
.wrap_err("failed to send coalesced buffer to i/o task!")?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
data
|
data
|
||||||
|
|
@ -534,8 +384,7 @@ impl Engine {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// return w/ info for hash calculation
|
Ok(())
|
||||||
Ok((hash_sample.freeze(), observed_len))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn process(
|
pub async fn process(
|
||||||
|
|
@ -545,7 +394,7 @@ impl Engine {
|
||||||
stream: BodyDataStream,
|
stream: BodyDataStream,
|
||||||
lifetime: Option<Duration>,
|
lifetime: Option<Duration>,
|
||||||
keep_exif: bool,
|
keep_exif: bool,
|
||||||
) -> eyre::Result<ProcessOutcome> {
|
) -> anyhow::Result<ProcessOutcome> {
|
||||||
// if the upload size is greater than our max file size, deny it now
|
// if the upload size is greater than our max file size, deny it now
|
||||||
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
if self.cfg.max_upload_len.is_some_and(|l| provided_len > l) {
|
||||||
return Ok(ProcessOutcome::UploadTooLarge);
|
return Ok(ProcessOutcome::UploadTooLarge);
|
||||||
|
|
@ -579,47 +428,22 @@ impl Engine {
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
// handle result
|
// If anything fails, delete the upload and return the error
|
||||||
let (hash_sample, len) = match save_result {
|
if save_result.is_err() {
|
||||||
// Okay so just extract metadata
|
error!("failed processing upload!");
|
||||||
Ok(m) => m,
|
|
||||||
// If anything fails, delete the upload and return the error
|
|
||||||
Err(err) => {
|
|
||||||
error!(?err, "failed processing upload!");
|
|
||||||
|
|
||||||
self.remove(&saved_name).await?;
|
self.remove(&saved_name).await?;
|
||||||
return Err(err);
|
save_result?;
|
||||||
}
|
}
|
||||||
};
|
|
||||||
|
|
||||||
// if deletion urls are enabled, create one
|
|
||||||
let deletion_url = self.deletion_hmac.clone().map(|mut hmac| {
|
|
||||||
// calculate hash of file metadata
|
|
||||||
let hash = calculate_hash(len, hash_sample);
|
|
||||||
let mut hash_bytes = BytesMut::new();
|
|
||||||
hash_bytes.put_u128(hash);
|
|
||||||
let hash_b64 = BASE64_URL_SAFE_NO_PAD.encode(&hash_bytes);
|
|
||||||
|
|
||||||
// take hmac
|
|
||||||
update_hmac(&mut hmac, &saved_name, hash);
|
|
||||||
let out = hmac.finalize().into_bytes();
|
|
||||||
let out_b64 = BASE64_URL_SAFE_NO_PAD.encode(out);
|
|
||||||
|
|
||||||
// format deletion url
|
|
||||||
format!(
|
|
||||||
"{}/del?name={saved_name}&hash={hash_b64}&hmac={out_b64}",
|
|
||||||
self.cfg.base_url
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
// format and send back the url
|
// format and send back the url
|
||||||
let url = format!("{}/p/{saved_name}", self.cfg.base_url);
|
let url = format!("{}/p/{}", self.cfg.base_url, saved_name);
|
||||||
|
|
||||||
// if all goes well, increment the cached upload counter
|
// if all goes well, increment the cached upload counter
|
||||||
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
self.upl_count.fetch_add(1, Ordering::Relaxed);
|
||||||
|
|
||||||
info!("finished processing upload!");
|
info!("finished processing upload!");
|
||||||
|
|
||||||
Ok(ProcessOutcome::Success { url, deletion_url })
|
Ok(ProcessOutcome::Success(url))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,11 +1,9 @@
|
||||||
use std::sync::{Arc, atomic::Ordering};
|
use std::sync::{atomic::Ordering, Arc};
|
||||||
|
|
||||||
use axum::extract::State;
|
use axum::extract::State;
|
||||||
|
|
||||||
use crate::engine::Engine;
|
|
||||||
|
|
||||||
/// Show index status page with amount of uploaded files
|
/// Show index status page with amount of uploaded files
|
||||||
pub async fn index(State(engine): State<Arc<Engine>>) -> String {
|
pub async fn index(State(engine): State<Arc<crate::engine::Engine>>) -> String {
|
||||||
let count = engine.upl_count.load(Ordering::Relaxed);
|
let count = engine.upl_count.load(Ordering::Relaxed);
|
||||||
|
|
||||||
let motd = engine.cfg.motd.clone();
|
let motd = engine.cfg.motd.clone();
|
||||||
|
|
@ -14,7 +12,6 @@ pub async fn index(State(engine): State<Arc<Engine>>) -> String {
|
||||||
.replace("%uplcount%", &count.to_string())
|
.replace("%uplcount%", &count.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[rustfmt::skip]
|
|
||||||
pub async fn robots_txt() -> &'static str {
|
pub async fn robots_txt() -> &'static str {
|
||||||
/// robots.txt that tells web crawlers not to list uploads
|
/// robots.txt that tells web crawlers not to list uploads
|
||||||
const ROBOTS_TXT: &str = concat!(
|
const ROBOTS_TXT: &str = concat!(
|
||||||
|
|
|
||||||
65
src/main.rs
65
src/main.rs
|
|
@ -1,19 +1,17 @@
|
||||||
use std::{path::PathBuf, sync::Arc};
|
use std::{path::PathBuf, sync::Arc};
|
||||||
|
|
||||||
use argh::FromArgs;
|
use argh::FromArgs;
|
||||||
use color_eyre::eyre::{self, Context, bail};
|
|
||||||
use engine::Engine;
|
use engine::Engine;
|
||||||
|
|
||||||
use axum::{
|
use axum::{
|
||||||
Router,
|
|
||||||
routing::{get, post},
|
routing::{get, post},
|
||||||
|
Router,
|
||||||
};
|
};
|
||||||
use tokio::{fs, net::TcpListener, signal};
|
use tokio::{fs, net::TcpListener, signal};
|
||||||
use tracing::{info, warn};
|
use tracing::{info, warn};
|
||||||
|
|
||||||
mod cache;
|
mod cache;
|
||||||
mod config;
|
mod config;
|
||||||
mod delete;
|
|
||||||
mod disk;
|
mod disk;
|
||||||
mod engine;
|
mod engine;
|
||||||
mod index;
|
mod index;
|
||||||
|
|
@ -23,8 +21,6 @@ mod view;
|
||||||
#[cfg(not(target_env = "msvc"))]
|
#[cfg(not(target_env = "msvc"))]
|
||||||
use tikv_jemallocator::Jemalloc;
|
use tikv_jemallocator::Jemalloc;
|
||||||
|
|
||||||
use crate::{cache::Cache, disk::Disk};
|
|
||||||
|
|
||||||
#[cfg(not(target_env = "msvc"))]
|
#[cfg(not(target_env = "msvc"))]
|
||||||
#[global_allocator]
|
#[global_allocator]
|
||||||
static GLOBAL: Jemalloc = Jemalloc;
|
static GLOBAL: Jemalloc = Jemalloc;
|
||||||
|
|
@ -37,34 +33,20 @@ struct Args {
|
||||||
config: PathBuf,
|
config: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Instantiates router.
|
|
||||||
fn router(engine: Engine) -> Router {
|
|
||||||
Router::new()
|
|
||||||
.route("/new", post(new::new))
|
|
||||||
.route("/p/{saved_name}", get(view::view))
|
|
||||||
.route("/del", get(delete::delete))
|
|
||||||
.route("/", get(index::index))
|
|
||||||
.route("/robots.txt", get(index::robots_txt))
|
|
||||||
.with_state(Arc::new(engine))
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> eyre::Result<()> {
|
async fn main() {
|
||||||
// Install color-eyre
|
|
||||||
color_eyre::install()?;
|
|
||||||
|
|
||||||
// Read & parse args
|
// Read & parse args
|
||||||
let args: Args = argh::from_env();
|
let args: Args = argh::from_env();
|
||||||
|
|
||||||
// Read & parse config
|
// Read & parse config
|
||||||
let cfg: config::Config = {
|
let cfg: config::Config = {
|
||||||
let config_str = fs::read_to_string(args.config).await.wrap_err(
|
let config_str = fs::read_to_string(args.config).await.expect(
|
||||||
"failed to read config file! make sure it exists and you have read permissions",
|
"failed to read config file! make sure it exists and you have read permissions",
|
||||||
)?;
|
);
|
||||||
|
|
||||||
toml::from_str(&config_str).wrap_err(
|
toml::from_str(&config_str).unwrap_or_else(|e| {
|
||||||
"invalid config! ensure proper fields and structure. reference config is in readme",
|
panic!("invalid config! ensure proper fields and structure. reference config is in readme.\n{e}");
|
||||||
)?
|
})
|
||||||
};
|
};
|
||||||
|
|
||||||
// Set up tracing
|
// Set up tracing
|
||||||
|
|
@ -74,42 +56,35 @@ async fn main() -> eyre::Result<()> {
|
||||||
|
|
||||||
// Check config
|
// Check config
|
||||||
{
|
{
|
||||||
let save_path = cfg.disk.save_path.clone();
|
let save_path = cfg.engine.disk.save_path.clone();
|
||||||
if !save_path.exists() || !save_path.is_dir() {
|
if !save_path.exists() || !save_path.is_dir() {
|
||||||
bail!("the save path does not exist or is not a directory! this is invalid");
|
panic!("the save path does not exist or is not a directory! this is invalid");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if cfg.engine.upload_key.is_empty() {
|
if cfg.engine.upload_key.is_empty() {
|
||||||
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
warn!("engine upload_key is empty! no key will be required for uploading new files");
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create backends
|
|
||||||
let cache = Arc::new(Cache::with_config(cfg.cache)?);
|
|
||||||
let disk = Disk::with_config(cfg.disk);
|
|
||||||
|
|
||||||
// Start cache scanner
|
|
||||||
tokio::spawn({
|
|
||||||
let cache = cache.clone();
|
|
||||||
async move { cache.scanner().await }
|
|
||||||
});
|
|
||||||
|
|
||||||
// Create engine
|
// Create engine
|
||||||
let engine = Engine::new(cfg.engine, cache, disk)?;
|
let engine = Engine::with_config(cfg.engine);
|
||||||
|
|
||||||
// Build main router
|
// Build main router
|
||||||
let app = router(engine);
|
let app = Router::new()
|
||||||
|
.route("/new", post(new::new))
|
||||||
|
.route("/p/{saved_name}", get(view::view))
|
||||||
|
.route("/", get(index::index))
|
||||||
|
.route("/robots.txt", get(index::robots_txt))
|
||||||
|
.with_state(Arc::new(engine));
|
||||||
|
|
||||||
// Start web server
|
// Start web server
|
||||||
info!("starting server.");
|
info!("starting server.");
|
||||||
let listener = TcpListener::bind(&cfg.http.listen_on)
|
let listener = TcpListener::bind(&cfg.http.listen_on)
|
||||||
.await
|
.await
|
||||||
.wrap_err("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound")?;
|
.expect("failed to bind to given `http.listen_on` address! make sure it's valid, and the port isn't already bound");
|
||||||
axum::serve(listener, app)
|
axum::serve(listener, app)
|
||||||
.with_graceful_shutdown(shutdown_signal())
|
.with_graceful_shutdown(shutdown_signal())
|
||||||
.await
|
.await
|
||||||
.wrap_err("failed to start server")?;
|
.expect("failed to start server");
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
async fn shutdown_signal() {
|
||||||
|
|
@ -131,8 +106,8 @@ async fn shutdown_signal() {
|
||||||
let terminate = std::future::pending::<()>();
|
let terminate = std::future::pending::<()>();
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
() = ctrl_c => {},
|
_ = ctrl_c => {},
|
||||||
() = terminate => {},
|
_ = terminate => {},
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("shutting down!");
|
info!("shutting down!");
|
||||||
|
|
|
||||||
34
src/new.rs
34
src/new.rs
|
|
@ -8,16 +8,14 @@ use std::{
|
||||||
use axum::{
|
use axum::{
|
||||||
body::Body,
|
body::Body,
|
||||||
extract::{Query, State},
|
extract::{Query, State},
|
||||||
response::{IntoResponse as _, Response},
|
|
||||||
};
|
};
|
||||||
use axum_extra::TypedHeader;
|
use axum_extra::TypedHeader;
|
||||||
use headers::ContentLength;
|
use headers::ContentLength;
|
||||||
use http::{HeaderValue, StatusCode};
|
use http::StatusCode;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use serde_with::{DurationSeconds, serde_as};
|
use serde_with::{serde_as, DurationSeconds};
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use crate::engine::{Engine, ProcessOutcome};
|
use crate::engine::ProcessOutcome;
|
||||||
|
|
||||||
fn default_keep_exif() -> bool {
|
fn default_keep_exif() -> bool {
|
||||||
false
|
false
|
||||||
|
|
@ -40,11 +38,11 @@ pub struct NewRequest {
|
||||||
/// The request handler for the /new path.
|
/// The request handler for the /new path.
|
||||||
/// This handles all new uploads.
|
/// This handles all new uploads.
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
State(engine): State<Arc<Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Query(req): Query<NewRequest>,
|
Query(req): Query<NewRequest>,
|
||||||
TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
|
TypedHeader(ContentLength(content_length)): TypedHeader<ContentLength>,
|
||||||
body: Body,
|
body: Body,
|
||||||
) -> Result<Response, StatusCode> {
|
) -> Result<String, StatusCode> {
|
||||||
// check upload key, if i need to
|
// check upload key, if i need to
|
||||||
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
if !engine.cfg.upload_key.is_empty() && req.key.unwrap_or_default() != engine.cfg.upload_key {
|
||||||
return Err(StatusCode::FORBIDDEN);
|
return Err(StatusCode::FORBIDDEN);
|
||||||
|
|
@ -92,7 +90,7 @@ pub async fn new(
|
||||||
|
|
||||||
// pass it off to the engine to be processed
|
// pass it off to the engine to be processed
|
||||||
// --
|
// --
|
||||||
// also, error responses here don't get presented properly in ShareX most of the time
|
// also, error responses here don't get represented properly in ShareX most of the time
|
||||||
// they don't expect the connection to close before they're done uploading, i think
|
// they don't expect the connection to close before they're done uploading, i think
|
||||||
// so it will just present the user with a "connection closed" error
|
// so it will just present the user with a "connection closed" error
|
||||||
match engine
|
match engine
|
||||||
|
|
@ -101,20 +99,7 @@ pub async fn new(
|
||||||
{
|
{
|
||||||
Ok(outcome) => match outcome {
|
Ok(outcome) => match outcome {
|
||||||
// 200 OK
|
// 200 OK
|
||||||
ProcessOutcome::Success { url, deletion_url } => {
|
ProcessOutcome::Success(url) => Ok(url),
|
||||||
let mut res = url.into_response();
|
|
||||||
|
|
||||||
// insert deletion url header if needed
|
|
||||||
if let Some(deletion_url) = deletion_url {
|
|
||||||
let deletion_url = HeaderValue::from_str(&deletion_url)
|
|
||||||
.expect("deletion url contains invalid chars");
|
|
||||||
|
|
||||||
let headers = res.headers_mut();
|
|
||||||
headers.insert("Breeze-Deletion-Url", deletion_url);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(res)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 413 Payload Too Large
|
// 413 Payload Too Large
|
||||||
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
ProcessOutcome::UploadTooLarge | ProcessOutcome::TemporaryUploadTooLarge => {
|
||||||
|
|
@ -126,9 +111,6 @@ pub async fn new(
|
||||||
},
|
},
|
||||||
|
|
||||||
// 500 Internal Server Error
|
// 500 Internal Server Error
|
||||||
Err(err) => {
|
Err(_) => Err(StatusCode::INTERNAL_SERVER_ERROR),
|
||||||
error!("failed to process upload!! {err:#}");
|
|
||||||
Err(StatusCode::INTERNAL_SERVER_ERROR)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
25
src/view.rs
25
src/view.rs
|
|
@ -10,9 +10,8 @@ use axum_extra::TypedHeader;
|
||||||
use headers::Range;
|
use headers::Range;
|
||||||
use http::{HeaderValue, StatusCode};
|
use http::{HeaderValue, StatusCode};
|
||||||
use tokio_util::io::ReaderStream;
|
use tokio_util::io::ReaderStream;
|
||||||
use tracing::error;
|
|
||||||
|
|
||||||
use crate::engine::{Engine, GetOutcome, UploadData, UploadResponse};
|
use crate::engine::{GetOutcome, UploadData, UploadResponse};
|
||||||
|
|
||||||
/// Responses for a failed view operation
|
/// Responses for a failed view operation
|
||||||
pub enum ViewError {
|
pub enum ViewError {
|
||||||
|
|
@ -22,7 +21,7 @@ pub enum ViewError {
|
||||||
/// Will send status code 500 with a plaintext "internal server error" message.
|
/// Will send status code 500 with a plaintext "internal server error" message.
|
||||||
InternalServerError,
|
InternalServerError,
|
||||||
|
|
||||||
/// Sends status code 416 with a plaintext "range not satisfiable" message.
|
/// Sends status code 206 with a plaintext "range not satisfiable" message.
|
||||||
RangeNotSatisfiable,
|
RangeNotSatisfiable,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -45,7 +44,7 @@ impl IntoResponse for ViewError {
|
||||||
impl IntoResponse for UploadResponse {
|
impl IntoResponse for UploadResponse {
|
||||||
fn into_response(self) -> Response {
|
fn into_response(self) -> Response {
|
||||||
let (start, end) = self.range;
|
let (start, end) = self.range;
|
||||||
let range_len = end - start;
|
let range_len = (end - start) + 1;
|
||||||
|
|
||||||
let mut res = match self.data {
|
let mut res = match self.data {
|
||||||
UploadData::Cache(data) => data.into_response(),
|
UploadData::Cache(data) => data.into_response(),
|
||||||
|
|
@ -88,27 +87,23 @@ impl IntoResponse for UploadResponse {
|
||||||
/// GET request handler for /p/* path.
|
/// GET request handler for /p/* path.
|
||||||
/// All file views are handled here.
|
/// All file views are handled here.
|
||||||
pub async fn view(
|
pub async fn view(
|
||||||
State(engine): State<Arc<Engine>>,
|
State(engine): State<Arc<crate::engine::Engine>>,
|
||||||
Path(original_path): Path<PathBuf>,
|
Path(original_path): Path<PathBuf>,
|
||||||
range: Option<TypedHeader<Range>>,
|
range: Option<TypedHeader<Range>>,
|
||||||
) -> Result<UploadResponse, ViewError> {
|
) -> Result<UploadResponse, ViewError> {
|
||||||
// try to extract the file name (if it's the only component)
|
let saved_name = if let Some(Some(n)) = original_path.file_name().map(OsStr::to_str) {
|
||||||
// this makes paths like `asdf%2fabcdef.png` invalid
|
n
|
||||||
let saved_name = match original_path.file_name().map(OsStr::to_str) {
|
} else {
|
||||||
Some(Some(n)) if original_path.components().count() == 1 => n,
|
return Err(ViewError::NotFound);
|
||||||
_ => return Err(ViewError::NotFound),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let range = range.map(|TypedHeader(range)| range);
|
let range = range.map(|th| th.0);
|
||||||
|
|
||||||
// get result from the engine
|
// get result from the engine
|
||||||
match engine.get(saved_name, range).await {
|
match engine.get(saved_name, range).await {
|
||||||
Ok(GetOutcome::Success(res)) => Ok(res),
|
Ok(GetOutcome::Success(res)) => Ok(res),
|
||||||
Ok(GetOutcome::NotFound) => Err(ViewError::NotFound),
|
Ok(GetOutcome::NotFound) => Err(ViewError::NotFound),
|
||||||
Ok(GetOutcome::RangeNotSatisfiable) => Err(ViewError::RangeNotSatisfiable),
|
Ok(GetOutcome::RangeNotSatisfiable) => Err(ViewError::RangeNotSatisfiable),
|
||||||
Err(err) => {
|
Err(_) => Err(ViewError::InternalServerError),
|
||||||
error!("failed to get upload!! {err:#}");
|
|
||||||
Err(ViewError::InternalServerError)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue