mirror of
https://github.com/mickael-kerjean/filestash
synced 2025-12-06 08:22:24 +01:00
chore (refactoring): revamp sqlite full text search
This commit is contained in:
parent
ae7e6d0801
commit
b2c9b29384
19 changed files with 1195 additions and 914 deletions
|
|
@ -1,161 +0,0 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
SEARCH_ENABLE func() bool
|
||||
SEARCH_PROCESS_MAX func() int
|
||||
SEARCH_PROCESS_PAR func() int
|
||||
SEARCH_REINDEX func() int
|
||||
CYCLE_TIME func() int
|
||||
INDEXING_EXT func() string
|
||||
MAX_INDEXING_FSIZE func() int
|
||||
INDEXING_EXCLUSION = []string{
|
||||
"/node_modules/", "/bower_components/",
|
||||
"/.cache/", "/.npm/", "/.git/",
|
||||
}
|
||||
)
|
||||
|
||||
func init() {
|
||||
SEARCH_ENABLE = func() bool {
|
||||
return Config.Get("features.search.enable").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Name = "enable"
|
||||
f.Type = "enable"
|
||||
f.Target = []string{
|
||||
"process_max", "process_par", "reindex_time",
|
||||
"cycle_time", "max_size", "indexer_ext",
|
||||
}
|
||||
f.Description = "Enable/Disable full text search"
|
||||
f.Placeholder = "Default: false"
|
||||
f.Default = false
|
||||
return f
|
||||
}).Bool()
|
||||
}
|
||||
SEARCH_PROCESS_MAX = func() int {
|
||||
return Config.Get("features.search.process_max").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "process_max"
|
||||
f.Name = "process_max"
|
||||
f.Type = "number"
|
||||
f.Description = "Size of the pool containing the indexers"
|
||||
f.Placeholder = "Default: 5"
|
||||
f.Default = 5
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
SEARCH_PROCESS_PAR = func() int {
|
||||
return Config.Get("features.search.process_par").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "process_par"
|
||||
f.Name = "process_par"
|
||||
f.Type = "number"
|
||||
f.Description = "How many concurrent indexers are running in the same time (requires a restart)"
|
||||
f.Placeholder = "Default: 2"
|
||||
f.Default = 2
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
SEARCH_REINDEX = func() int {
|
||||
return Config.Get("features.search.reindex_time").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "reindex_time"
|
||||
f.Name = "reindex_time"
|
||||
f.Type = "number"
|
||||
f.Description = "Time in hours after which we consider our index to be stale and needs to be reindexed"
|
||||
f.Placeholder = "Default: 24h"
|
||||
f.Default = 24
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
CYCLE_TIME = func() int {
|
||||
return Config.Get("features.search.cycle_time").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "cycle_time"
|
||||
f.Name = "cycle_time"
|
||||
f.Type = "number"
|
||||
f.Description = "Time the indexer needs to spend for each cycle in seconds (discovery, indexing and maintenance)"
|
||||
f.Placeholder = "Default: 10s"
|
||||
f.Default = 10
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
MAX_INDEXING_FSIZE = func() int {
|
||||
return Config.Get("features.search.max_size").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "max_size"
|
||||
f.Name = "max_size"
|
||||
f.Type = "number"
|
||||
f.Description = "Maximum size of files the indexer will perform full text search"
|
||||
f.Placeholder = "Default: 524288000 => 512MB"
|
||||
f.Default = 524288000
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
INDEXING_EXT = func() string {
|
||||
return Config.Get("features.search.indexer_ext").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "indexer_ext"
|
||||
f.Name = "indexer_ext"
|
||||
f.Type = "text"
|
||||
f.Description = "File extension we want to see indexed"
|
||||
f.Placeholder = "Default: org,txt,docx,pdf,md,form"
|
||||
f.Default = "org,txt,docx,pdf,md,form"
|
||||
return f
|
||||
}).String()
|
||||
}
|
||||
|
||||
Hooks.Register.Onload(func() {
|
||||
SEARCH_ENABLE()
|
||||
SEARCH_PROCESS_MAX()
|
||||
SEARCH_PROCESS_PAR()
|
||||
SEARCH_REINDEX()
|
||||
CYCLE_TIME()
|
||||
MAX_INDEXING_FSIZE()
|
||||
INDEXING_EXT()
|
||||
|
||||
onChange := Config.ListenForChange()
|
||||
runner := func() {
|
||||
startSearch := false
|
||||
for {
|
||||
if SEARCH_ENABLE() == false {
|
||||
select {
|
||||
case <-onChange.Listener:
|
||||
startSearch = SEARCH_ENABLE()
|
||||
}
|
||||
if startSearch == false {
|
||||
continue
|
||||
}
|
||||
}
|
||||
sidx := SProc.Peek()
|
||||
if sidx == nil {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
sidx.mu.Lock()
|
||||
sidx.Execute()
|
||||
sidx.mu.Unlock()
|
||||
}
|
||||
}
|
||||
for i := 0; i < SEARCH_PROCESS_PAR(); i++ {
|
||||
go runner()
|
||||
}
|
||||
})
|
||||
}
|
||||
34
server/plugin/plg_search_sqlitefts/converter/index.go
Normal file
34
server/plugin/plg_search_sqlitefts/converter/index.go
Normal file
|
|
@ -0,0 +1,34 @@
|
|||
package converter
|
||||
|
||||
import (
|
||||
"io"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/model/formater"
|
||||
)
|
||||
|
||||
func Convert(path string, reader io.ReadCloser) (out io.ReadCloser, err error) {
|
||||
switch GetMimeType(path) {
|
||||
case "text/plain":
|
||||
out, err = formater.TxtFormater(reader)
|
||||
case "text/org":
|
||||
out, err = formater.TxtFormater(reader)
|
||||
case "text/markdown":
|
||||
out, err = formater.TxtFormater(reader)
|
||||
case "application/x-form":
|
||||
out, err = formater.TxtFormater(reader)
|
||||
case "application/pdf":
|
||||
out, err = formater.PdfFormater(reader)
|
||||
case "application/powerpoint":
|
||||
out, err = formater.OfficeFormater(reader)
|
||||
case "application/vnd.ms-powerpoint":
|
||||
out, err = formater.OfficeFormater(reader)
|
||||
case "application/word":
|
||||
out, err = formater.OfficeFormater(reader)
|
||||
case "application/msword":
|
||||
out, err = formater.OfficeFormater(reader)
|
||||
default:
|
||||
err = ErrNotImplemented
|
||||
}
|
||||
return out, err
|
||||
}
|
||||
128
server/plugin/plg_search_sqlitefts/crawler/configuration.go
Normal file
128
server/plugin/plg_search_sqlitefts/crawler/configuration.go
Normal file
|
|
@ -0,0 +1,128 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
func init() {
|
||||
Hooks.Register.Onload(func() {
|
||||
SEARCH_ENABLE()
|
||||
SEARCH_PROCESS_MAX()
|
||||
SEARCH_PROCESS_PAR()
|
||||
SEARCH_REINDEX()
|
||||
CYCLE_TIME()
|
||||
MAX_INDEXING_FSIZE()
|
||||
INDEXING_EXT()
|
||||
})
|
||||
}
|
||||
|
||||
var INDEXING_EXCLUSION = []string{
|
||||
"/node_modules/", "/bower_components/",
|
||||
"/.cache/", "/.npm/", "/.git/",
|
||||
}
|
||||
|
||||
var SEARCH_ENABLE = func() bool {
|
||||
return Config.Get("features.search.enable").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Name = "enable"
|
||||
f.Type = "enable"
|
||||
f.Target = []string{
|
||||
"process_max", "process_par", "reindex_time",
|
||||
"cycle_time", "max_size", "indexer_ext",
|
||||
}
|
||||
f.Description = "Enable/Disable full text search"
|
||||
f.Placeholder = "Default: false"
|
||||
f.Default = false
|
||||
return f
|
||||
}).Bool()
|
||||
}
|
||||
|
||||
var SEARCH_PROCESS_MAX = func() int {
|
||||
return Config.Get("features.search.process_max").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "process_max"
|
||||
f.Name = "process_max"
|
||||
f.Type = "number"
|
||||
f.Description = "Size of the pool containing the indexers"
|
||||
f.Placeholder = "Default: 5"
|
||||
f.Default = 5
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
|
||||
var SEARCH_PROCESS_PAR = func() int {
|
||||
return Config.Get("features.search.process_par").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "process_par"
|
||||
f.Name = "process_par"
|
||||
f.Type = "number"
|
||||
f.Description = "How many concurrent indexers are running in the same time (requires a restart)"
|
||||
f.Placeholder = "Default: 2"
|
||||
f.Default = 2
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
|
||||
var SEARCH_REINDEX = func() int {
|
||||
return Config.Get("features.search.reindex_time").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "reindex_time"
|
||||
f.Name = "reindex_time"
|
||||
f.Type = "number"
|
||||
f.Description = "Time in hours after which we consider our index to be stale and needs to be reindexed"
|
||||
f.Placeholder = "Default: 24h"
|
||||
f.Default = 24
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
var CYCLE_TIME = func() int {
|
||||
return Config.Get("features.search.cycle_time").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "cycle_time"
|
||||
f.Name = "cycle_time"
|
||||
f.Type = "number"
|
||||
f.Description = "Time the indexer needs to spend for each cycle in seconds (discovery, indexing and maintenance)"
|
||||
f.Placeholder = "Default: 10s"
|
||||
f.Default = 10
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
|
||||
var MAX_INDEXING_FSIZE = func() int {
|
||||
return Config.Get("features.search.max_size").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "max_size"
|
||||
f.Name = "max_size"
|
||||
f.Type = "number"
|
||||
f.Description = "Maximum size of files the indexer will perform full text search"
|
||||
f.Placeholder = "Default: 524288000 => 512MB"
|
||||
f.Default = 524288000
|
||||
return f
|
||||
}).Int()
|
||||
}
|
||||
var INDEXING_EXT = func() string {
|
||||
return Config.Get("features.search.indexer_ext").Schema(func(f *FormElement) *FormElement {
|
||||
if f == nil {
|
||||
f = &FormElement{}
|
||||
}
|
||||
f.Id = "indexer_ext"
|
||||
f.Name = "indexer_ext"
|
||||
f.Type = "text"
|
||||
f.Description = "File extension we want to see indexed"
|
||||
f.Placeholder = "Default: org,txt,docx,pdf,md,form"
|
||||
f.Default = "org,txt,docx,pdf,md,form"
|
||||
return f
|
||||
}).String()
|
||||
}
|
||||
41
server/plugin/plg_search_sqlitefts/crawler/daemon.go
Normal file
41
server/plugin/plg_search_sqlitefts/crawler/daemon.go
Normal file
|
|
@ -0,0 +1,41 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
var onConfigChange ChangeListener
|
||||
|
||||
func init() {
|
||||
onConfigChange = Config.ListenForChange()
|
||||
Hooks.Register.Onload(func() {
|
||||
for i := 0; i < SEARCH_PROCESS_PAR(); i++ {
|
||||
go runner()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func runner() {
|
||||
startSearch := false
|
||||
for {
|
||||
if SEARCH_ENABLE() == false {
|
||||
select {
|
||||
case <-onConfigChange.Listener:
|
||||
startSearch = SEARCH_ENABLE()
|
||||
}
|
||||
if startSearch == false {
|
||||
continue
|
||||
}
|
||||
}
|
||||
crwlr := NextCrawler()
|
||||
if crwlr == nil {
|
||||
time.Sleep(5 * time.Second)
|
||||
continue
|
||||
}
|
||||
crwlr.mu.Lock()
|
||||
crwlr.Run()
|
||||
crwlr.mu.Unlock()
|
||||
}
|
||||
}
|
||||
70
server/plugin/plg_search_sqlitefts/crawler/daemon_state.go
Normal file
70
server/plugin/plg_search_sqlitefts/crawler/daemon_state.go
Normal file
|
|
@ -0,0 +1,70 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"sync"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
var DaemonState = daemonState{
|
||||
idx: make([]Crawler, 0),
|
||||
n: -1,
|
||||
}
|
||||
|
||||
type daemonState struct {
|
||||
idx []Crawler
|
||||
n int
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
type Crawler struct {
|
||||
Id string
|
||||
FoldersUnknown HeapDoc
|
||||
CurrentPhase string
|
||||
Backend IBackend
|
||||
State indexer.Index
|
||||
mu sync.Mutex
|
||||
lastHash string
|
||||
}
|
||||
|
||||
func NewCrawler(id string, b IBackend) (Crawler, error) {
|
||||
s := Crawler{
|
||||
Id: id,
|
||||
Backend: b,
|
||||
State: indexer.NewIndex(id),
|
||||
FoldersUnknown: make(HeapDoc, 0, 1),
|
||||
}
|
||||
if err := s.State.Init(); err != nil {
|
||||
return s, err
|
||||
}
|
||||
heap.Init(&s.FoldersUnknown)
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func GetCrawler(app *App) *Crawler {
|
||||
id := GenerateID(app.Session)
|
||||
DaemonState.mu.RLock()
|
||||
defer DaemonState.mu.RUnlock()
|
||||
for i := len(DaemonState.idx) - 1; i >= 0; i-- {
|
||||
if id == DaemonState.idx[i].Id {
|
||||
return &DaemonState.idx[i]
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func NextCrawler() *Crawler {
|
||||
DaemonState.mu.Lock()
|
||||
defer DaemonState.mu.Unlock()
|
||||
if len(DaemonState.idx) == 0 {
|
||||
return nil
|
||||
}
|
||||
if DaemonState.n >= len(DaemonState.idx)-1 || DaemonState.n < 0 {
|
||||
DaemonState.n = 0
|
||||
} else {
|
||||
DaemonState.n = DaemonState.n + 1
|
||||
}
|
||||
return &DaemonState.idx[DaemonState.n]
|
||||
}
|
||||
191
server/plugin/plg_search_sqlitefts/crawler/events.go
Normal file
191
server/plugin/plg_search_sqlitefts/crawler/events.go
Normal file
|
|
@ -0,0 +1,191 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
/*
|
||||
* We're listening to what the user is doing to hint the crawler over
|
||||
* what needs to be updated in priority, what file got updated and would need
|
||||
* to be reindexed, what should disappear from the index, ....
|
||||
* This way we can fine tune how full text search is behaving
|
||||
*/
|
||||
|
||||
type FileHook struct{}
|
||||
|
||||
func (this FileHook) Ls(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintLs(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Cat(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Mkdir(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Rm(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintRm(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Mv(ctx *App, from string, to string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintRm(ctx, filepath.Dir(from)+"/")
|
||||
go DaemonState.HintLs(ctx, filepath.Dir(to)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Save(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
go DaemonState.HintFile(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) Touch(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go DaemonState.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this FileHook) record(ctx *App) bool {
|
||||
if ctx.Context.Value("AUDIT") == false {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *daemonState) HintLs(app *App, path string) {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
// try to find the search indexer among the existing ones
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id != this.idx[i].Id {
|
||||
continue
|
||||
}
|
||||
alreadyHasPath := false
|
||||
for j := 0; j < len(this.idx[i].FoldersUnknown); j++ {
|
||||
if this.idx[i].FoldersUnknown[j].Path == path {
|
||||
alreadyHasPath = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if alreadyHasPath == false {
|
||||
heap.Push(&this.idx[i].FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Path: path,
|
||||
InitialPath: path,
|
||||
Name: filepath.Base(path),
|
||||
})
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// Having all indexers running in memory could be expensive => instead we're cycling a pool
|
||||
search_process_max := SEARCH_PROCESS_MAX()
|
||||
lenIdx := len(this.idx)
|
||||
if lenIdx > 0 && search_process_max > 0 && lenIdx > (search_process_max-1) {
|
||||
toDel := this.idx[0 : lenIdx-(search_process_max-1)]
|
||||
for i := range toDel {
|
||||
toDel[i].Close()
|
||||
}
|
||||
this.idx = this.idx[lenIdx-(search_process_max-1):]
|
||||
}
|
||||
// instantiate the new indexer
|
||||
s, err := NewCrawler(id, app.Backend)
|
||||
if err != nil {
|
||||
Log.Warning("plg_search_sqlitefs::init message=cannot_create_crawler err=%s", err.Error())
|
||||
return
|
||||
}
|
||||
defer func() {
|
||||
// recover from panic if one occurred. Set err to nil otherwise.
|
||||
if r := recover(); r != nil {
|
||||
name := "na"
|
||||
for _, el := range app.Backend.LoginForm().Elmnts {
|
||||
if el.Name == "type" {
|
||||
name = el.Value.(string)
|
||||
}
|
||||
}
|
||||
Log.Error("plg_search_sqlitefs::panic backend=\"%s\" recover=\"%s\"", name, r)
|
||||
}
|
||||
}()
|
||||
v := reflect.ValueOf(app.Backend).Elem().FieldByName("Context")
|
||||
if v.IsValid() && v.CanSet() {
|
||||
// prevent context expiration which is often default as r.Context()
|
||||
// as we need to make queries outside the scope of a normal http request
|
||||
v.Set(reflect.ValueOf(context.Background()))
|
||||
}
|
||||
|
||||
heap.Push(&s.FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Path: path,
|
||||
InitialPath: path,
|
||||
Name: filepath.Base(path),
|
||||
})
|
||||
this.idx = append(this.idx, s)
|
||||
}
|
||||
|
||||
func (this *daemonState) HintRm(app *App, path string) {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.RLock()
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id != this.idx[i].Id {
|
||||
continue
|
||||
}
|
||||
if op, err := this.idx[i].State.Change(); err == nil {
|
||||
op.RemoveAll(path)
|
||||
op.Commit()
|
||||
}
|
||||
break
|
||||
}
|
||||
this.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (this *daemonState) HintFile(app *App, path string) {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.RLock()
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id != this.idx[i].Id {
|
||||
continue
|
||||
}
|
||||
if op, err := this.idx[i].State.Change(); err == nil {
|
||||
op.IndexTimeClear(path)
|
||||
op.Commit()
|
||||
}
|
||||
break
|
||||
}
|
||||
this.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (this *daemonState) Reset() {
|
||||
this.mu.Lock()
|
||||
for i := range this.idx {
|
||||
this.idx[i].Close()
|
||||
}
|
||||
this.idx = make([]Crawler, 0)
|
||||
this.n = -1
|
||||
this.mu.Unlock()
|
||||
}
|
||||
54
server/plugin/plg_search_sqlitefts/crawler/phase.go
Normal file
54
server/plugin/plg_search_sqlitefts/crawler/phase.go
Normal file
|
|
@ -0,0 +1,54 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
func (this *Crawler) Run() {
|
||||
if this.CurrentPhase == "" {
|
||||
time.Sleep(1 * time.Second)
|
||||
this.CurrentPhase = PHASE_EXPLORE
|
||||
}
|
||||
Log.Debug("Search::indexing Execute %s", this.CurrentPhase)
|
||||
|
||||
cycleExecute := func(fn func(indexer.Manager) bool) {
|
||||
stopTime := time.Now().Add(time.Duration(CYCLE_TIME()) * time.Second)
|
||||
op, err := this.State.Change()
|
||||
if err != nil {
|
||||
Log.Warning("search::index cycle_begin (%+v)", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
for {
|
||||
if fn(op) == false {
|
||||
break
|
||||
}
|
||||
if stopTime.After(time.Now()) == false {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err = op.Commit(); err != nil {
|
||||
Log.Warning("search::index cycle_commit (%+v)", err)
|
||||
}
|
||||
}
|
||||
if this.CurrentPhase == PHASE_EXPLORE {
|
||||
cycleExecute(this.Discover)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_INDEXING {
|
||||
cycleExecute(this.Indexing)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_MAINTAIN {
|
||||
cycleExecute(this.Consolidate)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_PAUSE {
|
||||
time.Sleep(5 * time.Second)
|
||||
this.CurrentPhase = ""
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *Crawler) Close() error {
|
||||
return this.State.Close()
|
||||
}
|
||||
110
server/plugin/plg_search_sqlitefts/crawler/phase_explore.go
Normal file
110
server/plugin/plg_search_sqlitefts/crawler/phase_explore.go
Normal file
|
|
@ -0,0 +1,110 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"encoding/base64"
|
||||
"hash/fnv"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
func (this *Crawler) Discover(tx indexer.Manager) bool {
|
||||
if this.FoldersUnknown.Len() == 0 {
|
||||
this.CurrentPhase = PHASE_INDEXING
|
||||
return false
|
||||
}
|
||||
var doc *Document
|
||||
doc = heap.Pop(&this.FoldersUnknown).(*Document)
|
||||
if doc == nil {
|
||||
this.CurrentPhase = PHASE_INDEXING
|
||||
return false
|
||||
}
|
||||
files, err := this.Backend.Ls(doc.Path)
|
||||
if err != nil {
|
||||
this.CurrentPhase = ""
|
||||
return true
|
||||
}
|
||||
if len(files) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// We don't want our indexer to go wild and diverge over time. As such we need to detect those edge cases: aka
|
||||
// recursive folder structure. Our detection is relying on a Hash of []os.FileInfo
|
||||
hashFiles := func() string {
|
||||
var step int = len(files) / 50
|
||||
if step == 0 {
|
||||
step = 1
|
||||
}
|
||||
hasher := fnv.New32()
|
||||
hasher.Write([]byte(strconv.Itoa(len(files))))
|
||||
for i := 0; i < len(files); i = i + step {
|
||||
hasher.Write([]byte(files[i].Name()))
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||
}()
|
||||
if hashFiles == this.lastHash {
|
||||
return true
|
||||
}
|
||||
this.lastHash = ""
|
||||
for i := 0; i < this.FoldersUnknown.Len(); i++ {
|
||||
if this.FoldersUnknown[i].Hash == hashFiles && filepath.Base(doc.Path) != filepath.Base(this.FoldersUnknown[i].Path) {
|
||||
this.lastHash = hashFiles
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the newly found data within our index
|
||||
for i := range files {
|
||||
f := files[i]
|
||||
name := f.Name()
|
||||
if f.IsDir() {
|
||||
var performPush bool = false
|
||||
p := filepath.Join(doc.Path, name)
|
||||
p += "/"
|
||||
if err = dbInsert(doc.Path, f, tx); err == nil {
|
||||
performPush = true
|
||||
} else if err == indexer.ErrConstraint {
|
||||
performPush = func(path string) bool {
|
||||
tm, err := tx.IndexTimeGet(p)
|
||||
if err != nil {
|
||||
Log.Warning("search::discovery unknown_path (%v)", err)
|
||||
return false
|
||||
}
|
||||
if time.Now().Add(time.Duration(-SEARCH_REINDEX()) * time.Hour).Before(tm) {
|
||||
return false
|
||||
}
|
||||
if err = tx.IndexTimeUpdate(p, time.Now()); err != nil {
|
||||
Log.Warning("search::discovery insertion_failed (%v)", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}(p)
|
||||
} else {
|
||||
Log.Error("search::indexing insert_index (%v)", err)
|
||||
}
|
||||
if performPush == true {
|
||||
heap.Push(&this.FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Name: name,
|
||||
Path: p,
|
||||
Size: f.Size(),
|
||||
ModTime: f.ModTime(),
|
||||
Hash: hashFiles,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if err = dbInsert(doc.Path, f, tx); err != nil {
|
||||
if err == indexer.ErrConstraint {
|
||||
return false
|
||||
}
|
||||
Log.Warning("search::insert index_error (%v)", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
36
server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go
Normal file
36
server/plugin/plg_search_sqlitefts/crawler/phase_indexing.go
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"strings"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
func (this *Crawler) Indexing(tx indexer.Manager) bool {
|
||||
rows, err := tx.FindNew(MAX_INDEXING_FSIZE(), strings.Split(INDEXING_EXT(), ","))
|
||||
if err != nil {
|
||||
Log.Warning("search::insert index_query (%v)", err)
|
||||
return false
|
||||
}
|
||||
defer rows.Close()
|
||||
hasRows := false
|
||||
for rows.Next() {
|
||||
hasRows = true
|
||||
r, err := rows.Value()
|
||||
if err != nil {
|
||||
Log.Warning("search::indexing index_scan (%v)", err)
|
||||
return false
|
||||
}
|
||||
if err = updateFile(r.Path, this.Backend, tx); err != nil {
|
||||
Log.Warning("search::indexing index_update (%v)", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if hasRows == false {
|
||||
this.CurrentPhase = PHASE_MAINTAIN
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
40
server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go
Normal file
40
server/plugin/plg_search_sqlitefts/crawler/phase_maintain.go
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
func (this *Crawler) Consolidate(tx indexer.Manager) bool {
|
||||
rows, err := tx.FindBefore(time.Now().Add(-time.Duration(SEARCH_REINDEX()) * time.Hour))
|
||||
if err != nil {
|
||||
if err == indexer.ErrNoRows {
|
||||
this.CurrentPhase = PHASE_PAUSE
|
||||
return false
|
||||
}
|
||||
this.CurrentPhase = ""
|
||||
return false
|
||||
}
|
||||
defer rows.Close()
|
||||
hasRows := false
|
||||
for rows.Next() {
|
||||
hasRows = true
|
||||
r, err := rows.Value()
|
||||
if err != nil {
|
||||
Log.Warning("search::index db_stale (%v)", err)
|
||||
return false
|
||||
}
|
||||
if r.CType == "directory" {
|
||||
updateFolder(r.Path, this.Backend, tx)
|
||||
} else {
|
||||
updateFile(r.Path, this.Backend, tx)
|
||||
}
|
||||
}
|
||||
if hasRows == false {
|
||||
this.CurrentPhase = PHASE_PAUSE
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
131
server/plugin/plg_search_sqlitefts/crawler/phase_utils.go
Normal file
131
server/plugin/plg_search_sqlitefts/crawler/phase_utils.go
Normal file
|
|
@ -0,0 +1,131 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/converter"
|
||||
"github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/indexer"
|
||||
)
|
||||
|
||||
func updateFile(path string, backend IBackend, tx indexer.Manager) error {
|
||||
if err := tx.IndexTimeUpdate(path, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
for i := 0; i < len(INDEXING_EXCLUSION); i++ {
|
||||
if strings.Contains(path, INDEXING_EXCLUSION[i]) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
reader, err := backend.Cat(path)
|
||||
if err != nil {
|
||||
tx.RemoveAll(path)
|
||||
return err
|
||||
}
|
||||
defer reader.Close()
|
||||
convertedReader, err := converter.Convert(path, reader)
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
defer convertedReader.Close()
|
||||
if err = tx.FileContentUpdate(path, reader); err != nil {
|
||||
Log.Warning("search::index index_update (%v)", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func updateFolder(path string, backend IBackend, tx indexer.Manager) error {
|
||||
if err := tx.IndexTimeUpdate(path, time.Now()); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(INDEXING_EXCLUSION); i++ {
|
||||
if strings.Contains(path, INDEXING_EXCLUSION[i]) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch list of folders as in the remote filesystem
|
||||
currFiles, err := backend.Ls(path)
|
||||
if err != nil {
|
||||
tx.RemoveAll(path)
|
||||
return err
|
||||
}
|
||||
|
||||
// Fetch FS as appear in our search cache
|
||||
rows, err := tx.FindParent(path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
previousFiles := make([]File, 0)
|
||||
for rows.Next() {
|
||||
r, err := rows.Value()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
previousFiles = append(previousFiles, File{
|
||||
FName: r.Name,
|
||||
FSize: r.Size,
|
||||
FPath: r.Path,
|
||||
})
|
||||
}
|
||||
|
||||
// Perform the DB operation to ensure previousFiles and currFiles are in sync
|
||||
// 1. Find the content that have been created and did not exist before
|
||||
for i := 0; i < len(currFiles); i++ {
|
||||
currFilenameAlreadyExist := false
|
||||
currFilename := currFiles[i].Name()
|
||||
for j := 0; j < len(previousFiles); j++ {
|
||||
if currFilename == previousFiles[j].Name() {
|
||||
if currFiles[i].Size() != previousFiles[j].Size() {
|
||||
if err = dbUpdate(path, currFiles[i], tx); err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
currFilenameAlreadyExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if currFilenameAlreadyExist == false {
|
||||
dbInsert(path, currFiles[i], tx)
|
||||
}
|
||||
}
|
||||
// 2. Find the content that was existing before but got removed
|
||||
for i := 0; i < len(previousFiles); i++ {
|
||||
previousFilenameStillExist := false
|
||||
previousFilename := previousFiles[i].Name()
|
||||
for j := 0; j < len(currFiles); j++ {
|
||||
if previousFilename == currFiles[j].Name() {
|
||||
previousFilenameStillExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if previousFilenameStillExist == false {
|
||||
p := filepath.Join(path, previousFiles[i].Name())
|
||||
if previousFiles[i].IsDir() {
|
||||
p += "/"
|
||||
}
|
||||
tx.RemoveAll(p)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func dbInsert(parent string, f os.FileInfo, tx indexer.Manager) error {
|
||||
return tx.FileCreate(f, parent)
|
||||
}
|
||||
|
||||
func dbUpdate(parent string, f fs.FileInfo, tx indexer.Manager) error {
|
||||
path := filepath.Join(parent, f.Name())
|
||||
if f.IsDir() {
|
||||
path += "/"
|
||||
}
|
||||
return tx.FileMetaUpdate(path, f)
|
||||
}
|
||||
|
|
@ -5,6 +5,13 @@ import (
|
|||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
PHASE_EXPLORE = "PHASE_EXPLORE"
|
||||
PHASE_INDEXING = "PHASE_INDEXING"
|
||||
PHASE_MAINTAIN = "PHASE_MAINTAIN"
|
||||
PHASE_PAUSE = "PHASE_PAUSE"
|
||||
)
|
||||
|
||||
const MAX_HEAP_SIZE = 100000
|
||||
|
||||
type Document struct {
|
||||
|
|
@ -1,140 +0,0 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"context"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
"sync"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
var SProc SearchProcess = SearchProcess{
|
||||
idx: make([]SearchIndexer, 0),
|
||||
n: -1,
|
||||
}
|
||||
|
||||
type SearchProcess struct {
|
||||
idx []SearchIndexer
|
||||
n int
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func (this *SearchProcess) HintLs(app *App, path string) *SearchIndexer {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.Lock()
|
||||
defer this.mu.Unlock()
|
||||
|
||||
// try to find the search indexer among the existing ones
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id == this.idx[i].Id {
|
||||
alreadyHasPath := false
|
||||
for j := 0; j < len(this.idx[i].FoldersUnknown); j++ {
|
||||
if this.idx[i].FoldersUnknown[j].Path == path {
|
||||
alreadyHasPath = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if alreadyHasPath == false {
|
||||
heap.Push(&this.idx[i].FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Path: path,
|
||||
InitialPath: path,
|
||||
Name: filepath.Base(path),
|
||||
})
|
||||
}
|
||||
ret := &this.idx[i]
|
||||
return ret
|
||||
}
|
||||
}
|
||||
|
||||
// Having all indexers running in memory could be expensive => instead we're cycling a pool
|
||||
search_process_max := SEARCH_PROCESS_MAX()
|
||||
lenIdx := len(this.idx)
|
||||
if lenIdx > 0 && search_process_max > 0 && lenIdx > (search_process_max-1) {
|
||||
toDel := this.idx[0 : lenIdx-(search_process_max-1)]
|
||||
for i := range toDel {
|
||||
toDel[i].DB.Close()
|
||||
}
|
||||
this.idx = this.idx[lenIdx-(search_process_max-1):]
|
||||
}
|
||||
// instantiate the new indexer
|
||||
s := NewSearchIndexer(id, app.Backend)
|
||||
defer func() {
|
||||
// recover from panic if one occurred. Set err to nil otherwise.
|
||||
if recover() != nil {
|
||||
name := "na"
|
||||
for _, el := range app.Backend.LoginForm().Elmnts {
|
||||
if el.Name == "type" {
|
||||
name = el.Value.(string)
|
||||
}
|
||||
}
|
||||
Log.Error("plg_search_sqlitefs::panic backend=\"%s\"", name)
|
||||
}
|
||||
}()
|
||||
v := reflect.ValueOf(app.Backend).Elem().FieldByName("Context")
|
||||
if v.IsValid() && v.CanSet() {
|
||||
// prevent context expiration which is often default as r.Context()
|
||||
// as we need to make queries outside the scope of a normal http request
|
||||
v.Set(reflect.ValueOf(context.Background()))
|
||||
}
|
||||
|
||||
heap.Push(&s.FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Path: path,
|
||||
InitialPath: path,
|
||||
Name: filepath.Base(path),
|
||||
})
|
||||
this.idx = append(this.idx, s)
|
||||
return &s
|
||||
}
|
||||
|
||||
func (this *SearchProcess) HintRm(app *App, path string) {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.RLock()
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id == this.idx[i].Id {
|
||||
this.idx[i].DB.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~")
|
||||
break
|
||||
}
|
||||
}
|
||||
this.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (this *SearchProcess) HintFile(app *App, path string) {
|
||||
id := GenerateID(app.Session)
|
||||
this.mu.RLock()
|
||||
for i := len(this.idx) - 1; i >= 0; i-- {
|
||||
if id == this.idx[i].Id {
|
||||
this.idx[i].DB.Exec("UPDATE file set indexTime = NULL WHERE path = ?", path)
|
||||
break
|
||||
}
|
||||
}
|
||||
this.mu.RUnlock()
|
||||
}
|
||||
|
||||
func (this *SearchProcess) Peek() *SearchIndexer {
|
||||
if len(this.idx) == 0 {
|
||||
return nil
|
||||
}
|
||||
this.mu.Lock()
|
||||
if this.n >= len(this.idx)-1 || this.n < 0 {
|
||||
this.n = 0
|
||||
} else {
|
||||
this.n = this.n + 1
|
||||
}
|
||||
s := &this.idx[this.n]
|
||||
this.mu.Unlock()
|
||||
return s
|
||||
}
|
||||
|
||||
func (this *SearchProcess) Reset() {
|
||||
this.mu.Lock()
|
||||
for i := range this.idx {
|
||||
this.idx[i].DB.Close()
|
||||
}
|
||||
this.idx = make([]SearchIndexer, 0)
|
||||
this.mu.Unlock()
|
||||
this.n = -1
|
||||
}
|
||||
|
|
@ -2,133 +2,10 @@ package plg_search_sqlitefts
|
|||
|
||||
import (
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
)
|
||||
|
||||
const (
|
||||
PHASE_EXPLORE = "PHASE_EXPLORE"
|
||||
PHASE_INDEXING = "PHASE_INDEXING"
|
||||
PHASE_MAINTAIN = "PHASE_MAINTAIN"
|
||||
PHASE_PAUSE = "PHASE_PAUSE"
|
||||
. "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/crawler"
|
||||
)
|
||||
|
||||
func init() {
|
||||
sh := SearchHint{}
|
||||
Hooks.Register.SearchEngine(SqliteSearch{Hint: &sh})
|
||||
Hooks.Register.AuthorisationMiddleware(&sh)
|
||||
}
|
||||
|
||||
type SqliteSearch struct {
|
||||
Hint *SearchHint
|
||||
}
|
||||
|
||||
func (this SqliteSearch) Query(app App, path string, keyword string) ([]IFile, error) {
|
||||
files := []IFile{}
|
||||
|
||||
// extract our search indexer
|
||||
s := SProc.HintLs(&app, path)
|
||||
if s == nil {
|
||||
return files, ErrNotReachable
|
||||
}
|
||||
|
||||
if path == "" {
|
||||
path = "/"
|
||||
}
|
||||
|
||||
rows, err := s.DB.Query(
|
||||
"SELECT type, path, size, modTime FROM file WHERE path IN ("+
|
||||
" SELECT path FROM file_index WHERE file_index MATCH ? AND path > ? AND path < ?"+
|
||||
" ORDER BY rank LIMIT 2000"+
|
||||
")",
|
||||
regexp.MustCompile(`(\.|\-)`).ReplaceAllString(keyword, "\"$1\""),
|
||||
path, path+"~",
|
||||
)
|
||||
if err != nil {
|
||||
Log.Warning("search::query DBQuery (%s)", err.Error())
|
||||
return files, ErrNotReachable
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
f := File{}
|
||||
var t string
|
||||
if err = rows.Scan(&f.FType, &f.FPath, &f.FSize, &t); err != nil {
|
||||
Log.Warning("search::query scan (%s)", err.Error())
|
||||
return files, ErrNotReachable
|
||||
}
|
||||
if tm, err := time.Parse(time.RFC3339, t); err == nil {
|
||||
f.FTime = tm.Unix() * 1000
|
||||
}
|
||||
f.FName = filepath.Base(f.FPath)
|
||||
files = append(files, f)
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
|
||||
/*
|
||||
* We're listening to what the user is doing to hint the crawler over
|
||||
* what needs to be updated in priority, what file got updated and would need
|
||||
* to be reindexed, what should disappear from the index, ....
|
||||
* This way we can fine tune how full text search is behaving
|
||||
*/
|
||||
|
||||
type SearchHint struct{}
|
||||
|
||||
func (this SearchHint) Ls(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintLs(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Cat(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Mkdir(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Rm(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintRm(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Mv(ctx *App, from string, to string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintRm(ctx, filepath.Dir(from)+"/")
|
||||
go SProc.HintLs(ctx, filepath.Dir(to)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Save(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
go SProc.HintFile(ctx, path)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) Touch(ctx *App, path string) error {
|
||||
if this.record(ctx) {
|
||||
go SProc.HintLs(ctx, filepath.Dir(path)+"/")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this SearchHint) record(ctx *App) bool {
|
||||
if ctx.Context.Value("AUDIT") == false {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
Hooks.Register.SearchEngine(SearchEngine{})
|
||||
Hooks.Register.AuthorisationMiddleware(FileHook{})
|
||||
}
|
||||
|
|
|
|||
24
server/plugin/plg_search_sqlitefts/indexer/error.go
Normal file
24
server/plugin/plg_search_sqlitefts/indexer/error.go
Normal file
|
|
@ -0,0 +1,24 @@
|
|||
package indexer
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
|
||||
"github.com/mattn/go-sqlite3"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrConstraint = fmt.Errorf("DB_CONSTRAINT_FAILED_ERROR")
|
||||
ErrNoRows = fmt.Errorf("NO_ROWS")
|
||||
)
|
||||
|
||||
func toErr(err error) error {
|
||||
if sqliteErr, ok := (err).(sqlite3.Error); ok {
|
||||
if err == sql.ErrNoRows {
|
||||
return ErrNoRows
|
||||
} else if sqliteErr.Code == sqlite3.ErrConstraint {
|
||||
return ErrConstraint
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
266
server/plugin/plg_search_sqlitefts/indexer/index.go
Normal file
266
server/plugin/plg_search_sqlitefts/indexer/index.go
Normal file
|
|
@ -0,0 +1,266 @@
|
|||
package indexer
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"io"
|
||||
"io/fs"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
type Index interface {
|
||||
Init() error
|
||||
Search(path string, q string) ([]IFile, error)
|
||||
Change() (Manager, error)
|
||||
Close() error
|
||||
}
|
||||
|
||||
type Manager interface {
|
||||
FindParent(path string) (RowMapper, error)
|
||||
FindBefore(time time.Time) (RowMapper, error)
|
||||
FindNew(maxSize int, toOmit []string) (RowMapper, error)
|
||||
|
||||
FileCreate(f fs.FileInfo, parent string) error
|
||||
FileContentUpdate(path string, f io.ReadCloser) error
|
||||
FileMetaUpdate(path string, f fs.FileInfo) error
|
||||
|
||||
IndexTimeGet(path string) (time.Time, error)
|
||||
IndexTimeUpdate(path string, t time.Time) error
|
||||
IndexTimeClear(path string) error
|
||||
|
||||
RemoveAll(path string) error
|
||||
Commit() error
|
||||
}
|
||||
|
||||
func NewIndex(id string) Index {
|
||||
return &sqliteIndex{id, nil}
|
||||
}
|
||||
|
||||
type sqliteIndex struct {
|
||||
id string
|
||||
db *sql.DB
|
||||
}
|
||||
|
||||
func (this *sqliteIndex) Init() error {
|
||||
path := GetAbsolutePath(FTS_PATH, "fts_"+this.id+".sql")
|
||||
db, err := sql.Open("sqlite3", path+"?_journal_mode=wal")
|
||||
if err != nil {
|
||||
Log.Warning("search::init can't open database (%v)", err)
|
||||
return toErr(err)
|
||||
}
|
||||
this.db = db
|
||||
|
||||
queryDB := func(sqlQuery string) error {
|
||||
stmt, err := db.Prepare(sqlQuery)
|
||||
if err != nil {
|
||||
Log.Warning("search::initschema prepare schema error(%v)", err)
|
||||
return toErr(err)
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
Log.Warning("search::initschema execute error(%v)", err)
|
||||
return toErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), parent VARCHAR(1024), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE INDEX IF NOT EXISTS idx_file_index_time ON file(indexTime) WHERE indexTime IS NOT NULL;"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE INDEX IF NOT EXISTS idx_file_parent ON file(parent);"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content, tokenize = 'porter');"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_insert AFTER INSERT ON file BEGIN INSERT INTO file_index (path, filename, filetype) VALUES(new.path, new.filename, new.filetype); END;"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_delete AFTER DELETE ON file BEGIN DELETE FROM file_index WHERE path = old.path; END;"); err != nil {
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_update_path UPDATE OF path ON file BEGIN UPDATE file_index SET path = new.path, filepath = new.filepath, filetype = new.filetype WHERE path = old.path; END;"); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this sqliteIndex) Change() (Manager, error) {
|
||||
tx, err := this.db.Begin()
|
||||
if err != nil {
|
||||
return nil, toErr(err)
|
||||
}
|
||||
return sqliteQueries{tx}, nil
|
||||
}
|
||||
|
||||
func (this sqliteIndex) Close() error {
|
||||
return this.db.Close()
|
||||
}
|
||||
|
||||
type sqliteQueries struct {
|
||||
tx *sql.Tx
|
||||
}
|
||||
|
||||
func (this sqliteQueries) Commit() error {
|
||||
return this.tx.Commit()
|
||||
}
|
||||
|
||||
func (this sqliteQueries) IndexTimeGet(path string) (time.Time, error) {
|
||||
var t string
|
||||
if err := this.tx.QueryRow("SELECT indexTime FROM file WHERE path = ?", path).Scan(&t); err != nil {
|
||||
return time.Now(), toErr(err)
|
||||
}
|
||||
tm, err := time.Parse(time.RFC3339, t)
|
||||
if err != nil {
|
||||
return tm, toErr(err)
|
||||
}
|
||||
return tm, nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) IndexTimeUpdate(path string, time time.Time) error {
|
||||
if _, err := this.tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time, path); err != nil {
|
||||
return toErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) IndexTimeClear(path string) error {
|
||||
if _, err := this.tx.Exec("UPDATE file SET indexTime = NULL WHERE path = ?", path); err != nil {
|
||||
return toErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type Record struct {
|
||||
Name string
|
||||
Path string
|
||||
Size int64
|
||||
CType string
|
||||
}
|
||||
|
||||
type RowMapper struct {
|
||||
rows *sql.Rows
|
||||
}
|
||||
|
||||
func (this *RowMapper) Next() bool {
|
||||
return this.rows.Next()
|
||||
}
|
||||
|
||||
func (this *RowMapper) Value() (Record, error) {
|
||||
var r Record
|
||||
if err := this.rows.Scan(&r.Name, &r.CType, &r.Path, &r.Size); err != nil {
|
||||
return r, toErr(err)
|
||||
}
|
||||
return r, nil
|
||||
}
|
||||
|
||||
func (this *RowMapper) Close() error {
|
||||
return this.rows.Close()
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FindNew(maxSize int, toOmit []string) (RowMapper, error) {
|
||||
for i := 0; i < len(toOmit); i++ {
|
||||
toOmit[i] = "'" + strings.TrimSpace(toOmit[i]) + "'"
|
||||
}
|
||||
|
||||
rows, err := this.tx.Query(
|
||||
"SELECT filename, type, path, size FROM file WHERE ("+
|
||||
" type = 'file' AND size < ? AND filetype IN ("+strings.Join(toOmit, ",")+") AND indexTime IS NULL "+
|
||||
") LIMIT 2",
|
||||
maxSize,
|
||||
)
|
||||
if err != nil {
|
||||
return RowMapper{}, toErr(err)
|
||||
}
|
||||
return RowMapper{rows: rows}, nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FindBefore(t time.Time) (RowMapper, error) {
|
||||
rows, err := this.tx.Query(
|
||||
"SELECT filename, type, path, size FROM file WHERE indexTime < ? ORDER BY indexTime DESC LIMIT 5",
|
||||
t,
|
||||
)
|
||||
if err != nil {
|
||||
return RowMapper{}, toErr(err)
|
||||
}
|
||||
return RowMapper{rows: rows}, nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FindParent(path string) (RowMapper, error) {
|
||||
rows, err := this.tx.Query("SELECT filename, type, path, size FROM file WHERE parent = ?", path)
|
||||
if err != nil {
|
||||
return RowMapper{}, err
|
||||
}
|
||||
return RowMapper{rows: rows}, nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FileMetaUpdate(path string, f fs.FileInfo) error {
|
||||
_, err := this.tx.Exec(
|
||||
"UPDATE file SET size = ?, modTime = ? indexTime = NULL WHERE path = ?",
|
||||
f.Size(), f.ModTime(), path,
|
||||
)
|
||||
return toErr(err)
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FileContentUpdate(path string, reader io.ReadCloser) error {
|
||||
content, err := io.ReadAll(reader)
|
||||
if err != nil {
|
||||
return toErr(err)
|
||||
}
|
||||
if _, err := this.tx.Exec("UPDATE file_index SET content = ? WHERE path = ?", content, path); err != nil {
|
||||
return toErr(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) FileCreate(f fs.FileInfo, parentPath string) (err error) {
|
||||
name := f.Name()
|
||||
path := filepath.Join(parentPath, f.Name())
|
||||
if f.IsDir() {
|
||||
_, err = this.tx.Exec(
|
||||
"INSERT INTO file(path, parent, filename, type, size, modTime, indexTime) "+
|
||||
"VALUES(?, ?, ?, ?, ?, ?, ?)",
|
||||
path+"/",
|
||||
parentPath,
|
||||
name,
|
||||
"directory",
|
||||
f.Size(),
|
||||
f.ModTime(),
|
||||
time.Now(),
|
||||
)
|
||||
} else {
|
||||
_, err = this.tx.Exec(
|
||||
"INSERT INTO file(path, parent, filename, type, size, modTime, indexTime, filetype) "+
|
||||
"VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
path,
|
||||
parentPath,
|
||||
name,
|
||||
"file",
|
||||
f.Size(),
|
||||
f.ModTime(),
|
||||
nil,
|
||||
strings.TrimPrefix(filepath.Ext(name), "."),
|
||||
)
|
||||
}
|
||||
return toErr(err)
|
||||
}
|
||||
|
||||
func (this sqliteQueries) Remove(path string) error {
|
||||
if _, a := this.tx.Exec("DELETE FROM file WHERE path = ?", path); a != nil {
|
||||
return toErr(a)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this sqliteQueries) RemoveAll(path string) error {
|
||||
if _, a := this.tx.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~"); a != nil {
|
||||
return toErr(a)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
40
server/plugin/plg_search_sqlitefts/indexer/query.go
Normal file
40
server/plugin/plg_search_sqlitefts/indexer/query.go
Normal file
|
|
@ -0,0 +1,40 @@
|
|||
package indexer
|
||||
|
||||
import (
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
)
|
||||
|
||||
func (this sqliteIndex) Search(path string, q string) ([]IFile, error) {
|
||||
files := []IFile{}
|
||||
rows, err := this.db.Query(
|
||||
"SELECT type, path, size, modTime FROM file WHERE path IN ("+
|
||||
" SELECT path FROM file_index WHERE file_index MATCH ? AND path > ? AND path < ?"+
|
||||
" ORDER BY rank LIMIT 2000"+
|
||||
")",
|
||||
regexp.MustCompile(`(\.|\-)`).ReplaceAllString(q, "\"$1\""),
|
||||
path, path+"~",
|
||||
)
|
||||
if err != nil {
|
||||
Log.Warning("search::query DBQuery (%s)", err.Error())
|
||||
return files, ErrNotReachable
|
||||
}
|
||||
defer rows.Close()
|
||||
for rows.Next() {
|
||||
f := File{}
|
||||
var t string
|
||||
if err = rows.Scan(&f.FType, &f.FPath, &f.FSize, &t); err != nil {
|
||||
Log.Warning("search::query scan (%s)", err.Error())
|
||||
return files, ErrNotReachable
|
||||
}
|
||||
if tm, err := time.Parse(time.RFC3339, t); err == nil {
|
||||
f.FTime = tm.Unix() * 1000
|
||||
}
|
||||
f.FName = filepath.Base(f.FPath)
|
||||
files = append(files, f)
|
||||
}
|
||||
return files, nil
|
||||
}
|
||||
20
server/plugin/plg_search_sqlitefts/query.go
Normal file
20
server/plugin/plg_search_sqlitefts/query.go
Normal file
|
|
@ -0,0 +1,20 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
. "github.com/mickael-kerjean/filestash/server/plugin/plg_search_sqlitefts/crawler"
|
||||
)
|
||||
|
||||
type SearchEngine struct{}
|
||||
|
||||
func (this SearchEngine) Query(app App, path string, keyword string) ([]IFile, error) {
|
||||
DaemonState.HintLs(&app, path)
|
||||
s := GetCrawler(&app)
|
||||
if s == nil {
|
||||
return nil, ErrNotReachable
|
||||
}
|
||||
if path == "" {
|
||||
path = "/"
|
||||
}
|
||||
return s.State.Search(path, keyword)
|
||||
}
|
||||
|
|
@ -1,487 +0,0 @@
|
|||
package plg_search_sqlitefts
|
||||
|
||||
import (
|
||||
"container/heap"
|
||||
"database/sql"
|
||||
"encoding/base64"
|
||||
"github.com/mattn/go-sqlite3"
|
||||
. "github.com/mickael-kerjean/filestash/server/common"
|
||||
"github.com/mickael-kerjean/filestash/server/model/formater"
|
||||
"hash/fnv"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
type SearchIndexer struct {
|
||||
Id string
|
||||
FoldersUnknown HeapDoc
|
||||
CurrentPhase string
|
||||
Backend IBackend
|
||||
DBPath string
|
||||
DB *sql.DB
|
||||
mu sync.Mutex
|
||||
lastHash string
|
||||
}
|
||||
|
||||
func NewSearchIndexer(id string, b IBackend) SearchIndexer {
|
||||
s := SearchIndexer{
|
||||
DBPath: GetAbsolutePath(FTS_PATH, "fts_"+id+".sql"),
|
||||
Id: id,
|
||||
Backend: b,
|
||||
FoldersUnknown: make(HeapDoc, 0, 1),
|
||||
}
|
||||
heap.Init(&s.FoldersUnknown)
|
||||
|
||||
db, err := sql.Open("sqlite3", s.DBPath+"?_journal_mode=wal")
|
||||
if err != nil {
|
||||
Log.Warning("search::init can't open database (%v)", err)
|
||||
return s
|
||||
}
|
||||
s.DB = db
|
||||
queryDB := func(sqlQuery string) error {
|
||||
stmt, err := db.Prepare(sqlQuery)
|
||||
if err != nil {
|
||||
Log.Warning("search::initschema prepare schema error(%v)", err)
|
||||
return err
|
||||
}
|
||||
defer stmt.Close()
|
||||
_, err = stmt.Exec()
|
||||
if err != nil {
|
||||
Log.Warning("search::initschema execute error(%v)", err)
|
||||
return err
|
||||
}
|
||||
return err
|
||||
}
|
||||
if queryDB("CREATE TABLE IF NOT EXISTS file(path VARCHAR(1024) PRIMARY KEY, filename VARCHAR(64), filetype VARCHAR(16), type VARCHAR(16), parent VARCHAR(1024), size INTEGER, modTime timestamp, indexTime timestamp DEFAULT NULL);"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE INDEX IF NOT EXISTS idx_file_index_time ON file(indexTime) WHERE indexTime IS NOT NULL;"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE INDEX IF NOT EXISTS idx_file_parent ON file(parent);"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE VIRTUAL TABLE IF NOT EXISTS file_index USING fts5(path UNINDEXED, filename, filetype, content, tokenize = 'porter');"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_insert AFTER INSERT ON file BEGIN INSERT INTO file_index (path, filename, filetype) VALUES(new.path, new.filename, new.filetype); END;"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_delete AFTER DELETE ON file BEGIN DELETE FROM file_index WHERE path = old.path; END;"); err != nil {
|
||||
return s
|
||||
}
|
||||
if queryDB("CREATE TRIGGER IF NOT EXISTS after_file_update_path UPDATE OF path ON file BEGIN UPDATE file_index SET path = new.path, filepath = new.filepath, filetype = new.filetype WHERE path = old.path; END;"); err != nil {
|
||||
return s
|
||||
}
|
||||
return s
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) Execute() {
|
||||
if this.CurrentPhase == "" {
|
||||
time.Sleep(1 * time.Second)
|
||||
this.CurrentPhase = PHASE_EXPLORE
|
||||
}
|
||||
Log.Debug("Search::indexing Execute %s", this.CurrentPhase)
|
||||
|
||||
cycleExecute := func(fn func(*sql.Tx) bool) {
|
||||
stopTime := time.Now().Add(time.Duration(CYCLE_TIME()) * time.Second)
|
||||
tx, err := this.DB.Begin()
|
||||
if err != nil {
|
||||
Log.Warning("search::index cycle_begin (%+v)", err)
|
||||
time.Sleep(5 * time.Second)
|
||||
}
|
||||
for {
|
||||
if fn(tx) == false {
|
||||
break
|
||||
}
|
||||
if stopTime.After(time.Now()) == false {
|
||||
break
|
||||
}
|
||||
}
|
||||
if err = tx.Commit(); err != nil {
|
||||
Log.Warning("search::index cycle_commit (%+v)", err)
|
||||
}
|
||||
}
|
||||
if this.CurrentPhase == PHASE_EXPLORE {
|
||||
cycleExecute(this.Discover)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_INDEXING {
|
||||
cycleExecute(this.Indexing)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_MAINTAIN {
|
||||
cycleExecute(this.Consolidate)
|
||||
return
|
||||
} else if this.CurrentPhase == PHASE_PAUSE {
|
||||
time.Sleep(5 * time.Second)
|
||||
this.CurrentPhase = ""
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) Discover(tx *sql.Tx) bool {
|
||||
if this.FoldersUnknown.Len() == 0 {
|
||||
this.CurrentPhase = PHASE_INDEXING
|
||||
return false
|
||||
}
|
||||
var doc *Document
|
||||
doc = heap.Pop(&this.FoldersUnknown).(*Document)
|
||||
if doc == nil {
|
||||
this.CurrentPhase = PHASE_INDEXING
|
||||
return false
|
||||
}
|
||||
files, err := this.Backend.Ls(doc.Path)
|
||||
if err != nil {
|
||||
this.CurrentPhase = ""
|
||||
return true
|
||||
}
|
||||
if len(files) == 0 {
|
||||
return true
|
||||
}
|
||||
|
||||
// We don't want our indexer to go wild and diverge over time. As such we need to detect those edge cases: aka
|
||||
// recursive folder structure. Our detection is relying on a Hash of []os.FileInfo
|
||||
hashFiles := func() string {
|
||||
var step int = len(files) / 50
|
||||
if step == 0 {
|
||||
step = 1
|
||||
}
|
||||
hasher := fnv.New32()
|
||||
hasher.Write([]byte(strconv.Itoa(len(files))))
|
||||
for i := 0; i < len(files); i = i + step {
|
||||
hasher.Write([]byte(files[i].Name()))
|
||||
}
|
||||
return base64.StdEncoding.EncodeToString(hasher.Sum(nil))
|
||||
}()
|
||||
if hashFiles == this.lastHash {
|
||||
return true
|
||||
}
|
||||
this.lastHash = ""
|
||||
for i := 0; i < this.FoldersUnknown.Len(); i++ {
|
||||
if this.FoldersUnknown[i].Hash == hashFiles && filepath.Base(doc.Path) != filepath.Base(this.FoldersUnknown[i].Path) {
|
||||
this.lastHash = hashFiles
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
// Insert the newly found data within our index
|
||||
for i := range files {
|
||||
f := files[i]
|
||||
name := f.Name()
|
||||
if f.IsDir() {
|
||||
var performPush bool = false
|
||||
p := filepath.Join(doc.Path, name)
|
||||
p += "/"
|
||||
if err = this.dbInsert(doc.Path, f, tx); err == nil {
|
||||
performPush = true
|
||||
} else if e, ok := err.(sqlite3.Error); ok && e.Code == sqlite3.ErrConstraint {
|
||||
performPush = func(path string) bool {
|
||||
var t string
|
||||
var err error
|
||||
if err := tx.QueryRow("SELECT indexTime FROM file WHERE path = ?", p).Scan(&t); err != nil {
|
||||
Log.Warning("search::discovery unknown_path (%v)", err)
|
||||
return false
|
||||
}
|
||||
tm, err := time.Parse(time.RFC3339, t)
|
||||
if err != nil {
|
||||
Log.Warning("search::discovery invalid_time (%v)", err)
|
||||
return false
|
||||
}
|
||||
if time.Now().Add(time.Duration(-SEARCH_REINDEX()) * time.Hour).Before(tm) {
|
||||
return false
|
||||
}
|
||||
if _, err = tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), p); err != nil {
|
||||
Log.Warning("search::discovery insertion_failed (%v)", err)
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}(p)
|
||||
} else {
|
||||
Log.Error("search::indexing insert_index (%v)", err)
|
||||
}
|
||||
if performPush == true {
|
||||
heap.Push(&this.FoldersUnknown, &Document{
|
||||
Type: "directory",
|
||||
Name: name,
|
||||
Path: p,
|
||||
Size: f.Size(),
|
||||
ModTime: f.ModTime(),
|
||||
Hash: hashFiles,
|
||||
})
|
||||
}
|
||||
} else {
|
||||
if err = this.dbInsert(doc.Path, f, tx); err != nil {
|
||||
if e, ok := err.(sqlite3.Error); ok && e.Code == sqlite3.ErrConstraint {
|
||||
return false
|
||||
}
|
||||
Log.Warning("search::insert index_error (%v)", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) Indexing(tx *sql.Tx) bool {
|
||||
ext := strings.Split(INDEXING_EXT(), ",")
|
||||
for i := 0; i < len(ext); i++ {
|
||||
ext[i] = "'" + strings.TrimSpace(ext[i]) + "'"
|
||||
}
|
||||
|
||||
rows, err := tx.Query(
|
||||
"SELECT path FROM file WHERE ("+
|
||||
" type = 'file' AND size < ? AND filetype IN ("+strings.Join(ext, ",")+") AND indexTime IS NULL "+
|
||||
") LIMIT 2",
|
||||
MAX_INDEXING_FSIZE(),
|
||||
)
|
||||
if err != nil {
|
||||
Log.Warning("search::insert index_query (%v)", err)
|
||||
return false
|
||||
}
|
||||
defer rows.Close()
|
||||
i := 0
|
||||
for rows.Next() {
|
||||
i += 1
|
||||
var path string
|
||||
if err = rows.Scan(&path); err != nil {
|
||||
Log.Warning("search::indexing index_scan (%v)", err)
|
||||
return false
|
||||
}
|
||||
if err = this.updateFile(path, tx); err != nil {
|
||||
Log.Warning("search::indexing index_update (%v)", err)
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
if i == 0 {
|
||||
this.CurrentPhase = PHASE_MAINTAIN
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) updateFile(path string, tx *sql.Tx) error {
|
||||
if _, err := tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(INDEXING_EXCLUSION); i++ {
|
||||
if strings.Contains(path, INDEXING_EXCLUSION[i]) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
reader, err := this.Backend.Cat(path)
|
||||
if err != nil {
|
||||
if _, a := tx.Exec("DELETE FROM file WHERE path = ?", path); a != nil {
|
||||
return a
|
||||
}
|
||||
return err
|
||||
}
|
||||
defer reader.Close()
|
||||
|
||||
switch GetMimeType(path) {
|
||||
case "text/plain":
|
||||
reader, err = formater.TxtFormater(reader)
|
||||
case "text/org":
|
||||
reader, err = formater.TxtFormater(reader)
|
||||
case "text/markdown":
|
||||
reader, err = formater.TxtFormater(reader)
|
||||
case "application/x-form":
|
||||
reader, err = formater.TxtFormater(reader)
|
||||
case "application/pdf":
|
||||
reader, err = formater.PdfFormater(reader)
|
||||
case "application/powerpoint":
|
||||
reader, err = formater.OfficeFormater(reader)
|
||||
case "application/vnd.ms-powerpoint":
|
||||
reader, err = formater.OfficeFormater(reader)
|
||||
case "application/word":
|
||||
reader, err = formater.OfficeFormater(reader)
|
||||
case "application/msword":
|
||||
reader, err = formater.OfficeFormater(reader)
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil
|
||||
}
|
||||
var content []byte
|
||||
if content, err = ioutil.ReadAll(reader); err != nil {
|
||||
Log.Warning("search::index content_read (%v)", err)
|
||||
return nil
|
||||
}
|
||||
if _, err = tx.Exec("UPDATE file_index SET content = ? WHERE path = ?", content, path); err != nil {
|
||||
Log.Warning("search::index index_update (%v)", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) updateFolder(path string, tx *sql.Tx) error {
|
||||
if _, err := tx.Exec("UPDATE file SET indexTime = ? WHERE path = ?", time.Now(), path); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
for i := 0; i < len(INDEXING_EXCLUSION); i++ {
|
||||
if strings.Contains(path, INDEXING_EXCLUSION[i]) {
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// Fetch list of folders as in the remote filesystem
|
||||
currFiles, err := this.Backend.Ls(path)
|
||||
if err != nil {
|
||||
tx.Exec("DELETE FROM file WHERE path >= ? AND path < ?", path, path+"~")
|
||||
return err
|
||||
}
|
||||
|
||||
// Fetch FS as appear in our search cache
|
||||
rows, err := tx.Query("SELECT filename, type, size FROM file WHERE parent = ?", path)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer rows.Close()
|
||||
previousFiles := make([]File, 0)
|
||||
for rows.Next() {
|
||||
var f File
|
||||
rows.Scan(&f.FName, &f.FType, f.FSize)
|
||||
previousFiles = append(previousFiles, f)
|
||||
}
|
||||
|
||||
// Perform the DB operation to ensure previousFiles and currFiles are in sync
|
||||
// 1. Find the content that have been created and did not exist before
|
||||
for i := 0; i < len(currFiles); i++ {
|
||||
currFilenameAlreadyExist := false
|
||||
currFilename := currFiles[i].Name()
|
||||
for j := 0; j < len(previousFiles); j++ {
|
||||
if currFilename == previousFiles[j].Name() {
|
||||
if currFiles[i].Size() != previousFiles[j].Size() {
|
||||
err = this.dbUpdate(path, currFiles[i], tx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
break
|
||||
}
|
||||
currFilenameAlreadyExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if currFilenameAlreadyExist == false {
|
||||
this.dbInsert(path, currFiles[i], tx)
|
||||
}
|
||||
}
|
||||
// 2. Find the content that was existing before but got removed
|
||||
for i := 0; i < len(previousFiles); i++ {
|
||||
previousFilenameStillExist := false
|
||||
previousFilename := previousFiles[i].Name()
|
||||
for j := 0; j < len(currFiles); j++ {
|
||||
if previousFilename == currFiles[j].Name() {
|
||||
previousFilenameStillExist = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if previousFilenameStillExist == false {
|
||||
this.dbDelete(path, previousFiles[i], tx)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) Consolidate(tx *sql.Tx) bool {
|
||||
rows, err := tx.Query(
|
||||
"SELECT path, type FROM file WHERE indexTime < ? ORDER BY indexTime DESC LIMIT 5",
|
||||
time.Now().Add(-time.Duration(SEARCH_REINDEX())*time.Hour),
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
this.CurrentPhase = PHASE_PAUSE
|
||||
return false
|
||||
}
|
||||
this.CurrentPhase = ""
|
||||
return false
|
||||
}
|
||||
defer rows.Close()
|
||||
i := 0
|
||||
for rows.Next() {
|
||||
i += 1
|
||||
var path string
|
||||
var cType string
|
||||
if err = rows.Scan(&path, &cType); err != nil {
|
||||
Log.Warning("search::index db_stale (%v)", err)
|
||||
return false
|
||||
}
|
||||
if cType == "directory" {
|
||||
this.updateFolder(path, tx)
|
||||
} else {
|
||||
this.updateFile(path, tx)
|
||||
}
|
||||
}
|
||||
if i == 0 {
|
||||
this.CurrentPhase = PHASE_PAUSE
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) dbInsert(parent string, f os.FileInfo, tx *sql.Tx) error {
|
||||
var name string = f.Name()
|
||||
var err error
|
||||
path := filepath.Join(parent, name)
|
||||
|
||||
if f.IsDir() {
|
||||
_, err = tx.Exec(
|
||||
"INSERT INTO file(path, parent, filename, type, size, modTime, indexTime) "+
|
||||
"VALUES(?, ?, ?, ?, ?, ?, ?)",
|
||||
path+"/",
|
||||
parent,
|
||||
name,
|
||||
"directory",
|
||||
f.Size(),
|
||||
f.ModTime(),
|
||||
time.Now(),
|
||||
)
|
||||
} else {
|
||||
_, err = tx.Exec(
|
||||
"INSERT INTO file(path, parent, filename, type, size, modTime, indexTime, filetype) "+
|
||||
"VALUES(?, ?, ?, ?, ?, ?, ?, ?)",
|
||||
path,
|
||||
parent,
|
||||
name,
|
||||
"file",
|
||||
f.Size(),
|
||||
f.ModTime(),
|
||||
nil,
|
||||
strings.TrimPrefix(filepath.Ext(name), "."),
|
||||
)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) dbUpdate(parent string, f os.FileInfo, tx *sql.Tx) error {
|
||||
path := filepath.Join(parent, f.Name())
|
||||
if f.IsDir() {
|
||||
path += "/"
|
||||
}
|
||||
_, err := tx.Exec(
|
||||
"UPDATE file SET size = ?, modTime = ? indexTime = NULL WHERE path = ?",
|
||||
f.Size(), f.ModTime(), path,
|
||||
)
|
||||
return err
|
||||
}
|
||||
|
||||
func (this *SearchIndexer) dbDelete(parent string, f os.FileInfo, tx *sql.Tx) error {
|
||||
path := filepath.Join(parent, f.Name())
|
||||
if f.IsDir() {
|
||||
path += "/"
|
||||
}
|
||||
_, err := tx.Exec(
|
||||
"DELETE FROM file WHERE path >= ? AND path < ?",
|
||||
path, path+"~",
|
||||
)
|
||||
return err
|
||||
}
|
||||
Loading…
Reference in a new issue