feature (tus): chunked upload via TUS

instead of inventing a new protocol for chunked upload that can be
resumed, we might as well use something that already exists like TUS.

As such we removed our custom implementation to favor that standard
This commit is contained in:
MickaelK 2024-11-29 15:02:59 +11:00
parent c556a404f7
commit ac9d1a4980
3 changed files with 154 additions and 77 deletions

View file

@ -1,4 +1,5 @@
import { createElement, createFragment, createRender } from "../../lib/skeleton/index.js";
import { toHref } from "../../lib/skeleton/router.js";
import rxjs, { effect, onClick } from "../../lib/rx.js";
import { forwardURLParams } from "../../lib/path.js";
import { animate, slideYOut } from "../../lib/animate.js";
@ -291,7 +292,6 @@ function workerImplFile({ error, progress, speed }) {
constructor() {
super();
this.xhr = null;
this.prevProgress = [];
}
/**
@ -303,58 +303,75 @@ function workerImplFile({ error, progress, speed }) {
/**
* @override
* TODO: retry logic on failed upload would be nice
*/
async run({ file, path, virtual }) {
const _file = await file();
const chunkSize = (window.CONFIG["upload_chunk_size"] || 0) *1024*1024;
const numberOfChunks = Math.ceil(_file.size / chunkSize);
// Case1: basic upload
if (chunkSize === 0 || numberOfChunks === 0 || numberOfChunks === 1) {
await this._execute({ file: _file, path, virtual, chunk: null, progress });
try {
await this._http(toHref(`/api/files/cat?path=${path}`), {
method: "POST",
body: _file,
progress,
});
virtual.afterSuccess();
} catch (err) {
error(err);
virtual.afterError();
}
return;
}
for (let i=0; i<numberOfChunks; i++) {
const offset = chunkSize * i;
const chunk = numberOfChunks - i - 1;
await this._execute({
file: _file.slice(offset, offset+chunkSize),
virtual: {
before: () => {
if (i === 0) virtual.before();
},
afterSuccess: () => {
if (i === numberOfChunks - 1) virtual.afterSuccess();
},
afterError: () => virtual.afterError(),
},
progress: (p) => {
const chunksAlreadyDownloaded = i * chunkSize;
const currentChunkDownloaded = p / 100 * (
i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize
);
progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size));
},
chunk,
path,
// Case2: chunked upload => TUS: https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html
try {
let resp = await this._http(toHref(`/api/files/cat?path=${path}&proto=tus`), {
method: "POST",
headers: { "Upload-Length": _file.size },
progress: (n) => progress(n),
});
this.prevProgress = [];
const url = resp.headers.location;
if (!url.startsWith(toHref("/api/files/cat?"))) {
throw new Error("Internal Error");
return
}
for (let i=0; i<numberOfChunks; i++) {
const offset = chunkSize * i;
const chunk = numberOfChunks - i - 1;
resp = await this._http(url, {
method: "PATCH",
headers: { "Upload-Offset": offset },
body: _file.slice(offset, offset + chunkSize),
progress: (p) => {
const chunksAlreadyDownloaded = i * chunkSize;
const currentChunkDownloaded = p / 100 * (
i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize
);
progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size));
},
});
}
virtual.afterSuccess();
} catch (err) {
error(err);
virtual.afterError();
if (err !== ABORT_ERROR) throw new Error(err);
}
}
_execute({ file, path, virtual, chunk, progress }) {
_http(url, { method, headers, body, progress }) {
const xhr = new XMLHttpRequest();
this.prevProgress = [];
this.xhr = xhr;
return new Promise((resolve, reject) => {
xhr.open(
"POST",
forwardURLParams(
"api/files/cat?path=" + encodeURIComponent(path) +
(chunk === null ? "" : `&chunk=${chunk}`),
["share"],
),
);
xhr.withCredentials = true;
xhr.open(method, forwardURLParams(url, ["share"]));
xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest");
xhr.withCredentials = true;
for (let key in headers) {
xhr.setRequestHeader(key, headers[key]);
}
xhr.upload.onprogress = (e) => {
if (!e.lengthComputable) return;
const percent = Math.floor(100 * e.loaded / e.total);
@ -379,28 +396,30 @@ function workerImplFile({ error, progress, speed }) {
this.prevProgress.shift();
}
};
xhr.upload.onabort = () => {
reject(ABORT_ERROR);
error(ABORT_ERROR);
virtual.afterError();
};
xhr.upload.onabort = () => reject(ABORT_ERROR);
xhr.onload = () => {
progress(100);
if (xhr.status !== 200) {
virtual.afterError();
if ([200, 201, 204].indexOf(xhr.status) === -1) {
reject(new Error(xhr.statusText));
return;
}
virtual.afterSuccess();
resolve(null);
progress(100);
resolve({
status: xhr.status,
headers: xhr.getAllResponseHeaders()
.split("\r\n")
.reduce((acc, el) => {
const tmp = el.split(": "); acc[tmp[0]] = tmp[1]
return acc;
}, {})
});
};
xhr.onerror = function(e) {
xhr.onerror = (e) => {
reject(new AjaxError("failed", e, "FAILED"));
virtual.afterError();
};
xhr.send(file);
xhr.send(body);
});
}
}();
}

View file

@ -441,8 +441,8 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) {
// There is 2 ways to save something:
// - case1: regular upload, we just insert the file in the pipe
chunk := req.URL.Query().Get("chunk")
if chunk == "" {
proto := req.URL.Query().Get("proto")
if proto == "" && req.Method == http.MethodPost {
err = ctx.Backend.Save(path, req.Body)
req.Body.Close()
if err != nil {
@ -453,39 +453,83 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) {
SendSuccessResult(res, nil)
return
}
// - case2: chunked upload. In this scenario, the frontend send the file in chunks, the
// only assumption being that upload is complete when the "chunk" param is "0"
n, err := strconv.Atoi(chunk)
if err != nil {
SendErrorResult(res, NewError(err.Error(), 403))
}
ctx.Session["path"] = path
res.Header().Set("Connection", "Close")
var uploader *chunkedUpload
if c := chunkedUploadCache.Get(ctx.Session); c == nil {
uploader = createChunkedUploader(ctx.Backend.Save, path)
// - case2: chunked upload which implements the TUS protocol:
// https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html
h := res.Header()
ctx.Session["path"] = path
if proto == "tus" && req.Method == http.MethodPost {
if c := chunkedUploadCache.Get(ctx.Session); c != nil {
chunkedUploadCache.Del(ctx.Session)
}
size, err := strconv.ParseUint(req.Header.Get("Upload-Length"), 10, 0)
if err != nil {
SendErrorResult(res, ErrNotValid)
return
}
uploader := createChunkedUploader(ctx.Backend.Save, path, size)
chunkedUploadCache.Set(ctx.Session, uploader)
} else {
uploader = c.(*chunkedUpload)
}
if _, err := uploader.Next(req.Body); err != nil {
SendErrorResult(res, NewError(err.Error(), 403))
h.Set("Content-Length", "0")
h.Set("Location", req.URL.String())
res.WriteHeader(http.StatusCreated)
return
}
if n == 0 {
if err = uploader.Close(); err != nil {
if proto == "tus" && req.Method == http.MethodHead {
c := chunkedUploadCache.Get(ctx.Session)
if c == nil {
SendErrorResult(res, ErrNotFound)
return
}
offset, length := c.(*chunkedUpload).Meta()
h.Set("Upload-Offset", fmt.Sprintf("%d", offset))
h.Set("Upload-Length", fmt.Sprintf("%d", length))
res.WriteHeader(http.StatusOK)
return
}
if proto == "tus" && req.Method == http.MethodPatch {
requestOffset, err := strconv.ParseUint(req.Header.Get("Upload-Offset"), 10, 0)
if err != nil {
SendErrorResult(res, ErrNotValid)
return
}
c := chunkedUploadCache.Get(ctx.Session)
if c == nil {
SendErrorResult(res, ErrNotFound)
return
}
uploader := c.(*chunkedUpload)
initialOffset, totalSize := uploader.Meta()
if initialOffset != requestOffset {
SendErrorResult(res, ErrNotValid)
return
}
if err := uploader.Next(req.Body); err != nil {
SendErrorResult(res, NewError(err.Error(), 403))
return
}
chunkedUploadCache.Del(ctx.Session)
SendSuccessResult(res, nil)
newOffset, _ := uploader.Meta()
if newOffset > totalSize {
uploader.Close()
chunkedUploadCache.Del(ctx.Session)
Log.Warning("ctrl::files::tus error=assert_offset size=%d old_offset=%d new_offset=%d", totalSize, initialOffset, newOffset)
SendErrorResult(res, NewError("aborted - offset larger than total size", 403))
return
} else if newOffset == totalSize {
if err := uploader.Close(); err != nil {
SendErrorResult(res, ErrNotValid)
return
}
chunkedUploadCache.Del(ctx.Session)
}
h.Set("Connection", "Close")
h.Set("Upload-Offset", fmt.Sprintf("%d", newOffset))
res.WriteHeader(http.StatusNoContent)
return
}
SendSuccessResult(res, nil)
SendErrorResult(res, ErrNotImplemented)
}
func createChunkedUploader(save func(path string, file io.Reader) error, path string) *chunkedUpload {
func createChunkedUploader(save func(path string, file io.Reader) error, path string, size uint64) *chunkedUpload {
r, w := io.Pipe()
done := make(chan error, 1)
go func() {
@ -495,6 +539,8 @@ func createChunkedUploader(save func(path string, file io.Reader) error, path st
fn: save,
stream: w,
done: done,
offset: 0,
size: size,
}
}
@ -516,14 +562,20 @@ func initChunkedUploader() {
type chunkedUpload struct {
fn func(path string, file io.Reader) error
stream *io.PipeWriter
offset uint64
size uint64
done chan error
once sync.Once
mu sync.Mutex
}
func (this *chunkedUpload) Next(body io.ReadCloser) (int64, error) {
func (this *chunkedUpload) Next(body io.ReadCloser) error {
n, err := io.Copy(this.stream, body)
body.Close()
return n, err
this.mu.Lock()
this.offset += uint64(n)
this.mu.Unlock()
return err
}
func (this *chunkedUpload) Close() error {
@ -535,6 +587,12 @@ func (this *chunkedUpload) Close() error {
return err
}
func (this *chunkedUpload) Meta() (uint64, uint64) {
this.mu.Lock()
defer this.mu.Unlock()
return this.offset, this.size
}
func FileMv(ctx *App, res http.ResponseWriter, req *http.Request) {
if model.CanEdit(ctx) == false {
Log.Debug("mv::permission 'permission denied'")

View file

@ -56,7 +56,7 @@ func Build(a App) *mux.Router {
files.HandleFunc("/unzip", NewMiddlewareChain(FileExtract, middlewares, a)).Methods("POST")
middlewares = []Middleware{ApiHeaders, SecureHeaders, SecureOrigin, WithPublicAPI, SessionStart, LoggedInOnly, PluginInjector}
files.HandleFunc("/cat", NewMiddlewareChain(FileAccess, middlewares, a)).Methods("OPTIONS")
files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST")
files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST", "PATCH")
files.HandleFunc("/ls", NewMiddlewareChain(FileLs, middlewares, a)).Methods("GET")
files.HandleFunc("/mv", NewMiddlewareChain(FileMv, middlewares, a)).Methods("POST")
files.HandleFunc("/rm", NewMiddlewareChain(FileRm, middlewares, a)).Methods("POST")