fix (canary): retry mechanism on upload

This commit is contained in:
MickaelK 2024-11-29 17:48:37 +11:00
parent ac9d1a4980
commit 487895abee

View file

@ -185,45 +185,62 @@ function componentUploadQueue(render, { workers$ }) {
}(new Array(MAX_WORKERS).fill(0)));
const updateDOMGlobalTitle = ($page, text) => $page.firstElementChild.nextElementSibling.childNodes[0].textContent = text;
const updateDOMWithStatus = ($task, { status, exec, nworker }) => {
const cancel = () => exec.cancel();
switch (status) {
case "todo":
break;
case "doing":
updateDOMTaskProgress($task, formatPercent(0));
$task.firstElementChild.nextElementSibling.nextElementSibling.appendChild($iconStop);
$iconStop.onclick = () => {
cancel();
const executeMutation = (status) => {
switch (status) {
case "todo":
updateDOMGlobalTitle($page, t("Running") + "...");
break;
case "doing":
const $stop = assert.type($iconStop.cloneNode(true), HTMLElement);
updateDOMTaskProgress($task, formatPercent(0));
$task.classList.remove("error_color");
$task.classList.add("todo_color");
$task.firstElementChild.nextElementSibling.nextElementSibling.replaceChildren($stop);
$stop.onclick = () => {
exec.cancel();
$task.firstElementChild.nextElementSibling.nextElementSibling.classList.add("hidden");
};
$close.addEventListener("click", exec.cancel);
break;
case "done":
updateDOMGlobalTitle($page, t("Done"));
updateDOMTaskProgress($task, t("Done"));
updateDOMGlobalSpeed(nworker, 0);
updateDOMTaskSpeed($task, 0);
$task.removeAttribute("data-path");
$task.classList.remove("todo_color");
$task.firstElementChild.nextElementSibling.nextElementSibling.classList.add("hidden");
};
$close.addEventListener("click", cancel);
break;
case "done":
updateDOMGlobalSpeed(nworker, 0);
updateDOMTaskProgress($task, t("Done"));
updateDOMTaskSpeed($task, 0);
$task.removeAttribute("data-path");
$task.classList.remove("todo_color");
$task.firstElementChild.nextElementSibling.nextElementSibling.classList.add("hidden");
$close.removeEventListener("click", cancel);
break;
case "error":
const $retry = assert.type($iconRetry.cloneNode(true), HTMLElement);
updateDOMGlobalTitle($page, t("Error"));
updateDOMGlobalSpeed(nworker, 0);
updateDOMTaskProgress($task, t("Error"));
updateDOMTaskSpeed($task, 0);
$task.removeAttribute("data-path");
$task.classList.remove("todo_color");
$task.firstElementChild.nextElementSibling.nextElementSibling.firstElementChild.remove();
$task.firstElementChild.nextElementSibling.nextElementSibling.appendChild($retry);
$retry.onclick = () => { console.log("CLICK RETRY"); };
$close.removeEventListener("click", cancel);
$task.classList.add("error_color");
break;
default:
assert.fail(`UNEXPECTED_STATUS status="${status}" path="${$task.getAttribute("path")}"`);
}
$close.removeEventListener("click", exec.cancel);
break;
case "error":
const $retry = assert.type($iconRetry.cloneNode(true), HTMLElement);
updateDOMGlobalTitle($page, t("Error"));
updateDOMTaskProgress($task, t("Error"));
updateDOMGlobalSpeed(nworker, 0);
updateDOMTaskSpeed($task, 0);
$task.removeAttribute("data-path");
$task.classList.remove("todo_color");
$task.classList.add("error_color");
$task.firstElementChild.nextElementSibling.nextElementSibling.firstElementChild.remove();
$task.firstElementChild.nextElementSibling.nextElementSibling.appendChild($retry);
$retry.onclick = async() => {
executeMutation("todo");
executeMutation("doing");
try {
await exec.retry();
executeMutation("done");
} catch (err) {
executeMutation("error");
}
};
$close.removeEventListener("click", exec.cancel);
break;
default:
assert.fail(`UNEXPECTED_STATUS status="${status}" path="${$task.getAttribute("path")}"`);
}
};
executeMutation(status);
};
let tasks = [];
@ -238,7 +255,6 @@ function componentUploadQueue(render, { workers$ }) {
}
const $task = qs($page, `[data-path="${task.path}"]`);
const exec = task.exec({
error: () => updateDOMWithStatus($task, { status: "error", nworker, exec: null }),
progress: (progress) => updateDOMTaskProgress($task, formatPercent(progress)),
speed: (speed) => {
updateDOMTaskSpeed($task, speed);
@ -284,10 +300,11 @@ function componentUploadQueue(render, { workers$ }) {
class IExecutor {
contructor() {}
cancel() { throw new Error("NOT_IMPLEMENTED"); }
retry() { throw new Error("NOT_IMPLEMENTED"); }
run() { throw new Error("NOT_IMPLEMENTED"); }
}
function workerImplFile({ error, progress, speed }) {
function workerImplFile({ progress, speed }) {
return new class Worker extends IExecutor {
constructor() {
super();
@ -306,124 +323,75 @@ function workerImplFile({ error, progress, speed }) {
*/
async run({ file, path, virtual }) {
const _file = await file();
const executeJob = (firstRun) => this.prepareJob({ file: _file, path, virtual, firstRun });
this.retry = () => executeJob(false);
return executeJob(true);
}
async prepareJob({ file, path, virtual, firstRun }) {
if (firstRun === false) virtual.before();
const chunkSize = (window.CONFIG["upload_chunk_size"] || 0) *1024*1024;
const numberOfChunks = Math.ceil(_file.size / chunkSize);
const numberOfChunks = Math.ceil(file.size / chunkSize);
// Case1: basic upload
if (chunkSize === 0 || numberOfChunks === 0 || numberOfChunks === 1) {
try {
await this._http(toHref(`/api/files/cat?path=${path}`), {
await executeHttp.call(this, toHref(`/api/files/cat?path=${encodeURIComponent(path)}`), {
method: "POST",
body: _file,
headers: {},
body: file,
progress,
speed,
});
virtual.afterSuccess();
} catch (err) {
error(err);
virtual.afterError();
if (err === ABORT_ERROR) return;
throw err;
}
return;
}
// Case2: chunked upload => TUS: https://www.ietf.org/archive/id/draft-tus-httpbis-resumable-uploads-protocol-00.html
try {
let resp = await this._http(toHref(`/api/files/cat?path=${path}&proto=tus`), {
let resp = await executeHttp.call(this, toHref(`/api/files/cat?path=${encodeURIComponent(path)}&proto=tus`), {
method: "POST",
headers: { "Upload-Length": _file.size },
headers: { "Upload-Length": file.size },
body: null,
progress: (n) => progress(n),
speed,
});
const url = resp.headers.location;
if (!url.startsWith(toHref("/api/files/cat?"))) {
throw new Error("Internal Error");
return
}
for (let i=0; i<numberOfChunks; i++) {
const offset = chunkSize * i;
const chunk = numberOfChunks - i - 1;
resp = await this._http(url, {
resp = await executeHttp.call(this, url, {
method: "PATCH",
headers: { "Upload-Offset": offset },
body: _file.slice(offset, offset + chunkSize),
body: file.slice(offset, offset + chunkSize),
progress: (p) => {
const chunksAlreadyDownloaded = i * chunkSize;
const currentChunkDownloaded = p / 100 * (
i !== numberOfChunks - 1 ? chunkSize : (_file.size % chunkSize) || chunkSize
i !== numberOfChunks - 1 ? chunkSize : (file.size % chunkSize) || chunkSize
);
progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / _file.size));
progress(Math.floor(100 * (chunksAlreadyDownloaded + currentChunkDownloaded) / file.size));
},
speed,
});
}
virtual.afterSuccess();
} catch (err) {
error(err);
virtual.afterError();
if (err !== ABORT_ERROR) throw new Error(err);
if (err === ABORT_ERROR) return;
throw err;
}
}
_http(url, { method, headers, body, progress }) {
const xhr = new XMLHttpRequest();
this.prevProgress = [];
this.xhr = xhr;
return new Promise((resolve, reject) => {
xhr.open(method, forwardURLParams(url, ["share"]));
xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest");
xhr.withCredentials = true;
for (let key in headers) {
xhr.setRequestHeader(key, headers[key]);
}
xhr.upload.onprogress = (e) => {
if (!e.lengthComputable) return;
const percent = Math.floor(100 * e.loaded / e.total);
progress(percent);
if (this.prevProgress.length === 0) {
this.prevProgress.push(e);
return;
}
this.prevProgress.push(e);
const calculateTime = (p1, pm1) => (p1.timeStamp - pm1.timeStamp)/1000;
const calculateBytes = (p1, pm1) => p1.loaded - pm1.loaded;
let avgSpeed = 0;
for (let i=1; i<this.prevProgress.length; i++) {
const p1 = this.prevProgress[i];
const pm1 = this.prevProgress[i-1];
avgSpeed += calculateBytes(p1, pm1) / calculateTime(p1, pm1);
}
avgSpeed = avgSpeed / (this.prevProgress.length - 1);
speed(avgSpeed);
if (e.timeStamp - this.prevProgress[0].timeStamp > 5000) {
this.prevProgress.shift();
}
};
xhr.upload.onabort = () => reject(ABORT_ERROR);
xhr.onload = () => {
if ([200, 201, 204].indexOf(xhr.status) === -1) {
reject(new Error(xhr.statusText));
return;
}
progress(100);
resolve({
status: xhr.status,
headers: xhr.getAllResponseHeaders()
.split("\r\n")
.reduce((acc, el) => {
const tmp = el.split(": "); acc[tmp[0]] = tmp[1]
return acc;
}, {})
});
};
xhr.onerror = (e) => {
reject(new AjaxError("failed", e, "FAILED"));
};
xhr.send(body);
});
}
}();
}
function workerImplDirectory({ error, progress }) {
function workerImplDirectory({ progress }) {
return new class Worker extends IExecutor {
constructor() {
super();
@ -440,55 +408,104 @@ function workerImplDirectory({ error, progress }) {
/**
* @override
*/
run({ virtual, path }) {
const xhr = new XMLHttpRequest();
this.xhr = xhr;
return new Promise((resolve, reject) => {
xhr.open(
"POST",
forwardURLParams(
"api/files/mkdir?path=" + encodeURIComponent(path),
["share"],
),
);
xhr.withCredentials = true;
xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest");
xhr.onerror = function(e) {
reject(new AjaxError("failed", e, "FAILED"));
};
async run({ virtual, path }) {
const executeJob = (firstRun) => this.prepareJob({ virtual, path, firstRun });
this.retry = () => executeJob(false);
return executeJob(true);
}
let percent = 0;
const id = setInterval(() => {
percent += 10;
if (percent >= 100) {
clearInterval(id);
return;
}
progress(percent);
}, 100);
xhr.upload.onabort = () => {
reject(ABORT_ERROR);
error(ABORT_ERROR);
async prepareJob({ virtual, path, firstRun }) {
if (firstRun === false) virtual.before();
let percent = 0;
const id = setInterval(() => {
percent += 10;
if (percent >= 100) {
clearInterval(id);
virtual.afterError();
};
xhr.onload = () => {
clearInterval(id);
progress(100);
if (xhr.status !== 200) {
virtual.afterError();
reject(new Error(xhr.statusText));
return;
}
virtual.afterSuccess();
resolve(null);
};
xhr.send(null);
});
return;
}
progress(percent);
}, 100);
try {
await executeHttp.call(this, toHref(`/api/files/mkdir?path=${encodeURIComponent(path)}`), {
method: "POST",
headers: {},
body: null,
progress,
speed: () => {},
});
clearInterval(id);
progress(100);
virtual.afterSuccess();
} catch (err) {
clearInterval(id);
virtual.afterError();
if (err === ABORT_ERROR) return;
throw err;
}
}
}();
}
function executeHttp(url, { method, headers, body, progress, speed }) {
const xhr = new XMLHttpRequest();
const prevProgress = [];
this.xhr = xhr;
return new Promise((resolve, reject) => {
xhr.open(method, forwardURLParams(url, ["share"]));
xhr.setRequestHeader("X-Requested-With", "XmlHttpRequest");
xhr.withCredentials = true;
for (const key in headers) {
xhr.setRequestHeader(key, headers[key]);
}
xhr.upload.onprogress = (e) => {
if (!e.lengthComputable) return;
const percent = Math.floor(100 * e.loaded / e.total);
progress(percent);
if (prevProgress.length === 0) {
prevProgress.push(e);
return;
}
prevProgress.push(e);
const calculateTime = (p1, pm1) => (p1.timeStamp - pm1.timeStamp)/1000;
const calculateBytes = (p1, pm1) => p1.loaded - pm1.loaded;
let avgSpeed = 0;
for (let i=1; i<prevProgress.length; i++) {
const p1 = prevProgress[i];
const pm1 = prevProgress[i-1];
avgSpeed += calculateBytes(p1, pm1) / calculateTime(p1, pm1);
}
avgSpeed = avgSpeed / (prevProgress.length - 1);
speed(avgSpeed);
if (e.timeStamp - prevProgress[0].timeStamp > 5000) {
prevProgress.shift();
}
};
xhr.upload.onabort = () => reject(ABORT_ERROR);
xhr.onerror = (e) => reject(new AjaxError("failed", e, "FAILED"));
xhr.onload = () => {
if ([200, 201, 204].indexOf(xhr.status) === -1) {
reject(new Error(xhr.statusText));
return;
}
progress(100);
resolve({
status: xhr.status,
headers: xhr.getAllResponseHeaders()
.split("\r\n")
.reduce((acc, el) => {
const tmp = el.split(": ");
if (typeof tmp[0] === "string" && typeof tmp[1] === "string") {
acc[tmp[0]] = tmp[1];
}
return acc;
}, {})
});
};
xhr.send(body);
});
}
async function processFiles(filelist) {
const tasks = [];
// let size = 0; // TODO