diff --git a/cmd/main.go b/cmd/main.go index 6300eb64..9b56b803 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -5,12 +5,15 @@ import ( "sync" "github.com/gorilla/mux" + _ "github.com/mattn/go-sqlite3" "github.com/mickael-kerjean/filestash" "github.com/mickael-kerjean/filestash/server" - . "github.com/mickael-kerjean/filestash/server/common" "github.com/mickael-kerjean/filestash/server/ctrl" "github.com/mickael-kerjean/filestash/server/model" + "github.com/mickael-kerjean/filestash/server/workflow" + + . "github.com/mickael-kerjean/filestash/server/common" _ "github.com/mickael-kerjean/filestash/server/plugin" ) @@ -22,6 +25,7 @@ func Run(router *mux.Router, app App) { Log.Info("Filestash %s starting", APP_VERSION) check(InitLogger(), "Logger init failed. err=%s") check(InitConfig(), "Config init failed. err=%s") + check(workflow.Init(), "Worklow Initialisation failure. err=%s") check(model.PluginDiscovery(), "Plugin Discovery failed. err=%s") check(ctrl.InitPluginList(embed.EmbedPluginList, model.PLUGINS), "Plugin Initialisation failed. err=%s") if len(Hooks.Get.Starter()) == 0 { diff --git a/go.mod b/go.mod index c6952864..00b041a2 100644 --- a/go.mod +++ b/go.mod @@ -72,6 +72,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/aybabtme/rgbterm v0.0.0-20170906152045-cc83f3b3ce59 // indirect github.com/beevik/etree v1.4.0 // indirect + github.com/bmatcuk/doublestar/v4 v4.9.1 // indirect github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc // indirect github.com/calebcase/tmpfile v1.0.3 // indirect github.com/census-instrumentation/opencensus-proto v0.4.1 // indirect diff --git a/go.sum b/go.sum index 3205f748..5d0a0d00 100644 --- a/go.sum +++ b/go.sum @@ -68,6 +68,8 @@ github.com/beevik/etree v1.4.0 h1:oz1UedHRepuY3p4N5OjE0nK1WLCqtzHf25bxplKOHLs= github.com/beevik/etree v1.4.0/go.mod h1:cyWiXwGoasx60gHvtnEh5x8+uIjUVnjWqBvEnhnqKDA= github.com/bluekeyes/go-gitdiff v0.7.3 h1:SElKwtm/IQPOwKs0vdowW5uAlip+P+jatagmUU8E0r4= github.com/bluekeyes/go-gitdiff v0.7.3/go.mod h1:QpfYYO1E0fTVHVZAZKiRjtSGY9823iCdvGXBcEzHGbM= +github.com/bmatcuk/doublestar/v4 v4.9.1 h1:X8jg9rRZmJd4yRy7ZeNDRnM+T3ZfHv15JiBJ/avrEXE= +github.com/bmatcuk/doublestar/v4 v4.9.1/go.mod h1:xBQ8jztBU6kakFMg+8WGxn0c6z1fTSPVIjEY1Wr7jzc= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc h1:biVzkmvwrH8WK8raXaxBx6fRVTlJILwEwQGL1I/ByEI= github.com/boombuler/barcode v1.0.1-0.20190219062509-6c824513bacc/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= github.com/calebcase/tmpfile v1.0.3 h1:BZrOWZ79gJqQ3XbAQlihYZf/YCV0H4KPIdM5K5oMpJo= diff --git a/public/assets/pages/adminpage/ctrl_workflow.css b/public/assets/pages/adminpage/ctrl_workflow.css index 2601d235..a2d3eb49 100644 --- a/public/assets/pages/adminpage/ctrl_workflow.css +++ b/public/assets/pages/adminpage/ctrl_workflow.css @@ -6,6 +6,16 @@ padding: 3px 5px; margin: 0 2px; } +.component_page_workflow h2 input { + padding: 0 0 0 20px; + border: none; + font-family: inherit; + color: inherit; + font-size: inherit; + width: 100%; + position: relative; + bottom: 2px; +} .component_page_workflow .box { display: block; background: white; @@ -25,7 +35,7 @@ } .component_page_workflow .box h3 span { font-weight: 300; - font-size: 0.95rem; + font-size: 0.8rem; font-style: italic; color: var(--light); } @@ -33,6 +43,7 @@ float: left; width: 25px; fill: var(--light); + opacity: 0.8; } .component_page_workflow .box .workflow-summary button { color: var(--light); @@ -56,10 +67,10 @@ overflow: hidden; } .component_page_workflow .box [data-bind="form"] .formbuilder { - padding: 10px; + padding: 10px 10px 0px 10px; border-radius: 5px; margin-top: 10px; - border: 2px solid #ebebec; + border-top: 2px solid #ebebec; background: transparent; } .component_page_workflow .box.disabled [data-bind="form"] .formbuilder { @@ -90,6 +101,13 @@ border-radius: 15px; } +/* list page */ +.component_page_workflow h3.empty { + padding: 30px; + border: 2px dashed var(--light); + color: var(--light); +} + /* Workflow creation modal */ .component_workflow_create { font-size: 1rem; @@ -158,11 +176,14 @@ border-bottom-left-radius: 10px; font-weight: 600; } -.workflow-fab svg { +.workflow-fab button > * { height: 30px; fill: var(--bg-color); padding: 7px; } +.workflow-fab button > component-icon img { + filter: brightness(0) invert(1); +} /* ADD BUTTON */ .component_page_workflow [data-bind="add"] { @@ -183,8 +204,10 @@ color: var(--bg-color); font-family: monospace; text-transform: uppercase; - padding-left: 0; - padding-right: 0; + background:var(--border); + font-size: 0.85rem; + letter-spacing: -0.5px; + margin-right: 5px; } .component_page_workflow [data-bind="add"] .box svg { margin-right: 0; @@ -199,9 +222,7 @@ backdrop-filter: blur(2px); padding: 3px 8px; } -.component_page_workflow [data-bind="add"] .box .sub { - padding: 0 5px; -} .component_page_workflow [data-bind="add"] .box > .flex { padding: 0 5px 0 0; + margin-left: -5px; } diff --git a/public/assets/pages/adminpage/ctrl_workflow.js b/public/assets/pages/adminpage/ctrl_workflow.js index 865d36f0..eb50699c 100644 --- a/public/assets/pages/adminpage/ctrl_workflow.js +++ b/public/assets/pages/adminpage/ctrl_workflow.js @@ -2,254 +2,26 @@ import { createElement } from "../../lib/skeleton/index.js"; import { loadCSS } from "../../helpers/loader.js"; import AdminHOC from "./decorator.js"; +import { workflowAll } from "./model_workflow.js"; import ctrlList from "./ctrl_workflow_list.js"; import ctrlDetails from "./ctrl_workflow_details.js"; -const workflows = [ - { - id: "dummy", - name: "My First Workflow", - published: false, - trigger: { name: "user" }, - actions: [ - { - name: "tools/debug", - }, - { - name: "notify/email", - } - ], - }, - { - id: "uuid0", - name: "Detection de fichier d'inbox", - published: true, - lastEdited: "2 days ago", - trigger: { name: "watch" }, - actions: [ - { - name: "run/program", - }, - { - name: "notify/email", - }, - ], - }, - { - id: "uuid1", - name: "Notify team when file is moved", - published: true, - lastEdited: "2 days ago", - history: [], - trigger: { name: "operation" }, - actions: [ - { - name: "notify/email", - }, - ] - }, - { - id: "uuid2", - name: "Any change to the contract folder", - published: true, - history: [], - trigger: { name: "operation", values: {} }, - actions: [ - { - name: "notify/email", - // values: {} // TODO - }, - ] - }, -]; - -const triggers = [ - { - name: "schedule", - title: "On a Schedule", - icon: ``, - specs: { - "start": { - name: "test", - type: "datetime", - }, - "frequency": { - type: "select", - options: ["hourly", "weekly", "monthly", "yearly"], - }, - }, - }, - { - name: "user", - title: "When a User Creates a Request", - icon: ``, - specs: { - name: { type: "text" }, - form: { type: "text", placeholder: "Optional form user should submit" }, - visibility: { type: "text" }, - }, - }, - { - name: "operation", - title: "When a File Action Happens", - icon: ``, - specs: { - event: { - type: "text", - datalist: ["ls", "cat", "mkdir", "mv", "rm", "touch"], - multi: true, - }, - path: { - type: "text", - }, - }, - }, - { - name: "watch", - title: "When the Filesystem Changes", - icon: ``, - specs: { - token: { - type: "text", - }, - path: { - type: "text", - }, - }, - }, - { - name: "webhook", - title: "From a webhook", - icon: ``, - specs: { - url: { - type: "text", - readonly: true, - value: "http://example.com/workflow/webhook?id=generatedID", - } - }, - }, -]; - -const actions = [ - { - name: "run/program", - title: "Execute Program", - subtitle: "name", - icon: ``, - specs: { - name: { - type: "select", - options: ["ocr", "duplicate", "expiry", "sign"], - } - }, - }, - { - name: "run/api", - title: "Make API Call", - icon: ``, - specs: { - url: { - type: "text", - }, - method: { - options: ["POST", "PUT", "GET"], - }, - headers: { - type: "long_text", - }, - body: { - type: "long_text", - }, - }, - }, - { - name: "notify/email", - title: "Notify", - subtitle: "email", - icon: ``, - specs: { - email: { - type: "text", - }, - message: { - type: "long_text", - } - }, - }, - { - name: "approval/email", - title: "Approval", - subtitle: "email", - icon: ``, - specs: { - email: { - type: "text", - }, - message: { - type: "long_text", - }, - }, - }, - { - name: "tools/debug", - title: "Debug", - icon: ``, - specs: {}, - }, - { - name: "tools/map", - title: "Map", - icon: ``, - specs: { - transform: { - type: "long_text", - } - }, - } - // TODO: expand macros -]; - -const macros = [ - { - name: "files/mv", - specs: [ - { name: "from", type: "text" }, - { name: "to", type: "text" }, - ], - run: [ - { - name: "run/api", - values: [ - { name: "url", value: "{{ .endpoint }}/api/files/mv?from={{ .from }}&to={{ .to }}" }, - { name: "method", value: "POST" }, - { name: "headers", value: "Authorization: Bearer {{ .authorization }}" }, - ] - }, - ], - }, - // { - // name: "metadata/add", - // } -] - export default AdminHOC(async function(render) { await loadCSS(import.meta.url, "./ctrl_workflow.css"); render(createElement("")); + const { workflows, triggers, actions } = await workflowAll().toPromise(); + const specs = getSpecs(); if (specs) ctrlDetails(render, { workflow: specs, triggers, actions }); - else { - await new Promise((done) => setTimeout(() => done(), 100)); - ctrlList(render, { workflows, triggers, actions }); - } + else ctrlList(render, { workflows, triggers, actions }); }); function getSpecs() { - const GET = new URLSearchParams(location.search) + const GET = new URLSearchParams(location.search); try { return JSON.parse(atob(GET.get("specs"))); - } catch (err ) { + } catch (err) { return null; } } diff --git a/public/assets/pages/adminpage/ctrl_workflow_details.js b/public/assets/pages/adminpage/ctrl_workflow_details.js index f419038d..aaff9c74 100644 --- a/public/assets/pages/adminpage/ctrl_workflow_details.js +++ b/public/assets/pages/adminpage/ctrl_workflow_details.js @@ -1,31 +1,33 @@ -import { createElement, createFragment } from "../../lib/skeleton/index.js"; +import { createElement, createFragment, navigate } from "../../lib/skeleton/index.js"; +import rxjs, { effect, onClick } from "../../lib/rx.js"; import { animate, slideXIn, slideXOut } from "../../lib/animate.js"; -import { qs, qsa } from "../../lib/dom.js"; +import { qs, qsa, safe } from "../../lib/dom.js"; import { createForm, mutateForm } from "../../lib/form.js"; import { formTmpl } from "../../components/form.js"; -import { renderLeaf, useForm$, formObjToJSON$ } from "./helper_form.js"; +import { workflowUpsert, workflowDelete, workflowGet } from "./model_workflow.js"; import transition from "./animate.js"; -// TODO: auto id: Date.now().toString(36) + Math.random().toString(36).slice(2, 6); - export default async function(render, { workflow, triggers, actions }) { + const { id } = workflow; const $page = createElement(`
-

+

- ${workflow.name} +

- +

Activity

+
...
+
`); @@ -33,12 +35,12 @@ export default async function(render, { workflow, triggers, actions }) { // feature1: setup trigger const $trigger = qs($page, `[data-bind="trigger"]`); - $trigger.appendChild(await createTrigger({ workflow, triggers })); + $trigger.appendChild(await createTrigger({ trigger: workflow.trigger, triggers })); // feature2: setup actions const $actions = qs($page, `[data-bind="actions"]`); - for (let i=0; i { + createAction: async({ action, actions }) => { const $action = await createAction({ action, actions }); qs($action, `button[alt="delete"]`).onclick = (e) => removeAction(e.target); $actions.appendChild($action); @@ -55,7 +57,43 @@ export default async function(render, { workflow, triggers, actions }) { })); // feature4: save button - $page.parentElement.appendChild(createSave()); + effect(rxjs.of(createSave({ withDelete: !!id })).pipe( + rxjs.tap(($fab) => $page.parentElement.appendChild($fab)), + rxjs.mergeMap(($fab) => onClick(qs($fab, "button"))), + rxjs.tap(($button) => { + $button.setAttribute("disabled", "true"); + $button.firstElementChild.classList.add("hidden"); + $button.firstElementChild.nextElementSibling.classList.remove("hidden"); + }), + rxjs.mergeMap(($button) => { + let action = rxjs.of(null); + let redirect = !id; + const workflow = formData($page, { id }); + if (workflow.published === "delete") { + if (window.confirm("delete this workflow?")) { + action = workflowDelete(id); + redirect = true; + } + } else { + workflow.published = workflow.published === "publish"; + action = workflowUpsert(workflow); + } + const start = new Date(); + return action.pipe( + rxjs.switchMap((result) => { + const elapsed = new Date() - start; + return rxjs.of(result).pipe(rxjs.delay(Math.max(0, 400 - elapsed))); + }), + rxjs.finalize(() => { + $button.removeAttribute("disabled"); + $button.firstElementChild.classList.remove("hidden"); + $button.firstElementChild.nextElementSibling.classList.add("hidden"); + if (redirect) navigate(window.location.pathname); + else history.replaceState(null, "", window.location.pathname + "?specs=" + encodeURIComponent(btoa(JSON.stringify(workflow)))); + }), + ); + }), + )); // feature5: toggle form visibility qsa($page, `[data-bind="form"]`).forEach(($form) => { @@ -67,46 +105,92 @@ export default async function(render, { workflow, triggers, actions }) { qsa($page, `button[alt="delete"]`).forEach(($delete) => $delete.onclick = (e) => { removeAction(e.target); }); + + // feature7: history + const $history = qs($page, `[data-bind="history"]`); + if (!workflow.id) { + $history.previousElementSibling.remove(); + $history.remove(); + } else effect(rxjs.of(null).pipe( + rxjs.mergeMap(() => workflowGet(workflow.id)), + rxjs.tap(({ history }) => { + if (history.length === 0) $history.innerText = "Ø"; + else $history.innerText = history.map(({ id, created_at, status, steps }) => { + const [date, time] = created_at.split(" "); + let msg = `date=${safe(date)} time=${safe(time)} id=${safe(String(id))} status=${safe(status)}`; + try { steps = JSON.parse(steps); } + catch (err) { steps = []; } + if (steps.length > 0) { + const s = new Array(steps.length); + for (let i = 0; i < steps.length; i++) { + const { name, done = false } = steps[i]; + const out = name.split("/")[1] || "na"; + if (status === "FAILURE" && !done) s[i] = out + "[✗]"; + else if (status === "RUNNING" && !done && (steps[i-1] ? steps[i-1].done : true)) s[i] = out + "[○]"; + else if (["RUNNING", "PENDING"].indexOf(status) !== -1 && done) s[i] = out + "[✓]"; + else if (["READY", "CLAIMED"].indexOf(status) !== -1) s[i] = out + "[○]"; + else s[i] = out; + } + msg += "[" + s.join(" ➜ ") + "]"; + } + return msg; + }).join("\n"); + }), + rxjs.catchError(() => rxjs.of(null)), + rxjs.delay(5000), + rxjs.repeat(), + )); } -async function createTrigger({ workflow, triggers }) { - const trigger = triggers.find(({ name }) => name === workflow.trigger.name); - if (!trigger) return createElement(`
Trigger not found "${workflow.trigger.name}"
`); - const { title, icon } = trigger; +async function createTrigger({ trigger, triggers }) { + const currentTrigger = triggers.find(({ name }) => name === trigger.name); + if (!currentTrigger) return createElement(`
Trigger not found "${safe(trigger.name)}"
`); + const { title, icon } = currentTrigger; const $trigger = createFragment(`
${icon}

- ${title} +  ${safe(title)}

-
+

`); - const $form = await createForm(trigger.specs, formTmpl()); + const $form = await createForm( + mutateForm(currentTrigger.specs, trigger.params || {}), + formTmpl(), + ); qs($trigger, `[data-bind="form"]`).appendChild($form); return $trigger; } async function createAction({ action, actions }) { const selected = actions.find((_action) => _action.name === action.name); - if (!selected) return createElement(`
Action not found "${action.name}"
`); - const subtitle = selected.subtitle ? `({{ ${action.subtitle} }})` : ""; + if (!selected) return createElement(` +
+
Action not found "${safe(action.name)}"
+
+
+ `); + const subtitle = selected.subtitle && action.params && action.params[selected.subtitle] ? `(${safe(action.params[selected.subtitle])})` : ""; const $action = createElement(`
${selected.icon}

- ${selected.title} ${subtitle} +  ${safe(selected.title)} ${safe(subtitle)}

-
+

`); - const $form = await createForm(selected.specs, formTmpl()); + const $form = await createForm( + mutateForm(selected.specs, action.params || {}), + formTmpl(), + ); qs($action, `[data-bind="form"]`).appendChild($form); return $action; } @@ -130,27 +214,16 @@ async function createAdd({ actions, createAction }) { `); - const categories = actions.reduce((acc, { name }) => { - const s = name.split("/"); - if (!acc[s[0]]) acc[s[0]] = []; - acc[s[0]].push(name); - return acc; - }, {}); const $categories = createElement(` - `); render(transition($page)); - workflows.forEach((workflow) => qs($page, `[data-bind="workflows"]`).appendChild(createWorkflow(workflow))); + const $workflows = qs($page, `[data-bind="workflows"]`); + workflows.forEach((workflow) => $workflows.appendChild(createWorkflow(workflow))); + if (workflows.length === 0) $workflows.appendChild(createEmptyWorkflow()); effect(onClick(qs($page, "h2 > a")).pipe( rxjs.tap((a) => ctrlModal(createModal(), { triggers })), @@ -27,22 +29,31 @@ export default async function(render, { workflows, triggers }) { } function createWorkflow(specs) { - const { name, published, actions, trigger } = specs; + const { name, published, actions, trigger, updated_at } = specs; const summaryHTML = { - trigger: ``, - actions: actions.map(({ name }) => ``).join(""), + trigger: ``, + actions: (actions || []).map(({ name }) => ``).join(""), }; const $workflow = createElement(` - -

${name} (2 weeks ago)

+
+

+ ${safe(name)} + (${Intl.DateTimeFormat(navigator.language).format(new Date(safe(updated_at)))}) +

- ${summaryHTML.trigger} → ${summaryHTML.actions} + ${summaryHTML.trigger} ${summaryHTML.actions ? "→" + summaryHTML.actions : ""}
`); return $workflow; } +function createEmptyWorkflow() { + return createElement(` +

Add a new workflow to get started

+ `); +} + function ctrlModal(render, { triggers }) { const $page = createElement(`
@@ -61,7 +72,7 @@ function ctrlModal(render, { triggers }) { const $input = qs($page, "input"); effect(rxjs.of(triggers).pipe( rxjs.map((arr) => arr.map(({ name, title, icon }) => createElement(` - ${icon} ${title} + ${icon} ${safe(title)} `))), rxjs.map(($els) => { $list.innerHTML = ""; @@ -79,21 +90,21 @@ function ctrlModal(render, { triggers }) { rxjs.tap((e) => { const shouldOpen = e.target.value.length > 0; if ($list.clientHeight === 0 && shouldOpen) animate($list, { - time: Math.max(50, Math.min(height, 150)), + time: 300, keyframes: [{ height: "0" }, { height: `${height}px` }], onExit: () => $list.style.height = "", }); else if ($list.clientHeight > 0 && !shouldOpen) animate($list, { - time: Math.max(50, Math.min(height, 150)), + time: 100, keyframes: [{ height: `${height}px` }, { height: "0" }], onExit: () => $list.style.height = "0", }); - $els.forEach(($el) => $el.setAttribute("href", "./admin/workflow?specs="+btoa(JSON.stringify({ + $els.forEach(($el) => $el.setAttribute("href", "./admin/workflow?specs="+encodeURIComponent(btoa(JSON.stringify({ name: e.target.value, published: false, trigger: { name: $el.getAttribute("data-name") }, - actions: [ { name: "tools/debug" }] - })))); + actions: [{ name: "tools/debug" }] + }))))); }), )), )); diff --git a/public/assets/pages/adminpage/decorator_sidemenu.js b/public/assets/pages/adminpage/decorator_sidemenu.js index 296d1ef5..608e8a98 100644 --- a/public/assets/pages/adminpage/decorator_sidemenu.js +++ b/public/assets/pages/adminpage/decorator_sidemenu.js @@ -31,15 +31,15 @@ export default function(ctrl) {
  • - - Activity - -
  • -
  • Workflow
  • +
  • + + Activity + +
  • Settings diff --git a/public/assets/pages/adminpage/index.css b/public/assets/pages/adminpage/index.css index b17da01e..b77bc523 100644 --- a/public/assets/pages/adminpage/index.css +++ b/public/assets/pages/adminpage/index.css @@ -110,7 +110,7 @@ font-size: 0.9em; padding: 10px; margin-bottom: 0; - border-radius: 2px; + border-radius: 3px; color: white; max-width: 100%; overflow-x: auto; diff --git a/public/assets/pages/adminpage/model_workflow.js b/public/assets/pages/adminpage/model_workflow.js new file mode 100644 index 00000000..a846b105 --- /dev/null +++ b/public/assets/pages/adminpage/model_workflow.js @@ -0,0 +1,46 @@ +import rxjs from "../../lib/rx.js"; +import ajax from "../../lib/ajax.js"; + +export function workflowAll() { + return ajax({ + url: "admin/api/workflow", + responseType: "json", + }).pipe( + rxjs.map(({ responseJSON }) => responseJSON.result), + rxjs.map(({ workflows, triggers, actions }) => ({ + workflows, + triggers, + actions, + })), + ); +} + +export function workflowUpsert(body) { + return ajax({ + url: "admin/api/workflow", + responseType: "json", + method: "POST", + body, + }).pipe( + rxjs.map(({ responseJSON }) => responseJSON.result) + ); +} + +export function workflowDelete(id) { + return ajax({ + url: "admin/api/workflow?id=" + id, + responseType: "json", + method: "DELETE", + }).pipe( + rxjs.map(({ responseJSON }) => responseJSON.result) + ); +} + +export function workflowGet(id) { + return ajax({ + url: "admin/api/workflow/" + id, + responseType: "json", + }).pipe( + rxjs.map(({ responseJSON }) => responseJSON.result) + ); +} diff --git a/server/common/files.go b/server/common/files.go index 26ee6e15..3e580118 100644 --- a/server/common/files.go +++ b/server/common/files.go @@ -5,6 +5,8 @@ import ( "os" "path/filepath" "strings" + + "github.com/bmatcuk/doublestar/v4" ) var MOCK_CURRENT_DIR string @@ -106,6 +108,11 @@ func SafeOsRename(from string, to string) error { return processError(os.Rename(from, to)) } +func GlobMatch(pattern, name string) bool { + m, _ := doublestar.Match(pattern, name) + return m +} + func safePath(path string) error { p, err := filepath.EvalSymlinks(path) if err != nil { diff --git a/server/common/plugin.go b/server/common/plugin.go index 62cbb42e..73f3c73c 100644 --- a/server/common/plugin.go +++ b/server/common/plugin.go @@ -6,6 +6,7 @@ import ( "io/fs" "net/http" "path/filepath" + "sort" "strings" "github.com/gorilla/mux" @@ -286,6 +287,30 @@ func (this Get) Metadata() IMetadata { return meta } +var workflow_triggers []ITrigger + +func (this Register) WorkflowTrigger(t ITrigger) { + workflow_triggers = append(workflow_triggers, t) + sort.Slice(workflow_triggers, func(i, j int) bool { + return workflow_triggers[i].Manifest().Order < workflow_triggers[j].Manifest().Order + }) +} +func (this Get) WorkflowTriggers() []ITrigger { + return workflow_triggers +} + +var workflow_actions []IAction + +func (this Register) WorkflowAction(a IAction) { + workflow_actions = append(workflow_actions, a) + sort.Slice(workflow_actions, func(i, j int) bool { + return workflow_actions[i].Manifest().Order < workflow_actions[j].Manifest().Order + }) +} +func (this Get) WorkflowActions() []IAction { + return workflow_actions +} + func init() { Hooks.Register.FrontendOverrides(OverrideVideoSourceMapper) } diff --git a/server/common/types.go b/server/common/types.go index ea4956d2..5e4f5327 100644 --- a/server/common/types.go +++ b/server/common/types.go @@ -78,6 +78,30 @@ type IMetadata interface { Search(ctx *App, path string, facets map[string]any) (map[string][]FormElement, error) } +type ITrigger interface { + Manifest() WorkflowSpecs + Init() (chan ITriggerEvent, error) +} + +type IAction interface { + Manifest() WorkflowSpecs + Execute(params map[string]string, input map[string]string) (map[string]string, error) +} + +type ITriggerEvent interface { + WorkflowID() string + Input() map[string]string +} + +type WorkflowSpecs struct { + Name string `json:"name"` + Title string `json:"title"` + Subtitle string `json:"subtitle"` + Icon string `json:"icon"` + Specs map[string]FormElement `json:"specs"` + Order int `json:"-"` +} + type File struct { FName string `json:"name"` FType string `json:"type"` diff --git a/server/routes.go b/server/routes.go index df62a5be..5beebafa 100644 --- a/server/routes.go +++ b/server/routes.go @@ -13,6 +13,7 @@ import ( . "github.com/mickael-kerjean/filestash/server/common" . "github.com/mickael-kerjean/filestash/server/ctrl" . "github.com/mickael-kerjean/filestash/server/middleware" + . "github.com/mickael-kerjean/filestash/server/workflow" ) func Build(r *mux.Router, a App) { @@ -39,6 +40,10 @@ func Build(r *mux.Router, a App) { middlewares = []Middleware{ApiHeaders, AdminOnly, SecureOrigin, PluginInjector} admin.HandleFunc("/config", NewMiddlewareChain(PrivateConfigHandler, middlewares, a)).Methods("GET") admin.HandleFunc("/config", NewMiddlewareChain(PrivateConfigUpdateHandler, middlewares, a)).Methods("POST") + admin.HandleFunc("/workflow", NewMiddlewareChain(WorkflowAll, middlewares, a)).Methods("GET") + admin.HandleFunc("/workflow/{workflowID}", NewMiddlewareChain(WorkflowGet, middlewares, a)).Methods("GET") + admin.HandleFunc("/workflow", NewMiddlewareChain(WorkflowUpsert, middlewares, a)).Methods("POST") + admin.HandleFunc("/workflow", NewMiddlewareChain(WorkflowDelete, middlewares, a)).Methods("DELETE") admin.HandleFunc("/middlewares/authentication", NewMiddlewareChain(AdminAuthenticationMiddleware, middlewares, a)).Methods("GET") admin.HandleFunc("/audit", NewMiddlewareChain(FetchAuditHandler, middlewares, a)).Methods("GET") middlewares = []Middleware{IndexHeaders, AdminOnly, PluginInjector} diff --git a/server/workflow/action.go b/server/workflow/action.go new file mode 100644 index 00000000..5b5364aa --- /dev/null +++ b/server/workflow/action.go @@ -0,0 +1,24 @@ +package workflow + +import ( + . "github.com/mickael-kerjean/filestash/server/common" + _ "github.com/mickael-kerjean/filestash/server/workflow/actions" + . "github.com/mickael-kerjean/filestash/server/workflow/model" +) + +func ExecuteAction(action Step, input map[string]string) (map[string]string, error) { + currAction, err := findAction(action.Name) + if err != nil { + return nil, err + } + return currAction.Execute(action.Params, input) +} + +func findAction(action string) (IAction, error) { + for _, currAction := range Hooks.Get.WorkflowActions() { + if currAction.Manifest().Name == action { + return currAction, nil + } + } + return nil, ErrNotFound +} diff --git a/server/workflow/actions/notify_email.go b/server/workflow/actions/notify_email.go new file mode 100644 index 00000000..64d6ab06 --- /dev/null +++ b/server/workflow/actions/notify_email.go @@ -0,0 +1,61 @@ +package actions + +import ( + . "github.com/mickael-kerjean/filestash/server/common" + + "gopkg.in/gomail.v2" +) + +func init() { + Hooks.Register.WorkflowAction(&ActionNotifyEmail{}) +} + +type ActionNotifyEmail struct{} + +func (this *ActionNotifyEmail) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: "notify/email", + Title: "Notify", + Icon: ``, + Specs: map[string]FormElement{ + "email": { + Type: "text", + }, + "subject": { + Type: "text", + }, + "message": { + Type: "long_text", + }, + }, + } +} + +func (this *ActionNotifyEmail) Execute(params map[string]string, input map[string]string) (map[string]string, error) { + email := struct { + Hostname string + Port int + Username string + Password string + From string + To string + Subject string + Message string + }{ + Hostname: Config.Get("email.server").String(), + Port: Config.Get("email.port").Int(), + Username: Config.Get("email.username").String(), + Password: Config.Get("email.password").String(), + From: Config.Get("email.from").String(), + To: params["email"], + Subject: params["subject"], + Message: params["message"], + } + m := gomail.NewMessage() + m.SetHeader("From", email.From) + m.SetHeader("To", email.To) + m.SetHeader("Subject", email.Subject) + m.SetBody("text/html", email.Message) + mail := gomail.NewDialer(email.Hostname, email.Port, email.Username, email.Password) + return input, mail.DialAndSend(m) +} diff --git a/server/workflow/actions/run_api.go b/server/workflow/actions/run_api.go new file mode 100644 index 00000000..ec2caa9a --- /dev/null +++ b/server/workflow/actions/run_api.go @@ -0,0 +1,79 @@ +package actions + +import ( + "bytes" + "fmt" + "io" + "net/http" + "slices" + "strings" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +func init() { + Hooks.Register.WorkflowAction(&RunApi{}) +} + +type RunApi struct{} + +func (this *RunApi) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: "run/api", + Title: "Make API Call", + Icon: ``, + Specs: map[string]FormElement{ + "url": { + Type: "text", + }, + "method": { + Type: "select", + Opts: []string{"POST", "PUT", "GET", "PATCH"}, + }, + "headers": { + Type: "long_text", + }, + "body": { + Type: "long_text", + }, + }, + } +} + +func (this *RunApi) Execute(params map[string]string, input map[string]string) (map[string]string, error) { + req, err := http.NewRequest(params["method"], params["url"], bytes.NewBufferString(params["body"])) + if err != nil { + return input, err + } else if params["headers"] != "" { + for _, header := range strings.Split(params["headers"], "\n") { + if parts := strings.SplitN(strings.TrimSpace(header), ":", 2); len(parts) == 2 { + req.Header.Add( + strings.TrimSpace(parts[0]), + strings.TrimSpace(parts[1]), + ) + } + } + } + if params["body"] != "" && slices.Contains([]string{"POST", "PUT", "PATCH"}, params["method"]) && req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + resp, err := HTTP.Do(req) + if err != nil { + return input, err + } + defer resp.Body.Close() + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return input, NewError(fmt.Sprintf("received status code is %d", resp.StatusCode), resp.StatusCode) + } + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return input, err + } + output := make(map[string]string) + for k, v := range input { + output[k] = v + } + output["http::status"] = string(resp.StatusCode) + output["http::response"] = string(responseBody) + return output, nil +} diff --git a/server/workflow/actions/tools_debug.go b/server/workflow/actions/tools_debug.go new file mode 100644 index 00000000..2e730b57 --- /dev/null +++ b/server/workflow/actions/tools_debug.go @@ -0,0 +1,25 @@ +package actions + +import ( + . "github.com/mickael-kerjean/filestash/server/common" +) + +func init() { + Hooks.Register.WorkflowAction(&ToolsDebug{}) +} + +type ToolsDebug struct{} + +func (this *ToolsDebug) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: "tools/debug", + Title: "Debug", + Icon: ``, + Specs: map[string]FormElement{}, + } +} + +func (this *ToolsDebug) Execute(params map[string]string, input map[string]string) (map[string]string, error) { + Log.Info("[workflow] action=tools/debug input=%v", input) + return input, nil +} diff --git a/server/workflow/config.go b/server/workflow/config.go new file mode 100644 index 00000000..9fb80795 --- /dev/null +++ b/server/workflow/config.go @@ -0,0 +1,40 @@ +package workflow + +import ( + . "github.com/mickael-kerjean/filestash/server/common" +) + +func init() { + Hooks.Register.Onload(func() { + PluginEnable() + PluginNumberWorker() + }) +} + +var PluginEnable = func() bool { + return Config.Get("features.workflow.enable").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Name = "enable" + f.Type = "enable" + f.Target = []string{"workflow_workers"} + f.Description = "Enable/Disable workflows" + f.Default = true + return f + }).Bool() +} + +var PluginNumberWorker = func() int { + return Config.Get("features.workflow.workers").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "workflow_workers" + f.Name = "workers" + f.Type = "number" + f.Description = "Number of workers running in parallel. Default: 1" + f.Default = 1 + return f + }).Int() +} diff --git a/server/workflow/handler.go b/server/workflow/handler.go new file mode 100644 index 00000000..284e7a03 --- /dev/null +++ b/server/workflow/handler.go @@ -0,0 +1,73 @@ +package workflow + +import ( + "encoding/json" + "net/http" + + . "github.com/mickael-kerjean/filestash/server/common" + . "github.com/mickael-kerjean/filestash/server/workflow/model" + + "github.com/gorilla/mux" +) + +func WorkflowAll(ctx *App, res http.ResponseWriter, req *http.Request) { + workflows, err := AllWorkflows() + if err != nil { + SendErrorResult(res, err) + return + } + triggers := Hooks.Get.WorkflowTriggers() + tm := make([]WorkflowSpecs, len(triggers)) + for i, t := range triggers { + tm[i] = t.Manifest() + } + actions := Hooks.Get.WorkflowActions() + am := make([]WorkflowSpecs, len(actions)) + for i, a := range actions { + am[i] = a.Manifest() + } + SendSuccessResult(res, map[string]any{ + "workflows": workflows, + "triggers": tm, + "actions": am, + }) +} + +func WorkflowUpsert(ctx *App, res http.ResponseWriter, req *http.Request) { + var workflow Workflow + if err := json.NewDecoder(req.Body).Decode(&workflow); err != nil { + SendErrorResult(res, ErrInternal) + return + } + if err := UpsertWorkflow(workflow); err != nil { + SendErrorResult(res, err) + return + } + SendSuccessResult(res, nil) +} + +func WorkflowGet(ctx *App, res http.ResponseWriter, req *http.Request) { + workflowID := mux.Vars(req)["workflowID"] + if workflowID == "" { + SendErrorResult(res, ErrNotValid) + return + } + workflow, err := GetWorkflow(workflowID) + if err != nil { + SendErrorResult(res, err) + return + } + SendSuccessResult(res, workflow) +} + +func WorkflowDelete(ctx *App, res http.ResponseWriter, req *http.Request) { + id := req.URL.Query().Get("id") + if id == "" { + SendErrorResult(res, ErrNotValid) + return + } else if err := DeleteWorkflow(id); err != nil { + SendErrorResult(res, err) + return + } + SendSuccessResult(res, nil) +} diff --git a/server/workflow/index.go b/server/workflow/index.go new file mode 100644 index 00000000..88a6c94b --- /dev/null +++ b/server/workflow/index.go @@ -0,0 +1,64 @@ +package workflow + +import ( + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + . "github.com/mickael-kerjean/filestash/server/workflow/model" +) + +var ( + job_event = make(chan interface{}, 100) + workflow_enable = false +) + +func Init() error { + if err := InitState(); err != nil { + return err + } else if PluginEnable() == false { + Log.Debug("[workflow] state=disabled") + return nil + } + Log.Debug("[workflow] state=enabled worker=%d", PluginNumberWorker()) + + triggers := Hooks.Get.WorkflowTriggers() + for i := 0; i < len(triggers); i++ { + t, err := triggers[i].Init() + if err != nil { + return err + } + go func(t chan ITriggerEvent) { + for trigger := range t { + if err := CreateJob(trigger.WorkflowID(), trigger.Input()); err != nil { + Log.Error("[workflow] action=createJob err=%s", err.Error()) + } + select { + case job_event <- nil: + default: + } + } + }(t) + } + + for i := 0; i < PluginNumberWorker(); i++ { + go func(i int) { + time.Sleep(time.Duration((i+1)*100) * time.Millisecond) + for { + select { + case <-job_event: + case <-time.After(60 * time.Second): + } + jobID, workflow, input, err := NextJob() + if err == ErrNotFound { + continue + } else if err != nil { + Log.Error("[workflow] type=worker err=%s", err.Error()) + time.Sleep(10 * time.Second) + continue + } + ExecuteJob(jobID, workflow, input) + } + }(i) + } + return nil +} diff --git a/server/workflow/job.go b/server/workflow/job.go new file mode 100644 index 00000000..b043e08a --- /dev/null +++ b/server/workflow/job.go @@ -0,0 +1,29 @@ +package workflow + +import ( + . "github.com/mickael-kerjean/filestash/server/workflow/model" +) + +func ExecuteJob(jobID string, workflow Workflow, input map[string]string) { + var err error + UpdateJob(jobID, "RUNNING", workflow.Actions, input) + for i := 0; i < len(workflow.Actions); i++ { + if workflow.Actions[i].Done { + continue + } + input, err = ExecuteAction(workflow.Actions[i], input) + workflow.Actions[i].Done = true + if err != nil { + status := "FAILURE" + workflow.Actions[i].Done = false + if input["status"] == "PENDING" { + status = "PENDING" + } + UpdateJob(jobID, status, workflow.Actions, input) + return + } + UpdateJob(jobID, "RUNNING", workflow.Actions, input) + } + UpdateJob(jobID, "SUCCESS", workflow.Actions, map[string]string{}) + return +} diff --git a/server/workflow/model/block.go b/server/workflow/model/block.go new file mode 100644 index 00000000..cb4466c0 --- /dev/null +++ b/server/workflow/model/block.go @@ -0,0 +1,13 @@ +package model + +import ( + . "github.com/mickael-kerjean/filestash/server/common" +) + +type StepDefinition struct { + Name string `json:"name"` + Title string `json:"title"` + Subtitle string `json:"subtitle"` + Icon string `json:"icon"` + Specs map[string]FormElement `json:"specs"` +} diff --git a/server/workflow/model/index.go b/server/workflow/model/index.go new file mode 100644 index 00000000..383fd316 --- /dev/null +++ b/server/workflow/model/index.go @@ -0,0 +1,50 @@ +package model + +import ( + "database/sql" + + _ "github.com/mattn/go-sqlite3" + "github.com/mickael-kerjean/filestash/server/common" +) + +var db *sql.DB + +func InitState() (err error) { + db, err = sql.Open("sqlite3", common.GetAbsolutePath(common.DB_PATH, "workflow.sql")) + if err != nil { + return err + } + + db.Exec(` + CREATE TABLE IF NOT EXISTS workflows ( + id TEXT PRIMARY KEY, + name TEXT NOT NULL, + published BOOLEAN DEFAULT 0, + trigger TEXT NOT NULL, -- JSON encoded Step + actions TEXT NOT NULL, -- JSON encoded []Step + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP + ); + CREATE INDEX IF NOT EXISTS idx_workflows_trigger_name ON workflows(json_extract(trigger, '$.name'));`) + + db.Exec(` + CREATE TABLE IF NOT EXISTS jobs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + related_workflow TEXT NOT NULL, + status TEXT CHECK(status IN ('READY', 'PENDING', 'CLAIMED', 'RUNNING', 'SUCCESS', 'FAILURE')) DEFAULT 'READY', + steps TEXT NOT NULL, + input TEXT NOT NULL, + created_at DATETIME DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (related_workflow) REFERENCES workflows(id) + ); + CREATE INDEX IF NOT EXISTS idx_jobs_status ON jobs(status); + CREATE INDEX IF NOT EXISTS idx_jobs_workflow ON jobs(related_workflow, created_at DESC);`) + + db.Exec(` + UPDATE jobs + SET status = 'READY', updated_at = CURRENT_TIMESTAMP + WHERE status IN ('RUNNING', 'CLAIMED')`) + + return nil +} diff --git a/server/workflow/model/job.go b/server/workflow/model/job.go new file mode 100644 index 00000000..a7d3fe46 --- /dev/null +++ b/server/workflow/model/job.go @@ -0,0 +1,116 @@ +package model + +import ( + "database/sql" + "encoding/json" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +type Job struct { + ID int `json:"id"` + RelatedWorkflow string `json:"related_workflow"` + Status string `json:"status"` + Steps []Step `json:"steps"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` +} + +func CreateJob(workflowID string, input map[string]string) error { + workflow, err := GetWorkflow(workflowID) + if err != nil { + return err + } + stepsJSON, err := json.Marshal(workflow.Actions) + if err != nil { + return err + } + inputJSON, err := json.Marshal(input) + if err != nil { + return err + } + tx, err := db.Begin() + if err != nil { + return err + } + defer tx.Rollback() + if _, err = tx.Exec(` + INSERT INTO jobs (related_workflow, status, steps, input) + VALUES (?, 'READY', ?, ?) + `, workflowID, string(stepsJSON), string(inputJSON)); err != nil { + return err + } + if _, err = tx.Exec(`DELETE FROM jobs WHERE related_workflow = ? AND id NOT IN ( + SELECT id FROM jobs + WHERE related_workflow = ? + ORDER BY created_at DESC + LIMIT 1000 + )`, workflowID, workflowID); err != nil { + return err + } + return tx.Commit() +} + +func NextJob() (string, Workflow, map[string]string, error) { + tx, err := db.Begin() + if err != nil { + return "", Workflow{}, nil, err + } + defer tx.Rollback() + query := ` + SELECT id, related_workflow, steps, input + FROM jobs + WHERE status = 'READY' + ORDER BY updated_at ASC + LIMIT 1` + var ( + jobID string + workflowID string + stepsJSON string + inputJSON string + ) + if err = tx.QueryRow(query).Scan(&jobID, &workflowID, &stepsJSON, &inputJSON); err != nil { + if err == sql.ErrNoRows { + return "", Workflow{}, nil, ErrNotFound + } + return "", Workflow{}, nil, err + } + if _, err = tx.Exec(`UPDATE jobs SET status = 'CLAIMED', updated_at = CURRENT_TIMESTAMP WHERE id = ?`, jobID); err != nil { + return "", Workflow{}, nil, err + } else if err = tx.Commit(); err != nil { + return "", Workflow{}, nil, err + } + workflow, err := GetWorkflow(workflowID) + if err != nil { + return "", Workflow{}, nil, err + } + var input map[string]string + if err = json.Unmarshal([]byte(inputJSON), &input); err != nil { + return "", Workflow{}, nil, err + } + input["jobID"] = jobID + input["workflowID"] = workflow.ID + input["trigger"] = workflow.Trigger.Name + return jobID, workflow, input, nil +} + +func UpdateJob(jobID string, status string, steps []Step, input map[string]string) { + stepsJSON, err := json.Marshal(steps) + if err != nil { + Log.Error("[workflow] from=job on=updateJob step=marshal err=%s", err.Error()) + return + } + inputJSON, err := json.Marshal(input) + if err != nil { + Log.Error("[workflow] from=job on=updateJob step=marshal err=%s", err.Error()) + return + } + query := ` + UPDATE jobs + SET status = ?, steps = ?, input = ?, updated_at = CURRENT_TIMESTAMP + WHERE id = ? + ` + if _, err := db.Exec(query, status, string(stepsJSON), string(inputJSON), jobID); err != nil { + Log.Error("[workflow] from=job on=updateJob err=%s", err.Error()) + } +} diff --git a/server/workflow/model/workflow.go b/server/workflow/model/workflow.go new file mode 100644 index 00000000..4476b7b6 --- /dev/null +++ b/server/workflow/model/workflow.go @@ -0,0 +1,150 @@ +package model + +import ( + "database/sql" + "encoding/json" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +type Workflow struct { + ID string `json:"id"` + Name string `json:"name"` + Published bool `json:"published"` + Trigger Step `json:"trigger"` + Actions []Step `json:"actions"` + UpdatedAt string `json:"updated_at"` + CreatedAt string `json:"created_at"` + History []any `json:"history"` +} + +type Step struct { + Name string `json:"name"` + Params map[string]string `json:"params",omitzero` + Done bool `json:"done,omitempty"` +} + +func FindWorkflows(triggerName string) ([]Workflow, error) { + rows, err := db.Query(` + SELECT w.id, w.name, w.published, w.trigger, w.actions, w.created_at, w.updated_at + FROM workflows w + WHERE json_extract(w.trigger, '$.name') = ? + ORDER BY w.created_at DESC + `, triggerName) + if err != nil { + return nil, err + } + defer rows.Close() + + var workflows = []Workflow{} + for rows.Next() { + var w Workflow + var triggerJSON, actionsJSON string + if err := rows.Scan(&w.ID, &w.Name, &w.Published, &triggerJSON, &actionsJSON, &w.CreatedAt, &w.UpdatedAt); err != nil { + return nil, err + } + if err := json.Unmarshal([]byte(triggerJSON), &w.Trigger); err != nil { + return nil, err + } + if err := json.Unmarshal([]byte(actionsJSON), &w.Actions); err != nil { + return nil, err + } + workflows = append(workflows, w) + } + return workflows, rows.Err() +} + +func AllWorkflows() ([]Workflow, error) { + rows, err := db.Query(` + SELECT id, name, published, trigger, actions, created_at, updated_at + FROM workflows + ORDER BY created_at DESC + `) + if err != nil { + return nil, err + } + defer rows.Close() + + var workflows = []Workflow{} + for rows.Next() { + var w Workflow + var triggerJSON, actionsJSON string + err := rows.Scan(&w.ID, &w.Name, &w.Published, &triggerJSON, &actionsJSON, &w.CreatedAt, &w.UpdatedAt) + if err != nil { + return nil, err + } + if err := json.Unmarshal([]byte(triggerJSON), &w.Trigger); err != nil { + return nil, err + } + if err := json.Unmarshal([]byte(actionsJSON), &w.Actions); err != nil { + return nil, err + } + workflows = append(workflows, w) + } + return workflows, rows.Err() +} + +func UpsertWorkflow(workflow Workflow) error { + triggerJSON, err := json.Marshal(workflow.Trigger) + if err != nil { + return err + } + actionsJSON, err := json.Marshal(workflow.Actions) + if err != nil { + return err + } + query := ` + INSERT OR REPLACE INTO workflows (id, name, published, trigger, actions, updated_at) + VALUES (?, ?, ?, ?, ?, CURRENT_TIMESTAMP)` + _, err = db.Exec(query, workflow.ID, workflow.Name, workflow.Published, string(triggerJSON), string(actionsJSON)) + return err +} + +func GetWorkflow(id string) (Workflow, error) { + query := ` + SELECT w.id, w.name, w.published, w.trigger, w.actions, w.created_at, w.updated_at, + (SELECT COALESCE(JSON_GROUP_ARRAY(JSON_OBJECT( + 'id', j.id, 'status', j.status, 'created_at', j.created_at, 'steps', j.steps + )), JSON_ARRAY()) FROM ( + SELECT * FROM jobs j + WHERE j.related_workflow = w.id + ORDER BY j.created_at DESC + LIMIT 3000 + ) j) as history + FROM workflows w + WHERE w.id = ?` + row := db.QueryRow(query, id) + + var w Workflow + var triggerJSON, actionsJSON, historyJSON string + if err := row.Scan(&w.ID, &w.Name, &w.Published, &triggerJSON, &actionsJSON, &w.CreatedAt, &w.UpdatedAt, &historyJSON); err != nil { + if err == sql.ErrNoRows { + return Workflow{}, ErrNotFound + } + return Workflow{}, err + } + if err := json.Unmarshal([]byte(triggerJSON), &w.Trigger); err != nil { + return Workflow{}, err + } + if err := json.Unmarshal([]byte(actionsJSON), &w.Actions); err != nil { + return Workflow{}, err + } + if err := json.Unmarshal([]byte(historyJSON), &w.History); err != nil { + return Workflow{}, err + } + return w, nil +} + +func DeleteWorkflow(id string) error { + result, err := db.Exec(`DELETE FROM workflows WHERE id = ?`, id) + if err != nil { + return err + } + rowsAffected, err := result.RowsAffected() + if err != nil { + return err + } else if rowsAffected == 0 { + return ErrNotFound + } + return nil +} diff --git a/server/workflow/trigger.go b/server/workflow/trigger.go new file mode 100644 index 00000000..5aaad17a --- /dev/null +++ b/server/workflow/trigger.go @@ -0,0 +1,5 @@ +package workflow + +import ( + _ "github.com/mickael-kerjean/filestash/server/workflow/trigger" +) diff --git a/server/workflow/trigger/fileaction.go b/server/workflow/trigger/fileaction.go new file mode 100644 index 00000000..51b65710 --- /dev/null +++ b/server/workflow/trigger/fileaction.go @@ -0,0 +1,123 @@ +package trigger + +import ( + "strings" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +var ( + fileaction_event = make(chan ITriggerEvent, 1) + fileaction_name = "event" +) + +func init() { + Hooks.Register.WorkflowTrigger(&FileEventTrigger{}) + Hooks.Register.AuthorisationMiddleware(hookAuthorisation{}) +} + +type hookAuthorisation struct{} + +func (this hookAuthorisation) Ls(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "ls", "path": path}) + return nil +} + +func (this hookAuthorisation) Cat(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "cat", "path": path}) + return nil +} + +func (this hookAuthorisation) Mkdir(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "mkdir", "path": path}) + return nil +} + +func (this hookAuthorisation) Rm(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "rm", "path": path}) + return nil +} + +func (this hookAuthorisation) Mv(ctx *App, from string, to string) error { + processFileAction(ctx, map[string]string{"event": "mv", "path": from + ", " + to}) + return nil +} + +func (this hookAuthorisation) Save(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "save", "path": path}) + return nil +} + +func (this hookAuthorisation) Touch(ctx *App, path string) error { + processFileAction(ctx, map[string]string{"event": "touch", "path": path}) + return nil +} + +type FileEventTrigger struct{} + +func (this *FileEventTrigger) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: fileaction_name, + Title: "When Something Happen", + Icon: ``, + Specs: map[string]FormElement{ + "event": { + Type: "text", + Datalist: []string{"ls", "cat", "mkdir", "mv", "rm", "touch"}, + MultiValue: true, + }, + "path": { + Type: "text", + }, + }, + Order: 3, + } +} + +func (this *FileEventTrigger) Init() (chan ITriggerEvent, error) { + return fileaction_event, nil +} + +func processFileAction(ctx *App, params map[string]string) { + if ctx.Context.Value("AUDIT") == false { + return + } + if err := triggerEvents(fileaction_event, fileaction_name, fileactionCallback(params)); err != nil { + Log.Error("[workflow] trigger=event step=triggerEvents err=%s", err.Error()) + } +} + +func fileactionCallback(out map[string]string) func(map[string]string) (map[string]string, bool) { + return func(params map[string]string) (map[string]string, bool) { + if !matchEvent(params["event"], out["event"]) { + return out, false + } else if !matchPath(params["path"], out["path"]) { + return out, false + } + return out, true + } +} + +func matchEvent(paramValue string, eventValue string) bool { + if paramValue == "" { + return true + } + for _, pvalue := range strings.Split(paramValue, ",") { + if strings.TrimSpace(pvalue) == eventValue { + return true + } + } + return false +} + +func matchPath(paramValue string, eventValue string) bool { + if paramValue == "" { + return true + } + for _, epath := range strings.Split(eventValue, ",") { + if GlobMatch(paramValue, strings.TrimSpace(epath)) { + return true + } + } + return false +} diff --git a/server/workflow/trigger/filewatch.go b/server/workflow/trigger/filewatch.go new file mode 100644 index 00000000..f9450421 --- /dev/null +++ b/server/workflow/trigger/filewatch.go @@ -0,0 +1,117 @@ +package trigger + +import ( + "context" + "encoding/json" + "os" + "sync" + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/model" +) + +var ( + filewatch_event = make(chan ITriggerEvent, 1) + filewatch_name = "watch" + filewatch_state sync.Map +) + +func init() { + Hooks.Register.WorkflowTrigger(&WatchTrigger{}) +} + +type WatchTrigger struct{} + +func (this *WatchTrigger) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: "watch", + Title: "When the Filesystem Changes", + Icon: ``, + Specs: map[string]FormElement{ + "token": { + Type: "text", + }, + "path": { + Type: "text", + }, + }, + Order: 4, + } +} + +func (this *WatchTrigger) Init() (chan ITriggerEvent, error) { + go func() { + for { + if err := triggerEvents(filewatch_event, filewatch_name, filewatchCallback); err != nil { + Log.Error("[workflow] trigger=watch step=triggerEvents err=%s", err.Error()) + } + time.Sleep(10 * time.Second) + } + }() + return filewatch_event, nil +} + +func filewatchCallback(params map[string]string) (map[string]string, bool) { + out := map[string]string{"path": params["path"]} + backend, session, err := createBackend(params["token"]) + if err != nil { + Log.Error("[workflow] trigger=filewatch step=callback::init err=%s", err.Error()) + return out, false + } + files, err := backend.Ls(params["path"]) + if err != nil { + Log.Error("[workflow] trigger=filewatch step=callback::ls err=%s", err.Error()) + return out, false + } + key := GenerateID(session) + params["path"] + fincache, exists := filewatch_state.Load(key) + if !exists { + filewatch_state.Store(key, files) + return out, false + } + prevFiles := fincache.([]os.FileInfo) + if len(files) != len(prevFiles) { + filewatch_state.Store(key, files) + return out, true + } + changes := []string{} + for i := 0; i < len(files); i++ { + hasChange := false + if files[i].Name() != prevFiles[i].Name() { + hasChange = true + } else if files[i].Size() != prevFiles[i].Size() { + hasChange = true + } else if files[i].ModTime() != prevFiles[i].ModTime() { + hasChange = true + } + if hasChange { + p := JoinPath(params["path"], files[i].Name()) + if files[i].IsDir() { + p = EnforceDirectory(p) + } + changes = append(changes, p) + } + } + if len(changes) > 0 { + filewatch_state.Store(key, files) + return out, true + } + return out, false +} + +func createBackend(token string) (IBackend, map[string]string, error) { + session := map[string]string{} + str, err := DecryptString(SECRET_KEY_DERIVATE_FOR_USER, token) + if err != nil { + return nil, session, err + } + if err = json.Unmarshal([]byte(str), &session); err != nil { + return nil, session, err + } + backend, err := model.NewBackend( + &App{Context: context.Background()}, + session, + ) + return backend, session, err +} diff --git a/server/workflow/trigger/index.go b/server/workflow/trigger/index.go new file mode 100644 index 00000000..0416dbc0 --- /dev/null +++ b/server/workflow/trigger/index.go @@ -0,0 +1,55 @@ +package trigger + +import ( + "encoding/json" + "net/http" + + . "github.com/mickael-kerjean/filestash/server/common" + . "github.com/mickael-kerjean/filestash/server/workflow/model" +) + +type TriggerEvent struct { + ID string + Params map[string]string +} + +func (this TriggerEvent) Input() map[string]string { + return this.Params +} + +func (this *TriggerEvent) WorkflowID() string { + return this.ID +} + +func triggerEvents(event chan ITriggerEvent, triggerID string, callback func(params map[string]string) (map[string]string, bool)) error { + workflows, err := FindWorkflows(triggerID) + if err != nil { + return err + } + for _, workflow := range workflows { + if !workflow.Published { + continue + } + params, emit := callback(workflow.Trigger.Params) + if !emit { + continue + } + select { + case event <- &TriggerEvent{ + ID: workflow.ID, + Params: params, + }: + default: + return NewError("Workflow is busy", http.StatusServiceUnavailable) + } + } + return nil +} + +func toJSON(val any) string { + b, err := json.Marshal(val) + if err != nil { + return "{}" + } + return string(b) +} diff --git a/server/workflow/trigger/schedule.go b/server/workflow/trigger/schedule.go new file mode 100644 index 00000000..e364fb43 --- /dev/null +++ b/server/workflow/trigger/schedule.go @@ -0,0 +1,66 @@ +package trigger + +import ( + "time" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +var ( + cron_name = "schedule" + cron_event = make(chan ITriggerEvent, 10) +) + +func init() { + Hooks.Register.WorkflowTrigger(&ScheduleTrigger{}) +} + +type ScheduleTrigger struct{} + +func (this *ScheduleTrigger) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: cron_name, + Title: "On a Schedule", + Subtitle: "frequency", + Icon: ``, + Specs: map[string]FormElement{ + "frequency": { + Type: "select", + Opts: []string{"per-minute", "hourly", "daily", "weekly", "monthly"}, + Value: "daily", + }, + }, + Order: 1, + } +} + +func (this *ScheduleTrigger) Init() (chan ITriggerEvent, error) { + go func() { + for { + if err := triggerEvents(cron_event, cron_name, scheduleCallback); err != nil { + Log.Error("[workflow] trigger=schedule step=triggerEvents err=%s", err.Error()) + } + time.Sleep(60 * time.Second) + } + }() + return cron_event, nil +} + +func scheduleCallback(params map[string]string) (map[string]string, bool) { + shouldTrigger := false + now := time.Now() + out := map[string]string{"frequency": params["frequency"]} + switch params["frequency"] { + case "per-minute": + shouldTrigger = true + case "hourly": + shouldTrigger = now.Minute() == 0 + case "daily": + shouldTrigger = now.Hour() == 0 && now.Minute() == 0 + case "weekly": + shouldTrigger = now.Weekday() == time.Sunday && now.Hour() == 0 && now.Minute() == 0 + case "monthly": + shouldTrigger = now.Day() == 1 && now.Hour() == 0 && now.Minute() == 0 + } + return out, shouldTrigger +} diff --git a/server/workflow/trigger/webhook.go b/server/workflow/trigger/webhook.go new file mode 100644 index 00000000..29816fbd --- /dev/null +++ b/server/workflow/trigger/webhook.go @@ -0,0 +1,69 @@ +package trigger + +import ( + "net/http" + "strings" + + . "github.com/mickael-kerjean/filestash/server/common" + + "github.com/gorilla/mux" +) + +var ( + webhook_event = make(chan ITriggerEvent, 5) + webhook_name = "webhook" +) + +func init() { + Hooks.Register.WorkflowTrigger(&WebhookTrigger{}) + Hooks.Register.HttpEndpoint(func(r *mux.Router, app *App) error { + r.HandleFunc(WithBase("/api/workflow/webhook"), func(w http.ResponseWriter, r *http.Request) { + if err := triggerEvents(webhook_event, webhook_name, webhookCallback(r)); err != nil { + SendErrorResult(w, err) + return + } + SendSuccessResult(w, nil) + }).Methods("GET", "POST") + return nil + }) +} + +func webhookCallback(r *http.Request) func(params map[string]string) (map[string]string, bool) { + return func(params map[string]string) (map[string]string, bool) { + headers := map[string]any{} + for k, v := range r.Header { + headers[k] = strings.Join(v, ", ") + } + query := map[string]any{} + for k, v := range r.URL.Query() { + query[k] = strings.Join(v, ", ") + } + return map[string]string{ + "method": r.Method, + "headers": toJSON(headers), + "query": toJSON(query), + }, true + } +} + +type WebhookTrigger struct{} + +func (this *WebhookTrigger) Manifest() WorkflowSpecs { + return WorkflowSpecs{ + Name: webhook_name, + Title: "From a WebHook", + Icon: ``, + Specs: map[string]FormElement{ + "url": { + Type: "text", + ReadOnly: true, + Value: "/api/workflow/webhook", + }, + }, + Order: 5, + } +} + +func (this *WebhookTrigger) Init() (chan ITriggerEvent, error) { + return webhook_event, nil +}