diff --git a/public/assets/pages/filespage/ctrl_upload.js b/public/assets/pages/filespage/ctrl_upload.js index 5541f133..14a9ac67 100644 --- a/public/assets/pages/filespage/ctrl_upload.js +++ b/public/assets/pages/filespage/ctrl_upload.js @@ -1,4 +1,5 @@ import { createElement, createFragment, createRender } from "../../lib/skeleton/index.js"; +import { toHref } from "../../lib/skeleton/router.js"; import rxjs, { effect, onClick } from "../../lib/rx.js"; import { forwardURLParams } from "../../lib/path.js"; import { animate, slideYOut } from "../../lib/animate.js"; @@ -291,7 +292,6 @@ function workerImplFile({ error, progress, speed }) { constructor() { super(); this.xhr = null; - this.prevProgress = []; } /** @@ -303,58 +303,75 @@ function workerImplFile({ error, progress, speed }) { /** * @override - * TODO: retry logic on failed upload would be nice */ async run({ file, path, virtual }) { const _file = await file(); const chunkSize = (window.CONFIG["upload_chunk_size"] || 0) *1024*1024; const numberOfChunks = Math.ceil(_file.size / chunkSize); + + // Case1: basic upload if (chunkSize === 0 || numberOfChunks === 0 || numberOfChunks === 1) { - await this._execute({ file: _file, path, virtual, chunk: null, progress }); + try { + await this._http(toHref(`/api/files/cat?path=${path}`), { + method: "POST", + body: _file, + progress, + }); + virtual.afterSuccess(); + } catch (err) { + error(err); + virtual.afterError(); + } return; } - for (let i=0; i { - if (i === 0) virtual.before(); - }, - afterSuccess: () => { - if (i === numberOfChunks - 1) virtual.afterSuccess(); - }, - afterError: () => virtual.afterError(), - }, - progress: (p) => { - const chunksAlreadyDownloaded = i * chunkSize; - const currentChunkDownloaded = p / 100 * ( - i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize - ); - progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size)); - }, - chunk, - path, + + // Case2: chunked upload => TUS: https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html + try { + let resp = await this._http(toHref(`/api/files/cat?path=${path}&proto=tus`), { + method: "POST", + headers: { "Upload-Length": _file.size }, + progress: (n) => progress(n), }); - this.prevProgress = []; + const url = resp.headers.location; + if (!url.startsWith(toHref("/api/files/cat?"))) { + throw new Error("Internal Error"); + return + } + for (let i=0; i { + const chunksAlreadyDownloaded = i * chunkSize; + const currentChunkDownloaded = p / 100 * ( + i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize + ); + progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size)); + }, + }); + } + virtual.afterSuccess(); + } catch (err) { + error(err); + virtual.afterError(); + if (err !== ABORT_ERROR) throw new Error(err); } } - _execute({ file, path, virtual, chunk, progress }) { + _http(url, { method, headers, body, progress }) { const xhr = new XMLHttpRequest(); + this.prevProgress = []; this.xhr = xhr; return new Promise((resolve, reject) => { - xhr.open( - "POST", - forwardURLParams( - "api/files/cat?path=" + encodeURIComponent(path) + - (chunk === null ? "" : `&chunk=${chunk}`), - ["share"], - ), - ); - xhr.withCredentials = true; + xhr.open(method, forwardURLParams(url, ["share"])); xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest"); + xhr.withCredentials = true; + for (let key in headers) { + xhr.setRequestHeader(key, headers[key]); + } xhr.upload.onprogress = (e) => { if (!e.lengthComputable) return; const percent = Math.floor(100 * e.loaded / e.total); @@ -379,28 +396,30 @@ function workerImplFile({ error, progress, speed }) { this.prevProgress.shift(); } }; - xhr.upload.onabort = () => { - reject(ABORT_ERROR); - error(ABORT_ERROR); - virtual.afterError(); - }; + xhr.upload.onabort = () => reject(ABORT_ERROR); xhr.onload = () => { - progress(100); - if (xhr.status !== 200) { - virtual.afterError(); + if ([200, 201, 204].indexOf(xhr.status) === -1) { reject(new Error(xhr.statusText)); return; } - virtual.afterSuccess(); - resolve(null); + progress(100); + resolve({ + status: xhr.status, + headers: xhr.getAllResponseHeaders() + .split("\r\n") + .reduce((acc, el) => { + const tmp = el.split(": "); acc[tmp[0]] = tmp[1] + return acc; + }, {}) + }); }; - xhr.onerror = function(e) { + xhr.onerror = (e) => { reject(new AjaxError("failed", e, "FAILED")); - virtual.afterError(); }; - xhr.send(file); + xhr.send(body); }); } + }(); } diff --git a/server/ctrl/files.go b/server/ctrl/files.go index 6dd7c499..6b6e00f8 100644 --- a/server/ctrl/files.go +++ b/server/ctrl/files.go @@ -441,8 +441,8 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { // There is 2 ways to save something: // - case1: regular upload, we just insert the file in the pipe - chunk := req.URL.Query().Get("chunk") - if chunk == "" { + proto := req.URL.Query().Get("proto") + if proto == "" && req.Method == http.MethodPost { err = ctx.Backend.Save(path, req.Body) req.Body.Close() if err != nil { @@ -453,39 +453,83 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { SendSuccessResult(res, nil) return } - // - case2: chunked upload. In this scenario, the frontend send the file in chunks, the - // only assumption being that upload is complete when the "chunk" param is "0" - n, err := strconv.Atoi(chunk) - if err != nil { - SendErrorResult(res, NewError(err.Error(), 403)) - } - ctx.Session["path"] = path - res.Header().Set("Connection", "Close") - var uploader *chunkedUpload - if c := chunkedUploadCache.Get(ctx.Session); c == nil { - uploader = createChunkedUploader(ctx.Backend.Save, path) + // - case2: chunked upload which implements the TUS protocol: + // https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html + h := res.Header() + ctx.Session["path"] = path + if proto == "tus" && req.Method == http.MethodPost { + if c := chunkedUploadCache.Get(ctx.Session); c != nil { + chunkedUploadCache.Del(ctx.Session) + } + size, err := strconv.ParseUint(req.Header.Get("Upload-Length"), 10, 0) + if err != nil { + SendErrorResult(res, ErrNotValid) + return + } + uploader := createChunkedUploader(ctx.Backend.Save, path, size) chunkedUploadCache.Set(ctx.Session, uploader) - } else { - uploader = c.(*chunkedUpload) - } - if _, err := uploader.Next(req.Body); err != nil { - SendErrorResult(res, NewError(err.Error(), 403)) + h.Set("Content-Length", "0") + h.Set("Location", req.URL.String()) + res.WriteHeader(http.StatusCreated) return } - if n == 0 { - if err = uploader.Close(); err != nil { + if proto == "tus" && req.Method == http.MethodHead { + c := chunkedUploadCache.Get(ctx.Session) + if c == nil { + SendErrorResult(res, ErrNotFound) + return + } + offset, length := c.(*chunkedUpload).Meta() + h.Set("Upload-Offset", fmt.Sprintf("%d", offset)) + h.Set("Upload-Length", fmt.Sprintf("%d", length)) + res.WriteHeader(http.StatusOK) + return + } + if proto == "tus" && req.Method == http.MethodPatch { + requestOffset, err := strconv.ParseUint(req.Header.Get("Upload-Offset"), 10, 0) + if err != nil { + SendErrorResult(res, ErrNotValid) + return + } + c := chunkedUploadCache.Get(ctx.Session) + if c == nil { + SendErrorResult(res, ErrNotFound) + return + } + uploader := c.(*chunkedUpload) + initialOffset, totalSize := uploader.Meta() + if initialOffset != requestOffset { + SendErrorResult(res, ErrNotValid) + return + } + if err := uploader.Next(req.Body); err != nil { SendErrorResult(res, NewError(err.Error(), 403)) return } - chunkedUploadCache.Del(ctx.Session) - SendSuccessResult(res, nil) + newOffset, _ := uploader.Meta() + if newOffset > totalSize { + uploader.Close() + chunkedUploadCache.Del(ctx.Session) + Log.Warning("ctrl::files::tus error=assert_offset size=%d old_offset=%d new_offset=%d", totalSize, initialOffset, newOffset) + SendErrorResult(res, NewError("aborted - offset larger than total size", 403)) + return + } else if newOffset == totalSize { + if err := uploader.Close(); err != nil { + SendErrorResult(res, ErrNotValid) + return + } + chunkedUploadCache.Del(ctx.Session) + } + h.Set("Connection", "Close") + h.Set("Upload-Offset", fmt.Sprintf("%d", newOffset)) + res.WriteHeader(http.StatusNoContent) return } - SendSuccessResult(res, nil) + SendErrorResult(res, ErrNotImplemented) } -func createChunkedUploader(save func(path string, file io.Reader) error, path string) *chunkedUpload { +func createChunkedUploader(save func(path string, file io.Reader) error, path string, size uint64) *chunkedUpload { r, w := io.Pipe() done := make(chan error, 1) go func() { @@ -495,6 +539,8 @@ func createChunkedUploader(save func(path string, file io.Reader) error, path st fn: save, stream: w, done: done, + offset: 0, + size: size, } } @@ -516,14 +562,20 @@ func initChunkedUploader() { type chunkedUpload struct { fn func(path string, file io.Reader) error stream *io.PipeWriter + offset uint64 + size uint64 done chan error once sync.Once + mu sync.Mutex } -func (this *chunkedUpload) Next(body io.ReadCloser) (int64, error) { +func (this *chunkedUpload) Next(body io.ReadCloser) error { n, err := io.Copy(this.stream, body) body.Close() - return n, err + this.mu.Lock() + this.offset += uint64(n) + this.mu.Unlock() + return err } func (this *chunkedUpload) Close() error { @@ -535,6 +587,12 @@ func (this *chunkedUpload) Close() error { return err } +func (this *chunkedUpload) Meta() (uint64, uint64) { + this.mu.Lock() + defer this.mu.Unlock() + return this.offset, this.size +} + func FileMv(ctx *App, res http.ResponseWriter, req *http.Request) { if model.CanEdit(ctx) == false { Log.Debug("mv::permission 'permission denied'") diff --git a/server/routes.go b/server/routes.go index dc19c0ca..1df97898 100644 --- a/server/routes.go +++ b/server/routes.go @@ -56,7 +56,7 @@ func Build(a App) *mux.Router { files.HandleFunc("/unzip", NewMiddlewareChain(FileExtract, middlewares, a)).Methods("POST") middlewares = []Middleware{ApiHeaders, SecureHeaders, SecureOrigin, WithPublicAPI, SessionStart, LoggedInOnly, PluginInjector} files.HandleFunc("/cat", NewMiddlewareChain(FileAccess, middlewares, a)).Methods("OPTIONS") - files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST") + files.HandleFunc("/cat", NewMiddlewareChain(FileSave, middlewares, a)).Methods("POST", "PATCH") files.HandleFunc("/ls", NewMiddlewareChain(FileLs, middlewares, a)).Methods("GET") files.HandleFunc("/mv", NewMiddlewareChain(FileMv, middlewares, a)).Methods("POST") files.HandleFunc("/rm", NewMiddlewareChain(FileRm, middlewares, a)).Methods("POST")