diff --git a/public/assets/pages/filespage/ctrl_upload.js b/public/assets/pages/filespage/ctrl_upload.js index ba48ad71..a08e4191 100644 --- a/public/assets/pages/filespage/ctrl_upload.js +++ b/public/assets/pages/filespage/ctrl_upload.js @@ -303,15 +303,52 @@ function workerImplFile({ error, progress, speed }) { /** * @override + * TODO: retry logic on failed upload would be nice */ async run({ file, path, virtual }) { + const _file = await file(); + const chunkSize = (CONFIG.upload_chunk_size || 0) *1024*1024; + const numberOfChunks = Math.ceil(_file.size / chunkSize); + if (chunkSize === 0 || numberOfChunks === 0 || numberOfChunks === 1) { + await this._execute({ file: _file, path, virtual, chunk: null, progress }); + return; + } + for (let i=0; i { + if (i === 0) virtual.before(); + }, + afterSuccess: () => { + if (i === numberOfChunks - 1) virtual.afterSuccess(); + }, + afterError: () => virtual.afterError(), + }, + progress: (p) => { + const chunksAlreadyDownloaded = i * chunkSize; + const currentChunkDownloaded = p / 100 * ( + i !== numberOfChunks - 1 ? chunkSize : _file.size % chunkSize + ); + progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size)); + }, + chunk, path, + }); + this.prevProgress = []; + } + } + + _execute({ file, path, virtual, chunk, progress }) { const xhr = new XMLHttpRequest(); this.xhr = xhr; return new Promise((resolve, reject) => { xhr.open( "POST", forwardURLParams( - "api/files/cat?path=" + encodeURIComponent(path), + "api/files/cat?path=" + encodeURIComponent(path) + + (chunk === null ? "" : `&chunk=${chunk}`), ["share"], ), ); @@ -360,7 +397,7 @@ function workerImplFile({ error, progress, speed }) { reject(new AjaxError("failed", e, "FAILED")); virtual.afterError(); }; - file().then((f) => xhr.send(f)).catch((err) => xhr.onerror && xhr.onerror(err)); + xhr.send(file); }); } }(); diff --git a/server/common/config.go b/server/common/config.go index 3d7b7c45..c78b9a4c 100644 --- a/server/common/config.go +++ b/server/common/config.go @@ -75,7 +75,8 @@ func NewConfiguration() Configuration { FormElement{Name: "display_hidden", Type: "boolean", Default: false, Description: "Should files starting with a dot be visible by default?"}, FormElement{Name: "refresh_after_upload", Type: "boolean", Default: false, Description: "Refresh directory listing after upload"}, FormElement{Name: "upload_button", Type: "boolean", Default: false, Description: "Display the upload button on any device"}, - FormElement{Name: "upload_pool_size", Type: "number", Default: 15, Description: "Maximum number of files upload in parallel (Default: 15)"}, + FormElement{Name: "upload_pool_size", Type: "number", Default: 15, Description: "Maximum number of files upload in parallel. Default: 15"}, + FormElement{Name: "upload_chunk_size", Type: "number", Default: 0, Description: "Size of Chunks for Uploads in MB."}, FormElement{Name: "filepage_default_view", Type: "select", Default: "grid", Opts: []string{"list", "grid"}, Description: "Default layout for files and folder on the file page"}, FormElement{Name: "filepage_default_sort", Type: "select", Default: "type", Opts: []string{"type", "date", "name"}, Description: "Default order for files and folder on the file page"}, FormElement{Name: "cookie_timeout", Type: "number", Default: 60 * 24 * 7, Description: "Authentication Cookie expiration in minutes. Default: 60 * 24 * 7 = 1 week"}, @@ -359,6 +360,7 @@ func (this *Configuration) Export() interface{} { Logout string `json:"logout"` MimeTypes map[string]string `json:"mime"` UploadPoolSize int `json:"upload_pool_size"` + UploadChunkSize int `json:"upload_chunk_size"` RefreshAfterUpload bool `json:"refresh_after_upload"` FilePageDefaultSort string `json:"default_sort"` FilePageDefaultView string `json:"default_view"` @@ -378,6 +380,7 @@ func (this *Configuration) Export() interface{} { Logout: this.Get("general.logout").String(), MimeTypes: AllMimeTypes(), UploadPoolSize: this.Get("general.upload_pool_size").Int(), + UploadChunkSize: this.Get("general.upload_chunk_size").Int(), RefreshAfterUpload: this.Get("general.refresh_after_upload").Bool(), FilePageDefaultSort: this.Get("general.filepage_default_sort").String(), FilePageDefaultView: this.Get("general.filepage_default_view").String(), diff --git a/server/ctrl/files.go b/server/ctrl/files.go index 9d6f6c7a..464f3c51 100644 --- a/server/ctrl/files.go +++ b/server/ctrl/files.go @@ -13,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync" "time" . "github.com/mickael-kerjean/filestash/server/common" @@ -66,6 +67,7 @@ func init() { zip_timeout() disable_csp() }) + initChunkedUploader() } func FileLs(ctx *App, res http.ResponseWriter, req *http.Request) { @@ -395,6 +397,8 @@ func FileAccess(ctx *App, res http.ResponseWriter, req *http.Request) { SendSuccessResult(res, nil) } +var chunkedUploadCache AppCache + func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { path, err := PathBuilder(ctx, req.URL.Query().Get("path")) if err != nil { @@ -435,16 +439,101 @@ func FileSave(ctx *App, res http.ResponseWriter, req *http.Request) { } } - err = ctx.Backend.Save(path, req.Body) - req.Body.Close() + // There is 2 ways to save something: + // - case1: regular upload, we just insert the file in the pipe + chunk := req.URL.Query().Get("chunk") + if chunk == "" { + err = ctx.Backend.Save(path, req.Body) + req.Body.Close() + if err != nil { + Log.Debug("save::backend '%s'", err.Error()) + SendErrorResult(res, NewError(err.Error(), 403)) + return + } + SendSuccessResult(res, nil) + return + } + // - case2: chunked upload. In this scenario, the frontend send the file in chunks, the + // only assumption being that upload is complete when the "chunk" param is "0" + n, err := strconv.Atoi(chunk) if err != nil { - Log.Debug("save::backend '%s'", err.Error()) + SendErrorResult(res, NewError(err.Error(), 403)) + } + ctx.Session["path"] = path + + var uploader *chunkedUpload + if c := chunkedUploadCache.Get(ctx.Session); c == nil { + uploader = createChunkedUploader(ctx.Backend.Save, path) + chunkedUploadCache.Set(ctx.Session, uploader) + } else { + uploader = c.(*chunkedUpload) + } + if _, err := uploader.Next(req.Body); err != nil { SendErrorResult(res, NewError(err.Error(), 403)) return } + if n == 0 { + if err = uploader.Close(); err != nil { + SendErrorResult(res, NewError(err.Error(), 403)) + return + } + chunkedUploadCache.Del(ctx.Session) + SendSuccessResult(res, nil) + return + } SendSuccessResult(res, nil) } +func createChunkedUploader(save func(path string, file io.Reader) error, path string) *chunkedUpload { + r, w := io.Pipe() + done := make(chan error, 1) + go func() { + done <- save(path, r) + }() + return &chunkedUpload{ + fn: save, + stream: w, + done: done, + } +} + +func initChunkedUploader() { + chunkedUploadCache = NewAppCache(60*24, 1) + chunkedUploadCache.OnEvict(func(key string, value interface{}) { + c := value.(*chunkedUpload) + if c == nil { + Log.Warning("ctrl::files::chunked::cleanup nil on close") + return + } + if err := c.Close(); err != nil { + Log.Warning("ctrl::files::chunked::cleanup action=close err=%s", err.Error()) + return + } + }) +} + +type chunkedUpload struct { + fn func(path string, file io.Reader) error + stream *io.PipeWriter + done chan error + once sync.Once +} + +func (this *chunkedUpload) Next(body io.ReadCloser) (int64, error) { + n, err := io.Copy(this.stream, body) + body.Close() + return n, err +} + +func (this *chunkedUpload) Close() error { + this.stream.Close() + err := <-this.done + this.once.Do(func() { + close(this.done) + }) + return err +} + func FileMv(ctx *App, res http.ResponseWriter, req *http.Request) { if model.CanEdit(ctx) == false { Log.Debug("mv::permission 'permission denied'")