From ac9d1a498010cfdd2af8924074e8d41cb7a2cc19 Mon Sep 17 00:00:00 2001 From: MickaelK Date: Fri, 29 Nov 2024 15:02:59 +1100 Subject: [PATCH] feature (tus): chunked upload via TUS instead of inventing a new protocol for chunked upload that can be resumed, we might as well use something that already exists like TUS. As such we removed our custom implementation to favor that standard --- public/assets/pages/filespage/ctrl_upload.js | 119 +++++++++++-------- server/ctrl/files.go | 110 +++++++++++++---- server/routes.go | 2 +- 3 files changed, 154 insertions(+), 77 deletions(-) diff --git a/public/assets/pages/filespage/ctrl_upload.js b/public/assets/pages/filespage/ctrl_upload.js index 5541f133..14a9ac67 100644 --- a/public/assets/pages/filespage/ctrl_upload.js +++ b/public/assets/pages/filespage/ctrl_upload.js @@ -1,4 +1,5 @@ import { createElement, createFragment, createRender } from "../../lib/skeleton/index.js"; +import { toHref } from "../../lib/skeleton/router.js"; import rxjs, { effect, onClick } from "../../lib/rx.js"; import { forwardURLParams } from "../../lib/path.js"; import { animate, slideYOut } from "../../lib/animate.js"; @@ -291,7 +292,6 @@ function workerImplFile({ error, progress, speed }) { constructor() { super(); this.xhr = null; - this.prevProgress = []; } /** @@ -303,58 +303,75 @@ function workerImplFile({ error, progress, speed }) { /** * @override - * TODO: retry logic on failed upload would be nice */ async run({ file, path, virtual }) { const _file = await file(); const chunkSize = (window.CONFIG["upload_chunk_size"] || 0) *1024*1024; const numberOfChunks = Math.ceil(_file.size / chunkSize); + + // Case1: basic upload if (chunkSize === 0 || numberOfChunks === 0 || numberOfChunks === 1) { - await this._execute({ file: _file, path, virtual, chunk: null, progress }); + try { + await this._http(toHref(`/api/files/cat?path=${path}`), { + method: "POST", + body: _file, + progress, + }); + virtual.afterSuccess(); + } catch (err) { + error(err); + virtual.afterError(); + } return; } - for (let i=0; i { - if (i === 0) virtual.before(); - }, - afterSuccess: () => { - if (i === numberOfChunks - 1) virtual.afterSuccess(); - }, - afterError: () => virtual.afterError(), - }, - progress: (p) => { - const chunksAlreadyDownloaded = i * chunkSize; - const currentChunkDownloaded = p / 100 * ( - i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize - ); - progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size)); - }, - chunk, - path, + + // Case2: chunked upload => TUS: https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html + try { + let resp = await this._http(toHref(`/api/files/cat?path=${path}&proto=tus`), { + method: "POST", + headers: { "Upload-Length": _file.size }, + progress: (n) => progress(n), }); - this.prevProgress = []; + const url = resp.headers.location; + if (!url.startsWith(toHref("/api/files/cat?"))) { + throw new Error("Internal Error"); + return + } + for (let i=0; i { + const chunksAlreadyDownloaded = i * chunkSize; + const currentChunkDownloaded = p / 100 * ( + i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize + ); + progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size)); + }, + }); + } + virtual.afterSuccess(); + } catch (err) { + error(err); + virtual.afterError(); + if (err !== ABORT_ERROR) throw new Error(err); } } - _execute({ file, path, virtual, chunk, progress }) { + _http(url, { method, headers, body, progress }) { const xhr = new XMLHttpRequest(); + this.prevProgress = []; this.xhr = xhr; return new Promise((resolve, reject) => { - xhr.open( - "POST", - forwardURLParams( - "api/files/cat?path=" + encodeURIComponent(path) + - (chunk === null ? "" : `&chunk=${chunk}`), - ["share"], - ), - ); - xhr.withCredentials = true; + xhr.open(method, forwardURLParams(url, ["share"])); xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest"); + xhr.withCredentials = true; + for (let key in headers) { + xhr.setRequestHeader(key, headers[key]); + } xhr.upload.onprogress = (e) => { if (!e.lengthComputable) return; const percent = Math.floor(100 * e.loaded / e.total); @@ -379,28 +396,30 @@ function workerImplFile({ error, progress, speed }) { this.prevProgress.shift(); } }; - xhr.upload.onabort = () => { - reject(ABORT_ERROR); - error(ABORT_ERROR); - virtual.afterError(); - }; + xhr.upload.onabort = () => reject(ABORT_ERROR); xhr.onload = () => { - progress(100); - if (xhr.status !== 200) { - virtual.afterError(); + if ([200, 201, 204].indexOf(xhr.status) === -1) { reject(new Error(xhr.statusText)); return; } - virtual.afterSuccess(); - resolve(null); + progress(100); + resolve({ + status: xhr.status, + headers: xhr.getAllResponseHeaders() + .split("\r\n") + .reduce((acc, el) => { + const tmp = el.split(": "); acc[tmp[0]] = tmp[1] + return acc; + }, {}) + }); }; - xhr.onerror = function(e) { + xhr.onerror = (e) => { reject(new AjaxError("failed", e, "FAILED")); - virtual.afterError(); }; - xhr.send(file); + xhr.send(body); }); } + }(); } diff --git a/server/ctrl/files.go b/server/ctrl/files.go index 6dd7c499..6b6e00f8 100644 --- a/server/ctrl/files.go +++ b/server/ctrl/files.go @@ -441,8 +441,8 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { // There is 2 ways to save something: // - case1: regular upload, we just insert the file in the pipe - chunk := req.URL.Query().Get("chunk") - if chunk == "" { + proto := req.URL.Query().Get("proto") + if proto == "" && req.Method == http.MethodPost { err = ctx.Backend.Save(path, req.Body) req.Body.Close() if err != nil { @@ -453,39 +453,83 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { SendSuccessResult(res, nil) return } - // - case2: chunked upload. In this scenario, the frontend send the file in chunks, the - // only assumption being that upload is complete when the "chunk" param is "0" - n, err := strconv.Atoi(chunk) - if err != nil { - SendErrorResult(res, NewError(err.Error(), 403)) - } - ctx.Session["path"] = path - res.Header().Set("Connection", "Close") - var uploader *chunkedUpload - if c := chunkedUploadCache.Get(ctx.Session); c == nil { - uploader = createChunkedUploader(ctx.Backend.Save, path) + // - case2: chunked upload which implements the TUS protocol: + // https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html + h := res.Header() + ctx.Session["path"] = path + if proto == "tus" && req.Method == http.MethodPost { + if c := chunkedUploadCache.Get(ctx.Session); c != nil { + chunkedUploadCache.Del(ctx.Session) + } + size, err := strconv.ParseUint(req.Header.Get("Upload-Length"), 10, 0) + if err != nil { + SendErrorResult(res, ErrNotValid) + return + } + uploader := createChunkedUploader(ctx.Backend.Save, path, size) chunkedUploadCache.Set(ctx.Session, uploader) - } else { - uploader = c.(*chunkedUpload) - } - if _, err := uploader.Next(req.Body); err != nil { - SendErrorResult(res, NewError(err.Error(), 403)) + h.Set("Content-Length", "0") + h.Set("Location", req.URL.String()) + res.WriteHeader(http.StatusCreated) return } - if n == 0 { - if err = uploader.Close(); err != nil { + if proto == "tus" && req.Method == http.MethodHead { + c := chunkedUploadCache.Get(ctx.Session) + if c == nil { + SendErrorResult(res, ErrNotFound) + return + } + offset, length := c.(*chunkedUpload).Meta() + h.Set("Upload-Offset", fmt.Sprintf("%d", offset)) + h.Set("Upload-Length", fmt.Sprintf("%d", length)) + res.WriteHeader(http.StatusOK) + return + } + if proto == "tus" && req.Method == http.MethodPatch { + requestOffset, err := strconv.ParseUint(req.Header.Get("Upload-Offset"), 10, 0) + if err != nil { + SendErrorResult(res, ErrNotValid) + return + } + c := chunkedUploadCache.Get(ctx.Session) + if c == nil { + SendErrorResult(res, ErrNotFound) + return + } + uploader := c.(*chunkedUpload) + initialOffset, totalSize := uploader.Meta() + if initialOffset != requestOffset { + SendErrorResult(res, ErrNotValid) + return + } + if err := uploader.Next(req.Body); err != nil { SendErrorResult(res, NewError(err.Error(), 403)) return } - chunkedUploadCache.Del(ctx.Session) - SendSuccessResult(res, nil) + newOffset, _ := uploader.Meta() + if newOffset > totalSize { + uploader.Close() + chunkedUploadCache.Del(ctx.Session) + Log.Warning("ctrl::files::tus error=assert_offset size=%d old_offset=%d new_offset=%d", totalSize, initialOffset, newOffset) + SendErrorResult(res, NewError("aborted - offset larger than total size", 403)) + return + } else if newOffset == totalSize { + if err := uploader.Close(); err != nil { + SendErrorResult(res, ErrNotValid) + return + } + chunkedUploadCache.Del(ctx.Session) + } + h.Set("Connection", "Close") + h.Set("Upload-Offset", fmt.Sprintf("%d", newOffset)) + res.WriteHeader(http.StatusNoContent) return } - SendSuccessResult(res, nil) + SendErrorResult(res, ErrNotImplemented) } -func createChunkedUploader(save func(path string, file io.Reader) error, path string) *chunkedUpload { +func createChunkedUploader(save func(path string, file io.Reader) error, path string, size uint64) *chunkedUpload { r, w := io.Pipe() done := make(chan error, 1) go func() { @@ -495,6 +539,8 @@ func createChunkedUploader(save func(path string, file io.Reader) error, path st fn: save, stream: w, done: done, + offset: 0, + size: size, } } @@ -516,14 +562,20 @@ func initChunkedUploader() { type chunkedUpload struct { fn func(path string, file io.Reader) error stream *io.PipeWriter + offset uint64 + size uint64 done chan error once sync.Once + mu sync.Mutex } -func (this *chunkedUpload) Next(body io.ReadCloser) (int64, error) { +func (this *chunkedUpload) Next(body io.ReadCloser) error { n, err := io.Copy(this.stream, body) body.Close() - return n, err + this.mu.Lock() + this.offset += uint64(n) + this.mu.Unlock() + return err } func (this *chunkedUpload) Close() error { @@ -535,6 +587,12 @@ func (this *chunkedUpload) Close() error { return err } +func (this *chunkedUpload) Meta() (uint64, uint64) { + this.mu.Lock() + defer this.mu.Unlock() + return this.offset, this.size +} + func FileMv(ctx *App, res http.ResponseWriter, req *http.Request) { if model.CanEdit(ctx) == false { Log.Debug("mv::permission 'permission denied'") diff --git a/server/routes.go b/server/routes.go index dc19c0ca..1df97898 100644 --- a/server/routes.go +++ b/server/routes.go @@ -56,7 +56,7 @@ func Build(a App) *mux.Router { files.HandleFunc("/unzip", NewMiddlewareChain(FileExtract, middlewares, a)).Methods("POST") middlewares = []Middleware{ApiHeaders, SecureHeaders, SecureOrigin, WithPublicAPI, SessionStart, LoggedInOnly, PluginInjector} files.HandleFunc("/cat", NewMiddlewareChain(FileAccess, middlewares, a)).Methods("OPTIONS") - files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST") + files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST", "PATCH") files.HandleFunc("/ls", NewMiddlewareChain(FileLs, middlewares, a)).Methods("GET") files.HandleFunc("/mv", NewMiddlewareChain(FileMv, middlewares, a)).Methods("POST") files.HandleFunc("/rm", NewMiddlewareChain(FileRm, middlewares, a)).Methods("POST")