From b2c9b293843ff4c09d8de776c4f703be15ff82c4 Mon Sep 17 00:00:00 2001 From: MickaelK Date: Mon, 3 Feb 2025 16:01:50 +1100 Subject: [PATCH] chore (refactoring): revamp sqlite full text search --- server/plugin/plg_search_sqlitefts/config.go | 161 ------ .../plg_search_sqlitefts/converter/index.go | 34 ++ .../crawler/configuration.go | 128 +++++ .../plg_search_sqlitefts/crawler/daemon.go | 41 ++ .../crawler/daemon_state.go | 70 +++ .../plg_search_sqlitefts/crawler/events.go | 191 +++++++ .../plg_search_sqlitefts/crawler/phase.go | 54 ++ .../crawler/phase_explore.go | 110 ++++ .../crawler/phase_indexing.go | 36 ++ .../crawler/phase_maintain.go | 40 ++ .../crawler/phase_utils.go | 131 +++++ .../{utils.go => crawler/types.go} | 7 + .../plugin/plg_search_sqlitefts/crawlstate.go | 140 ----- server/plugin/plg_search_sqlitefts/index.go | 129 +---- .../plg_search_sqlitefts/indexer/error.go | 24 + .../plg_search_sqlitefts/indexer/index.go | 266 ++++++++++ .../plg_search_sqlitefts/indexer/query.go | 40 ++ server/plugin/plg_search_sqlitefts/query.go | 20 + server/plugin/plg_search_sqlitefts/spider.go | 487 ------------------ 19 files changed, 1195 insertions(+), 914 deletions(-) delete mode 100644 server/plugin/plg_search_sqlitefts/config.go create mode 100644 server/plugin/plg_search_sqlitefts/converter/index.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/configuration.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/daemon.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/daemon_state.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/events.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/phase.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/phase_explore.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go create mode 100644 server/plugin/plg_search_sqlitefts/crawler/phase_utils.go rename server/plugin/plg_search_sqlitefts/{utils.go => crawler/types.go} (89%) delete mode 100644 server/plugin/plg_search_sqlitefts/crawlstate.go create mode 100644 server/plugin/plg_search_sqlitefts/indexer/error.go create mode 100644 server/plugin/plg_search_sqlitefts/indexer/index.go create mode 100644 server/plugin/plg_search_sqlitefts/indexer/query.go create mode 100644 server/plugin/plg_search_sqlitefts/query.go delete mode 100644 server/plugin/plg_search_sqlitefts/spider.go diff --git a/server/plugin/plg_search_sqlitefts/config.go b/server/plugin/plg_search_sqlitefts/config.go deleted file mode 100644 index 2d36fe95..00000000 --- a/server/plugin/plg_search_sqlitefts/config.go +++ /dev/null @@ -1,161 +0,0 @@ -package plg_search_sqlitefts - -import ( - . "github.com/mickael-kerjean/filestash/server/common" - "time" -) - -var ( - SEARCH_ENABLE func() bool - SEARCH_PROCESS_MAX func() int - SEARCH_PROCESS_PAR func() int - SEARCH_REINDEX func() int - CYCLE_TIME func() int - INDEXING_EXT func() string - MAX_INDEXING_FSIZE func() int - INDEXING_EXCLUSION = []string{ - "/node_modules/", "/bower_components/", - "/.cache/", "/.npm/", "/.git/", - } -) - -func init() { - SEARCH_ENABLE = func() bool { - return Config.Get("features.search.enable").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Name = "enable" - f.Type = "enable" - f.Target = []string{ - "process_max", "process_par", "reindex_time", - "cycle_time", "max_size", "indexer_ext", - } - f.Description = "Enable/Disable full text search" - f.Placeholder = "Default: false" - f.Default = false - return f - }).Bool() - } - SEARCH_PROCESS_MAX = func() int { - return Config.Get("features.search.process_max").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "process_max" - f.Name = "process_max" - f.Type = "number" - f.Description = "Size of the pool containing the indexers" - f.Placeholder = "Default: 5" - f.Default = 5 - return f - }).Int() - } - SEARCH_PROCESS_PAR = func() int { - return Config.Get("features.search.process_par").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "process_par" - f.Name = "process_par" - f.Type = "number" - f.Description = "How many concurrent indexers are running in the same time (requires a restart)" - f.Placeholder = "Default: 2" - f.Default = 2 - return f - }).Int() - } - SEARCH_REINDEX = func() int { - return Config.Get("features.search.reindex_time").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "reindex_time" - f.Name = "reindex_time" - f.Type = "number" - f.Description = "Time in hours after which we consider our index to be stale and needs to be reindexed" - f.Placeholder = "Default: 24h" - f.Default = 24 - return f - }).Int() - } - CYCLE_TIME = func() int { - return Config.Get("features.search.cycle_time").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "cycle_time" - f.Name = "cycle_time" - f.Type = "number" - f.Description = "Time the indexer needs to spend for each cycle in seconds (discovery, indexing and maintenance)" - f.Placeholder = "Default: 10s" - f.Default = 10 - return f - }).Int() - } - MAX_INDEXING_FSIZE = func() int { - return Config.Get("features.search.max_size").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "max_size" - f.Name = "max_size" - f.Type = "number" - f.Description = "Maximum size of files the indexer will perform full text search" - f.Placeholder = "Default: 524288000 => 512MB" - f.Default = 524288000 - return f - }).Int() - } - INDEXING_EXT = func() string { - return Config.Get("features.search.indexer_ext").Schema(func(f *FormElement) *FormElement { - if f == nil { - f = &FormElement{} - } - f.Id = "indexer_ext" - f.Name = "indexer_ext" - f.Type = "text" - f.Description = "File extension we want to see indexed" - f.Placeholder = "Default: org,txt,docx,pdf,md,form" - f.Default = "org,txt,docx,pdf,md,form" - return f - }).String() - } - - Hooks.Register.Onload(func() { - SEARCH_ENABLE() - SEARCH_PROCESS_MAX() - SEARCH_PROCESS_PAR() - SEARCH_REINDEX() - CYCLE_TIME() - MAX_INDEXING_FSIZE() - INDEXING_EXT() - - onChange := Config.ListenForChange() - runner := func() { - startSearch := false - for { - if SEARCH_ENABLE() == false { - select { - case <-onChange.Listener: - startSearch = SEARCH_ENABLE() - } - if startSearch == false { - continue - } - } - sidx := SProc.Peek() - if sidx == nil { - time.Sleep(5 * time.Second) - continue - } - sidx.mu.Lock() - sidx.Execute() - sidx.mu.Unlock() - } - } - for i := 0; i < SEARCH_PROCESS_PAR(); i++ { - go runner() - } - }) -} diff --git a/server/plugin/plg_search_sqlitefts/converter/index.go b/server/plugin/plg_search_sqlitefts/converter/index.go new file mode 100644 index 00000000..2e18f393 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/converter/index.go @@ -0,0 +1,34 @@ +package converter + +import ( + "io" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/model/formater" +) + +func Convert(path string, reader io.ReadCloser) (out io.ReadCloser, err error) { + switch GetMimeType(path) { + case "text/plain": + out, err = formater.TxtFormater(reader) + case "text/org": + out, err = formater.TxtFormater(reader) + case "text/markdown": + out, err = formater.TxtFormater(reader) + case "application/x-form": + out, err = formater.TxtFormater(reader) + case "application/pdf": + out, err = formater.PdfFormater(reader) + case "application/powerpoint": + out, err = formater.OfficeFormater(reader) + case "application/vnd.ms-powerpoint": + out, err = formater.OfficeFormater(reader) + case "application/word": + out, err = formater.OfficeFormater(reader) + case "application/msword": + out, err = formater.OfficeFormater(reader) + default: + err = ErrNotImplemented + } + return out, err +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/configuration.go b/server/plugin/plg_search_sqlitefts/crawler/configuration.go new file mode 100644 index 00000000..9fa80c26 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/configuration.go @@ -0,0 +1,128 @@ +package plg_search_sqlitefts + +import ( + . "github.com/mickael-kerjean/filestash/server/common" +) + +func init() { + Hooks.Register.Onload(func() { + SEARCH_ENABLE() + SEARCH_PROCESS_MAX() + SEARCH_PROCESS_PAR() + SEARCH_REINDEX() + CYCLE_TIME() + MAX_INDEXING_FSIZE() + INDEXING_EXT() + }) +} + +var INDEXING_EXCLUSION = []string{ + "/node_modules/", "/bower_components/", + "/.cache/", "/.npm/", "/.git/", +} + +var SEARCH_ENABLE = func() bool { + return Config.Get("features.search.enable").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Name = "enable" + f.Type = "enable" + f.Target = []string{ + "process_max", "process_par", "reindex_time", + "cycle_time", "max_size", "indexer_ext", + } + f.Description = "Enable/Disable full text search" + f.Placeholder = "Default: false" + f.Default = false + return f + }).Bool() +} + +var SEARCH_PROCESS_MAX = func() int { + return Config.Get("features.search.process_max").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "process_max" + f.Name = "process_max" + f.Type = "number" + f.Description = "Size of the pool containing the indexers" + f.Placeholder = "Default: 5" + f.Default = 5 + return f + }).Int() +} + +var SEARCH_PROCESS_PAR = func() int { + return Config.Get("features.search.process_par").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "process_par" + f.Name = "process_par" + f.Type = "number" + f.Description = "How many concurrent indexers are running in the same time (requires a restart)" + f.Placeholder = "Default: 2" + f.Default = 2 + return f + }).Int() +} + +var SEARCH_REINDEX = func() int { + return Config.Get("features.search.reindex_time").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "reindex_time" + f.Name = "reindex_time" + f.Type = "number" + f.Description = "Time in hours after which we consider our index to be stale and needs to be reindexed" + f.Placeholder = "Default: 24h" + f.Default = 24 + return f + }).Int() +} +var CYCLE_TIME = func() int { + return Config.Get("features.search.cycle_time").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "cycle_time" + f.Name = "cycle_time" + f.Type = "number" + f.Description = "Time the indexer needs to spend for each cycle in seconds (discovery, indexing and maintenance)" + f.Placeholder = "Default: 10s" + f.Default = 10 + return f + }).Int() +} + +var MAX_INDEXING_FSIZE = func() int { + return Config.Get("features.search.max_size").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "max_size" + f.Name = "max_size" + f.Type = "number" + f.Description = "Maximum size of files the indexer will perform full text search" + f.Placeholder = "Default: 524288000 => 512MB" + f.Default = 524288000 + return f + }).Int() +} +var INDEXING_EXT = func() string { + return Config.Get("features.search.indexer_ext").Schema(func(f *FormElement) *FormElement { + if f == nil { + f = &FormElement{} + } + f.Id = "indexer_ext" + f.Name = "indexer_ext" + f.Type = "text" + f.Description = "File extension we want to see indexed" + f.Placeholder = "Default: org,txt,docx,pdf,md,form" + f.Default = "org,txt,docx,pdf,md,form" + return f + }).String() +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/daemon.go b/server/plugin/plg_search_sqlitefts/crawler/daemon.go new file mode 100644 index 00000000..e8180c3c --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/daemon.go @@ -0,0 +1,41 @@ +package plg_search_sqlitefts + +import ( + "time" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +var onConfigChange ChangeListener + +func init() { + onConfigChange = Config.ListenForChange() + Hooks.Register.Onload(func() { + for i := 0; i < SEARCH_PROCESS_PAR(); i++ { + go runner() + } + }) +} + +func runner() { + startSearch := false + for { + if SEARCH_ENABLE() == false { + select { + case <-onConfigChange.Listener: + startSearch = SEARCH_ENABLE() + } + if startSearch == false { + continue + } + } + crwlr := NextCrawler() + if crwlr == nil { + time.Sleep(5 * time.Second) + continue + } + crwlr.mu.Lock() + crwlr.Run() + crwlr.mu.Unlock() + } +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/daemon_state.go b/server/plugin/plg_search_sqlitefts/crawler/daemon_state.go new file mode 100644 index 00000000..ba881986 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/daemon_state.go @@ -0,0 +1,70 @@ +package plg_search_sqlitefts + +import ( + "container/heap" + "sync" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +var DaemonState = daemonState{ + idx: make([]Crawler, 0), + n: -1, +} + +type daemonState struct { + idx []Crawler + n int + mu sync.RWMutex +} + +type Crawler struct { + Id string + FoldersUnknown HeapDoc + CurrentPhase string + Backend IBackend + State indexer.Index + mu sync.Mutex + lastHash string +} + +func NewCrawler(id string, b IBackend) (Crawler, error) { + s := Crawler{ + Id: id, + Backend: b, + State: indexer.NewIndex(id), + FoldersUnknown: make(HeapDoc, 0, 1), + } + if err := s.State.Init(); err != nil { + return s, err + } + heap.Init(&s.FoldersUnknown) + return s, nil +} + +func GetCrawler(app *App) *Crawler { + id := GenerateID(app.Session) + DaemonState.mu.RLock() + defer DaemonState.mu.RUnlock() + for i := len(DaemonState.idx) - 1; i >= 0; i-- { + if id == DaemonState.idx[i].Id { + return &DaemonState.idx[i] + } + } + return nil +} + +func NextCrawler() *Crawler { + DaemonState.mu.Lock() + defer DaemonState.mu.Unlock() + if len(DaemonState.idx) == 0 { + return nil + } + if DaemonState.n >= len(DaemonState.idx)-1 || DaemonState.n < 0 { + DaemonState.n = 0 + } else { + DaemonState.n = DaemonState.n + 1 + } + return &DaemonState.idx[DaemonState.n] +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/events.go b/server/plugin/plg_search_sqlitefts/crawler/events.go new file mode 100644 index 00000000..d1348906 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/events.go @@ -0,0 +1,191 @@ +package plg_search_sqlitefts + +import ( + "container/heap" + "context" + "path/filepath" + "reflect" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +/* + * We're listening to what the user is doing to hint the crawler over + * what needs to be updated in priority, what file got updated and would need + * to be reindexed, what should disappear from the index, .... + * This way we can fine tune how full text search is behaving + */ + +type FileHook struct{} + +func (this FileHook) Ls(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintLs(ctx, path) + } + return nil +} + +func (this FileHook) Cat(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintLs(ctx, filepath.Dir(path)+"/") + } + return nil +} + +func (this FileHook) Mkdir(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintLs(ctx, filepath.Dir(path)+"/") + } + return nil +} + +func (this FileHook) Rm(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintRm(ctx, path) + } + return nil +} + +func (this FileHook) Mv(ctx *App, from string, to string) error { + if this.record(ctx) { + go DaemonState.HintRm(ctx, filepath.Dir(from)+"/") + go DaemonState.HintLs(ctx, filepath.Dir(to)+"/") + } + return nil +} + +func (this FileHook) Save(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintLs(ctx, filepath.Dir(path)+"/") + go DaemonState.HintFile(ctx, path) + } + return nil +} + +func (this FileHook) Touch(ctx *App, path string) error { + if this.record(ctx) { + go DaemonState.HintLs(ctx, filepath.Dir(path)+"/") + } + return nil +} + +func (this FileHook) record(ctx *App) bool { + if ctx.Context.Value("AUDIT") == false { + return false + } + return true +} + +func (this *daemonState) HintLs(app *App, path string) { + id := GenerateID(app.Session) + this.mu.Lock() + defer this.mu.Unlock() + + // try to find the search indexer among the existing ones + for i := len(this.idx) - 1; i >= 0; i-- { + if id != this.idx[i].Id { + continue + } + alreadyHasPath := false + for j := 0; j < len(this.idx[i].FoldersUnknown); j++ { + if this.idx[i].FoldersUnknown[j].Path == path { + alreadyHasPath = true + break + } + } + if alreadyHasPath == false { + heap.Push(&this.idx[i].FoldersUnknown, &Document{ + Type: "directory", + Path: path, + InitialPath: path, + Name: filepath.Base(path), + }) + } + return + } + + // Having all indexers running in memory could be expensive => instead we're cycling a pool + search_process_max := SEARCH_PROCESS_MAX() + lenIdx := len(this.idx) + if lenIdx > 0 && search_process_max > 0 && lenIdx > (search_process_max-1) { + toDel := this.idx[0 : lenIdx-(search_process_max-1)] + for i := range toDel { + toDel[i].Close() + } + this.idx = this.idx[lenIdx-(search_process_max-1):] + } + // instantiate the new indexer + s, err := NewCrawler(id, app.Backend) + if err != nil { + Log.Warning("plg_search_sqlitefs::init message=cannot_create_crawler err=%s", err.Error()) + return + } + defer func() { + // recover from panic if one occurred. Set err to nil otherwise. + if r := recover(); r != nil { + name := "na" + for _, el := range app.Backend.LoginForm().Elmnts { + if el.Name == "type" { + name = el.Value.(string) + } + } + Log.Error("plg_search_sqlitefs::panic backend=\"%s\" recover=\"%s\"", name, r) + } + }() + v := reflect.ValueOf(app.Backend).Elem().FieldByName("Context") + if v.IsValid() && v.CanSet() { + // prevent context expiration which is often default as r.Context() + // as we need to make queries outside the scope of a normal http request + v.Set(reflect.ValueOf(context.Background())) + } + + heap.Push(&s.FoldersUnknown, &Document{ + Type: "directory", + Path: path, + InitialPath: path, + Name: filepath.Base(path), + }) + this.idx = append(this.idx, s) +} + +func (this *daemonState) HintRm(app *App, path string) { + id := GenerateID(app.Session) + this.mu.RLock() + for i := len(this.idx) - 1; i >= 0; i-- { + if id != this.idx[i].Id { + continue + } + if op, err := this.idx[i].State.Change(); err == nil { + op.RemoveAll(path) + op.Commit() + } + break + } + this.mu.RUnlock() +} + +func (this *daemonState) HintFile(app *App, path string) { + id := GenerateID(app.Session) + this.mu.RLock() + for i := len(this.idx) - 1; i >= 0; i-- { + if id != this.idx[i].Id { + continue + } + if op, err := this.idx[i].State.Change(); err == nil { + op.IndexTimeClear(path) + op.Commit() + } + break + } + this.mu.RUnlock() +} + +func (this *daemonState) Reset() { + this.mu.Lock() + for i := range this.idx { + this.idx[i].Close() + } + this.idx = make([]Crawler, 0) + this.n = -1 + this.mu.Unlock() +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/phase.go b/server/plugin/plg_search_sqlitefts/crawler/phase.go new file mode 100644 index 00000000..22f80fa4 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/phase.go @@ -0,0 +1,54 @@ +package plg_search_sqlitefts + +import ( + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +func (this *Crawler) Run() { + if this.CurrentPhase == "" { + time.Sleep(1 * time.Second) + this.CurrentPhase = PHASE_EXPLORE + } + Log.Debug("Search::indexing Execute %s", this.CurrentPhase) + + cycleExecute := func(fn func(indexer.Manager) bool) { + stopTime := time.Now().Add(time.Duration(CYCLE_TIME()) * time.Second) + op, err := this.State.Change() + if err != nil { + Log.Warning("search::index cycle_begin (%+v)", err) + time.Sleep(5 * time.Second) + } + for { + if fn(op) == false { + break + } + if stopTime.After(time.Now()) == false { + break + } + } + if err = op.Commit(); err != nil { + Log.Warning("search::index cycle_commit (%+v)", err) + } + } + if this.CurrentPhase == PHASE_EXPLORE { + cycleExecute(this.Discover) + return + } else if this.CurrentPhase == PHASE_INDEXING { + cycleExecute(this.Indexing) + return + } else if this.CurrentPhase == PHASE_MAINTAIN { + cycleExecute(this.Consolidate) + return + } else if this.CurrentPhase == PHASE_PAUSE { + time.Sleep(5 * time.Second) + this.CurrentPhase = "" + } + return +} + +func (this *Crawler) Close() error { + return this.State.Close() +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/phase_explore.go b/server/plugin/plg_search_sqlitefts/crawler/phase_explore.go new file mode 100644 index 00000000..3e6f4767 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/phase_explore.go @@ -0,0 +1,110 @@ +package plg_search_sqlitefts + +import ( + "container/heap" + "encoding/base64" + "hash/fnv" + "path/filepath" + "strconv" + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +func (this *Crawler) Discover(tx indexer.Manager) bool { + if this.FoldersUnknown.Len() == 0 { + this.CurrentPhase = PHASE_INDEXING + return false + } + var doc *Document + doc = heap.Pop(&this.FoldersUnknown).(*Document) + if doc == nil { + this.CurrentPhase = PHASE_INDEXING + return false + } + files, err := this.Backend.Ls(doc.Path) + if err != nil { + this.CurrentPhase = "" + return true + } + if len(files) == 0 { + return true + } + + // We don't want our indexer to go wild and diverge over time. As such we need to detect those edge cases: aka + // recursive folder structure. Our detection is relying on a Hash of []os.FileInfo + hashFiles := func() string { + var step int = len(files) / 50 + if step == 0 { + step = 1 + } + hasher := fnv.New32() + hasher.Write([]byte(strconv.Itoa(len(files)))) + for i := 0; i < len(files); i = i + step { + hasher.Write([]byte(files[i].Name())) + } + return base64.StdEncoding.EncodeToString(hasher.Sum(nil)) + }() + if hashFiles == this.lastHash { + return true + } + this.lastHash = "" + for i := 0; i < this.FoldersUnknown.Len(); i++ { + if this.FoldersUnknown[i].Hash == hashFiles && filepath.Base(doc.Path) != filepath.Base(this.FoldersUnknown[i].Path) { + this.lastHash = hashFiles + return true + } + } + + // Insert the newly found data within our index + for i := range files { + f := files[i] + name := f.Name() + if f.IsDir() { + var performPush bool = false + p := filepath.Join(doc.Path, name) + p += "/" + if err = dbInsert(doc.Path, f, tx); err == nil { + performPush = true + } else if err == indexer.ErrConstraint { + performPush = func(path string) bool { + tm, err := tx.IndexTimeGet(p) + if err != nil { + Log.Warning("search::discovery unknown_path (%v)", err) + return false + } + if time.Now().Add(time.Duration(-SEARCH_REINDEX()) * time.Hour).Before(tm) { + return false + } + if err = tx.IndexTimeUpdate(p, time.Now()); err != nil { + Log.Warning("search::discovery insertion_failed (%v)", err) + return false + } + return true + }(p) + } else { + Log.Error("search::indexing insert_index (%v)", err) + } + if performPush == true { + heap.Push(&this.FoldersUnknown, &Document{ + Type: "directory", + Name: name, + Path: p, + Size: f.Size(), + ModTime: f.ModTime(), + Hash: hashFiles, + }) + } + } else { + if err = dbInsert(doc.Path, f, tx); err != nil { + if err == indexer.ErrConstraint { + return false + } + Log.Warning("search::insert index_error (%v)", err) + return false + } + } + } + return true +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go b/server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go new file mode 100644 index 00000000..32d6eaf4 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go @@ -0,0 +1,36 @@ +package plg_search_sqlitefts + +import ( + "strings" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +func (this *Crawler) Indexing(tx indexer.Manager) bool { + rows, err := tx.FindNew(MAX_INDEXING_FSIZE(), strings.Split(INDEXING_EXT(), ",")) + if err != nil { + Log.Warning("search::insert index_query (%v)", err) + return false + } + defer rows.Close() + hasRows := false + for rows.Next() { + hasRows = true + r, err := rows.Value() + if err != nil { + Log.Warning("search::indexing index_scan (%v)", err) + return false + } + if err = updateFile(r.Path, this.Backend, tx); err != nil { + Log.Warning("search::indexing index_update (%v)", err) + return false + } + } + + if hasRows == false { + this.CurrentPhase = PHASE_MAINTAIN + return false + } + return true +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go b/server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go new file mode 100644 index 00000000..acf36d99 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go @@ -0,0 +1,40 @@ +package plg_search_sqlitefts + +import ( + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +func (this *Crawler) Consolidate(tx indexer.Manager) bool { + rows, err := tx.FindBefore(time.Now().Add(-time.Duration(SEARCH_REINDEX()) * time.Hour)) + if err != nil { + if err == indexer.ErrNoRows { + this.CurrentPhase = PHASE_PAUSE + return false + } + this.CurrentPhase = "" + return false + } + defer rows.Close() + hasRows := false + for rows.Next() { + hasRows = true + r, err := rows.Value() + if err != nil { + Log.Warning("search::index db_stale (%v)", err) + return false + } + if r.CType == "directory" { + updateFolder(r.Path, this.Backend, tx) + } else { + updateFile(r.Path, this.Backend, tx) + } + } + if hasRows == false { + this.CurrentPhase = PHASE_PAUSE + return false + } + return true +} diff --git a/server/plugin/plg_search_sqlitefts/crawler/phase_utils.go b/server/plugin/plg_search_sqlitefts/crawler/phase_utils.go new file mode 100644 index 00000000..cad0bfea --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/crawler/phase_utils.go @@ -0,0 +1,131 @@ +package plg_search_sqlitefts + +import ( + "io/fs" + "os" + "path/filepath" + "strings" + "time" + + . "github.com/mickael-kerjean/filestash/server/common" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/converter" + "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer" +) + +func updateFile(path string, backend IBackend, tx indexer.Manager) error { + if err := tx.IndexTimeUpdate(path, time.Now()); err != nil { + return err + } + for i := 0; i < len(INDEXING_EXCLUSION); i++ { + if strings.Contains(path, INDEXING_EXCLUSION[i]) { + return nil + } + } + reader, err := backend.Cat(path) + if err != nil { + tx.RemoveAll(path) + return err + } + defer reader.Close() + convertedReader, err := converter.Convert(path, reader) + if err != nil { + return nil + } + defer convertedReader.Close() + if err = tx.FileContentUpdate(path, reader); err != nil { + Log.Warning("search::index index_update (%v)", err) + return err + } + return nil +} + +func updateFolder(path string, backend IBackend, tx indexer.Manager) error { + if err := tx.IndexTimeUpdate(path, time.Now()); err != nil { + return err + } + + for i := 0; i < len(INDEXING_EXCLUSION); i++ { + if strings.Contains(path, INDEXING_EXCLUSION[i]) { + return nil + } + } + + // Fetch list of folders as in the remote filesystem + currFiles, err := backend.Ls(path) + if err != nil { + tx.RemoveAll(path) + return err + } + + // Fetch FS as appear in our search cache + rows, err := tx.FindParent(path) + if err != nil { + return err + } + defer rows.Close() + previousFiles := make([]File, 0) + for rows.Next() { + r, err := rows.Value() + if err != nil { + return err + } + previousFiles = append(previousFiles, File{ + FName: r.Name, + FSize: r.Size, + FPath: r.Path, + }) + } + + // Perform the DB operation to ensure previousFiles and currFiles are in sync + // 1. Find the content that have been created and did not exist before + for i := 0; i < len(currFiles); i++ { + currFilenameAlreadyExist := false + currFilename := currFiles[i].Name() + for j := 0; j < len(previousFiles); j++ { + if currFilename == previousFiles[j].Name() { + if currFiles[i].Size() != previousFiles[j].Size() { + if err = dbUpdate(path, currFiles[i], tx); err != nil { + return err + } + break + } + currFilenameAlreadyExist = true + break + } + } + if currFilenameAlreadyExist == false { + dbInsert(path, currFiles[i], tx) + } + } + // 2. Find the content that was existing before but got removed + for i := 0; i < len(previousFiles); i++ { + previousFilenameStillExist := false + previousFilename := previousFiles[i].Name() + for j := 0; j < len(currFiles); j++ { + if previousFilename == currFiles[j].Name() { + previousFilenameStillExist = true + break + } + } + if previousFilenameStillExist == false { + p := filepath.Join(path, previousFiles[i].Name()) + if previousFiles[i].IsDir() { + p += "/" + } + tx.RemoveAll(p) + } + } + return nil +} + +func dbInsert(parent string, f os.FileInfo, tx indexer.Manager) error { + return tx.FileCreate(f, parent) +} + +func dbUpdate(parent string, f fs.FileInfo, tx indexer.Manager) error { + path := filepath.Join(parent, f.Name()) + if f.IsDir() { + path += "/" + } + return tx.FileMetaUpdate(path, f) +} diff --git a/server/plugin/plg_search_sqlitefts/utils.go b/server/plugin/plg_search_sqlitefts/crawler/types.go similarity index 89% rename from server/plugin/plg_search_sqlitefts/utils.go rename to server/plugin/plg_search_sqlitefts/crawler/types.go index b27b1b31..6f558bbb 100644 --- a/server/plugin/plg_search_sqlitefts/utils.go +++ b/server/plugin/plg_search_sqlitefts/crawler/types.go @@ -5,6 +5,13 @@ import ( "time" ) +const ( + PHASE_EXPLORE = "PHASE_EXPLORE" + PHASE_INDEXING = "PHASE_INDEXING" + PHASE_MAINTAIN = "PHASE_MAINTAIN" + PHASE_PAUSE = "PHASE_PAUSE" +) + const MAX_HEAP_SIZE = 100000 type Document struct { diff --git a/server/plugin/plg_search_sqlitefts/crawlstate.go b/server/plugin/plg_search_sqlitefts/crawlstate.go deleted file mode 100644 index 78095ec9..00000000 --- a/server/plugin/plg_search_sqlitefts/crawlstate.go +++ /dev/null @@ -1,140 +0,0 @@ -package plg_search_sqlitefts - -import ( - "container/heap" - "context" - "path/filepath" - "reflect" - "sync" - - . "github.com/mickael-kerjean/filestash/server/common" -) - -var SProc SearchProcess = SearchProcess{ - idx: make([]SearchIndexer, 0), - n: -1, -} - -type SearchProcess struct { - idx []SearchIndexer - n int - mu sync.RWMutex -} - -func (this *SearchProcess) HintLs(app *App, path string) *SearchIndexer { - id := GenerateID(app.Session) - this.mu.Lock() - defer this.mu.Unlock() - - // try to find the search indexer among the existing ones - for i := len(this.idx) - 1; i >= 0; i-- { - if id == this.idx[i].Id { - alreadyHasPath := false - for j := 0; j < len(this.idx[i].FoldersUnknown); j++ { - if this.idx[i].FoldersUnknown[j].Path == path { - alreadyHasPath = true - break - } - } - if alreadyHasPath == false { - heap.Push(&this.idx[i].FoldersUnknown, &Document{ - Type: "directory", - Path: path, - InitialPath: path, - Name: filepath.Base(path), - }) - } - ret := &this.idx[i] - return ret - } - } - - // Having all indexers running in memory could be expensive => instead we're cycling a pool - search_process_max := SEARCH_PROCESS_MAX() - lenIdx := len(this.idx) - if lenIdx > 0 && search_process_max > 0 && lenIdx > (search_process_max-1) { - toDel := this.idx[0 : lenIdx-(search_process_max-1)] - for i := range toDel { - toDel[i].DB.Close() - } - this.idx = this.idx[lenIdx-(search_process_max-1):] - } - // instantiate the new indexer - s := NewSearchIndexer(id, app.Backend) - defer func() { - // recover from panic if one occurred. Set err to nil otherwise. - if recover() != nil { - name := "na" - for _, el := range app.Backend.LoginForm().Elmnts { - if el.Name == "type" { - name = el.Value.(string) - } - } - Log.Error("plg_search_sqlitefs::panic backend=\"%s\"", name) - } - }() - v := reflect.ValueOf(app.Backend).Elem().FieldByName("Context") - if v.IsValid() && v.CanSet() { - // prevent context expiration which is often default as r.Context() - // as we need to make queries outside the scope of a normal http request - v.Set(reflect.ValueOf(context.Background())) - } - - heap.Push(&s.FoldersUnknown, &Document{ - Type: "directory", - Path: path, - InitialPath: path, - Name: filepath.Base(path), - }) - this.idx = append(this.idx, s) - return &s -} - -func (this *SearchProcess) HintRm(app *App, path string) { - id := GenerateID(app.Session) - this.mu.RLock() - for i := len(this.idx) - 1; i >= 0; i-- { - if id == this.idx[i].Id { - this.idx[i].DB.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~") - break - } - } - this.mu.RUnlock() -} - -func (this *SearchProcess) HintFile(app *App, path string) { - id := GenerateID(app.Session) - this.mu.RLock() - for i := len(this.idx) - 1; i >= 0; i-- { - if id == this.idx[i].Id { - this.idx[i].DB.Exec("UPDATE file set indexTime = NULL WHERE path = ?", path) - break - } - } - this.mu.RUnlock() -} - -func (this *SearchProcess) Peek() *SearchIndexer { - if len(this.idx) == 0 { - return nil - } - this.mu.Lock() - if this.n >= len(this.idx)-1 || this.n < 0 { - this.n = 0 - } else { - this.n = this.n + 1 - } - s := &this.idx[this.n] - this.mu.Unlock() - return s -} - -func (this *SearchProcess) Reset() { - this.mu.Lock() - for i := range this.idx { - this.idx[i].DB.Close() - } - this.idx = make([]SearchIndexer, 0) - this.mu.Unlock() - this.n = -1 -} diff --git a/server/plugin/plg_search_sqlitefts/index.go b/server/plugin/plg_search_sqlitefts/index.go index c9afd2b1..ab77ab8a 100644 --- a/server/plugin/plg_search_sqlitefts/index.go +++ b/server/plugin/plg_search_sqlitefts/index.go @@ -2,133 +2,10 @@ package plg_search_sqlitefts import ( . "github.com/mickael-kerjean/filestash/server/common" - "path/filepath" - "regexp" - "time" -) - -const ( - PHASE_EXPLORE = "PHASE_EXPLORE" - PHASE_INDEXING = "PHASE_INDEXING" - PHASE_MAINTAIN = "PHASE_MAINTAIN" - PHASE_PAUSE = "PHASE_PAUSE" + . "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/crawler" ) func init() { - sh := SearchHint{} - Hooks.Register.SearchEngine(SqliteSearch{Hint: &sh}) - Hooks.Register.AuthorisationMiddleware(&sh) -} - -type SqliteSearch struct { - Hint *SearchHint -} - -func (this SqliteSearch) Query(app App, path string, keyword string) ([]IFile, error) { - files := []IFile{} - - // extract our search indexer - s := SProc.HintLs(&app, path) - if s == nil { - return files, ErrNotReachable - } - - if path == "" { - path = "/" - } - - rows, err := s.DB.Query( - "SELECT type, path, size, modTime FROM file WHERE path IN ("+ - " SELECT path FROM file_index WHERE file_index MATCH ? AND path > ? AND path < ?"+ - " ORDER BY rank LIMIT 2000"+ - ")", - regexp.MustCompile(`(\.|\-)`).ReplaceAllString(keyword, "\"$1\""), - path, path+"~", - ) - if err != nil { - Log.Warning("search::query DBQuery (%s)", err.Error()) - return files, ErrNotReachable - } - defer rows.Close() - for rows.Next() { - f := File{} - var t string - if err = rows.Scan(&f.FType, &f.FPath, &f.FSize, &t); err != nil { - Log.Warning("search::query scan (%s)", err.Error()) - return files, ErrNotReachable - } - if tm, err := time.Parse(time.RFC3339, t); err == nil { - f.FTime = tm.Unix() * 1000 - } - f.FName = filepath.Base(f.FPath) - files = append(files, f) - } - return files, nil -} - -/* - * We're listening to what the user is doing to hint the crawler over - * what needs to be updated in priority, what file got updated and would need - * to be reindexed, what should disappear from the index, .... - * This way we can fine tune how full text search is behaving - */ - -type SearchHint struct{} - -func (this SearchHint) Ls(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintLs(ctx, path) - } - return nil -} - -func (this SearchHint) Cat(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintLs(ctx, filepath.Dir(path)+"/") - } - return nil -} - -func (this SearchHint) Mkdir(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintLs(ctx, filepath.Dir(path)+"/") - } - return nil -} - -func (this SearchHint) Rm(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintRm(ctx, path) - } - return nil -} - -func (this SearchHint) Mv(ctx *App, from string, to string) error { - if this.record(ctx) { - go SProc.HintRm(ctx, filepath.Dir(from)+"/") - go SProc.HintLs(ctx, filepath.Dir(to)+"/") - } - return nil -} - -func (this SearchHint) Save(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintLs(ctx, filepath.Dir(path)+"/") - go SProc.HintFile(ctx, path) - } - return nil -} - -func (this SearchHint) Touch(ctx *App, path string) error { - if this.record(ctx) { - go SProc.HintLs(ctx, filepath.Dir(path)+"/") - } - return nil -} - -func (this SearchHint) record(ctx *App) bool { - if ctx.Context.Value("AUDIT") == false { - return false - } - return true + Hooks.Register.SearchEngine(SearchEngine{}) + Hooks.Register.AuthorisationMiddleware(FileHook{}) } diff --git a/server/plugin/plg_search_sqlitefts/indexer/error.go b/server/plugin/plg_search_sqlitefts/indexer/error.go new file mode 100644 index 00000000..d59341a6 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/indexer/error.go @@ -0,0 +1,24 @@ +package indexer + +import ( + "database/sql" + "fmt" + + "github.com/mattn/go-sqlite3" +) + +var ( + ErrConstraint = fmt.Errorf("DB_CONSTRAINT_FAILED_ERROR") + ErrNoRows = fmt.Errorf("NO_ROWS") +) + +func toErr(err error) error { + if sqliteErr, ok := (err).(sqlite3.Error); ok { + if err == sql.ErrNoRows { + return ErrNoRows + } else if sqliteErr.Code == sqlite3.ErrConstraint { + return ErrConstraint + } + } + return err +} diff --git a/server/plugin/plg_search_sqlitefts/indexer/index.go b/server/plugin/plg_search_sqlitefts/indexer/index.go new file mode 100644 index 00000000..ccbb0520 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/indexer/index.go @@ -0,0 +1,266 @@ +package indexer + +import ( + "database/sql" + "io" + "io/fs" + "path/filepath" + "strings" + "time" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +type Index interface { + Init() error + Search(path string, q string) ([]IFile, error) + Change() (Manager, error) + Close() error +} + +type Manager interface { + FindParent(path string) (RowMapper, error) + FindBefore(time time.Time) (RowMapper, error) + FindNew(maxSize int, toOmit []string) (RowMapper, error) + + FileCreate(f fs.FileInfo, parent string) error + FileContentUpdate(path string, f io.ReadCloser) error + FileMetaUpdate(path string, f fs.FileInfo) error + + IndexTimeGet(path string) (time.Time, error) + IndexTimeUpdate(path string, t time.Time) error + IndexTimeClear(path string) error + + RemoveAll(path string) error + Commit() error +} + +func NewIndex(id string) Index { + return &sqliteIndex{id, nil} +} + +type sqliteIndex struct { + id string + db *sql.DB +} + +func (this *sqliteIndex) Init() error { + path := GetAbsolutePath(FTS_PATH, "fts_"+this.id+".sql") + db, err := sql.Open("sqlite3", path+"?_journal_mode=wal") + if err != nil { + Log.Warning("search::init can't open database (%v)", err) + return toErr(err) + } + this.db = db + + queryDB := func(sqlQuery string) error { + stmt, err := db.Prepare(sqlQuery) + if err != nil { + Log.Warning("search::initschema prepare schema error(%v)", err) + return toErr(err) + } + defer stmt.Close() + _, err = stmt.Exec() + if err != nil { + Log.Warning("search::initschema execute error(%v)", err) + return toErr(err) + } + return nil + } + if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), parent VARCHAR(1024), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil { + return err + } + if queryDB("CREATE INDEX IF NOT EXISTS idx_file_index_time ON file(indexTime) WHERE indexTime IS NOT NULL;"); err != nil { + return err + } + if queryDB("CREATE INDEX IF NOT EXISTS idx_file_parent ON file(parent);"); err != nil { + return err + } + if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content, tokenize = 'porter');"); err != nil { + return err + } + if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_insert AFTER INSERT ON file BEGIN INSERT INTO file_index (path, filename, filetype) VALUES(new.path, new.filename, new.filetype); END;"); err != nil { + return err + } + if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_delete AFTER DELETE ON file BEGIN DELETE FROM file_index WHERE path = old.path; END;"); err != nil { + return err + } + if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_update_path UPDATE OF path ON file BEGIN UPDATE file_index SET path = new.path, filepath = new.filepath, filetype = new.filetype WHERE path = old.path; END;"); err != nil { + return err + } + return nil +} + +func (this sqliteIndex) Change() (Manager, error) { + tx, err := this.db.Begin() + if err != nil { + return nil, toErr(err) + } + return sqliteQueries{tx}, nil +} + +func (this sqliteIndex) Close() error { + return this.db.Close() +} + +type sqliteQueries struct { + tx *sql.Tx +} + +func (this sqliteQueries) Commit() error { + return this.tx.Commit() +} + +func (this sqliteQueries) IndexTimeGet(path string) (time.Time, error) { + var t string + if err := this.tx.QueryRow("SELECT indexTime FROM file WHERE path = ?", path).Scan(&t); err != nil { + return time.Now(), toErr(err) + } + tm, err := time.Parse(time.RFC3339, t) + if err != nil { + return tm, toErr(err) + } + return tm, nil +} + +func (this sqliteQueries) IndexTimeUpdate(path string, time time.Time) error { + if _, err := this.tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time, path); err != nil { + return toErr(err) + } + return nil +} + +func (this sqliteQueries) IndexTimeClear(path string) error { + if _, err := this.tx.Exec("UPDATE file SET indexTime = NULL WHERE path = ?", path); err != nil { + return toErr(err) + } + return nil +} + +type Record struct { + Name string + Path string + Size int64 + CType string +} + +type RowMapper struct { + rows *sql.Rows +} + +func (this *RowMapper) Next() bool { + return this.rows.Next() +} + +func (this *RowMapper) Value() (Record, error) { + var r Record + if err := this.rows.Scan(&r.Name, &r.CType, &r.Path, &r.Size); err != nil { + return r, toErr(err) + } + return r, nil +} + +func (this *RowMapper) Close() error { + return this.rows.Close() +} + +func (this sqliteQueries) FindNew(maxSize int, toOmit []string) (RowMapper, error) { + for i := 0; i < len(toOmit); i++ { + toOmit[i] = "'" + strings.TrimSpace(toOmit[i]) + "'" + } + + rows, err := this.tx.Query( + "SELECT filename, type, path, size FROM file WHERE ("+ + " type = 'file' AND size < ? AND filetype IN ("+strings.Join(toOmit, ",")+") AND indexTime IS NULL "+ + ") LIMIT 2", + maxSize, + ) + if err != nil { + return RowMapper{}, toErr(err) + } + return RowMapper{rows: rows}, nil +} + +func (this sqliteQueries) FindBefore(t time.Time) (RowMapper, error) { + rows, err := this.tx.Query( + "SELECT filename, type, path, size FROM file WHERE indexTime < ? ORDER BY indexTime DESC LIMIT 5", + t, + ) + if err != nil { + return RowMapper{}, toErr(err) + } + return RowMapper{rows: rows}, nil +} + +func (this sqliteQueries) FindParent(path string) (RowMapper, error) { + rows, err := this.tx.Query("SELECT filename, type, path, size FROM file WHERE parent = ?", path) + if err != nil { + return RowMapper{}, err + } + return RowMapper{rows: rows}, nil +} + +func (this sqliteQueries) FileMetaUpdate(path string, f fs.FileInfo) error { + _, err := this.tx.Exec( + "UPDATE file SET size = ?, modTime = ? indexTime = NULL WHERE path = ?", + f.Size(), f.ModTime(), path, + ) + return toErr(err) +} + +func (this sqliteQueries) FileContentUpdate(path string, reader io.ReadCloser) error { + content, err := io.ReadAll(reader) + if err != nil { + return toErr(err) + } + if _, err := this.tx.Exec("UPDATE file_index SET content = ? WHERE path = ?", content, path); err != nil { + return toErr(err) + } + return nil +} + +func (this sqliteQueries) FileCreate(f fs.FileInfo, parentPath string) (err error) { + name := f.Name() + path := filepath.Join(parentPath, f.Name()) + if f.IsDir() { + _, err = this.tx.Exec( + "INSERT INTO file(path, parent, filename, type, size, modTime, indexTime) "+ + "VALUES(?, ?, ?, ?, ?, ?, ?)", + path+"/", + parentPath, + name, + "directory", + f.Size(), + f.ModTime(), + time.Now(), + ) + } else { + _, err = this.tx.Exec( + "INSERT INTO file(path, parent, filename, type, size, modTime, indexTime, filetype) "+ + "VALUES(?, ?, ?, ?, ?, ?, ?, ?)", + path, + parentPath, + name, + "file", + f.Size(), + f.ModTime(), + nil, + strings.TrimPrefix(filepath.Ext(name), "."), + ) + } + return toErr(err) +} + +func (this sqliteQueries) Remove(path string) error { + if _, a := this.tx.Exec("DELETE FROM file WHERE path = ?", path); a != nil { + return toErr(a) + } + return nil +} + +func (this sqliteQueries) RemoveAll(path string) error { + if _, a := this.tx.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~"); a != nil { + return toErr(a) + } + return nil +} diff --git a/server/plugin/plg_search_sqlitefts/indexer/query.go b/server/plugin/plg_search_sqlitefts/indexer/query.go new file mode 100644 index 00000000..d5d81fa9 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/indexer/query.go @@ -0,0 +1,40 @@ +package indexer + +import ( + "path/filepath" + "regexp" + "time" + + . "github.com/mickael-kerjean/filestash/server/common" +) + +func (this sqliteIndex) Search(path string, q string) ([]IFile, error) { + files := []IFile{} + rows, err := this.db.Query( + "SELECT type, path, size, modTime FROM file WHERE path IN ("+ + " SELECT path FROM file_index WHERE file_index MATCH ? AND path > ? AND path < ?"+ + " ORDER BY rank LIMIT 2000"+ + ")", + regexp.MustCompile(`(\.|\-)`).ReplaceAllString(q, "\"$1\""), + path, path+"~", + ) + if err != nil { + Log.Warning("search::query DBQuery (%s)", err.Error()) + return files, ErrNotReachable + } + defer rows.Close() + for rows.Next() { + f := File{} + var t string + if err = rows.Scan(&f.FType, &f.FPath, &f.FSize, &t); err != nil { + Log.Warning("search::query scan (%s)", err.Error()) + return files, ErrNotReachable + } + if tm, err := time.Parse(time.RFC3339, t); err == nil { + f.FTime = tm.Unix() * 1000 + } + f.FName = filepath.Base(f.FPath) + files = append(files, f) + } + return files, nil +} diff --git a/server/plugin/plg_search_sqlitefts/query.go b/server/plugin/plg_search_sqlitefts/query.go new file mode 100644 index 00000000..946ee5e2 --- /dev/null +++ b/server/plugin/plg_search_sqlitefts/query.go @@ -0,0 +1,20 @@ +package plg_search_sqlitefts + +import ( + . "github.com/mickael-kerjean/filestash/server/common" + . "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/crawler" +) + +type SearchEngine struct{} + +func (this SearchEngine) Query(app App, path string, keyword string) ([]IFile, error) { + DaemonState.HintLs(&app, path) + s := GetCrawler(&app) + if s == nil { + return nil, ErrNotReachable + } + if path == "" { + path = "/" + } + return s.State.Search(path, keyword) +} diff --git a/server/plugin/plg_search_sqlitefts/spider.go b/server/plugin/plg_search_sqlitefts/spider.go deleted file mode 100644 index c7ccca42..00000000 --- a/server/plugin/plg_search_sqlitefts/spider.go +++ /dev/null @@ -1,487 +0,0 @@ -package plg_search_sqlitefts - -import ( - "container/heap" - "database/sql" - "encoding/base64" - "github.com/mattn/go-sqlite3" - . "github.com/mickael-kerjean/filestash/server/common" - "github.com/mickael-kerjean/filestash/server/model/formater" - "hash/fnv" - "io/ioutil" - "os" - "path/filepath" - "strconv" - "strings" - "sync" - "time" -) - -type SearchIndexer struct { - Id string - FoldersUnknown HeapDoc - CurrentPhase string - Backend IBackend - DBPath string - DB *sql.DB - mu sync.Mutex - lastHash string -} - -func NewSearchIndexer(id string, b IBackend) SearchIndexer { - s := SearchIndexer{ - DBPath: GetAbsolutePath(FTS_PATH, "fts_"+id+".sql"), - Id: id, - Backend: b, - FoldersUnknown: make(HeapDoc, 0, 1), - } - heap.Init(&s.FoldersUnknown) - - db, err := sql.Open("sqlite3", s.DBPath+"?_journal_mode=wal") - if err != nil { - Log.Warning("search::init can't open database (%v)", err) - return s - } - s.DB = db - queryDB := func(sqlQuery string) error { - stmt, err := db.Prepare(sqlQuery) - if err != nil { - Log.Warning("search::initschema prepare schema error(%v)", err) - return err - } - defer stmt.Close() - _, err = stmt.Exec() - if err != nil { - Log.Warning("search::initschema execute error(%v)", err) - return err - } - return err - } - if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), parent VARCHAR(1024), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil { - return s - } - if queryDB("CREATE INDEX IF NOT EXISTS idx_file_index_time ON file(indexTime) WHERE indexTime IS NOT NULL;"); err != nil { - return s - } - if queryDB("CREATE INDEX IF NOT EXISTS idx_file_parent ON file(parent);"); err != nil { - return s - } - if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content, tokenize = 'porter');"); err != nil { - return s - } - if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_insert AFTER INSERT ON file BEGIN INSERT INTO file_index (path, filename, filetype) VALUES(new.path, new.filename, new.filetype); END;"); err != nil { - return s - } - if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_delete AFTER DELETE ON file BEGIN DELETE FROM file_index WHERE path = old.path; END;"); err != nil { - return s - } - if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_update_path UPDATE OF path ON file BEGIN UPDATE file_index SET path = new.path, filepath = new.filepath, filetype = new.filetype WHERE path = old.path; END;"); err != nil { - return s - } - return s -} - -func (this *SearchIndexer) Execute() { - if this.CurrentPhase == "" { - time.Sleep(1 * time.Second) - this.CurrentPhase = PHASE_EXPLORE - } - Log.Debug("Search::indexing Execute %s", this.CurrentPhase) - - cycleExecute := func(fn func(*sql.Tx) bool) { - stopTime := time.Now().Add(time.Duration(CYCLE_TIME()) * time.Second) - tx, err := this.DB.Begin() - if err != nil { - Log.Warning("search::index cycle_begin (%+v)", err) - time.Sleep(5 * time.Second) - } - for { - if fn(tx) == false { - break - } - if stopTime.After(time.Now()) == false { - break - } - } - if err = tx.Commit(); err != nil { - Log.Warning("search::index cycle_commit (%+v)", err) - } - } - if this.CurrentPhase == PHASE_EXPLORE { - cycleExecute(this.Discover) - return - } else if this.CurrentPhase == PHASE_INDEXING { - cycleExecute(this.Indexing) - return - } else if this.CurrentPhase == PHASE_MAINTAIN { - cycleExecute(this.Consolidate) - return - } else if this.CurrentPhase == PHASE_PAUSE { - time.Sleep(5 * time.Second) - this.CurrentPhase = "" - } - return -} - -func (this *SearchIndexer) Discover(tx *sql.Tx) bool { - if this.FoldersUnknown.Len() == 0 { - this.CurrentPhase = PHASE_INDEXING - return false - } - var doc *Document - doc = heap.Pop(&this.FoldersUnknown).(*Document) - if doc == nil { - this.CurrentPhase = PHASE_INDEXING - return false - } - files, err := this.Backend.Ls(doc.Path) - if err != nil { - this.CurrentPhase = "" - return true - } - if len(files) == 0 { - return true - } - - // We don't want our indexer to go wild and diverge over time. As such we need to detect those edge cases: aka - // recursive folder structure. Our detection is relying on a Hash of []os.FileInfo - hashFiles := func() string { - var step int = len(files) / 50 - if step == 0 { - step = 1 - } - hasher := fnv.New32() - hasher.Write([]byte(strconv.Itoa(len(files)))) - for i := 0; i < len(files); i = i + step { - hasher.Write([]byte(files[i].Name())) - } - return base64.StdEncoding.EncodeToString(hasher.Sum(nil)) - }() - if hashFiles == this.lastHash { - return true - } - this.lastHash = "" - for i := 0; i < this.FoldersUnknown.Len(); i++ { - if this.FoldersUnknown[i].Hash == hashFiles && filepath.Base(doc.Path) != filepath.Base(this.FoldersUnknown[i].Path) { - this.lastHash = hashFiles - return true - } - } - - // Insert the newly found data within our index - for i := range files { - f := files[i] - name := f.Name() - if f.IsDir() { - var performPush bool = false - p := filepath.Join(doc.Path, name) - p += "/" - if err = this.dbInsert(doc.Path, f, tx); err == nil { - performPush = true - } else if e, ok := err.(sqlite3.Error); ok && e.Code == sqlite3.ErrConstraint { - performPush = func(path string) bool { - var t string - var err error - if err := tx.QueryRow("SELECT indexTime FROM file WHERE path = ?", p).Scan(&t); err != nil { - Log.Warning("search::discovery unknown_path (%v)", err) - return false - } - tm, err := time.Parse(time.RFC3339, t) - if err != nil { - Log.Warning("search::discovery invalid_time (%v)", err) - return false - } - if time.Now().Add(time.Duration(-SEARCH_REINDEX()) * time.Hour).Before(tm) { - return false - } - if _, err = tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), p); err != nil { - Log.Warning("search::discovery insertion_failed (%v)", err) - return false - } - return true - }(p) - } else { - Log.Error("search::indexing insert_index (%v)", err) - } - if performPush == true { - heap.Push(&this.FoldersUnknown, &Document{ - Type: "directory", - Name: name, - Path: p, - Size: f.Size(), - ModTime: f.ModTime(), - Hash: hashFiles, - }) - } - } else { - if err = this.dbInsert(doc.Path, f, tx); err != nil { - if e, ok := err.(sqlite3.Error); ok && e.Code == sqlite3.ErrConstraint { - return false - } - Log.Warning("search::insert index_error (%v)", err) - return false - } - } - } - return true -} - -func (this *SearchIndexer) Indexing(tx *sql.Tx) bool { - ext := strings.Split(INDEXING_EXT(), ",") - for i := 0; i < len(ext); i++ { - ext[i] = "'" + strings.TrimSpace(ext[i]) + "'" - } - - rows, err := tx.Query( - "SELECT path FROM file WHERE ("+ - " type = 'file' AND size < ? AND filetype IN ("+strings.Join(ext, ",")+") AND indexTime IS NULL "+ - ") LIMIT 2", - MAX_INDEXING_FSIZE(), - ) - if err != nil { - Log.Warning("search::insert index_query (%v)", err) - return false - } - defer rows.Close() - i := 0 - for rows.Next() { - i += 1 - var path string - if err = rows.Scan(&path); err != nil { - Log.Warning("search::indexing index_scan (%v)", err) - return false - } - if err = this.updateFile(path, tx); err != nil { - Log.Warning("search::indexing index_update (%v)", err) - return false - } - } - - if i == 0 { - this.CurrentPhase = PHASE_MAINTAIN - return false - } - return true -} - -func (this *SearchIndexer) updateFile(path string, tx *sql.Tx) error { - if _, err := tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), path); err != nil { - return err - } - - for i := 0; i < len(INDEXING_EXCLUSION); i++ { - if strings.Contains(path, INDEXING_EXCLUSION[i]) { - return nil - } - } - - reader, err := this.Backend.Cat(path) - if err != nil { - if _, a := tx.Exec("DELETE FROM file WHERE path = ?", path); a != nil { - return a - } - return err - } - defer reader.Close() - - switch GetMimeType(path) { - case "text/plain": - reader, err = formater.TxtFormater(reader) - case "text/org": - reader, err = formater.TxtFormater(reader) - case "text/markdown": - reader, err = formater.TxtFormater(reader) - case "application/x-form": - reader, err = formater.TxtFormater(reader) - case "application/pdf": - reader, err = formater.PdfFormater(reader) - case "application/powerpoint": - reader, err = formater.OfficeFormater(reader) - case "application/vnd.ms-powerpoint": - reader, err = formater.OfficeFormater(reader) - case "application/word": - reader, err = formater.OfficeFormater(reader) - case "application/msword": - reader, err = formater.OfficeFormater(reader) - default: - return nil - } - - if err != nil { - return nil - } - var content []byte - if content, err = ioutil.ReadAll(reader); err != nil { - Log.Warning("search::index content_read (%v)", err) - return nil - } - if _, err = tx.Exec("UPDATE file_index SET content = ? WHERE path = ?", content, path); err != nil { - Log.Warning("search::index index_update (%v)", err) - return err - } - return nil -} - -func (this *SearchIndexer) updateFolder(path string, tx *sql.Tx) error { - if _, err := tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), path); err != nil { - return err - } - - for i := 0; i < len(INDEXING_EXCLUSION); i++ { - if strings.Contains(path, INDEXING_EXCLUSION[i]) { - return nil - } - } - - // Fetch list of folders as in the remote filesystem - currFiles, err := this.Backend.Ls(path) - if err != nil { - tx.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~") - return err - } - - // Fetch FS as appear in our search cache - rows, err := tx.Query("SELECT filename, type, size FROM file WHERE parent = ?", path) - if err != nil { - return err - } - defer rows.Close() - previousFiles := make([]File, 0) - for rows.Next() { - var f File - rows.Scan(&f.FName, &f.FType, f.FSize) - previousFiles = append(previousFiles, f) - } - - // Perform the DB operation to ensure previousFiles and currFiles are in sync - // 1. Find the content that have been created and did not exist before - for i := 0; i < len(currFiles); i++ { - currFilenameAlreadyExist := false - currFilename := currFiles[i].Name() - for j := 0; j < len(previousFiles); j++ { - if currFilename == previousFiles[j].Name() { - if currFiles[i].Size() != previousFiles[j].Size() { - err = this.dbUpdate(path, currFiles[i], tx) - if err != nil { - return err - } - break - } - currFilenameAlreadyExist = true - break - } - } - if currFilenameAlreadyExist == false { - this.dbInsert(path, currFiles[i], tx) - } - } - // 2. Find the content that was existing before but got removed - for i := 0; i < len(previousFiles); i++ { - previousFilenameStillExist := false - previousFilename := previousFiles[i].Name() - for j := 0; j < len(currFiles); j++ { - if previousFilename == currFiles[j].Name() { - previousFilenameStillExist = true - break - } - } - if previousFilenameStillExist == false { - this.dbDelete(path, previousFiles[i], tx) - } - } - return nil -} - -func (this *SearchIndexer) Consolidate(tx *sql.Tx) bool { - rows, err := tx.Query( - "SELECT path, type FROM file WHERE indexTime < ? ORDER BY indexTime DESC LIMIT 5", - time.Now().Add(-time.Duration(SEARCH_REINDEX())*time.Hour), - ) - if err != nil { - if err == sql.ErrNoRows { - this.CurrentPhase = PHASE_PAUSE - return false - } - this.CurrentPhase = "" - return false - } - defer rows.Close() - i := 0 - for rows.Next() { - i += 1 - var path string - var cType string - if err = rows.Scan(&path, &cType); err != nil { - Log.Warning("search::index db_stale (%v)", err) - return false - } - if cType == "directory" { - this.updateFolder(path, tx) - } else { - this.updateFile(path, tx) - } - } - if i == 0 { - this.CurrentPhase = PHASE_PAUSE - return false - } - return true -} - -func (this *SearchIndexer) dbInsert(parent string, f os.FileInfo, tx *sql.Tx) error { - var name string = f.Name() - var err error - path := filepath.Join(parent, name) - - if f.IsDir() { - _, err = tx.Exec( - "INSERT INTO file(path, parent, filename, type, size, modTime, indexTime) "+ - "VALUES(?, ?, ?, ?, ?, ?, ?)", - path+"/", - parent, - name, - "directory", - f.Size(), - f.ModTime(), - time.Now(), - ) - } else { - _, err = tx.Exec( - "INSERT INTO file(path, parent, filename, type, size, modTime, indexTime, filetype) "+ - "VALUES(?, ?, ?, ?, ?, ?, ?, ?)", - path, - parent, - name, - "file", - f.Size(), - f.ModTime(), - nil, - strings.TrimPrefix(filepath.Ext(name), "."), - ) - } - return err -} - -func (this *SearchIndexer) dbUpdate(parent string, f os.FileInfo, tx *sql.Tx) error { - path := filepath.Join(parent, f.Name()) - if f.IsDir() { - path += "/" - } - _, err := tx.Exec( - "UPDATE file SET size = ?, modTime = ? indexTime = NULL WHERE path = ?", - f.Size(), f.ModTime(), path, - ) - return err -} - -func (this *SearchIndexer) dbDelete(parent string, f os.FileInfo, tx *sql.Tx) error { - path := filepath.Join(parent, f.Name()) - if f.IsDir() { - path += "/" - } - _, err := tx.Exec( - "DELETE FROM file WHERE path >= ? AND path < ?", - path, path+"~", - ) - return err -}